feat: 增强 Gemini 服务支持并添加统一调度器

- 新增 unifiedGeminiScheduler.js 统一账户调度服务
- 增强 geminiRoutes.js 支持更多 Gemini API 端点
- 优化 geminiAccountService.js 账户管理和 token 刷新机制
- 添加对 v1internal 端点的完整支持(loadCodeAssist、onboardUser、countTokens、generateContent、streamGenerateContent)
- 改进错误处理和流式响应管理

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
千羽
2025-08-04 14:47:03 +09:00
parent 6d27dd7c94
commit 33837c23aa
3 changed files with 1100 additions and 128 deletions

View File

@@ -5,6 +5,9 @@ const { authenticateApiKey } = require('../middleware/auth');
const geminiAccountService = require('../services/geminiAccountService');
const { sendGeminiRequest, getAvailableModels } = require('../services/geminiRelayService');
const crypto = require('crypto');
const sessionHelper = require('../utils/sessionHelper');
const unifiedGeminiScheduler = require('../services/unifiedGeminiScheduler');
const { OAuth2Client } = require('google-auth-library');
// 生成会话哈希
function generateSessionHash(req) {
@@ -272,4 +275,259 @@ router.get('/key-info', authenticateApiKey, async (req, res) => {
}
});
router.post('/v1internal\\:loadCodeAssist', authenticateApiKey, async (req, res) => {
try {
const sessionHash = sessionHelper.generateSessionHash(req.body);
// 使用统一调度选择账号(传递请求的模型)
const requestedModel = req.body.model;
const { accountId } = await unifiedGeminiScheduler.selectAccountForApiKey(req.apiKey, sessionHash, requestedModel);
const { accessToken, refreshToken } = await geminiAccountService.getAccount(accountId);
logger.info(`accessToken: ${accessToken}`);
const { metadata, cloudaicompanionProject } = req.body;
logger.info('LoadCodeAssist request', {
metadata: metadata || {},
cloudaicompanionProject: cloudaicompanionProject || null,
apiKeyId: req.apiKey?.id || 'unknown'
});
const client = await geminiAccountService.getOauthClient(accessToken, refreshToken);
const response = await geminiAccountService.loadCodeAssist(client, cloudaicompanionProject);
res.json(response);
} catch (error) {
logger.error('Error in loadCodeAssist endpoint', { error: error.message });
res.status(500).json({
error: 'Internal server error',
message: error.message
});
}
});
router.post('/v1internal\\:onboardUser', authenticateApiKey, async (req, res) => {
try {
const { tierId, cloudaicompanionProject, metadata } = req.body;
const sessionHash = sessionHelper.generateSessionHash(req.body);
// 使用统一调度选择账号(传递请求的模型)
const requestedModel = req.body.model;
const { accountId } = await unifiedGeminiScheduler.selectAccountForApiKey(req.apiKey, sessionHash, requestedModel);
const { accessToken, refreshToken } = await geminiAccountService.getAccount(accountId);
logger.info('OnboardUser request', {
tierId: tierId || 'not provided',
cloudaicompanionProject: cloudaicompanionProject || null,
metadata: metadata || {},
apiKeyId: req.apiKey?.id || 'unknown'
});
const client = await geminiAccountService.getOauthClient(accessToken, refreshToken);
// 如果提供了完整参数直接调用onboardUser
if (tierId && metadata) {
const response = await geminiAccountService.onboardUser(client, tierId, cloudaicompanionProject, metadata);
res.json(response);
} else {
// 否则执行完整的setupUser流程
const response = await geminiAccountService.setupUser(client, cloudaicompanionProject, metadata);
res.json(response);
}
} catch (error) {
logger.error('Error in onboardUser endpoint', { error: error.message });
res.status(500).json({
error: 'Internal server error',
message: error.message
});
}
});
router.post('/v1internal\\:countTokens', authenticateApiKey, async (req, res) => {
try {
// 处理请求体结构,支持直接 contents 或 request.contents
const requestData = req.body.request || req.body;
const { contents, model = 'gemini-2.0-flash-exp' } = requestData;
const sessionHash = sessionHelper.generateSessionHash(req.body);
// 验证必需参数
if (!contents || !Array.isArray(contents)) {
return res.status(400).json({
error: {
message: 'Contents array is required',
type: 'invalid_request_error'
}
});
}
// 使用统一调度选择账号
const { accountId } = await unifiedGeminiScheduler.selectAccountForApiKey(req.apiKey, sessionHash, model);
const { accessToken, refreshToken } = await geminiAccountService.getAccount(accountId);
logger.info('CountTokens request', {
model: model,
contentsLength: contents.length,
apiKeyId: req.apiKey?.id || 'unknown'
});
const client = await geminiAccountService.getOauthClient(accessToken, refreshToken);
const response = await geminiAccountService.countTokens(client, contents, model);
res.json(response);
} catch (error) {
logger.error('Error in countTokens endpoint', { error: error.message });
res.status(500).json({
error: {
message: error.message || 'Internal server error',
type: 'api_error'
}
});
}
});
router.post('/v1internal\\:generateContent', authenticateApiKey, async (req, res) => {
try {
const { model, project, user_prompt_id, request: requestData } = req.body;
const sessionHash = sessionHelper.generateSessionHash(req.body);
// 验证必需参数
if (!requestData || !requestData.contents) {
return res.status(400).json({
error: {
message: 'Request contents are required',
type: 'invalid_request_error'
}
});
}
// 使用统一调度选择账号
const { accountId } = await unifiedGeminiScheduler.selectAccountForApiKey(req.apiKey, sessionHash, model);
const account = await geminiAccountService.getAccount(accountId);
const { accessToken, refreshToken } = account;
logger.info('GenerateContent request', {
model: model,
userPromptId: user_prompt_id,
projectId: project || account.projectId,
apiKeyId: req.apiKey?.id || 'unknown'
});
const client = await geminiAccountService.getOauthClient(accessToken, refreshToken);
const response = await geminiAccountService.generateContent(
client,
{ model, request: requestData },
user_prompt_id,
project || account.projectId,
req.apiKey?.id // 使用 API Key ID 作为 session ID
);
res.json(response);
} catch (error) {
logger.error('Error in generateContent endpoint', { error: error.message });
res.status(500).json({
error: {
message: error.message || 'Internal server error',
type: 'api_error'
}
});
}
});
router.post('/v1internal\\:streamGenerateContent', authenticateApiKey, async (req, res) => {
let abortController = null;
try {
const { model, project, user_prompt_id, request: requestData } = req.body;
const sessionHash = sessionHelper.generateSessionHash(req.body);
// 验证必需参数
if (!requestData || !requestData.contents) {
return res.status(400).json({
error: {
message: 'Request contents are required',
type: 'invalid_request_error'
}
});
}
// 使用统一调度选择账号
const { accountId } = await unifiedGeminiScheduler.selectAccountForApiKey(req.apiKey, sessionHash, model);
const account = await geminiAccountService.getAccount(accountId);
const { accessToken, refreshToken } = account;
logger.info('StreamGenerateContent request', {
model: model,
userPromptId: user_prompt_id,
projectId: project || account.projectId,
apiKeyId: req.apiKey?.id || 'unknown'
});
// 创建中止控制器
abortController = new AbortController();
// 处理客户端断开连接
req.on('close', () => {
if (abortController && !abortController.signal.aborted) {
logger.info('Client disconnected, aborting stream request');
abortController.abort();
}
});
const client = await geminiAccountService.getOauthClient(accessToken, refreshToken);
const streamResponse = await geminiAccountService.generateContentStream(
client,
{ model, request: requestData },
user_prompt_id,
project || account.projectId,
req.apiKey?.id, // 使用 API Key ID 作为 session ID
abortController.signal // 传递中止信号
);
// 设置 SSE 响应头
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Accel-Buffering', 'no');
// 直接管道转发流式响应,不进行额外处理
streamResponse.pipe(res, { end: false });
streamResponse.on('end', () => {
logger.info('Stream completed successfully');
res.end();
});
streamResponse.on('error', (error) => {
logger.error('Stream error:', error);
if (!res.headersSent) {
res.status(500).json({
error: {
message: error.message || 'Stream error',
type: 'api_error'
}
});
} else {
res.end();
}
});
} catch (error) {
logger.error('Error in streamGenerateContent endpoint', { error: error.message });
if (!res.headersSent) {
res.status(500).json({
error: {
message: error.message || 'Internal server error',
type: 'api_error'
}
});
}
} finally {
// 清理资源
if (abortController) {
abortController = null;
}
}
});
module.exports = router;

View File

@@ -709,6 +709,336 @@ async function setAccountRateLimited(accountId, isLimited = true) {
await updateAccount(accountId, updates);
}
// 获取配置的OAuth客户端 - 参考GeminiCliSimulator的getOauthClient方法
async function getOauthClient(accessToken, refreshToken) {
const client = new OAuth2Client({
clientId: OAUTH_CLIENT_ID,
clientSecret: OAUTH_CLIENT_SECRET,
});
const creds = {
'access_token': accessToken,
'refresh_token': refreshToken,
'scope': 'https://www.googleapis.com/auth/cloud-platform https://www.googleapis.com/auth/userinfo.profile openid https://www.googleapis.com/auth/userinfo.email',
'token_type': 'Bearer',
'expiry_date': 1754269905646
};
// 设置凭据
client.setCredentials(creds);
// 验证凭据本地有效性
const { token } = await client.getAccessToken();
if (!token) {
return false;
}
// 验证服务器端token状态检查是否被撤销
await client.getTokenInfo(token);
logger.info('✅ OAuth客户端已创建');
return client;
}
// 调用 Google Code Assist API 的 loadCodeAssist 方法
async function loadCodeAssist(client, projectId = null) {
const axios = require('axios');
const CODE_ASSIST_ENDPOINT = 'https://cloudcode-pa.googleapis.com';
const CODE_ASSIST_API_VERSION = 'v1internal';
const { token } = await client.getAccessToken();
// 创建ClientMetadata
const clientMetadata = {
ideType: 'IDE_UNSPECIFIED',
platform: 'PLATFORM_UNSPECIFIED',
pluginType: 'GEMINI',
duetProject: projectId,
};
const request = {
cloudaicompanionProject: projectId,
metadata: clientMetadata,
};
const response = await axios({
url: `${CODE_ASSIST_ENDPOINT}/${CODE_ASSIST_API_VERSION}:loadCodeAssist`,
method: 'POST',
headers: {
'Authorization': `Bearer ${token}`,
'Content-Type': 'application/json',
},
data: request,
timeout: 30000,
});
logger.info('📋 loadCodeAssist API调用成功');
return response.data;
}
// 获取onboard层级 - 参考GeminiCliSimulator的getOnboardTier方法
function getOnboardTier(loadRes) {
// 用户层级枚举
const UserTierId = {
LEGACY: 'LEGACY',
FREE: 'FREE',
PRO: 'PRO'
};
if (loadRes.currentTier) {
return loadRes.currentTier;
}
for (const tier of loadRes.allowedTiers || []) {
if (tier.isDefault) {
return tier;
}
}
return {
name: '',
description: '',
id: UserTierId.LEGACY,
userDefinedCloudaicompanionProject: true,
};
}
// 调用 Google Code Assist API 的 onboardUser 方法(包含轮询逻辑)
async function onboardUser(client, tierId, projectId, clientMetadata) {
const axios = require('axios');
const CODE_ASSIST_ENDPOINT = 'https://cloudcode-pa.googleapis.com';
const CODE_ASSIST_API_VERSION = 'v1internal';
const { token } = await client.getAccessToken();
const onboardReq = {
tierId: tierId,
cloudaicompanionProject: projectId,
metadata: clientMetadata,
};
logger.info('📋 开始onboardUser API调用', { tierId, projectId });
// 轮询onboardUser直到长运行操作完成
let lroRes = await axios({
url: `${CODE_ASSIST_ENDPOINT}/${CODE_ASSIST_API_VERSION}:onboardUser`,
method: 'POST',
headers: {
'Authorization': `Bearer ${token}`,
'Content-Type': 'application/json',
},
data: onboardReq,
timeout: 30000,
});
let attempts = 0;
const maxAttempts = 12; // 最多等待1分钟5秒 * 12次
while (!lroRes.data.done && attempts < maxAttempts) {
logger.info(`⏳ 等待onboardUser完成... (${attempts + 1}/${maxAttempts})`);
await new Promise(resolve => setTimeout(resolve, 5000));
lroRes = await axios({
url: `${CODE_ASSIST_ENDPOINT}/${CODE_ASSIST_API_VERSION}:onboardUser`,
method: 'POST',
headers: {
'Authorization': `Bearer ${token}`,
'Content-Type': 'application/json',
},
data: onboardReq,
timeout: 30000,
});
attempts++;
}
if (!lroRes.data.done) {
throw new Error('onboardUser操作超时');
}
logger.info('✅ onboardUser API调用完成');
return lroRes.data;
}
// 完整的用户设置流程 - 参考setup.ts的逻辑
async function setupUser(client, initialProjectId = null, clientMetadata = null) {
logger.info('🚀 setupUser 开始', { initialProjectId, hasClientMetadata: !!clientMetadata });
let projectId = initialProjectId || process.env.GOOGLE_CLOUD_PROJECT || null;
logger.info('📋 初始项目ID', { projectId, fromEnv: !!process.env.GOOGLE_CLOUD_PROJECT });
// 默认的ClientMetadata
if (!clientMetadata) {
clientMetadata = {
ideType: 'IDE_UNSPECIFIED',
platform: 'PLATFORM_UNSPECIFIED',
pluginType: 'GEMINI',
duetProject: projectId,
};
logger.info('🔧 使用默认 ClientMetadata');
}
// 调用loadCodeAssist
logger.info('📞 调用 loadCodeAssist...');
const loadRes = await loadCodeAssist(client, projectId);
logger.info('✅ loadCodeAssist 完成', { hasCloudaicompanionProject: !!loadRes.cloudaicompanionProject });
// 如果没有projectId尝试从loadRes获取
if (!projectId && loadRes.cloudaicompanionProject) {
projectId = loadRes.cloudaicompanionProject;
logger.info('📋 从 loadCodeAssist 获取项目ID', { projectId });
}
const tier = getOnboardTier(loadRes);
logger.info('🎯 获取用户层级', { tierId: tier.id, userDefinedProject: tier.userDefinedCloudaicompanionProject });
if (tier.userDefinedCloudaiCompanionProject && !projectId) {
throw new Error('此账号需要设置GOOGLE_CLOUD_PROJECT环境变量或提供projectId');
}
// 调用onboardUser
logger.info('📞 调用 onboardUser...', { tierId: tier.id, projectId });
const lroRes = await onboardUser(client, tier.id, projectId, clientMetadata);
logger.info('✅ onboardUser 完成', { hasDone: !!lroRes.done, hasResponse: !!lroRes.response });
const result = {
projectId: lroRes.response?.cloudaicompanionProject?.id || projectId || '',
userTier: tier.id,
loadRes,
onboardRes: lroRes.response || {}
};
logger.info('🎯 setupUser 完成', { resultProjectId: result.projectId, userTier: result.userTier });
return result;
}
// 调用 Code Assist API 计算 token 数量
async function countTokens(client, contents, model = 'gemini-2.0-flash-exp') {
const axios = require('axios');
const CODE_ASSIST_ENDPOINT = 'https://cloudcode-pa.googleapis.com';
const CODE_ASSIST_API_VERSION = 'v1internal';
const { token } = await client.getAccessToken();
// 按照 gemini-cli 的转换格式构造请求
const request = {
request: {
model: `models/${model}`,
contents: contents
}
};
logger.info('📊 countTokens API调用开始', { model, contentsLength: contents.length });
const response = await axios({
url: `${CODE_ASSIST_ENDPOINT}/${CODE_ASSIST_API_VERSION}:countTokens`,
method: 'POST',
headers: {
'Authorization': `Bearer ${token}`,
'Content-Type': 'application/json',
},
data: request,
timeout: 30000,
});
logger.info('✅ countTokens API调用成功', { totalTokens: response.data.totalTokens });
return response.data;
}
// 调用 Code Assist API 生成内容(非流式)
async function generateContent(client, requestData, userPromptId, projectId = null, sessionId = null) {
const axios = require('axios');
const CODE_ASSIST_ENDPOINT = 'https://cloudcode-pa.googleapis.com';
const CODE_ASSIST_API_VERSION = 'v1internal';
const { token } = await client.getAccessToken();
// 按照 gemini-cli 的转换格式构造请求
const request = {
model: requestData.model,
project: projectId,
user_prompt_id: userPromptId,
request: {
...requestData.request,
session_id: sessionId
}
};
logger.info('🤖 generateContent API调用开始', {
model: requestData.model,
userPromptId,
projectId,
sessionId
});
const response = await axios({
url: `${CODE_ASSIST_ENDPOINT}/${CODE_ASSIST_API_VERSION}:generateContent`,
method: 'POST',
headers: {
'Authorization': `Bearer ${token}`,
'Content-Type': 'application/json',
},
data: request,
timeout: 60000, // 生成内容可能需要更长时间
});
logger.info('✅ generateContent API调用成功');
return response.data;
}
// 调用 Code Assist API 生成内容(流式)
async function generateContentStream(client, requestData, userPromptId, projectId = null, sessionId = null, signal = null) {
const axios = require('axios');
const CODE_ASSIST_ENDPOINT = 'https://cloudcode-pa.googleapis.com';
const CODE_ASSIST_API_VERSION = 'v1internal';
const { token } = await client.getAccessToken();
// 按照 gemini-cli 的转换格式构造请求
const request = {
model: requestData.model,
project: projectId,
user_prompt_id: userPromptId,
request: {
...requestData.request,
session_id: sessionId
}
};
logger.info('🌊 streamGenerateContent API调用开始', {
model: requestData.model,
userPromptId,
projectId,
sessionId
});
const axiosConfig = {
url: `${CODE_ASSIST_ENDPOINT}/${CODE_ASSIST_API_VERSION}:streamGenerateContent`,
method: 'POST',
params: {
alt: 'sse'
},
headers: {
'Authorization': `Bearer ${token}`,
'Content-Type': 'application/json',
},
data: request,
responseType: 'stream',
timeout: 60000,
};
// 如果提供了中止信号,添加到配置中
if (signal) {
axiosConfig.signal = signal;
}
const response = await axios(axiosConfig);
logger.info('✅ streamGenerateContent API调用成功开始流式传输');
return response.data; // 返回流对象
}
module.exports = {
generateAuthUrl,
pollAuthorizationStatus,
@@ -724,6 +1054,14 @@ module.exports = {
markAccountUsed,
setAccountRateLimited,
isTokenExpired,
getOauthClient,
loadCodeAssist,
getOnboardTier,
onboardUser,
setupUser,
countTokens,
generateContent,
generateContentStream,
OAUTH_CLIENT_ID,
OAUTH_SCOPES
};

View File

@@ -0,0 +1,376 @@
const geminiAccountService = require('./geminiAccountService');
const accountGroupService = require('./accountGroupService');
const redis = require('../models/redis');
const logger = require('../utils/logger');
class UnifiedGeminiScheduler {
constructor() {
this.SESSION_MAPPING_PREFIX = 'unified_gemini_session_mapping:';
}
// 🎯 统一调度Gemini账号
async selectAccountForApiKey(apiKeyData, sessionHash = null, requestedModel = null) {
try {
// 如果API Key绑定了专属账户或分组优先使用
if (apiKeyData.geminiAccountId) {
// 检查是否是分组
if (apiKeyData.geminiAccountId.startsWith('group:')) {
const groupId = apiKeyData.geminiAccountId.replace('group:', '');
logger.info(`🎯 API key ${apiKeyData.name} is bound to group ${groupId}, selecting from group`);
return await this.selectAccountFromGroup(groupId, sessionHash, requestedModel, apiKeyData);
}
// 普通专属账户
const boundAccount = await geminiAccountService.getAccount(apiKeyData.geminiAccountId);
if (boundAccount && boundAccount.isActive === 'true' && boundAccount.status !== 'error') {
logger.info(`🎯 Using bound dedicated Gemini account: ${boundAccount.name} (${apiKeyData.geminiAccountId}) for API key ${apiKeyData.name}`);
return {
accountId: apiKeyData.geminiAccountId,
accountType: 'gemini'
};
} else {
logger.warn(`⚠️ Bound Gemini account ${apiKeyData.geminiAccountId} is not available, falling back to pool`);
}
}
// 如果有会话哈希,检查是否有已映射的账户
if (sessionHash) {
const mappedAccount = await this._getSessionMapping(sessionHash);
if (mappedAccount) {
// 验证映射的账户是否仍然可用
const isAvailable = await this._isAccountAvailable(mappedAccount.accountId, mappedAccount.accountType);
if (isAvailable) {
logger.info(`🎯 Using sticky session account: ${mappedAccount.accountId} (${mappedAccount.accountType}) for session ${sessionHash}`);
return mappedAccount;
} else {
logger.warn(`⚠️ Mapped account ${mappedAccount.accountId} is no longer available, selecting new account`);
await this._deleteSessionMapping(sessionHash);
}
}
}
// 获取所有可用账户
const availableAccounts = await this._getAllAvailableAccounts(apiKeyData, requestedModel);
if (availableAccounts.length === 0) {
// 提供更详细的错误信息
if (requestedModel) {
throw new Error(`No available Gemini accounts support the requested model: ${requestedModel}`);
} else {
throw new Error('No available Gemini accounts');
}
}
// 按优先级和最后使用时间排序
const sortedAccounts = this._sortAccountsByPriority(availableAccounts);
// 选择第一个账户
const selectedAccount = sortedAccounts[0];
// 如果有会话哈希,建立新的映射
if (sessionHash) {
await this._setSessionMapping(sessionHash, selectedAccount.accountId, selectedAccount.accountType);
logger.info(`🎯 Created new sticky session mapping: ${selectedAccount.name} (${selectedAccount.accountId}, ${selectedAccount.accountType}) for session ${sessionHash}`);
}
logger.info(`🎯 Selected account: ${selectedAccount.name} (${selectedAccount.accountId}, ${selectedAccount.accountType}) with priority ${selectedAccount.priority} for API key ${apiKeyData.name}`);
return {
accountId: selectedAccount.accountId,
accountType: selectedAccount.accountType
};
} catch (error) {
logger.error('❌ Failed to select account for API key:', error);
throw error;
}
}
// 📋 获取所有可用账户
async _getAllAvailableAccounts(apiKeyData, requestedModel = null) {
const availableAccounts = [];
// 如果API Key绑定了专属账户优先返回
if (apiKeyData.geminiAccountId) {
const boundAccount = await geminiAccountService.getAccount(apiKeyData.geminiAccountId);
if (boundAccount && boundAccount.isActive === 'true' && boundAccount.status !== 'error') {
const isRateLimited = await this.isAccountRateLimited(boundAccount.id);
if (!isRateLimited) {
logger.info(`🎯 Using bound dedicated Gemini account: ${boundAccount.name} (${apiKeyData.geminiAccountId})`);
return [{
...boundAccount,
accountId: boundAccount.id,
accountType: 'gemini',
priority: parseInt(boundAccount.priority) || 50,
lastUsedAt: boundAccount.lastUsedAt || '0'
}];
}
} else {
logger.warn(`⚠️ Bound Gemini account ${apiKeyData.geminiAccountId} is not available`);
}
}
// 获取所有Gemini账户共享池
const geminiAccounts = await geminiAccountService.getAllAccounts();
for (const account of geminiAccounts) {
if (account.isActive === 'true' &&
account.status !== 'error' &&
(account.accountType === 'shared' || !account.accountType) && // 兼容旧数据
account.schedulable !== 'false') { // 检查是否可调度
// 检查token是否过期
const isExpired = geminiAccountService.isTokenExpired(account);
if (isExpired && !account.refreshToken) {
logger.warn(`⚠️ Gemini account ${account.name} token expired and no refresh token available`);
continue;
}
// 检查是否被限流
const isRateLimited = await this.isAccountRateLimited(account.id);
if (!isRateLimited) {
availableAccounts.push({
...account,
accountId: account.id,
accountType: 'gemini',
priority: parseInt(account.priority) || 50, // 默认优先级50
lastUsedAt: account.lastUsedAt || '0'
});
}
}
}
logger.info(`📊 Total available Gemini accounts: ${availableAccounts.length}`);
return availableAccounts;
}
// 🔢 按优先级和最后使用时间排序账户
_sortAccountsByPriority(accounts) {
return accounts.sort((a, b) => {
// 首先按优先级排序(数字越小优先级越高)
if (a.priority !== b.priority) {
return a.priority - b.priority;
}
// 优先级相同时,按最后使用时间排序(最久未使用的优先)
const aLastUsed = new Date(a.lastUsedAt || 0).getTime();
const bLastUsed = new Date(b.lastUsedAt || 0).getTime();
return aLastUsed - bLastUsed;
});
}
// 🔍 检查账户是否可用
async _isAccountAvailable(accountId, accountType) {
try {
if (accountType === 'gemini') {
const account = await geminiAccountService.getAccount(accountId);
if (!account || account.isActive !== 'true' || account.status === 'error') {
return false;
}
// 检查是否可调度
if (account.schedulable === 'false') {
logger.info(`🚫 Gemini account ${accountId} is not schedulable`);
return false;
}
return !(await this.isAccountRateLimited(accountId));
}
return false;
} catch (error) {
logger.warn(`⚠️ Failed to check account availability: ${accountId}`, error);
return false;
}
}
// 🔗 获取会话映射
async _getSessionMapping(sessionHash) {
const client = redis.getClientSafe();
const mappingData = await client.get(`${this.SESSION_MAPPING_PREFIX}${sessionHash}`);
if (mappingData) {
try {
return JSON.parse(mappingData);
} catch (error) {
logger.warn('⚠️ Failed to parse session mapping:', error);
return null;
}
}
return null;
}
// 💾 设置会话映射
async _setSessionMapping(sessionHash, accountId, accountType) {
const client = redis.getClientSafe();
const mappingData = JSON.stringify({ accountId, accountType });
// 设置1小时过期
await client.setex(
`${this.SESSION_MAPPING_PREFIX}${sessionHash}`,
3600,
mappingData
);
}
// 🗑️ 删除会话映射
async _deleteSessionMapping(sessionHash) {
const client = redis.getClientSafe();
await client.del(`${this.SESSION_MAPPING_PREFIX}${sessionHash}`);
}
// 🚫 标记账户为限流状态
async markAccountRateLimited(accountId, accountType, sessionHash = null) {
try {
if (accountType === 'gemini') {
await geminiAccountService.setAccountRateLimited(accountId, true);
}
// 删除会话映射
if (sessionHash) {
await this._deleteSessionMapping(sessionHash);
}
return { success: true };
} catch (error) {
logger.error(`❌ Failed to mark account as rate limited: ${accountId} (${accountType})`, error);
throw error;
}
}
// ✅ 移除账户的限流状态
async removeAccountRateLimit(accountId, accountType) {
try {
if (accountType === 'gemini') {
await geminiAccountService.setAccountRateLimited(accountId, false);
}
return { success: true };
} catch (error) {
logger.error(`❌ Failed to remove rate limit for account: ${accountId} (${accountType})`, error);
throw error;
}
}
// 🔍 检查账户是否处于限流状态
async isAccountRateLimited(accountId) {
try {
const account = await geminiAccountService.getAccount(accountId);
if (!account) return false;
if (account.rateLimitStatus === 'limited' && account.rateLimitedAt) {
const limitedAt = new Date(account.rateLimitedAt).getTime();
const now = Date.now();
const limitDuration = 60 * 60 * 1000; // 1小时
return now < (limitedAt + limitDuration);
}
return false;
} catch (error) {
logger.error(`❌ Failed to check rate limit status: ${accountId}`, error);
return false;
}
}
// 👥 从分组中选择账户
async selectAccountFromGroup(groupId, sessionHash = null, requestedModel = null, apiKeyData = null) {
try {
// 获取分组信息
const group = await accountGroupService.getGroup(groupId);
if (!group) {
throw new Error(`Group ${groupId} not found`);
}
if (group.platform !== 'gemini') {
throw new Error(`Group ${group.name} is not a Gemini group`);
}
logger.info(`👥 Selecting account from Gemini group: ${group.name}`);
// 如果有会话哈希,检查是否有已映射的账户
if (sessionHash) {
const mappedAccount = await this._getSessionMapping(sessionHash);
if (mappedAccount) {
// 验证映射的账户是否属于这个分组
const memberIds = await accountGroupService.getGroupMembers(groupId);
if (memberIds.includes(mappedAccount.accountId)) {
const isAvailable = await this._isAccountAvailable(mappedAccount.accountId, mappedAccount.accountType);
if (isAvailable) {
logger.info(`🎯 Using sticky session account from group: ${mappedAccount.accountId} (${mappedAccount.accountType}) for session ${sessionHash}`);
return mappedAccount;
}
}
// 如果映射的账户不可用或不在分组中,删除映射
await this._deleteSessionMapping(sessionHash);
}
}
// 获取分组内的所有账户
const memberIds = await accountGroupService.getGroupMembers(groupId);
if (memberIds.length === 0) {
throw new Error(`Group ${group.name} has no members`);
}
const availableAccounts = [];
// 获取所有成员账户的详细信息
for (const memberId of memberIds) {
const account = await geminiAccountService.getAccount(memberId);
if (!account) {
logger.warn(`⚠️ Gemini account ${memberId} not found in group ${group.name}`);
continue;
}
// 检查账户是否可用
if (account.isActive === 'true' &&
account.status !== 'error' &&
account.schedulable !== 'false') {
// 检查token是否过期
const isExpired = geminiAccountService.isTokenExpired(account);
if (isExpired && !account.refreshToken) {
logger.warn(`⚠️ Gemini account ${account.name} in group token expired and no refresh token available`);
continue;
}
// 检查是否被限流
const isRateLimited = await this.isAccountRateLimited(account.id);
if (!isRateLimited) {
availableAccounts.push({
...account,
accountId: account.id,
accountType: 'gemini',
priority: parseInt(account.priority) || 50,
lastUsedAt: account.lastUsedAt || '0'
});
}
}
}
if (availableAccounts.length === 0) {
throw new Error(`No available accounts in Gemini group ${group.name}`);
}
// 使用现有的优先级排序逻辑
const sortedAccounts = this._sortAccountsByPriority(availableAccounts);
// 选择第一个账户
const selectedAccount = sortedAccounts[0];
// 如果有会话哈希,建立新的映射
if (sessionHash) {
await this._setSessionMapping(sessionHash, selectedAccount.accountId, selectedAccount.accountType);
logger.info(`🎯 Created new sticky session mapping in group: ${selectedAccount.name} (${selectedAccount.accountId}, ${selectedAccount.accountType}) for session ${sessionHash}`);
}
logger.info(`🎯 Selected account from Gemini group ${group.name}: ${selectedAccount.name} (${selectedAccount.accountId}, ${selectedAccount.accountType}) with priority ${selectedAccount.priority}`);
return {
accountId: selectedAccount.accountId,
accountType: selectedAccount.accountType
};
} catch (error) {
logger.error(`❌ Failed to select account from Gemini group ${groupId}:`, error);
throw error;
}
}
}
module.exports = new UnifiedGeminiScheduler();