diff --git a/src/handlers/geminiHandlers.js b/src/handlers/geminiHandlers.js new file mode 100644 index 00000000..1e9a3968 --- /dev/null +++ b/src/handlers/geminiHandlers.js @@ -0,0 +1,2241 @@ +/** + * Gemini API 处理函数模块 + * + * 该模块包含所有 Gemini API 的处理函数,供 geminiRoutes.js 和 standardGeminiRoutes.js 共享使用。 + * 这样可以避免代码重复,确保处理逻辑的一致性。 + */ + +const logger = require('../utils/logger') +const geminiAccountService = require('../services/geminiAccountService') +const geminiApiAccountService = require('../services/geminiApiAccountService') +const { sendGeminiRequest, getAvailableModels } = require('../services/geminiRelayService') +const crypto = require('crypto') +const sessionHelper = require('../utils/sessionHelper') +const unifiedGeminiScheduler = require('../services/unifiedGeminiScheduler') +const apiKeyService = require('../services/apiKeyService') +const { updateRateLimitCounters } = require('../utils/rateLimitHelper') +const { parseSSELine } = require('../utils/sseParser') +const axios = require('axios') +const ProxyHelper = require('../utils/proxyHelper') + +// ============================================================================ +// 工具函数 +// ============================================================================ + +/** + * 生成会话哈希 + */ +function generateSessionHash(req) { + const apiKeyPrefix = + req.headers['x-api-key']?.substring(0, 10) || req.headers['x-goog-api-key']?.substring(0, 10) + + const sessionData = [req.headers['user-agent'], req.ip, apiKeyPrefix].filter(Boolean).join(':') + + return crypto.createHash('sha256').update(sessionData).digest('hex') +} + +/** + * 检查 API Key 权限 + */ +function checkPermissions(apiKeyData, requiredPermission = 'gemini') { + const permissions = apiKeyData?.permissions || 'all' + return permissions === 'all' || permissions === requiredPermission +} + +/** + * 确保请求具有 Gemini 访问权限 + */ +function ensureGeminiPermission(req, res) { + const apiKeyData = req.apiKey || {} + if (checkPermissions(apiKeyData, 'gemini')) { + return true + } + + logger.security( + `🚫 API Key ${apiKeyData.id || 'unknown'} 缺少 Gemini 权限,拒绝访问 ${req.originalUrl}` + ) + + res.status(403).json({ + error: { + message: 'This API key does not have permission to access Gemini', + type: 'permission_denied' + } + }) + return false +} + +/** + * 权限检查中间件 + */ +function ensureGeminiPermissionMiddleware(req, res, next) { + if (ensureGeminiPermission(req, res)) { + return next() + } + return undefined +} + +/** + * 应用速率限制跟踪 + */ +async function applyRateLimitTracking(req, usageSummary, model, context = '') { + if (!req.rateLimitInfo) { + return + } + + const label = context ? ` (${context})` : '' + + try { + const { totalTokens, totalCost } = await updateRateLimitCounters( + req.rateLimitInfo, + usageSummary, + model + ) + + if (totalTokens > 0) { + logger.api(`📊 Updated rate limit token count${label}: +${totalTokens} tokens`) + } + if (typeof totalCost === 'number' && totalCost > 0) { + logger.api(`💰 Updated rate limit cost count${label}: +$${totalCost.toFixed(6)}`) + } + } catch (error) { + logger.error(`❌ Failed to update rate limit counters${label}:`, error) + } +} + +/** + * 判断对象是否为可读流 + */ +function isReadableStream(value) { + return value && typeof value.on === 'function' && typeof value.pipe === 'function' +} + +/** + * 读取可读流内容为字符串 + */ +async function readStreamToString(stream) { + return new Promise((resolve, reject) => { + let result = '' + + try { + if (typeof stream.setEncoding === 'function') { + stream.setEncoding('utf8') + } + } catch (error) { + logger.warn('设置流编码失败:', error) + } + + stream.on('data', (chunk) => { + result += chunk + }) + + stream.on('end', () => { + resolve(result) + }) + + stream.on('error', (error) => { + reject(error) + }) + }) +} + +/** + * 规范化上游 Axios 错误信息 + */ +async function normalizeAxiosStreamError(error) { + const status = error.response?.status + const statusText = error.response?.statusText + const responseData = error.response?.data + let rawBody = null + let parsedBody = null + + if (responseData) { + try { + if (isReadableStream(responseData)) { + rawBody = await readStreamToString(responseData) + } else if (Buffer.isBuffer(responseData)) { + rawBody = responseData.toString('utf8') + } else if (typeof responseData === 'string') { + rawBody = responseData + } else { + rawBody = JSON.stringify(responseData) + } + } catch (streamError) { + logger.warn('读取 Gemini 上游错误流失败:', streamError) + } + } + + if (rawBody) { + if (typeof rawBody === 'string') { + try { + parsedBody = JSON.parse(rawBody) + } catch (parseError) { + parsedBody = rawBody + } + } else { + parsedBody = rawBody + } + } + + let finalMessage = error.message || 'Internal server error' + if (parsedBody && typeof parsedBody === 'object') { + finalMessage = parsedBody.error?.message || parsedBody.message || finalMessage + } else if (typeof parsedBody === 'string' && parsedBody.trim()) { + finalMessage = parsedBody.trim() + } + + return { + status, + statusText, + message: finalMessage, + parsedBody, + rawBody + } +} + +/** + * 解析账户代理配置 + */ +function parseProxyConfig(account) { + let proxyConfig = null + if (account.proxy) { + try { + proxyConfig = typeof account.proxy === 'string' ? JSON.parse(account.proxy) : account.proxy + } catch (e) { + logger.warn('Failed to parse proxy configuration:', e) + } + } + return proxyConfig +} + +// ============================================================================ +// 处理函数 - OpenAI 兼容格式(/messages 端点) +// ============================================================================ + +/** + * 处理 OpenAI 兼容格式的消息请求 + */ +async function handleMessages(req, res) { + const startTime = Date.now() + let abortController = null + let accountId + let accountType + let sessionHash + + try { + const apiKeyData = req.apiKey + + // 检查权限 + if (!checkPermissions(apiKeyData, 'gemini')) { + return res.status(403).json({ + error: { + message: 'This API key does not have permission to access Gemini', + type: 'permission_denied' + } + }) + } + + // 提取请求参数 + const { + messages, + model = 'gemini-2.5-flash', + temperature = 0.7, + max_tokens = 4096, + stream = false + } = req.body + + // 验证必需参数 + if (!messages || !Array.isArray(messages) || messages.length === 0) { + return res.status(400).json({ + error: { + message: 'Messages array is required', + type: 'invalid_request_error' + } + }) + } + + // 生成会话哈希用于粘性会话 + sessionHash = generateSessionHash(req) + + // 使用统一调度选择可用的 Gemini 账户(传递请求的模型) + try { + const schedulerResult = await unifiedGeminiScheduler.selectAccountForApiKey( + apiKeyData, + sessionHash, + model, // 传递请求的模型进行过滤 + { allowApiAccounts: true } // 允许调度 API 账户 + ) + ;({ accountId, accountType } = schedulerResult) + } catch (error) { + logger.error('Failed to select Gemini account:', error) + return res.status(503).json({ + error: { + message: error.message || 'No available Gemini accounts', + type: 'service_unavailable' + } + }) + } + + // 判断账户类型:根据 accountType 判断,而非 accountId 前缀 + const isApiAccount = accountType === 'gemini-api' + + // 获取账户详情 + let account + if (isApiAccount) { + account = await geminiApiAccountService.getAccount(accountId) + if (!account) { + return res.status(503).json({ + error: { + message: 'Gemini API account not found', + type: 'service_unavailable' + } + }) + } + logger.info(`Using Gemini API account: ${account.id} for API key: ${apiKeyData.id}`) + // 标记 API 账户被使用 + await geminiApiAccountService.markAccountUsed(account.id) + } else { + account = await geminiAccountService.getAccount(accountId) + if (!account) { + return res.status(503).json({ + error: { + message: 'Gemini OAuth account not found', + type: 'service_unavailable' + } + }) + } + logger.info(`Using Gemini OAuth account: ${account.id} for API key: ${apiKeyData.id}`) + // 标记 OAuth 账户被使用 + await geminiAccountService.markAccountUsed(account.id) + } + + // 创建中止控制器 + abortController = new AbortController() + + // 处理客户端断开连接 + req.on('close', () => { + if (abortController && !abortController.signal.aborted) { + logger.info('Client disconnected, aborting Gemini request') + abortController.abort() + } + }) + + let geminiResponse + + if (isApiAccount) { + // API 账户:直接调用 Google Gemini API + // 转换 OpenAI 格式的 messages 为 Gemini 格式的 contents + const contents = messages.map((msg) => ({ + role: msg.role === 'assistant' ? 'model' : msg.role, + parts: [{ text: msg.content }] + })) + + const requestBody = { + contents, + generationConfig: { + temperature, + maxOutputTokens: max_tokens, + topP: 0.95, + topK: 40 + } + } + + // 解析代理配置 + const proxyConfig = parseProxyConfig(account) + + const apiUrl = stream + ? `${account.baseUrl}/v1beta/models/${model}:streamGenerateContent?key=${account.apiKey}&alt=sse` + : `${account.baseUrl}/v1beta/models/${model}:generateContent?key=${account.apiKey}` + + const axiosConfig = { + method: 'POST', + url: apiUrl, + data: requestBody, + headers: { + 'Content-Type': 'application/json' + }, + responseType: stream ? 'stream' : 'json', + signal: abortController.signal + } + + // 添加代理配置 + if (proxyConfig) { + const proxyHelper = new ProxyHelper() + axiosConfig.httpsAgent = proxyHelper.createProxyAgent(proxyConfig) + axiosConfig.httpAgent = proxyHelper.createProxyAgent(proxyConfig) + } + + try { + const apiResponse = await axios(axiosConfig) + if (stream) { + geminiResponse = apiResponse.data + } else { + // 转换为 OpenAI 兼容格式 + const geminiData = apiResponse.data + geminiResponse = { + id: crypto.randomUUID(), + object: 'chat.completion', + created: Math.floor(Date.now() / 1000), + model, + choices: [ + { + index: 0, + message: { + role: 'assistant', + content: + geminiData.candidates?.[0]?.content?.parts?.[0]?.text || 'No response generated' + }, + finish_reason: 'stop' + } + ], + usage: { + prompt_tokens: geminiData.usageMetadata?.promptTokenCount || 0, + completion_tokens: geminiData.usageMetadata?.candidatesTokenCount || 0, + total_tokens: geminiData.usageMetadata?.totalTokenCount || 0 + } + } + + // 记录使用统计 + if (geminiData.usageMetadata) { + await apiKeyService.recordUsage( + apiKeyData.id, + geminiData.usageMetadata.promptTokenCount || 0, + geminiData.usageMetadata.candidatesTokenCount || 0, + 0, + 0, + model, + accountId + ) + } + } + } catch (error) { + logger.error('Gemini API request failed:', { + status: error.response?.status, + statusText: error.response?.statusText, + data: error.response?.data + }) + throw error + } + } else { + // OAuth 账户:使用现有的 sendGeminiRequest + geminiResponse = await sendGeminiRequest({ + messages, + model, + temperature, + maxTokens: max_tokens, + stream, + accessToken: account.accessToken, + proxy: account.proxy, + apiKeyId: apiKeyData.id, + signal: abortController.signal, + projectId: account.projectId, + accountId: account.id + }) + } + + if (stream) { + // 设置流式响应头 + res.setHeader('Content-Type', 'text/event-stream') + res.setHeader('Cache-Control', 'no-cache') + res.setHeader('Connection', 'keep-alive') + res.setHeader('X-Accel-Buffering', 'no') + + if (isApiAccount) { + // API 账户:处理 SSE 流并记录使用统计 + let totalUsage = { + promptTokenCount: 0, + candidatesTokenCount: 0, + totalTokenCount: 0 + } + + geminiResponse.on('data', (chunk) => { + try { + const chunkStr = chunk.toString() + res.write(chunkStr) + + // 尝试从 SSE 流中提取 usage 数据 + const lines = chunkStr.split('\n') + for (const line of lines) { + if (line.startsWith('data:')) { + const data = line.substring(5).trim() + if (data && data !== '[DONE]') { + try { + const parsed = JSON.parse(data) + if (parsed.usageMetadata || parsed.response?.usageMetadata) { + totalUsage = parsed.usageMetadata || parsed.response.usageMetadata + } + } catch (e) { + // 解析失败,忽略 + } + } + } + } + } catch (error) { + logger.error('Error processing stream chunk:', error) + } + }) + + geminiResponse.on('end', () => { + res.end() + + // 异步记录使用统计 + if (totalUsage.totalTokenCount > 0) { + apiKeyService + .recordUsage( + apiKeyData.id, + totalUsage.promptTokenCount || 0, + totalUsage.candidatesTokenCount || 0, + 0, + 0, + model, + accountId + ) + .then(() => { + logger.info( + `📊 Recorded Gemini API stream usage - Input: ${totalUsage.promptTokenCount}, Output: ${totalUsage.candidatesTokenCount}` + ) + }) + .catch((error) => { + logger.error('Failed to record Gemini API usage:', error) + }) + } + }) + + geminiResponse.on('error', (error) => { + logger.error('Stream error:', error) + if (!res.headersSent) { + res.status(500).json({ + error: { + message: error.message || 'Stream error', + type: 'api_error' + } + }) + } else { + res.end() + } + }) + } else { + // OAuth 账户:使用原有的流式传输逻辑 + for await (const chunk of geminiResponse) { + if (abortController.signal.aborted) { + break + } + res.write(chunk) + } + res.end() + } + } else { + // 非流式响应 + res.json(geminiResponse) + } + + const duration = Date.now() - startTime + logger.info(`Gemini request completed in ${duration}ms`) + } catch (error) { + logger.error('Gemini request error:', error) + + // 处理速率限制 + const errorStatus = error.response?.status || error.status + if (errorStatus === 429 && accountId) { + try { + const rateLimitAccountType = accountType || 'gemini' + await unifiedGeminiScheduler.markAccountRateLimited( + accountId, + rateLimitAccountType, + sessionHash + ) + logger.warn(`⚠️ Gemini account ${accountId} rate limited (/messages), marking as limited`) + } catch (limitError) { + logger.warn('Failed to mark account as rate limited:', limitError) + } + } + + // 返回错误响应 + const status = errorStatus || 500 + const errorResponse = { + error: error.error || { + message: error.message || 'Internal server error', + type: 'api_error' + } + } + + res.status(status).json(errorResponse) + } finally { + // 清理资源 + if (abortController) { + abortController = null + } + } + return undefined +} + +// ============================================================================ +// 处理函数 - 模型列表和详情 +// ============================================================================ + +/** + * 获取可用模型列表 + */ +async function handleModels(req, res) { + try { + const apiKeyData = req.apiKey + + // 检查权限 + if (!checkPermissions(apiKeyData, 'gemini')) { + return res.status(403).json({ + error: { + message: 'This API key does not have permission to access Gemini', + type: 'permission_denied' + } + }) + } + + // 选择账户获取模型列表 + let account = null + try { + const accountSelection = await unifiedGeminiScheduler.selectAccountForApiKey( + apiKeyData, + null, + null + ) + account = await geminiAccountService.getAccount(accountSelection.accountId) + } catch (error) { + logger.warn('Failed to select Gemini account for models endpoint:', error) + } + + if (!account) { + // 返回默认模型列表 + return res.json({ + object: 'list', + data: [ + { + id: 'gemini-2.5-flash', + object: 'model', + created: Date.now() / 1000, + owned_by: 'google' + } + ] + }) + } + + // 获取模型列表 + const models = await getAvailableModels(account.accessToken, account.proxy) + + res.json({ + object: 'list', + data: models + }) + } catch (error) { + logger.error('Failed to get Gemini models:', error) + res.status(500).json({ + error: { + message: 'Failed to retrieve models', + type: 'api_error' + } + }) + } + return undefined +} + +/** + * 获取模型详情(标准 Gemini API 格式) + */ +function handleModelDetails(req, res) { + const { modelName } = req.params + const version = req.path.includes('v1beta') ? 'v1beta' : 'v1' + logger.info(`Standard Gemini API model details request (${version}): ${modelName}`) + + res.json({ + name: `models/${modelName}`, + version: '001', + displayName: modelName, + description: `Gemini model: ${modelName}`, + inputTokenLimit: 1048576, + outputTokenLimit: 8192, + supportedGenerationMethods: ['generateContent', 'streamGenerateContent', 'countTokens'], + temperature: 1.0, + topP: 0.95, + topK: 40 + }) +} + +// ============================================================================ +// 处理函数 - 使用统计和 API Key 信息 +// ============================================================================ + +/** + * 获取使用情况统计 + */ +async function handleUsage(req, res) { + try { + const { usage } = req.apiKey + + res.json({ + object: 'usage', + total_tokens: usage.total.tokens, + total_requests: usage.total.requests, + daily_tokens: usage.daily.tokens, + daily_requests: usage.daily.requests, + monthly_tokens: usage.monthly.tokens, + monthly_requests: usage.monthly.requests + }) + } catch (error) { + logger.error('Failed to get usage stats:', error) + res.status(500).json({ + error: { + message: 'Failed to retrieve usage statistics', + type: 'api_error' + } + }) + } +} + +/** + * 获取 API Key 信息 + */ +async function handleKeyInfo(req, res) { + try { + const keyData = req.apiKey + + res.json({ + id: keyData.id, + name: keyData.name, + permissions: keyData.permissions || 'all', + token_limit: keyData.tokenLimit, + tokens_used: keyData.usage.total.tokens, + tokens_remaining: + keyData.tokenLimit > 0 + ? Math.max(0, keyData.tokenLimit - keyData.usage.total.tokens) + : null, + rate_limit: { + window: keyData.rateLimitWindow, + requests: keyData.rateLimitRequests + }, + concurrency_limit: keyData.concurrencyLimit, + model_restrictions: { + enabled: keyData.enableModelRestriction, + models: keyData.restrictedModels + } + }) + } catch (error) { + logger.error('Failed to get key info:', error) + res.status(500).json({ + error: { + message: 'Failed to retrieve API key information', + type: 'api_error' + } + }) + } +} + +// ============================================================================ +// 处理函数 - v1internal 格式(Gemini CLI 内部格式) +// ============================================================================ + +/** + * 简单端点处理函数工厂(用于直接转发的端点) + */ +function handleSimpleEndpoint(apiMethod) { + return async (req, res) => { + try { + if (!ensureGeminiPermission(req, res)) { + return undefined + } + + const sessionHash = sessionHelper.generateSessionHash(req.body) + + // 从路径参数或请求体中获取模型名 + const requestedModel = req.body.model || req.params.modelName || 'gemini-2.5-flash' + const { accountId } = await unifiedGeminiScheduler.selectAccountForApiKey( + req.apiKey, + sessionHash, + requestedModel + ) + const account = await geminiAccountService.getAccount(accountId) + const { accessToken, refreshToken } = account + + const version = req.path.includes('v1beta') ? 'v1beta' : 'v1internal' + logger.info(`${apiMethod} request (${version})`, { + apiKeyId: req.apiKey?.id || 'unknown', + requestBody: req.body + }) + + // 解析账户的代理配置 + const proxyConfig = parseProxyConfig(account) + + const client = await geminiAccountService.getOauthClient( + accessToken, + refreshToken, + proxyConfig + ) + + // 直接转发请求体,不做特殊处理 + const response = await geminiAccountService.forwardToCodeAssist( + client, + apiMethod, + req.body, + proxyConfig + ) + + res.json(response) + } catch (error) { + const version = req.path.includes('v1beta') ? 'v1beta' : 'v1internal' + logger.error(`Error in ${apiMethod} endpoint (${version})`, { error: error.message }) + res.status(500).json({ + error: 'Internal server error', + message: error.message + }) + } + } +} + +/** + * 处理 loadCodeAssist 请求 + */ +async function handleLoadCodeAssist(req, res) { + try { + if (!ensureGeminiPermission(req, res)) { + return undefined + } + + const sessionHash = sessionHelper.generateSessionHash(req.body) + + // 从路径参数或请求体中获取模型名 + const requestedModel = req.body.model || req.params.modelName || 'gemini-2.5-flash' + const { accountId } = await unifiedGeminiScheduler.selectAccountForApiKey( + req.apiKey, + sessionHash, + requestedModel + ) + const account = await geminiAccountService.getAccount(accountId) + const { accessToken, refreshToken, projectId } = account + + const { metadata, cloudaicompanionProject } = req.body + + const version = req.path.includes('v1beta') ? 'v1beta' : 'v1internal' + logger.info(`LoadCodeAssist request (${version})`, { + metadata: metadata || {}, + requestedProject: cloudaicompanionProject || null, + accountProject: projectId || null, + apiKeyId: req.apiKey?.id || 'unknown' + }) + + // 解析账户的代理配置 + const proxyConfig = parseProxyConfig(account) + + const client = await geminiAccountService.getOauthClient(accessToken, refreshToken, proxyConfig) + + // 智能处理项目ID + const effectiveProjectId = projectId || cloudaicompanionProject || null + + logger.info('📋 loadCodeAssist项目ID处理逻辑', { + accountProjectId: projectId, + requestProjectId: cloudaicompanionProject, + effectiveProjectId, + decision: projectId + ? '使用账户配置' + : cloudaicompanionProject + ? '使用请求参数' + : '不使用项目ID' + }) + + const response = await geminiAccountService.loadCodeAssist( + client, + effectiveProjectId, + proxyConfig + ) + + // 如果响应中包含 cloudaicompanionProject,保存到账户作为临时项目 ID + if (response.cloudaicompanionProject && !account.projectId) { + await geminiAccountService.updateTempProjectId(accountId, response.cloudaicompanionProject) + logger.info( + `📋 Cached temporary projectId from loadCodeAssist: ${response.cloudaicompanionProject}` + ) + } + + res.json(response) + } catch (error) { + const version = req.path.includes('v1beta') ? 'v1beta' : 'v1internal' + logger.error(`Error in loadCodeAssist endpoint (${version})`, { error: error.message }) + res.status(500).json({ + error: 'Internal server error', + message: error.message + }) + } +} + +/** + * 处理 onboardUser 请求 + */ +async function handleOnboardUser(req, res) { + try { + if (!ensureGeminiPermission(req, res)) { + return undefined + } + + // 提取请求参数 + const { tierId, cloudaicompanionProject, metadata } = req.body + const sessionHash = sessionHelper.generateSessionHash(req.body) + + // 从路径参数或请求体中获取模型名 + const requestedModel = req.body.model || req.params.modelName || 'gemini-2.5-flash' + const { accountId } = await unifiedGeminiScheduler.selectAccountForApiKey( + req.apiKey, + sessionHash, + requestedModel + ) + const account = await geminiAccountService.getAccount(accountId) + const { accessToken, refreshToken, projectId } = account + + const version = req.path.includes('v1beta') ? 'v1beta' : 'v1internal' + logger.info(`OnboardUser request (${version})`, { + tierId: tierId || 'not provided', + requestedProject: cloudaicompanionProject || null, + accountProject: projectId || null, + metadata: metadata || {}, + apiKeyId: req.apiKey?.id || 'unknown' + }) + + // 解析账户的代理配置 + const proxyConfig = parseProxyConfig(account) + + const client = await geminiAccountService.getOauthClient(accessToken, refreshToken, proxyConfig) + + // 智能处理项目ID + const effectiveProjectId = projectId || cloudaicompanionProject || null + + logger.info('📋 onboardUser项目ID处理逻辑', { + accountProjectId: projectId, + requestProjectId: cloudaicompanionProject, + effectiveProjectId, + decision: projectId + ? '使用账户配置' + : cloudaicompanionProject + ? '使用请求参数' + : '不使用项目ID' + }) + + // 如果提供了 tierId,直接调用 onboardUser + if (tierId) { + const response = await geminiAccountService.onboardUser( + client, + tierId, + effectiveProjectId, + metadata, + proxyConfig + ) + + res.json(response) + } else { + // 否则执行完整的 setupUser 流程 + const response = await geminiAccountService.setupUser( + client, + effectiveProjectId, + metadata, + proxyConfig + ) + + res.json(response) + } + } catch (error) { + const version = req.path.includes('v1beta') ? 'v1beta' : 'v1internal' + logger.error(`Error in onboardUser endpoint (${version})`, { error: error.message }) + res.status(500).json({ + error: 'Internal server error', + message: error.message + }) + } +} + +/** + * 处理 countTokens 请求 + */ +async function handleCountTokens(req, res) { + try { + if (!ensureGeminiPermission(req, res)) { + return undefined + } + + // 处理请求体结构,支持直接 contents 或 request.contents + const requestData = req.body.request || req.body + const { contents } = requestData + // 从路径参数或请求体中获取模型名 + const model = requestData.model || req.params.modelName || 'gemini-2.5-flash' + const sessionHash = sessionHelper.generateSessionHash(req.body) + + // 验证必需参数 + if (!contents || !Array.isArray(contents)) { + return res.status(400).json({ + error: { + message: 'Contents array is required', + type: 'invalid_request_error' + } + }) + } + + // 使用统一调度选择账号 + const { accountId } = await unifiedGeminiScheduler.selectAccountForApiKey( + req.apiKey, + sessionHash, + model + ) + const account = await geminiAccountService.getAccount(accountId) + const { accessToken, refreshToken } = account + + const version = req.path.includes('v1beta') ? 'v1beta' : 'v1internal' + logger.info(`CountTokens request (${version})`, { + model, + contentsLength: contents.length, + apiKeyId: req.apiKey?.id || 'unknown' + }) + + // 解析账户的代理配置 + const proxyConfig = parseProxyConfig(account) + + const client = await geminiAccountService.getOauthClient(accessToken, refreshToken, proxyConfig) + const response = await geminiAccountService.countTokens(client, contents, model, proxyConfig) + + res.json(response) + } catch (error) { + const version = req.path.includes('v1beta') ? 'v1beta' : 'v1internal' + logger.error(`Error in countTokens endpoint (${version})`, { error: error.message }) + res.status(500).json({ + error: { + message: error.message || 'Internal server error', + type: 'api_error' + } + }) + } + return undefined +} + +/** + * 处理 generateContent 请求(v1internal 格式) + */ +async function handleGenerateContent(req, res) { + try { + if (!ensureGeminiPermission(req, res)) { + return undefined + } + + const { project, user_prompt_id, request: requestData } = req.body + // 从路径参数或请求体中获取模型名 + const model = req.body.model || req.params.modelName || 'gemini-2.5-flash' + const sessionHash = sessionHelper.generateSessionHash(req.body) + + // 处理不同格式的请求 + let actualRequestData = requestData + if (!requestData) { + if (req.body.messages) { + // 这是 OpenAI 格式的请求,构建 Gemini 格式的 request 对象 + actualRequestData = { + contents: req.body.messages.map((msg) => ({ + role: msg.role === 'assistant' ? 'model' : msg.role, + parts: [{ text: msg.content }] + })), + generationConfig: { + temperature: req.body.temperature !== undefined ? req.body.temperature : 0.7, + maxOutputTokens: req.body.max_tokens !== undefined ? req.body.max_tokens : 4096, + topP: req.body.top_p !== undefined ? req.body.top_p : 0.95, + topK: req.body.top_k !== undefined ? req.body.top_k : 40 + } + } + } else if (req.body.contents) { + // 直接的 Gemini 格式请求(没有 request 包装) + actualRequestData = req.body + } + } + + // 验证必需参数 + if (!actualRequestData || !actualRequestData.contents) { + return res.status(400).json({ + error: { + message: 'Request contents are required', + type: 'invalid_request_error' + } + }) + } + + // 使用统一调度选择账号(v1internal 不允许 API 账户) + const schedulerResult = await unifiedGeminiScheduler.selectAccountForApiKey( + req.apiKey, + sessionHash, + model + ) + const { accountId, accountType } = schedulerResult + + // v1internal 路由只支持 OAuth 账户,不支持 API Key 账户 + if (accountType === 'gemini-api') { + logger.error(`❌ v1internal routes do not support Gemini API accounts. Account: ${accountId}`) + return res.status(400).json({ + error: { + message: + 'This endpoint only supports Gemini OAuth accounts. Gemini API Key accounts are not compatible with v1internal format.', + type: 'invalid_account_type' + } + }) + } + + const account = await geminiAccountService.getAccount(accountId) + if (!account) { + logger.error(`❌ Gemini account not found: ${accountId}`) + return res.status(404).json({ + error: { + message: 'Gemini account not found', + type: 'account_not_found' + } + }) + } + + const { accessToken, refreshToken } = account + + const version = req.path.includes('v1beta') ? 'v1beta' : 'v1internal' + logger.info(`GenerateContent request (${version})`, { + model, + userPromptId: user_prompt_id, + projectId: project || account.projectId, + apiKeyId: req.apiKey?.id || 'unknown' + }) + + // 解析账户的代理配置 + const proxyConfig = parseProxyConfig(account) + + const client = await geminiAccountService.getOauthClient(accessToken, refreshToken, proxyConfig) + + // 智能处理项目ID + const effectiveProjectId = account.projectId || project || null + + logger.info('📋 项目ID处理逻辑', { + accountProjectId: account.projectId, + requestProjectId: project, + effectiveProjectId, + decision: account.projectId ? '使用账户配置' : project ? '使用请求参数' : '不使用项目ID' + }) + + const response = await geminiAccountService.generateContent( + client, + { model, request: actualRequestData }, + user_prompt_id, + effectiveProjectId, + req.apiKey?.id, + proxyConfig + ) + + // 记录使用统计 + if (response?.response?.usageMetadata) { + try { + const usage = response.response.usageMetadata + await apiKeyService.recordUsage( + req.apiKey.id, + usage.promptTokenCount || 0, + usage.candidatesTokenCount || 0, + 0, + 0, + model, + account.id + ) + logger.info( + `📊 Recorded Gemini usage - Input: ${usage.promptTokenCount}, Output: ${usage.candidatesTokenCount}, Total: ${usage.totalTokenCount}` + ) + + await applyRateLimitTracking( + req, + { + inputTokens: usage.promptTokenCount || 0, + outputTokens: usage.candidatesTokenCount || 0, + cacheCreateTokens: 0, + cacheReadTokens: 0 + }, + model, + 'gemini-non-stream' + ) + } catch (error) { + logger.error('Failed to record Gemini usage:', error) + } + } + + res.json(version === 'v1beta' ? response.response : response) + } catch (error) { + const version = req.path.includes('v1beta') ? 'v1beta' : 'v1internal' + logger.error(`Error in generateContent endpoint (${version})`, { + message: error.message, + status: error.response?.status, + statusText: error.response?.statusText, + responseData: error.response?.data, + requestUrl: error.config?.url, + requestMethod: error.config?.method, + stack: error.stack + }) + res.status(500).json({ + error: { + message: error.message || 'Internal server error', + type: 'api_error' + } + }) + } + return undefined +} + +/** + * 处理 streamGenerateContent 请求(v1internal 格式) + */ +async function handleStreamGenerateContent(req, res) { + let abortController = null + + try { + if (!ensureGeminiPermission(req, res)) { + return undefined + } + + const { project, user_prompt_id, request: requestData } = req.body + // 从路径参数或请求体中获取模型名 + const model = req.body.model || req.params.modelName || 'gemini-2.5-flash' + const sessionHash = sessionHelper.generateSessionHash(req.body) + + // 处理不同格式的请求 + let actualRequestData = requestData + if (!requestData) { + if (req.body.messages) { + // 这是 OpenAI 格式的请求,构建 Gemini 格式的 request 对象 + actualRequestData = { + contents: req.body.messages.map((msg) => ({ + role: msg.role === 'assistant' ? 'model' : msg.role, + parts: [{ text: msg.content }] + })), + generationConfig: { + temperature: req.body.temperature !== undefined ? req.body.temperature : 0.7, + maxOutputTokens: req.body.max_tokens !== undefined ? req.body.max_tokens : 4096, + topP: req.body.top_p !== undefined ? req.body.top_p : 0.95, + topK: req.body.top_k !== undefined ? req.body.top_k : 40 + } + } + } else if (req.body.contents) { + // 直接的 Gemini 格式请求(没有 request 包装) + actualRequestData = req.body + } + } + + // 验证必需参数 + if (!actualRequestData || !actualRequestData.contents) { + return res.status(400).json({ + error: { + message: 'Request contents are required', + type: 'invalid_request_error' + } + }) + } + + // 使用统一调度选择账号(v1internal 不允许 API 账户) + const schedulerResult = await unifiedGeminiScheduler.selectAccountForApiKey( + req.apiKey, + sessionHash, + model + ) + const { accountId, accountType } = schedulerResult + + // v1internal 路由只支持 OAuth 账户,不支持 API Key 账户 + if (accountType === 'gemini-api') { + logger.error(`❌ v1internal routes do not support Gemini API accounts. Account: ${accountId}`) + return res.status(400).json({ + error: { + message: + 'This endpoint only supports Gemini OAuth accounts. Gemini API Key accounts are not compatible with v1internal format.', + type: 'invalid_account_type' + } + }) + } + + const account = await geminiAccountService.getAccount(accountId) + if (!account) { + logger.error(`❌ Gemini account not found: ${accountId}`) + return res.status(404).json({ + error: { + message: 'Gemini account not found', + type: 'account_not_found' + } + }) + } + + const { accessToken, refreshToken } = account + + const version = req.path.includes('v1beta') ? 'v1beta' : 'v1internal' + logger.info(`StreamGenerateContent request (${version})`, { + model, + userPromptId: user_prompt_id, + projectId: project || account.projectId, + apiKeyId: req.apiKey?.id || 'unknown' + }) + + // 创建中止控制器 + abortController = new AbortController() + + // 处理客户端断开连接 + req.on('close', () => { + if (abortController && !abortController.signal.aborted) { + logger.info('Client disconnected, aborting stream request') + abortController.abort() + } + }) + + // 解析账户的代理配置 + const proxyConfig = parseProxyConfig(account) + + const client = await geminiAccountService.getOauthClient(accessToken, refreshToken, proxyConfig) + + // 智能处理项目ID + const effectiveProjectId = account.projectId || project || null + + logger.info('📋 流式请求项目ID处理逻辑', { + accountProjectId: account.projectId, + requestProjectId: project, + effectiveProjectId, + decision: account.projectId ? '使用账户配置' : project ? '使用请求参数' : '不使用项目ID' + }) + + const streamResponse = await geminiAccountService.generateContentStream( + client, + { model, request: actualRequestData }, + user_prompt_id, + effectiveProjectId, + req.apiKey?.id, + abortController.signal, + proxyConfig + ) + + // 设置 SSE 响应头 + res.setHeader('Content-Type', 'text/event-stream') + res.setHeader('Cache-Control', 'no-cache') + res.setHeader('Connection', 'keep-alive') + res.setHeader('X-Accel-Buffering', 'no') + + // 处理流式响应并捕获usage数据 + let streamBuffer = '' + let totalUsage = { + promptTokenCount: 0, + candidatesTokenCount: 0, + totalTokenCount: 0 + } + let usageReported = false + + // SSE 心跳机制 + let heartbeatTimer = null + let lastDataTime = Date.now() + const HEARTBEAT_INTERVAL = 15000 + + const sendHeartbeat = () => { + const timeSinceLastData = Date.now() - lastDataTime + if (timeSinceLastData >= HEARTBEAT_INTERVAL && !res.destroyed) { + res.write('\n') + logger.info(`💓 Sent SSE keepalive (gap: ${(timeSinceLastData / 1000).toFixed(1)}s)`) + } + } + + heartbeatTimer = setInterval(sendHeartbeat, HEARTBEAT_INTERVAL) + + streamResponse.on('data', (chunk) => { + try { + lastDataTime = Date.now() + + // 立即转发原始数据 + if (!res.destroyed) { + res.write(chunk) + } + + // 异步提取 usage 数据 + setImmediate(() => { + try { + const chunkStr = chunk.toString() + if (!chunkStr.trim() || !chunkStr.includes('usageMetadata')) { + return + } + + streamBuffer += chunkStr + const lines = streamBuffer.split('\n') + streamBuffer = lines.pop() || '' + + for (const line of lines) { + if (!line.trim() || !line.includes('usageMetadata')) { + continue + } + + try { + const parsed = parseSSELine(line) + if (parsed.type === 'data' && parsed.data.response?.usageMetadata) { + totalUsage = parsed.data.response.usageMetadata + logger.debug('📊 Captured Gemini usage data:', totalUsage) + } + } catch (parseError) { + logger.warn('⚠️ Failed to parse usage line:', parseError.message) + } + } + } catch (error) { + logger.warn('⚠️ Error extracting usage data:', error.message) + } + }) + } catch (error) { + logger.error('Error processing stream chunk:', error) + } + }) + + streamResponse.on('end', () => { + logger.info('Stream completed successfully') + + if (heartbeatTimer) { + clearInterval(heartbeatTimer) + heartbeatTimer = null + } + + res.end() + + // 异步记录使用统计 + if (!usageReported && totalUsage.totalTokenCount > 0) { + Promise.all([ + apiKeyService.recordUsage( + req.apiKey.id, + totalUsage.promptTokenCount || 0, + totalUsage.candidatesTokenCount || 0, + 0, + 0, + model, + account.id + ), + applyRateLimitTracking( + req, + { + inputTokens: totalUsage.promptTokenCount || 0, + outputTokens: totalUsage.candidatesTokenCount || 0, + cacheCreateTokens: 0, + cacheReadTokens: 0 + }, + model, + 'gemini-stream' + ) + ]) + .then(() => { + logger.info( + `📊 Recorded Gemini stream usage - Input: ${totalUsage.promptTokenCount}, Output: ${totalUsage.candidatesTokenCount}, Total: ${totalUsage.totalTokenCount}` + ) + usageReported = true + }) + .catch((error) => { + logger.error('Failed to record Gemini usage:', error) + }) + } + }) + + streamResponse.on('error', (error) => { + logger.error('Stream error:', error) + + if (heartbeatTimer) { + clearInterval(heartbeatTimer) + heartbeatTimer = null + } + + if (!res.headersSent) { + res.status(500).json({ + error: { + message: error.message || 'Stream error', + type: 'api_error' + } + }) + } else { + if (!res.destroyed) { + try { + res.write( + `data: ${JSON.stringify({ + error: { + message: error.message || 'Stream error', + type: 'stream_error', + code: error.code + } + })}\n\n` + ) + res.write('data: [DONE]\n\n') + } catch (writeError) { + logger.error('Error sending error event:', writeError) + } + } + res.end() + } + }) + } catch (error) { + const version = req.path.includes('v1beta') ? 'v1beta' : 'v1internal' + logger.error(`Error in streamGenerateContent endpoint (${version})`, { + message: error.message, + status: error.response?.status, + statusText: error.response?.statusText, + responseData: error.response?.data, + requestUrl: error.config?.url, + requestMethod: error.config?.method, + stack: error.stack + }) + + if (!res.headersSent) { + res.status(500).json({ + error: { + message: error.message || 'Internal server error', + type: 'api_error' + } + }) + } + } finally { + if (abortController) { + abortController = null + } + } + return undefined +} + +// ============================================================================ +// 处理函数 - 标准 Gemini API 格式(/v1beta/models/:model:generateContent 等) +// ============================================================================ + +/** + * 处理标准 Gemini API 格式的 generateContent(支持 OAuth 和 API 账户) + */ +async function handleStandardGenerateContent(req, res) { + let account = null + let sessionHash = null + let accountId = null + let isApiAccount = false + + try { + if (!ensureGeminiPermission(req, res)) { + return undefined + } + + // 从路径参数中获取模型名 + const model = req.params.modelName || 'gemini-2.0-flash-exp' + sessionHash = sessionHelper.generateSessionHash(req.body) + + // 标准 Gemini API 请求体直接包含 contents 等字段 + const { contents, generationConfig, safetySettings, systemInstruction, tools, toolConfig } = + req.body + + // 验证必需参数 + if (!contents || !Array.isArray(contents) || contents.length === 0) { + return res.status(400).json({ + error: { + message: 'Contents array is required', + type: 'invalid_request_error' + } + }) + } + + // 构建内部 API 需要的请求格式 + const actualRequestData = { + contents, + generationConfig: generationConfig || { + temperature: 0.7, + maxOutputTokens: 4096, + topP: 0.95, + topK: 40 + } + } + + // 只有在 safetySettings 存在且非空时才添加 + if (safetySettings && safetySettings.length > 0) { + actualRequestData.safetySettings = safetySettings + } + + // 添加工具配置 + if (tools) { + actualRequestData.tools = tools + } + + if (toolConfig) { + actualRequestData.toolConfig = toolConfig + } + + // 处理 system instruction + if (systemInstruction) { + if (typeof systemInstruction === 'string' && systemInstruction.trim()) { + actualRequestData.systemInstruction = { + role: 'user', + parts: [{ text: systemInstruction }] + } + } else if (systemInstruction.parts && systemInstruction.parts.length > 0) { + const hasContent = systemInstruction.parts.some( + (part) => part.text && part.text.trim() !== '' + ) + if (hasContent) { + actualRequestData.systemInstruction = { + role: 'user', + parts: systemInstruction.parts + } + } + } + } + + // 使用统一调度选择账号 + const schedulerResult = await unifiedGeminiScheduler.selectAccountForApiKey( + req.apiKey, + sessionHash, + model, + { allowApiAccounts: true } + ) + ;({ accountId } = schedulerResult) + const { accountType } = schedulerResult + + isApiAccount = accountType === 'gemini-api' + const actualAccountId = accountId + + const version = req.path.includes('v1beta') ? 'v1beta' : 'v1' + + if (isApiAccount) { + account = await geminiApiAccountService.getAccount(actualAccountId) + if (!account) { + return res.status(404).json({ + error: { + message: 'Gemini API account not found', + type: 'account_not_found' + } + }) + } + + logger.info(`Standard Gemini API generateContent request (${version}) - API Key Account`, { + model, + accountId: actualAccountId, + apiKeyId: req.apiKey?.id || 'unknown' + }) + } else { + account = await geminiAccountService.getAccount(actualAccountId) + + logger.info(`Standard Gemini API generateContent request (${version}) - OAuth Account`, { + model, + projectId: account.projectId, + apiKeyId: req.apiKey?.id || 'unknown' + }) + } + + // 解析账户的代理配置 + const proxyConfig = parseProxyConfig(account) + + let response + + if (isApiAccount) { + // Gemini API 账户:直接使用 API Key 请求 + const apiUrl = `${account.baseUrl}/v1beta/models/${model}:generateContent?key=${account.apiKey}` + + const axiosConfig = { + method: 'POST', + url: apiUrl, + data: actualRequestData, + headers: { + 'Content-Type': 'application/json' + } + } + + if (proxyConfig) { + const proxyHelper = new ProxyHelper() + axiosConfig.httpsAgent = proxyHelper.createProxyAgent(proxyConfig) + axiosConfig.httpAgent = proxyHelper.createProxyAgent(proxyConfig) + } + + try { + const apiResponse = await axios(axiosConfig) + response = { response: apiResponse.data } + } catch (error) { + logger.error('Gemini API request failed:', { + status: error.response?.status, + statusText: error.response?.statusText, + data: error.response?.data + }) + throw error + } + } else { + // OAuth 账户 + const { accessToken, refreshToken } = account + const client = await geminiAccountService.getOauthClient( + accessToken, + refreshToken, + proxyConfig + ) + + let effectiveProjectId = account.projectId || account.tempProjectId || null + + if (!effectiveProjectId) { + try { + logger.info('📋 No projectId available, attempting to fetch from loadCodeAssist...') + const loadResponse = await geminiAccountService.loadCodeAssist(client, null, proxyConfig) + + if (loadResponse.cloudaicompanionProject) { + effectiveProjectId = loadResponse.cloudaicompanionProject + await geminiAccountService.updateTempProjectId(actualAccountId, effectiveProjectId) + logger.info(`📋 Fetched and cached temporary projectId: ${effectiveProjectId}`) + } + } catch (loadError) { + logger.warn('Failed to fetch projectId from loadCodeAssist:', loadError.message) + } + } + + if (!effectiveProjectId) { + return res.status(403).json({ + error: { + message: + 'This account requires a project ID to be configured. Please configure a project ID in the account settings.', + type: 'configuration_required' + } + }) + } + + logger.info('📋 Standard API 项目ID处理逻辑', { + accountProjectId: account.projectId, + tempProjectId: account.tempProjectId, + effectiveProjectId, + decision: account.projectId + ? '使用账户配置' + : account.tempProjectId + ? '使用临时项目ID' + : '从loadCodeAssist获取' + }) + + const userPromptId = `${crypto.randomUUID()}########0` + + response = await geminiAccountService.generateContent( + client, + { model, request: actualRequestData }, + userPromptId, + effectiveProjectId, + req.apiKey?.id, + proxyConfig + ) + } + + // 记录使用统计 + if (response?.response?.usageMetadata) { + try { + const usage = response.response.usageMetadata + await apiKeyService.recordUsage( + req.apiKey.id, + usage.promptTokenCount || 0, + usage.candidatesTokenCount || 0, + 0, + 0, + model, + accountId + ) + logger.info( + `📊 Recorded Gemini usage - Input: ${usage.promptTokenCount}, Output: ${usage.candidatesTokenCount}, Total: ${usage.totalTokenCount}` + ) + } catch (error) { + logger.error('Failed to record Gemini usage:', error) + } + } + + res.json(response.response || response) + } catch (error) { + logger.error(`Error in standard generateContent endpoint`, { + message: error.message, + status: error.response?.status, + statusText: error.response?.statusText, + responseData: error.response?.data, + stack: error.stack + }) + + res.status(500).json({ + error: { + message: error.message || 'Internal server error', + type: 'api_error' + } + }) + } +} + +/** + * 处理标准 Gemini API 格式的 streamGenerateContent(支持 OAuth 和 API 账户) + */ +async function handleStandardStreamGenerateContent(req, res) { + let abortController = null + let account = null + let sessionHash = null + let accountId = null + let isApiAccount = false + + try { + if (!ensureGeminiPermission(req, res)) { + return undefined + } + + // 从路径参数中获取模型名 + const model = req.params.modelName || 'gemini-2.0-flash-exp' + sessionHash = sessionHelper.generateSessionHash(req.body) + + // 标准 Gemini API 请求体直接包含 contents 等字段 + const { contents, generationConfig, safetySettings, systemInstruction, tools, toolConfig } = + req.body + + // 验证必需参数 + if (!contents || !Array.isArray(contents) || contents.length === 0) { + return res.status(400).json({ + error: { + message: 'Contents array is required', + type: 'invalid_request_error' + } + }) + } + + // 构建内部 API 需要的请求格式 + const actualRequestData = { + contents, + generationConfig: generationConfig || { + temperature: 0.7, + maxOutputTokens: 4096, + topP: 0.95, + topK: 40 + } + } + + if (safetySettings && safetySettings.length > 0) { + actualRequestData.safetySettings = safetySettings + } + + if (tools) { + actualRequestData.tools = tools + } + + if (toolConfig) { + actualRequestData.toolConfig = toolConfig + } + + // 处理 system instruction + if (systemInstruction) { + if (typeof systemInstruction === 'string' && systemInstruction.trim()) { + actualRequestData.systemInstruction = { + role: 'user', + parts: [{ text: systemInstruction }] + } + } else if (systemInstruction.parts && systemInstruction.parts.length > 0) { + const hasContent = systemInstruction.parts.some( + (part) => part.text && part.text.trim() !== '' + ) + if (hasContent) { + actualRequestData.systemInstruction = { + role: 'user', + parts: systemInstruction.parts + } + } + } + } + + // 使用统一调度选择账号 + const schedulerResult = await unifiedGeminiScheduler.selectAccountForApiKey( + req.apiKey, + sessionHash, + model, + { allowApiAccounts: true } + ) + ;({ accountId } = schedulerResult) + const { accountType } = schedulerResult + + isApiAccount = accountType === 'gemini-api' + const actualAccountId = accountId + + const version = req.path.includes('v1beta') ? 'v1beta' : 'v1' + + if (isApiAccount) { + account = await geminiApiAccountService.getAccount(actualAccountId) + if (!account) { + return res.status(404).json({ + error: { + message: 'Gemini API account not found', + type: 'account_not_found' + } + }) + } + + logger.info( + `Standard Gemini API streamGenerateContent request (${version}) - API Key Account`, + { + model, + accountId: actualAccountId, + apiKeyId: req.apiKey?.id || 'unknown' + } + ) + } else { + account = await geminiAccountService.getAccount(actualAccountId) + + logger.info( + `Standard Gemini API streamGenerateContent request (${version}) - OAuth Account`, + { + model, + projectId: account.projectId, + apiKeyId: req.apiKey?.id || 'unknown' + } + ) + } + + // 创建中止控制器 + abortController = new AbortController() + + // 处理客户端断开连接 + req.on('close', () => { + if (abortController && !abortController.signal.aborted) { + logger.info('Client disconnected, aborting stream request') + abortController.abort() + } + }) + + // 解析账户的代理配置 + const proxyConfig = parseProxyConfig(account) + + let streamResponse + + if (isApiAccount) { + // Gemini API 账户:直接使用 API Key 请求流式接口 + const apiUrl = `${account.baseUrl}/v1beta/models/${model}:streamGenerateContent?key=${account.apiKey}&alt=sse` + + const axiosConfig = { + method: 'POST', + url: apiUrl, + data: actualRequestData, + headers: { + 'Content-Type': 'application/json' + }, + responseType: 'stream', + signal: abortController.signal + } + + if (proxyConfig) { + const proxyHelper = new ProxyHelper() + axiosConfig.httpsAgent = proxyHelper.createProxyAgent(proxyConfig) + axiosConfig.httpAgent = proxyHelper.createProxyAgent(proxyConfig) + } + + try { + const apiResponse = await axios(axiosConfig) + streamResponse = apiResponse.data + } catch (error) { + logger.error('Gemini API stream request failed:', { + status: error.response?.status, + statusText: error.response?.statusText, + data: error.response?.data + }) + throw error + } + } else { + // OAuth 账户 + const { accessToken, refreshToken } = account + const client = await geminiAccountService.getOauthClient( + accessToken, + refreshToken, + proxyConfig + ) + + let effectiveProjectId = account.projectId || account.tempProjectId || null + + if (!effectiveProjectId) { + try { + logger.info('📋 No projectId available, attempting to fetch from loadCodeAssist...') + const loadResponse = await geminiAccountService.loadCodeAssist(client, null, proxyConfig) + + if (loadResponse.cloudaicompanionProject) { + effectiveProjectId = loadResponse.cloudaicompanionProject + await geminiAccountService.updateTempProjectId(actualAccountId, effectiveProjectId) + logger.info(`📋 Fetched and cached temporary projectId: ${effectiveProjectId}`) + } + } catch (loadError) { + logger.warn('Failed to fetch projectId from loadCodeAssist:', loadError.message) + } + } + + if (!effectiveProjectId) { + return res.status(403).json({ + error: { + message: + 'This account requires a project ID to be configured. Please configure a project ID in the account settings.', + type: 'configuration_required' + } + }) + } + + logger.info('📋 Standard API 流式项目ID处理逻辑', { + accountProjectId: account.projectId, + tempProjectId: account.tempProjectId, + effectiveProjectId, + decision: account.projectId + ? '使用账户配置' + : account.tempProjectId + ? '使用临时项目ID' + : '从loadCodeAssist获取' + }) + + const userPromptId = `${crypto.randomUUID()}########0` + + streamResponse = await geminiAccountService.generateContentStream( + client, + { model, request: actualRequestData }, + userPromptId, + effectiveProjectId, + req.apiKey?.id, + abortController.signal, + proxyConfig + ) + } + + // 设置 SSE 响应头 + res.setHeader('Content-Type', 'text/event-stream') + res.setHeader('Cache-Control', 'no-cache') + res.setHeader('Connection', 'keep-alive') + res.setHeader('X-Accel-Buffering', 'no') + + // 处理流式响应 + let totalUsage = { + promptTokenCount: 0, + candidatesTokenCount: 0, + totalTokenCount: 0 + } + + let heartbeatTimer = null + let lastDataTime = Date.now() + const HEARTBEAT_INTERVAL = 15000 + + const sendHeartbeat = () => { + const timeSinceLastData = Date.now() - lastDataTime + if (timeSinceLastData >= HEARTBEAT_INTERVAL && !res.destroyed) { + res.write('\n') + logger.info(`💓 Sent SSE keepalive (gap: ${(timeSinceLastData / 1000).toFixed(1)}s)`) + } + } + + heartbeatTimer = setInterval(sendHeartbeat, HEARTBEAT_INTERVAL) + + let sseBuffer = '' + + const handleEventBlock = (evt) => { + if (!evt.trim()) { + return + } + + const dataLines = evt.split(/\r?\n/).filter((line) => line.startsWith('data:')) + if (dataLines.length === 0) { + if (!res.destroyed) { + res.write(`${evt}\n\n`) + } + return + } + + const dataPayload = dataLines.map((line) => line.replace(/^data:\s?/, '')).join('\n') + + let processedPayload = null + let parsed = null + + if (dataPayload === '[DONE]') { + processedPayload = '[DONE]' + } else { + try { + parsed = JSON.parse(dataPayload) + + if (parsed.usageMetadata) { + totalUsage = parsed.usageMetadata + } else if (parsed.response?.usageMetadata) { + totalUsage = parsed.response.usageMetadata + } + + processedPayload = JSON.stringify(parsed.response || parsed) + } catch (e) { + // 解析失败,直接转发原始 data + } + } + + const outputChunk = processedPayload === null ? `${evt}\n\n` : `data: ${processedPayload}\n\n` + + if (!res.destroyed) { + res.write(outputChunk) + } + + setImmediate(() => { + try { + const usageSource = + processedPayload && processedPayload !== '[DONE]' ? processedPayload : dataPayload + + if (!usageSource || !usageSource.includes('usageMetadata')) { + return + } + + const usageObj = JSON.parse(usageSource) + const usage = usageObj.usageMetadata || usageObj.response?.usageMetadata || usageObj.usage + + if (usage && typeof usage === 'object') { + totalUsage = usage + logger.debug('📊 Captured Gemini usage data (async):', totalUsage) + } + } catch (error) { + // 提取用量失败时忽略 + } + }) + } + + streamResponse.on('data', (chunk) => { + try { + lastDataTime = Date.now() + + sseBuffer += chunk.toString() + const events = sseBuffer.split(/\r?\n\r?\n/) + sseBuffer = events.pop() || '' + + for (const evt of events) { + handleEventBlock(evt) + } + } catch (error) { + logger.error('Error processing stream chunk:', error) + } + }) + + streamResponse.on('end', () => { + logger.info('Stream completed successfully') + + if (sseBuffer.trim()) { + try { + handleEventBlock(sseBuffer) + } catch (flushError) { + // 忽略 flush 期间的异常 + } + sseBuffer = '' + } + + if (heartbeatTimer) { + clearInterval(heartbeatTimer) + heartbeatTimer = null + } + + res.end() + + if (totalUsage.totalTokenCount > 0) { + apiKeyService + .recordUsage( + req.apiKey.id, + totalUsage.promptTokenCount || 0, + totalUsage.candidatesTokenCount || 0, + 0, + 0, + model, + accountId + ) + .then(() => { + logger.info( + `📊 Recorded Gemini stream usage - Input: ${totalUsage.promptTokenCount}, Output: ${totalUsage.candidatesTokenCount}, Total: ${totalUsage.totalTokenCount}` + ) + }) + .catch((error) => { + logger.error('Failed to record Gemini usage:', error) + }) + } else { + logger.warn( + `⚠️ Stream completed without usage data - totalTokenCount: ${totalUsage.totalTokenCount}` + ) + } + }) + + streamResponse.on('error', (error) => { + logger.error('Stream error:', error) + + if (heartbeatTimer) { + clearInterval(heartbeatTimer) + heartbeatTimer = null + } + + if (!res.headersSent) { + res.status(500).json({ + error: { + message: error.message || 'Stream error', + type: 'api_error' + } + }) + } else { + if (!res.destroyed) { + try { + res.write( + `data: ${JSON.stringify({ + error: { + message: error.message || 'Stream error', + type: 'stream_error', + code: error.code + } + })}\n\n` + ) + res.write('data: [DONE]\n\n') + } catch (writeError) { + logger.error('Error sending error event:', writeError) + } + } + res.end() + } + }) + } catch (error) { + const normalizedError = await normalizeAxiosStreamError(error) + + logger.error(`Error in standard streamGenerateContent endpoint`, { + message: error.message, + status: error.response?.status, + statusText: error.response?.statusText, + responseData: normalizedError.parsedBody || normalizedError.rawBody, + stack: error.stack + }) + + if (!res.headersSent) { + const statusCode = normalizedError.status || 500 + const responseBody = { + error: { + message: normalizedError.message, + type: 'api_error' + } + } + + if (normalizedError.status) { + responseBody.error.upstreamStatus = normalizedError.status + } + if (normalizedError.statusText) { + responseBody.error.upstreamStatusText = normalizedError.statusText + } + if (normalizedError.parsedBody && typeof normalizedError.parsedBody === 'object') { + responseBody.error.upstreamResponse = normalizedError.parsedBody + } else if (normalizedError.rawBody) { + responseBody.error.upstreamRaw = normalizedError.rawBody + } + + return res.status(statusCode).json(responseBody) + } + } finally { + if (abortController) { + abortController = null + } + } +} + +// ============================================================================ +// 导出 +// ============================================================================ + +module.exports = { + // 工具函数 + generateSessionHash, + checkPermissions, + ensureGeminiPermission, + ensureGeminiPermissionMiddleware, + applyRateLimitTracking, + parseProxyConfig, + normalizeAxiosStreamError, + + // OpenAI 兼容格式处理函数 + handleMessages, + + // 模型相关处理函数 + handleModels, + handleModelDetails, + + // 使用统计和 API Key 信息 + handleUsage, + handleKeyInfo, + + // v1internal 格式处理函数 + handleSimpleEndpoint, + handleLoadCodeAssist, + handleOnboardUser, + handleCountTokens, + handleGenerateContent, + handleStreamGenerateContent, + + // 标准 Gemini API 格式处理函数 + handleStandardGenerateContent, + handleStandardStreamGenerateContent +} diff --git a/src/routes/geminiRoutes.js b/src/routes/geminiRoutes.js index 3d74679d..eefeba57 100644 --- a/src/routes/geminiRoutes.js +++ b/src/routes/geminiRoutes.js @@ -1,1431 +1,108 @@ +/** + * Gemini API 路由模块(精简版) + * + * 该模块只包含 geminiRoutes 独有的路由: + * - /messages - OpenAI 兼容格式消息处理 + * - /models - 模型列表 + * - /usage - 使用统计 + * - /key-info - API Key 信息 + * - /v1internal:listExperiments - 实验列表 + * - /v1beta/models/:modelName:listExperiments - 带模型参数的实验列表 + * + * 其他标准 Gemini API 路由由 standardGeminiRoutes.js 处理。 + * 所有处理函数都从 geminiHandlers.js 导入,以避免代码重复。 + */ + const express = require('express') const router = express.Router() -const logger = require('../utils/logger') const { authenticateApiKey } = require('../middleware/auth') -const geminiAccountService = require('../services/geminiAccountService') -const geminiApiAccountService = require('../services/geminiApiAccountService') -const { sendGeminiRequest, getAvailableModels } = require('../services/geminiRelayService') -const crypto = require('crypto') -const sessionHelper = require('../utils/sessionHelper') -const unifiedGeminiScheduler = require('../services/unifiedGeminiScheduler') -const apiKeyService = require('../services/apiKeyService') -const { updateRateLimitCounters } = require('../utils/rateLimitHelper') -const { parseSSELine } = require('../utils/sseParser') -const axios = require('axios') -const ProxyHelper = require('../utils/proxyHelper') -// const { OAuth2Client } = require('google-auth-library'); // OAuth2Client is not used in this file -// 生成会话哈希 -function generateSessionHash(req) { - const apiKeyPrefix = - req.headers['x-api-key']?.substring(0, 10) || req.headers['x-goog-api-key']?.substring(0, 10) - - const sessionData = [req.headers['user-agent'], req.ip, apiKeyPrefix].filter(Boolean).join(':') - - return crypto.createHash('sha256').update(sessionData).digest('hex') -} - -// 检查 API Key 权限 -function checkPermissions(apiKeyData, requiredPermission = 'gemini') { - const permissions = apiKeyData.permissions || 'all' - return permissions === 'all' || permissions === requiredPermission -} - -// 确保请求具有 Gemini 访问权限 -function ensureGeminiPermission(req, res) { - const apiKeyData = req.apiKey || {} - if (checkPermissions(apiKeyData, 'gemini')) { - return true - } - - logger.security( - `🚫 API Key ${apiKeyData.id || 'unknown'} 缺少 Gemini 权限,拒绝访问 ${req.originalUrl}` - ) - - res.status(403).json({ - error: { - message: 'This API key does not have permission to access Gemini', - type: 'permission_denied' - } - }) - return false -} - -async function applyRateLimitTracking(req, usageSummary, model, context = '') { - if (!req.rateLimitInfo) { - return - } - - const label = context ? ` (${context})` : '' - - try { - const { totalTokens, totalCost } = await updateRateLimitCounters( - req.rateLimitInfo, - usageSummary, - model - ) - - if (totalTokens > 0) { - logger.api(`📊 Updated rate limit token count${label}: +${totalTokens} tokens`) - } - if (typeof totalCost === 'number' && totalCost > 0) { - logger.api(`💰 Updated rate limit cost count${label}: +$${totalCost.toFixed(6)}`) - } - } catch (error) { - logger.error(`❌ Failed to update rate limit counters${label}:`, error) - } -} - -// Gemini 消息处理端点 -router.post('/messages', authenticateApiKey, async (req, res) => { - const startTime = Date.now() - let abortController = null - let accountId - let accountType - let sessionHash - - try { - const apiKeyData = req.apiKey - - // 检查权限 - if (!checkPermissions(apiKeyData, 'gemini')) { - return res.status(403).json({ - error: { - message: 'This API key does not have permission to access Gemini', - type: 'permission_denied' - } - }) - } - - // 提取请求参数 - const { - messages, - model = 'gemini-2.5-flash', - temperature = 0.7, - max_tokens = 4096, - stream = false - } = req.body - - // 验证必需参数 - if (!messages || !Array.isArray(messages) || messages.length === 0) { - return res.status(400).json({ - error: { - message: 'Messages array is required', - type: 'invalid_request_error' - } - }) - } - - // 生成会话哈希用于粘性会话 - sessionHash = generateSessionHash(req) - - // 使用统一调度选择可用的 Gemini 账户(传递请求的模型) - try { - const schedulerResult = await unifiedGeminiScheduler.selectAccountForApiKey( - apiKeyData, - sessionHash, - model, // 传递请求的模型进行过滤 - { allowApiAccounts: true } // 允许调度 API 账户 - ) - ;({ accountId, accountType } = schedulerResult) - } catch (error) { - logger.error('Failed to select Gemini account:', error) - return res.status(503).json({ - error: { - message: error.message || 'No available Gemini accounts', - type: 'service_unavailable' - } - }) - } - - // 判断账户类型:根据 accountType 判断,而非 accountId 前缀 - const isApiAccount = accountType === 'gemini-api' - - // 获取账户详情 - let account - if (isApiAccount) { - account = await geminiApiAccountService.getAccount(accountId) - if (!account) { - return res.status(503).json({ - error: { - message: 'Gemini API account not found', - type: 'service_unavailable' - } - }) - } - logger.info(`Using Gemini API account: ${account.id} for API key: ${apiKeyData.id}`) - // 标记 API 账户被使用 - await geminiApiAccountService.markAccountUsed(account.id) - } else { - account = await geminiAccountService.getAccount(accountId) - if (!account) { - return res.status(503).json({ - error: { - message: 'Gemini OAuth account not found', - type: 'service_unavailable' - } - }) - } - logger.info(`Using Gemini OAuth account: ${account.id} for API key: ${apiKeyData.id}`) - // 标记 OAuth 账户被使用 - await geminiAccountService.markAccountUsed(account.id) - } - - // 创建中止控制器 - abortController = new AbortController() - - // 处理客户端断开连接 - req.on('close', () => { - if (abortController && !abortController.signal.aborted) { - logger.info('Client disconnected, aborting Gemini request') - abortController.abort() - } - }) - - let geminiResponse - - if (isApiAccount) { - // API 账户:直接调用 Google Gemini API - // 转换 OpenAI 格式的 messages 为 Gemini 格式的 contents - const contents = messages.map((msg) => ({ - role: msg.role === 'assistant' ? 'model' : msg.role, - parts: [{ text: msg.content }] - })) - - const requestBody = { - contents, - generationConfig: { - temperature, - maxOutputTokens: max_tokens, - topP: 0.95, - topK: 40 - } - } - - // 解析代理配置 - let proxyConfig = null - if (account.proxy) { - try { - proxyConfig = - typeof account.proxy === 'string' ? JSON.parse(account.proxy) : account.proxy - } catch (e) { - logger.warn('Failed to parse proxy configuration:', e) - } - } - - const apiUrl = stream - ? `${account.baseUrl}/v1beta/models/${model}:streamGenerateContent?key=${account.apiKey}&alt=sse` - : `${account.baseUrl}/v1beta/models/${model}:generateContent?key=${account.apiKey}` - - const axiosConfig = { - method: 'POST', - url: apiUrl, - data: requestBody, - headers: { - 'Content-Type': 'application/json' - }, - responseType: stream ? 'stream' : 'json', - signal: abortController.signal - } - - // 添加代理配置 - if (proxyConfig) { - const proxyHelper = new ProxyHelper() - axiosConfig.httpsAgent = proxyHelper.createProxyAgent(proxyConfig) - axiosConfig.httpAgent = proxyHelper.createProxyAgent(proxyConfig) - } - - try { - const apiResponse = await axios(axiosConfig) - if (stream) { - geminiResponse = apiResponse.data - } else { - // 转换为 OpenAI 兼容格式 - const geminiData = apiResponse.data - geminiResponse = { - id: crypto.randomUUID(), - object: 'chat.completion', - created: Math.floor(Date.now() / 1000), - model, - choices: [ - { - index: 0, - message: { - role: 'assistant', - content: - geminiData.candidates?.[0]?.content?.parts?.[0]?.text || 'No response generated' - }, - finish_reason: 'stop' - } - ], - usage: { - prompt_tokens: geminiData.usageMetadata?.promptTokenCount || 0, - completion_tokens: geminiData.usageMetadata?.candidatesTokenCount || 0, - total_tokens: geminiData.usageMetadata?.totalTokenCount || 0 - } - } - - // 记录使用统计 - if (geminiData.usageMetadata) { - await apiKeyService.recordUsage( - apiKeyData.id, - geminiData.usageMetadata.promptTokenCount || 0, - geminiData.usageMetadata.candidatesTokenCount || 0, - 0, - 0, - model, - accountId // 使用原始 accountId(含 api: 前缀) - ) - } - } - } catch (error) { - logger.error('Gemini API request failed:', { - status: error.response?.status, - statusText: error.response?.statusText, - data: error.response?.data - }) - throw error - } - } else { - // OAuth 账户:使用现有的 sendGeminiRequest - geminiResponse = await sendGeminiRequest({ - messages, - model, - temperature, - maxTokens: max_tokens, - stream, - accessToken: account.accessToken, - proxy: account.proxy, - apiKeyId: apiKeyData.id, - signal: abortController.signal, - projectId: account.projectId, - accountId: account.id - }) - } - - if (stream) { - // 设置流式响应头 - res.setHeader('Content-Type', 'text/event-stream') - res.setHeader('Cache-Control', 'no-cache') - res.setHeader('Connection', 'keep-alive') - res.setHeader('X-Accel-Buffering', 'no') - - if (isApiAccount) { - // API 账户:处理 SSE 流并记录使用统计 - let totalUsage = { - promptTokenCount: 0, - candidatesTokenCount: 0, - totalTokenCount: 0 - } - - geminiResponse.on('data', (chunk) => { - try { - const chunkStr = chunk.toString() - res.write(chunkStr) - - // 尝试从 SSE 流中提取 usage 数据 - const lines = chunkStr.split('\n') - for (const line of lines) { - if (line.startsWith('data:')) { - const data = line.substring(5).trim() - if (data && data !== '[DONE]') { - try { - const parsed = JSON.parse(data) - if (parsed.usageMetadata || parsed.response?.usageMetadata) { - totalUsage = parsed.usageMetadata || parsed.response.usageMetadata - } - } catch (e) { - // 解析失败,忽略 - } - } - } - } - } catch (error) { - logger.error('Error processing stream chunk:', error) - } - }) - - geminiResponse.on('end', () => { - res.end() - - // 异步记录使用统计 - if (totalUsage.totalTokenCount > 0) { - apiKeyService - .recordUsage( - apiKeyData.id, - totalUsage.promptTokenCount || 0, - totalUsage.candidatesTokenCount || 0, - 0, - 0, - model, - accountId // 使用原始 accountId(含 api: 前缀) - ) - .then(() => { - logger.info( - `📊 Recorded Gemini API stream usage - Input: ${totalUsage.promptTokenCount}, Output: ${totalUsage.candidatesTokenCount}` - ) - }) - .catch((error) => { - logger.error('Failed to record Gemini API usage:', error) - }) - } - }) - - geminiResponse.on('error', (error) => { - logger.error('Stream error:', error) - if (!res.headersSent) { - res.status(500).json({ - error: { - message: error.message || 'Stream error', - type: 'api_error' - } - }) - } else { - res.end() - } - }) - } else { - // OAuth 账户:使用原有的流式传输逻辑 - for await (const chunk of geminiResponse) { - if (abortController.signal.aborted) { - break - } - res.write(chunk) - } - res.end() - } - } else { - // 非流式响应 - res.json(geminiResponse) - } - - const duration = Date.now() - startTime - logger.info(`Gemini request completed in ${duration}ms`) - } catch (error) { - logger.error('Gemini request error:', error) - - // 处理速率限制 - const errorStatus = error.response?.status || error.status - if (errorStatus === 429 && accountId) { - try { - // 使用已有的 accountType 变量,而非检查前缀 - const rateLimitAccountType = accountType || 'gemini' - await unifiedGeminiScheduler.markAccountRateLimited( - accountId, - rateLimitAccountType, - sessionHash - ) - logger.warn(`⚠️ Gemini account ${accountId} rate limited (/messages), marking as limited`) - } catch (limitError) { - logger.warn('Failed to mark account as rate limited:', limitError) - } - } - - // 返回错误响应 - const status = errorStatus || 500 - const errorResponse = { - error: error.error || { - message: error.message || 'Internal server error', - type: 'api_error' - } - } - - res.status(status).json(errorResponse) - } finally { - // 清理资源 - if (abortController) { - abortController = null - } - } - return undefined -}) - -// 获取可用模型列表 -router.get('/models', authenticateApiKey, async (req, res) => { - try { - const apiKeyData = req.apiKey - - // 检查权限 - if (!checkPermissions(apiKeyData, 'gemini')) { - return res.status(403).json({ - error: { - message: 'This API key does not have permission to access Gemini', - type: 'permission_denied' - } - }) - } - - // 选择账户获取模型列表 - let account = null - try { - const accountSelection = await unifiedGeminiScheduler.selectAccountForApiKey( - apiKeyData, - null, - null - ) - account = await geminiAccountService.getAccount(accountSelection.accountId) - } catch (error) { - logger.warn('Failed to select Gemini account for models endpoint:', error) - } - - if (!account) { - // 返回默认模型列表 - return res.json({ - object: 'list', - data: [ - { - id: 'gemini-2.5-flash', - object: 'model', - created: Date.now() / 1000, - owned_by: 'google' - } - ] - }) - } - - // 获取模型列表 - const models = await getAvailableModels(account.accessToken, account.proxy) - - res.json({ - object: 'list', - data: models - }) - } catch (error) { - logger.error('Failed to get Gemini models:', error) - res.status(500).json({ - error: { - message: 'Failed to retrieve models', - type: 'api_error' - } - }) - } - return undefined -}) - -// 使用情况统计(与 Claude 共用) -router.get('/usage', authenticateApiKey, async (req, res) => { - try { - const { usage } = req.apiKey - - res.json({ - object: 'usage', - total_tokens: usage.total.tokens, - total_requests: usage.total.requests, - daily_tokens: usage.daily.tokens, - daily_requests: usage.daily.requests, - monthly_tokens: usage.monthly.tokens, - monthly_requests: usage.monthly.requests - }) - } catch (error) { - logger.error('Failed to get usage stats:', error) - res.status(500).json({ - error: { - message: 'Failed to retrieve usage statistics', - type: 'api_error' - } - }) - } -}) - -// API Key 信息(与 Claude 共用) -router.get('/key-info', authenticateApiKey, async (req, res) => { - try { - const keyData = req.apiKey - - res.json({ - id: keyData.id, - name: keyData.name, - permissions: keyData.permissions || 'all', - token_limit: keyData.tokenLimit, - tokens_used: keyData.usage.total.tokens, - tokens_remaining: - keyData.tokenLimit > 0 - ? Math.max(0, keyData.tokenLimit - keyData.usage.total.tokens) - : null, - rate_limit: { - window: keyData.rateLimitWindow, - requests: keyData.rateLimitRequests - }, - concurrency_limit: keyData.concurrencyLimit, - model_restrictions: { - enabled: keyData.enableModelRestriction, - models: keyData.restrictedModels - } - }) - } catch (error) { - logger.error('Failed to get key info:', error) - res.status(500).json({ - error: { - message: 'Failed to retrieve API key information', - type: 'api_error' - } - }) - } -}) - -// 通用的简单端点处理函数(用于直接转发的端点) -// 适用于:listExperiments 等不需要特殊业务逻辑的端点 -function handleSimpleEndpoint(apiMethod) { - return async (req, res) => { - try { - if (!ensureGeminiPermission(req, res)) { - return undefined - } - - const sessionHash = sessionHelper.generateSessionHash(req.body) - - // 从路径参数或请求体中获取模型名 - const requestedModel = req.body.model || req.params.modelName || 'gemini-2.5-flash' - const { accountId } = await unifiedGeminiScheduler.selectAccountForApiKey( - req.apiKey, - sessionHash, - requestedModel - ) - const account = await geminiAccountService.getAccount(accountId) - const { accessToken, refreshToken } = account - - const version = req.path.includes('v1beta') ? 'v1beta' : 'v1internal' - logger.info(`${apiMethod} request (${version})`, { - apiKeyId: req.apiKey?.id || 'unknown', - requestBody: req.body - }) - - // 解析账户的代理配置 - let proxyConfig = null - if (account.proxy) { - try { - proxyConfig = - typeof account.proxy === 'string' ? JSON.parse(account.proxy) : account.proxy - } catch (e) { - logger.warn('Failed to parse proxy configuration:', e) - } - } - - const client = await geminiAccountService.getOauthClient( - accessToken, - refreshToken, - proxyConfig - ) - - // 直接转发请求体,不做特殊处理 - const response = await geminiAccountService.forwardToCodeAssist( - client, - apiMethod, - req.body, - proxyConfig - ) - - res.json(response) - } catch (error) { - const version = req.path.includes('v1beta') ? 'v1beta' : 'v1internal' - logger.error(`Error in ${apiMethod} endpoint (${version})`, { error: error.message }) - res.status(500).json({ - error: 'Internal server error', - message: error.message - }) - } - } -} - -// 共用的 loadCodeAssist 处理函数 -async function handleLoadCodeAssist(req, res) { - try { - if (!ensureGeminiPermission(req, res)) { - return undefined - } - - const sessionHash = sessionHelper.generateSessionHash(req.body) - - // 从路径参数或请求体中获取模型名 - const requestedModel = req.body.model || req.params.modelName || 'gemini-2.5-flash' - const { accountId } = await unifiedGeminiScheduler.selectAccountForApiKey( - req.apiKey, - sessionHash, - requestedModel - ) - const account = await geminiAccountService.getAccount(accountId) - const { accessToken, refreshToken, projectId } = account - - const { metadata, cloudaicompanionProject } = req.body - - const version = req.path.includes('v1beta') ? 'v1beta' : 'v1internal' - logger.info(`LoadCodeAssist request (${version})`, { - metadata: metadata || {}, - requestedProject: cloudaicompanionProject || null, - accountProject: projectId || null, - apiKeyId: req.apiKey?.id || 'unknown' - }) - - // 解析账户的代理配置 - let proxyConfig = null - if (account.proxy) { - try { - proxyConfig = typeof account.proxy === 'string' ? JSON.parse(account.proxy) : account.proxy - } catch (e) { - logger.warn('Failed to parse proxy configuration:', e) - } - } - - const client = await geminiAccountService.getOauthClient(accessToken, refreshToken, proxyConfig) - - // 智能处理项目ID: - // 1. 如果账户配置了项目ID -> 使用账户的项目ID(覆盖请求中的) - // 2. 如果账户没有项目ID -> 使用请求中的cloudaicompanionProject - // 3. 都没有 -> 传null - const effectiveProjectId = projectId || cloudaicompanionProject || null - - logger.info('📋 loadCodeAssist项目ID处理逻辑', { - accountProjectId: projectId, - requestProjectId: cloudaicompanionProject, - effectiveProjectId, - decision: projectId - ? '使用账户配置' - : cloudaicompanionProject - ? '使用请求参数' - : '不使用项目ID' - }) - - const response = await geminiAccountService.loadCodeAssist( - client, - effectiveProjectId, - proxyConfig - ) - - // 如果响应中包含 cloudaicompanionProject,保存到账户作为临时项目 ID - if (response.cloudaicompanionProject && !account.projectId) { - await geminiAccountService.updateTempProjectId(accountId, response.cloudaicompanionProject) - logger.info( - `📋 Cached temporary projectId from loadCodeAssist: ${response.cloudaicompanionProject}` - ) - } - - res.json(response) - } catch (error) { - const version = req.path.includes('v1beta') ? 'v1beta' : 'v1internal' - logger.error(`Error in loadCodeAssist endpoint (${version})`, { error: error.message }) - res.status(500).json({ - error: 'Internal server error', - message: error.message - }) - } -} - -// 共用的 onboardUser 处理函数 -async function handleOnboardUser(req, res) { - try { - if (!ensureGeminiPermission(req, res)) { - return undefined - } - - // 提取请求参数 - const { tierId, cloudaicompanionProject, metadata } = req.body - const sessionHash = sessionHelper.generateSessionHash(req.body) - - // 从路径参数或请求体中获取模型名 - const requestedModel = req.body.model || req.params.modelName || 'gemini-2.5-flash' - const { accountId } = await unifiedGeminiScheduler.selectAccountForApiKey( - req.apiKey, - sessionHash, - requestedModel - ) - const account = await geminiAccountService.getAccount(accountId) - const { accessToken, refreshToken, projectId } = account - - const version = req.path.includes('v1beta') ? 'v1beta' : 'v1internal' - logger.info(`OnboardUser request (${version})`, { - tierId: tierId || 'not provided', - requestedProject: cloudaicompanionProject || null, - accountProject: projectId || null, - metadata: metadata || {}, - apiKeyId: req.apiKey?.id || 'unknown' - }) - - // 解析账户的代理配置 - let proxyConfig = null - if (account.proxy) { - try { - proxyConfig = typeof account.proxy === 'string' ? JSON.parse(account.proxy) : account.proxy - } catch (e) { - logger.warn('Failed to parse proxy configuration:', e) - } - } - - const client = await geminiAccountService.getOauthClient(accessToken, refreshToken, proxyConfig) - - // 智能处理项目ID: - // 1. 如果账户配置了项目ID -> 使用账户的项目ID(覆盖请求中的) - // 2. 如果账户没有项目ID -> 使用请求中的cloudaicompanionProject - // 3. 都没有 -> 传null - const effectiveProjectId = projectId || cloudaicompanionProject || null - - logger.info('📋 onboardUser项目ID处理逻辑', { - accountProjectId: projectId, - requestProjectId: cloudaicompanionProject, - effectiveProjectId, - decision: projectId - ? '使用账户配置' - : cloudaicompanionProject - ? '使用请求参数' - : '不使用项目ID' - }) - - // 如果提供了 tierId,直接调用 onboardUser - if (tierId) { - const response = await geminiAccountService.onboardUser( - client, - tierId, - effectiveProjectId, // 使用处理后的项目ID - metadata, - proxyConfig - ) - - res.json(response) - } else { - // 否则执行完整的 setupUser 流程 - const response = await geminiAccountService.setupUser( - client, - effectiveProjectId, // 使用处理后的项目ID - metadata, - proxyConfig - ) - - res.json(response) - } - } catch (error) { - const version = req.path.includes('v1beta') ? 'v1beta' : 'v1internal' - logger.error(`Error in onboardUser endpoint (${version})`, { error: error.message }) - res.status(500).json({ - error: 'Internal server error', - message: error.message - }) - } -} - -// 共用的 countTokens 处理函数 -async function handleCountTokens(req, res) { - try { - if (!ensureGeminiPermission(req, res)) { - return undefined - } - - // 处理请求体结构,支持直接 contents 或 request.contents - const requestData = req.body.request || req.body - const { contents } = requestData - // 从路径参数或请求体中获取模型名 - const model = requestData.model || req.params.modelName || 'gemini-2.5-flash' - const sessionHash = sessionHelper.generateSessionHash(req.body) - - // 验证必需参数 - if (!contents || !Array.isArray(contents)) { - return res.status(400).json({ - error: { - message: 'Contents array is required', - type: 'invalid_request_error' - } - }) - } - - // 使用统一调度选择账号 - const { accountId } = await unifiedGeminiScheduler.selectAccountForApiKey( - req.apiKey, - sessionHash, - model - ) - const account = await geminiAccountService.getAccount(accountId) - const { accessToken, refreshToken } = account - - const version = req.path.includes('v1beta') ? 'v1beta' : 'v1internal' - logger.info(`CountTokens request (${version})`, { - model, - contentsLength: contents.length, - apiKeyId: req.apiKey?.id || 'unknown' - }) - - // 解析账户的代理配置 - let proxyConfig = null - if (account.proxy) { - try { - proxyConfig = typeof account.proxy === 'string' ? JSON.parse(account.proxy) : account.proxy - } catch (e) { - logger.warn('Failed to parse proxy configuration:', e) - } - } - - const client = await geminiAccountService.getOauthClient(accessToken, refreshToken, proxyConfig) - const response = await geminiAccountService.countTokens(client, contents, model, proxyConfig) - - res.json(response) - } catch (error) { - const version = req.path.includes('v1beta') ? 'v1beta' : 'v1internal' - logger.error(`Error in countTokens endpoint (${version})`, { error: error.message }) - res.status(500).json({ - error: { - message: error.message || 'Internal server error', - type: 'api_error' - } - }) - } - return undefined -} - -// 共用的 generateContent 处理函数 -async function handleGenerateContent(req, res) { - try { - if (!ensureGeminiPermission(req, res)) { - return undefined - } - - const { project, user_prompt_id, request: requestData } = req.body - // 从路径参数或请求体中获取模型名 - const model = req.body.model || req.params.modelName || 'gemini-2.5-flash' - const sessionHash = sessionHelper.generateSessionHash(req.body) - - // 处理不同格式的请求 - let actualRequestData = requestData - if (!requestData) { - if (req.body.messages) { - // 这是 OpenAI 格式的请求,构建 Gemini 格式的 request 对象 - actualRequestData = { - contents: req.body.messages.map((msg) => ({ - role: msg.role === 'assistant' ? 'model' : msg.role, - parts: [{ text: msg.content }] - })), - generationConfig: { - temperature: req.body.temperature !== undefined ? req.body.temperature : 0.7, - maxOutputTokens: req.body.max_tokens !== undefined ? req.body.max_tokens : 4096, - topP: req.body.top_p !== undefined ? req.body.top_p : 0.95, - topK: req.body.top_k !== undefined ? req.body.top_k : 40 - } - } - } else if (req.body.contents) { - // 直接的 Gemini 格式请求(没有 request 包装) - actualRequestData = req.body - } - } - - // 验证必需参数 - if (!actualRequestData || !actualRequestData.contents) { - return res.status(400).json({ - error: { - message: 'Request contents are required', - type: 'invalid_request_error' - } - }) - } - - // 使用统一调度选择账号(v1internal 不允许 API 账户) - const schedulerResult = await unifiedGeminiScheduler.selectAccountForApiKey( - req.apiKey, - sessionHash, - model - // 不传 allowApiAccounts: true,所以不会调度 API 账户 - ) - const { accountId, accountType } = schedulerResult - - // v1internal 路由只支持 OAuth 账户,不支持 API Key 账户 - if (accountType === 'gemini-api') { - logger.error(`❌ v1internal routes do not support Gemini API accounts. Account: ${accountId}`) - return res.status(400).json({ - error: { - message: - 'This endpoint only supports Gemini OAuth accounts. Gemini API Key accounts are not compatible with v1internal format.', - type: 'invalid_account_type' - } - }) - } - - const account = await geminiAccountService.getAccount(accountId) - if (!account) { - logger.error(`❌ Gemini account not found: ${accountId}`) - return res.status(404).json({ - error: { - message: 'Gemini account not found', - type: 'account_not_found' - } - }) - } - - const { accessToken, refreshToken } = account - - const version = req.path.includes('v1beta') ? 'v1beta' : 'v1internal' - logger.info(`GenerateContent request (${version})`, { - model, - userPromptId: user_prompt_id, - projectId: project || account.projectId, - apiKeyId: req.apiKey?.id || 'unknown' - }) - - // 解析账户的代理配置 - let proxyConfig = null - if (account.proxy) { - try { - proxyConfig = typeof account.proxy === 'string' ? JSON.parse(account.proxy) : account.proxy - } catch (e) { - logger.warn('Failed to parse proxy configuration:', e) - } - } - - const client = await geminiAccountService.getOauthClient(accessToken, refreshToken, proxyConfig) - - // 智能处理项目ID: - // 1. 如果账户配置了项目ID -> 使用账户的项目ID(覆盖请求中的) - // 2. 如果账户没有项目ID -> 使用请求中的项目ID(如果有的话) - // 3. 都没有 -> 传null - const effectiveProjectId = account.projectId || project || null - - logger.info('📋 项目ID处理逻辑', { - accountProjectId: account.projectId, - requestProjectId: project, - effectiveProjectId, - decision: account.projectId ? '使用账户配置' : project ? '使用请求参数' : '不使用项目ID' - }) - - const response = await geminiAccountService.generateContent( - client, - { model, request: actualRequestData }, - user_prompt_id, - effectiveProjectId, // 使用智能决策的项目ID - req.apiKey?.id, // 使用 API Key ID 作为 session ID - proxyConfig // 传递代理配置 - ) - - // 记录使用统计 - if (response?.response?.usageMetadata) { - try { - const usage = response.response.usageMetadata - await apiKeyService.recordUsage( - req.apiKey.id, - usage.promptTokenCount || 0, - usage.candidatesTokenCount || 0, - 0, // cacheCreateTokens - 0, // cacheReadTokens - model, - account.id - ) - logger.info( - `📊 Recorded Gemini usage - Input: ${usage.promptTokenCount}, Output: ${usage.candidatesTokenCount}, Total: ${usage.totalTokenCount}` - ) - - await applyRateLimitTracking( - req, - { - inputTokens: usage.promptTokenCount || 0, - outputTokens: usage.candidatesTokenCount || 0, - cacheCreateTokens: 0, - cacheReadTokens: 0 - }, - model, - 'gemini-non-stream' - ) - } catch (error) { - logger.error('Failed to record Gemini usage:', error) - } - } - - res.json(version === 'v1beta' ? response.response : response) - } catch (error) { - const version = req.path.includes('v1beta') ? 'v1beta' : 'v1internal' - // 打印详细的错误信息 - logger.error(`Error in generateContent endpoint (${version})`, { - message: error.message, - status: error.response?.status, - statusText: error.response?.statusText, - responseData: error.response?.data, - requestUrl: error.config?.url, - requestMethod: error.config?.method, - stack: error.stack - }) - res.status(500).json({ - error: { - message: error.message || 'Internal server error', - type: 'api_error' - } - }) - } - return undefined -} - -// 共用的 streamGenerateContent 处理函数 -async function handleStreamGenerateContent(req, res) { - let abortController = null - - try { - if (!ensureGeminiPermission(req, res)) { - return undefined - } - - const { project, user_prompt_id, request: requestData } = req.body - // 从路径参数或请求体中获取模型名 - const model = req.body.model || req.params.modelName || 'gemini-2.5-flash' - const sessionHash = sessionHelper.generateSessionHash(req.body) - - // 处理不同格式的请求 - let actualRequestData = requestData - if (!requestData) { - if (req.body.messages) { - // 这是 OpenAI 格式的请求,构建 Gemini 格式的 request 对象 - actualRequestData = { - contents: req.body.messages.map((msg) => ({ - role: msg.role === 'assistant' ? 'model' : msg.role, - parts: [{ text: msg.content }] - })), - generationConfig: { - temperature: req.body.temperature !== undefined ? req.body.temperature : 0.7, - maxOutputTokens: req.body.max_tokens !== undefined ? req.body.max_tokens : 4096, - topP: req.body.top_p !== undefined ? req.body.top_p : 0.95, - topK: req.body.top_k !== undefined ? req.body.top_k : 40 - } - } - } else if (req.body.contents) { - // 直接的 Gemini 格式请求(没有 request 包装) - actualRequestData = req.body - } - } - - // 验证必需参数 - if (!actualRequestData || !actualRequestData.contents) { - return res.status(400).json({ - error: { - message: 'Request contents are required', - type: 'invalid_request_error' - } - }) - } - - // 使用统一调度选择账号(v1internal 不允许 API 账户) - const schedulerResult = await unifiedGeminiScheduler.selectAccountForApiKey( - req.apiKey, - sessionHash, - model - // 不传 allowApiAccounts: true,所以不会调度 API 账户 - ) - const { accountId, accountType } = schedulerResult - - // v1internal 路由只支持 OAuth 账户,不支持 API Key 账户 - if (accountType === 'gemini-api') { - logger.error(`❌ v1internal routes do not support Gemini API accounts. Account: ${accountId}`) - return res.status(400).json({ - error: { - message: - 'This endpoint only supports Gemini OAuth accounts. Gemini API Key accounts are not compatible with v1internal format.', - type: 'invalid_account_type' - } - }) - } - - const account = await geminiAccountService.getAccount(accountId) - if (!account) { - logger.error(`❌ Gemini account not found: ${accountId}`) - return res.status(404).json({ - error: { - message: 'Gemini account not found', - type: 'account_not_found' - } - }) - } - - const { accessToken, refreshToken } = account - - const version = req.path.includes('v1beta') ? 'v1beta' : 'v1internal' - logger.info(`StreamGenerateContent request (${version})`, { - model, - userPromptId: user_prompt_id, - projectId: project || account.projectId, - apiKeyId: req.apiKey?.id || 'unknown' - }) - - // 创建中止控制器 - abortController = new AbortController() - - // 处理客户端断开连接 - req.on('close', () => { - if (abortController && !abortController.signal.aborted) { - logger.info('Client disconnected, aborting stream request') - abortController.abort() - } - }) - - // 解析账户的代理配置 - let proxyConfig = null - if (account.proxy) { - try { - proxyConfig = typeof account.proxy === 'string' ? JSON.parse(account.proxy) : account.proxy - } catch (e) { - logger.warn('Failed to parse proxy configuration:', e) - } - } - - const client = await geminiAccountService.getOauthClient(accessToken, refreshToken, proxyConfig) - - // 智能处理项目ID: - // 1. 如果账户配置了项目ID -> 使用账户的项目ID(覆盖请求中的) - // 2. 如果账户没有项目ID -> 使用请求中的项目ID(如果有的话) - // 3. 都没有 -> 传null - const effectiveProjectId = account.projectId || project || null - - logger.info('📋 流式请求项目ID处理逻辑', { - accountProjectId: account.projectId, - requestProjectId: project, - effectiveProjectId, - decision: account.projectId ? '使用账户配置' : project ? '使用请求参数' : '不使用项目ID' - }) - - const streamResponse = await geminiAccountService.generateContentStream( - client, - { model, request: actualRequestData }, - user_prompt_id, - effectiveProjectId, // 使用智能决策的项目ID - req.apiKey?.id, // 使用 API Key ID 作为 session ID - abortController.signal, // 传递中止信号 - proxyConfig // 传递代理配置 - ) - - // 设置 SSE 响应头 - res.setHeader('Content-Type', 'text/event-stream') - res.setHeader('Cache-Control', 'no-cache') - res.setHeader('Connection', 'keep-alive') - res.setHeader('X-Accel-Buffering', 'no') - - // 处理流式响应并捕获usage数据 - // 方案 A++:透明转发 + 异步 usage 提取 + SSE 心跳机制 - let streamBuffer = '' // 缓冲区用于处理不完整的行 - let totalUsage = { - promptTokenCount: 0, - candidatesTokenCount: 0, - totalTokenCount: 0 - } - let usageReported = false // 修复:改为 let 以便后续修改 - - // SSE 心跳机制:防止 Clash 等代理 120 秒超时 - let heartbeatTimer = null - let lastDataTime = Date.now() - const HEARTBEAT_INTERVAL = 15000 // 15 秒 - - const sendHeartbeat = () => { - const timeSinceLastData = Date.now() - lastDataTime - if (timeSinceLastData >= HEARTBEAT_INTERVAL && !res.destroyed) { - res.write('\n') // 发送空行保持连接活跃 - logger.info(`💓 Sent SSE keepalive (gap: ${(timeSinceLastData / 1000).toFixed(1)}s)`) - } - } - - heartbeatTimer = setInterval(sendHeartbeat, HEARTBEAT_INTERVAL) - - streamResponse.on('data', (chunk) => { - try { - // 更新最后数据时间 - lastDataTime = Date.now() - - // 1️⃣ 立即转发原始数据(零延迟,最高优先级) - // 对所有版本(v1beta 和 v1internal)都采用透明转发 - if (!res.destroyed) { - res.write(chunk) // 直接转发 Buffer,无需转换和序列化 - } - - // 2️⃣ 异步提取 usage 数据(不阻塞转发) - // 使用 setImmediate 将解析放到下一个事件循环 - setImmediate(() => { - try { - const chunkStr = chunk.toString() - if (!chunkStr.trim()) { - return - } - - // 快速检查是否包含 usage 数据(避免不必要的解析) - if (!chunkStr.includes('usageMetadata')) { - return - } - - // 处理不完整的行 - streamBuffer += chunkStr - const lines = streamBuffer.split('\n') - streamBuffer = lines.pop() || '' - - // 仅解析包含 usage 的行 - for (const line of lines) { - if (!line.trim() || !line.includes('usageMetadata')) { - continue - } - - try { - const parsed = parseSSELine(line) - if (parsed.type === 'data' && parsed.data.response?.usageMetadata) { - totalUsage = parsed.data.response.usageMetadata - logger.debug('📊 Captured Gemini usage data:', totalUsage) - } - } catch (parseError) { - // 解析失败但不影响转发 - logger.warn('⚠️ Failed to parse usage line:', parseError.message) - } - } - } catch (error) { - // 提取失败但不影响转发 - logger.warn('⚠️ Error extracting usage data:', error.message) - } - }) - } catch (error) { - logger.error('Error processing stream chunk:', error) - // 不中断流,继续处理后续数据 - } - }) - - streamResponse.on('end', () => { - logger.info('Stream completed successfully') - - // 清理心跳定时器 - if (heartbeatTimer) { - clearInterval(heartbeatTimer) - heartbeatTimer = null - } - - // 立即结束响应,不阻塞 - res.end() - - // 异步记录使用统计(不阻塞响应) - if (!usageReported && totalUsage.totalTokenCount > 0) { - Promise.all([ - apiKeyService.recordUsage( - req.apiKey.id, - totalUsage.promptTokenCount || 0, - totalUsage.candidatesTokenCount || 0, - 0, // cacheCreateTokens - 0, // cacheReadTokens - model, - account.id - ), - applyRateLimitTracking( - req, - { - inputTokens: totalUsage.promptTokenCount || 0, - outputTokens: totalUsage.candidatesTokenCount || 0, - cacheCreateTokens: 0, - cacheReadTokens: 0 - }, - model, - 'gemini-stream' - ) - ]) - .then(() => { - logger.info( - `📊 Recorded Gemini stream usage - Input: ${totalUsage.promptTokenCount}, Output: ${totalUsage.candidatesTokenCount}, Total: ${totalUsage.totalTokenCount}` - ) - usageReported = true - }) - .catch((error) => { - logger.error('Failed to record Gemini usage:', error) - }) - } - }) - - streamResponse.on('error', (error) => { - logger.error('Stream error:', error) - - // 清理心跳定时器 - if (heartbeatTimer) { - clearInterval(heartbeatTimer) - heartbeatTimer = null - } - - if (!res.headersSent) { - // 如果还没发送响应头,可以返回正常的错误响应 - res.status(500).json({ - error: { - message: error.message || 'Stream error', - type: 'api_error' - } - }) - } else { - // 如果已经开始流式传输,发送 SSE 格式的错误事件和结束标记 - // 这样客户端可以正确识别流的结束,避免 "Premature close" 错误 - if (!res.destroyed) { - try { - // 发送错误事件(SSE 格式) - res.write( - `data: ${JSON.stringify({ - error: { - message: error.message || 'Stream error', - type: 'stream_error', - code: error.code - } - })}\n\n` - ) - - // 发送 SSE 结束标记 - res.write('data: [DONE]\n\n') - } catch (writeError) { - logger.error('Error sending error event:', writeError) - } - } - res.end() - } - }) - } catch (error) { - const version = req.path.includes('v1beta') ? 'v1beta' : 'v1internal' - // 打印详细的错误信息 - logger.error(`Error in streamGenerateContent endpoint (${version})`, { - message: error.message, - status: error.response?.status, - statusText: error.response?.statusText, - responseData: error.response?.data, - requestUrl: error.config?.url, - requestMethod: error.config?.method, - stack: error.stack - }) - - if (!res.headersSent) { - res.status(500).json({ - error: { - message: error.message || 'Internal server error', - type: 'api_error' - } - }) - } - } finally { - // 清理资源 - if (abortController) { - abortController = null - } - } - return undefined -} - -// 注册所有路由端点 -// v1internal 版本的端点 -router.post('/v1internal\\:loadCodeAssist', authenticateApiKey, handleLoadCodeAssist) -router.post('/v1internal\\:onboardUser', authenticateApiKey, handleOnboardUser) -router.post('/v1internal\\:countTokens', authenticateApiKey, handleCountTokens) -router.post('/v1internal\\:generateContent', authenticateApiKey, handleGenerateContent) -router.post('/v1internal\\:streamGenerateContent', authenticateApiKey, handleStreamGenerateContent) +// 从 handlers/geminiHandlers.js 导入所有处理函数 +const { + handleMessages, + handleModels, + handleUsage, + handleKeyInfo, + handleSimpleEndpoint, + // 以下函数需要导出供其他模块使用(如 unified.js) + handleGenerateContent, + handleStreamGenerateContent, + handleLoadCodeAssist, + handleOnboardUser, + handleCountTokens, + handleStandardGenerateContent, + handleStandardStreamGenerateContent, + ensureGeminiPermissionMiddleware +} = require('../handlers/geminiHandlers') + +// ============================================================================ +// OpenAI 兼容格式路由 +// ============================================================================ + +/** + * POST /messages + * OpenAI 兼容格式的消息处理端点 + */ +router.post('/messages', authenticateApiKey, handleMessages) + +// ============================================================================ +// 模型和信息路由 +// ============================================================================ + +/** + * GET /models + * 获取可用模型列表 + */ +router.get('/models', authenticateApiKey, handleModels) + +/** + * GET /usage + * 获取使用情况统计 + */ +router.get('/usage', authenticateApiKey, handleUsage) + +/** + * GET /key-info + * 获取 API Key 信息 + */ +router.get('/key-info', authenticateApiKey, handleKeyInfo) + +// ============================================================================ +// v1internal 独有路由(listExperiments) +// ============================================================================ + +/** + * POST /v1internal:listExperiments + * 列出实验(只有 geminiRoutes 定义此路由) + */ router.post( '/v1internal\\:listExperiments', authenticateApiKey, handleSimpleEndpoint('listExperiments') ) -// v1beta 版本的端点 - 支持动态模型名称 -router.post('/v1beta/models/:modelName\\:loadCodeAssist', authenticateApiKey, handleLoadCodeAssist) -router.post('/v1beta/models/:modelName\\:onboardUser', authenticateApiKey, handleOnboardUser) -router.post('/v1beta/models/:modelName\\:countTokens', authenticateApiKey, handleCountTokens) -router.post( - '/v1beta/models/:modelName\\:generateContent', - authenticateApiKey, - handleGenerateContent -) -router.post( - '/v1beta/models/:modelName\\:streamGenerateContent', - authenticateApiKey, - handleStreamGenerateContent -) +/** + * POST /v1beta/models/:modelName:listExperiments + * 带模型参数的实验列表(只有 geminiRoutes 定义此路由) + */ router.post( '/v1beta/models/:modelName\\:listExperiments', authenticateApiKey, handleSimpleEndpoint('listExperiments') ) -// 导出处理函数供标准路由使用 +// ============================================================================ +// 导出 +// ============================================================================ + module.exports = router + +// 导出处理函数供其他模块使用(如 unified.js、standardGeminiRoutes.js) module.exports.handleLoadCodeAssist = handleLoadCodeAssist module.exports.handleOnboardUser = handleOnboardUser module.exports.handleCountTokens = handleCountTokens module.exports.handleGenerateContent = handleGenerateContent module.exports.handleStreamGenerateContent = handleStreamGenerateContent +module.exports.handleStandardGenerateContent = handleStandardGenerateContent +module.exports.handleStandardStreamGenerateContent = handleStandardStreamGenerateContent +module.exports.ensureGeminiPermissionMiddleware = ensureGeminiPermissionMiddleware diff --git a/src/routes/standardGeminiRoutes.js b/src/routes/standardGeminiRoutes.js index ab29752a..fefe611a 100644 --- a/src/routes/standardGeminiRoutes.js +++ b/src/routes/standardGeminiRoutes.js @@ -1,958 +1,46 @@ +/** + * 标准 Gemini API 路由模块 + * + * 该模块处理标准 Gemini API 格式的请求: + * - v1beta/models/:modelName:generateContent + * - v1beta/models/:modelName:streamGenerateContent + * - v1beta/models/:modelName:countTokens + * - v1beta/models/:modelName:loadCodeAssist + * - v1beta/models/:modelName:onboardUser + * - v1/models/:modelName:* (同上) + * - v1internal:* (内部格式) + * - v1beta/models, v1/models (模型列表) + * - v1beta/models/:modelName, v1/models/:modelName (模型详情) + * + * 所有处理函数都从 geminiHandlers.js 导入,以避免代码重复。 + */ + const express = require('express') const router = express.Router() const { authenticateApiKey } = require('../middleware/auth') const logger = require('../utils/logger') -const geminiAccountService = require('../services/geminiAccountService') -const geminiApiAccountService = require('../services/geminiApiAccountService') -const unifiedGeminiScheduler = require('../services/unifiedGeminiScheduler') -const apiKeyService = require('../services/apiKeyService') -const sessionHelper = require('../utils/sessionHelper') -const axios = require('axios') -const ProxyHelper = require('../utils/proxyHelper') - -// 导入 geminiRoutes 中导出的处理函数 -const { handleLoadCodeAssist, handleOnboardUser, handleCountTokens } = require('./geminiRoutes') - -// 检查 API Key 是否具备 Gemini 权限 -function hasGeminiPermission(apiKeyData, requiredPermission = 'gemini') { - const permissions = apiKeyData?.permissions || 'all' - return permissions === 'all' || permissions === requiredPermission -} - -// 确保请求拥有 Gemini 权限 -function ensureGeminiPermission(req, res) { - const apiKeyData = req.apiKey || {} - if (hasGeminiPermission(apiKeyData, 'gemini')) { - return true - } - - logger.security( - `🚫 API Key ${apiKeyData.id || 'unknown'} 缺少 Gemini 权限,拒绝访问 ${req.originalUrl}` - ) - - res.status(403).json({ - error: { - message: 'This API key does not have permission to access Gemini', - type: 'permission_denied' - } - }) - return false -} - -// 供路由中间件复用的权限检查 -function ensureGeminiPermissionMiddleware(req, res, next) { - if (ensureGeminiPermission(req, res)) { - return next() - } - return undefined -} - -// 判断对象是否为可读流 -function isReadableStream(value) { - return value && typeof value.on === 'function' && typeof value.pipe === 'function' -} - -// 读取可读流内容为字符串 -async function readStreamToString(stream) { - return new Promise((resolve, reject) => { - let result = '' - - try { - if (typeof stream.setEncoding === 'function') { - stream.setEncoding('utf8') - } - } catch (error) { - logger.warn('设置流编码失败:', error) - } - - stream.on('data', (chunk) => { - result += chunk - }) - - stream.on('end', () => { - resolve(result) - }) - - stream.on('error', (error) => { - reject(error) - }) - }) -} - -// 规范化上游 Axios 错误信息 -async function normalizeAxiosStreamError(error) { - const status = error.response?.status - const statusText = error.response?.statusText - const responseData = error.response?.data - let rawBody = null - let parsedBody = null - - if (responseData) { - try { - if (isReadableStream(responseData)) { - rawBody = await readStreamToString(responseData) - } else if (Buffer.isBuffer(responseData)) { - rawBody = responseData.toString('utf8') - } else if (typeof responseData === 'string') { - rawBody = responseData - } else { - rawBody = JSON.stringify(responseData) - } - } catch (streamError) { - logger.warn('读取 Gemini 上游错误流失败:', streamError) - } - } - - if (rawBody) { - if (typeof rawBody === 'string') { - try { - parsedBody = JSON.parse(rawBody) - } catch (parseError) { - parsedBody = rawBody - } - } else { - parsedBody = rawBody - } - } - - let finalMessage = error.message || 'Internal server error' - if (parsedBody && typeof parsedBody === 'object') { - finalMessage = parsedBody.error?.message || parsedBody.message || finalMessage - } else if (typeof parsedBody === 'string' && parsedBody.trim()) { - finalMessage = parsedBody.trim() - } - - return { - status, - statusText, - message: finalMessage, - parsedBody, - rawBody - } -} - -// 标准 Gemini API 路由处理器 -// 这些路由将挂载在 /gemini 路径下,处理标准 Gemini API 格式的请求 -// 标准格式: /gemini/v1beta/models/{model}:generateContent - -// 专门处理标准 Gemini API 格式的 generateContent -async function handleStandardGenerateContent(req, res) { - let account = null - let sessionHash = null - let accountId = null // 提升到外部作用域 - let isApiAccount = false // 提升到外部作用域 - - try { - if (!ensureGeminiPermission(req, res)) { - return undefined - } - - // 从路径参数中获取模型名 - const model = req.params.modelName || 'gemini-2.0-flash-exp' - sessionHash = sessionHelper.generateSessionHash(req.body) - - // 标准 Gemini API 请求体直接包含 contents 等字段 - const { contents, generationConfig, safetySettings, systemInstruction, tools, toolConfig } = - req.body - - // 验证必需参数 - if (!contents || !Array.isArray(contents) || contents.length === 0) { - return res.status(400).json({ - error: { - message: 'Contents array is required', - type: 'invalid_request_error' - } - }) - } - - // 构建内部 API 需要的请求格式 - const actualRequestData = { - contents, - generationConfig: generationConfig || { - temperature: 0.7, - maxOutputTokens: 4096, - topP: 0.95, - topK: 40 - } - } - - // 只有在 safetySettings 存在且非空时才添加 - if (safetySettings && safetySettings.length > 0) { - actualRequestData.safetySettings = safetySettings - } - - // 添加工具配置(tools 和 toolConfig) - if (tools) { - actualRequestData.tools = tools - } - - if (toolConfig) { - actualRequestData.toolConfig = toolConfig - } - - // 如果有 system instruction,修正格式并添加到请求体 - // Gemini CLI 的内部 API 需要 role: "user" 字段 - if (systemInstruction) { - // 确保 systemInstruction 格式正确 - if (typeof systemInstruction === 'string' && systemInstruction.trim()) { - actualRequestData.systemInstruction = { - role: 'user', // Gemini CLI 内部 API 需要这个字段 - parts: [{ text: systemInstruction }] - } - } else if (systemInstruction.parts && systemInstruction.parts.length > 0) { - // 检查是否有实际内容 - const hasContent = systemInstruction.parts.some( - (part) => part.text && part.text.trim() !== '' - ) - if (hasContent) { - // 添加 role 字段(Gemini CLI 格式) - actualRequestData.systemInstruction = { - role: 'user', // Gemini CLI 内部 API 需要这个字段 - parts: systemInstruction.parts - } - } - } - } - - // 使用统一调度选择账号 - const schedulerResult = await unifiedGeminiScheduler.selectAccountForApiKey( - req.apiKey, - sessionHash, - model, - { allowApiAccounts: true } // 允许调度 API 账户 - ) - ;({ accountId } = schedulerResult) - const { accountType } = schedulerResult - - // 判断账户类型:根据 accountType 判断,而非 accountId 前缀 - isApiAccount = accountType === 'gemini-api' // 赋值而不是声明 - const actualAccountId = accountId // accountId 已经是实际 ID,无需处理前缀 - - const version = req.path.includes('v1beta') ? 'v1beta' : 'v1' - - if (isApiAccount) { - // Gemini API 账户:使用 API Key 直接请求 - account = await geminiApiAccountService.getAccount(actualAccountId) - if (!account) { - return res.status(404).json({ - error: { - message: 'Gemini API account not found', - type: 'account_not_found' - } - }) - } - - logger.info(`Standard Gemini API generateContent request (${version}) - API Key Account`, { - model, - accountId: actualAccountId, - apiKeyId: req.apiKey?.id || 'unknown' - }) - } else { - // OAuth 账户:使用原有流程 - account = await geminiAccountService.getAccount(actualAccountId) - - logger.info(`Standard Gemini API generateContent request (${version}) - OAuth Account`, { - model, - projectId: account.projectId, - apiKeyId: req.apiKey?.id || 'unknown' - }) - } - - // 解析账户的代理配置 - let proxyConfig = null - if (account.proxy) { - try { - proxyConfig = typeof account.proxy === 'string' ? JSON.parse(account.proxy) : account.proxy - } catch (e) { - logger.warn('Failed to parse proxy configuration:', e) - } - } - - let response - - if (isApiAccount) { - // Gemini API 账户:直接使用 API Key 请求 - // baseUrl 填写域名,如 https://generativelanguage.googleapis.com,版本固定为 v1beta - const apiUrl = `${account.baseUrl}/v1beta/models/${model}:generateContent?key=${account.apiKey}` - - // 构建 Axios 配置 - const axiosConfig = { - method: 'POST', - url: apiUrl, - data: actualRequestData, - headers: { - 'Content-Type': 'application/json' - } - } - - // 添加代理配置 - if (proxyConfig) { - const proxyHelper = new ProxyHelper() - axiosConfig.httpsAgent = proxyHelper.createProxyAgent(proxyConfig) - axiosConfig.httpAgent = proxyHelper.createProxyAgent(proxyConfig) - } - - try { - const apiResponse = await axios(axiosConfig) - response = { response: apiResponse.data } - } catch (error) { - logger.error('Gemini API request failed:', { - status: error.response?.status, - statusText: error.response?.statusText, - data: error.response?.data - }) - throw error - } - } else { - // OAuth 账户:使用原有流程 - const { accessToken, refreshToken } = account - const client = await geminiAccountService.getOauthClient( - accessToken, - refreshToken, - proxyConfig - ) - - // 项目ID优先级:账户配置的项目ID > 临时项目ID > 尝试获取 - let effectiveProjectId = account.projectId || account.tempProjectId || null - - // 如果没有任何项目ID,尝试调用 loadCodeAssist 获取 - if (!effectiveProjectId) { - try { - logger.info('📋 No projectId available, attempting to fetch from loadCodeAssist...') - const loadResponse = await geminiAccountService.loadCodeAssist(client, null, proxyConfig) - - if (loadResponse.cloudaicompanionProject) { - effectiveProjectId = loadResponse.cloudaicompanionProject - // 保存临时项目ID - await geminiAccountService.updateTempProjectId(actualAccountId, effectiveProjectId) - logger.info(`📋 Fetched and cached temporary projectId: ${effectiveProjectId}`) - } - } catch (loadError) { - logger.warn('Failed to fetch projectId from loadCodeAssist:', loadError.message) - } - } - - // 如果还是没有项目ID,返回错误 - if (!effectiveProjectId) { - return res.status(403).json({ - error: { - message: - 'This account requires a project ID to be configured. Please configure a project ID in the account settings.', - type: 'configuration_required' - } - }) - } - - logger.info('📋 Standard API 项目ID处理逻辑', { - accountProjectId: account.projectId, - tempProjectId: account.tempProjectId, - effectiveProjectId, - decision: account.projectId - ? '使用账户配置' - : account.tempProjectId - ? '使用临时项目ID' - : '从loadCodeAssist获取' - }) - - // 生成一个符合 Gemini CLI 格式的 user_prompt_id - const userPromptId = `${require('crypto').randomUUID()}########0` - - // 调用内部 API(cloudcode-pa) - response = await geminiAccountService.generateContent( - client, - { model, request: actualRequestData }, - userPromptId, // 使用生成的 user_prompt_id - effectiveProjectId, // 使用处理后的项目ID - req.apiKey?.id, // 使用 API Key ID 作为 session ID - proxyConfig - ) - } - - // 记录使用统计 - if (response?.response?.usageMetadata) { - try { - const usage = response.response.usageMetadata - await apiKeyService.recordUsage( - req.apiKey.id, - usage.promptTokenCount || 0, - usage.candidatesTokenCount || 0, - 0, // cacheCreateTokens - 0, // cacheReadTokens - model, - accountId // 账户 ID - ) - logger.info( - `📊 Recorded Gemini usage - Input: ${usage.promptTokenCount}, Output: ${usage.candidatesTokenCount}, Total: ${usage.totalTokenCount}` - ) - } catch (error) { - logger.error('Failed to record Gemini usage:', error) - } - } - - // 返回标准 Gemini API 格式的响应 - // 内部 API 返回的是 { response: {...} } 格式,需要提取 - // 注意:不过滤 thought 字段,因为 gemini-cli 会自行处理 - res.json(response.response || response) - } catch (error) { - logger.error(`Error in standard generateContent endpoint`, { - message: error.message, - status: error.response?.status, - statusText: error.response?.statusText, - responseData: error.response?.data, - stack: error.stack - }) - - // 处理速率限制 暂时去掉此处的标记限流的处理 - // if (error.response?.status === 429 && accountId) { - // logger.warn(`⚠️ Gemini account ${accountId} rate limited (Standard API), marking as limited`) - // try { - // const rateLimitAccountType = isApiAccount ? 'gemini-api' : 'gemini' - // await unifiedGeminiScheduler.markAccountRateLimited( - // accountId, // 账户 ID - // rateLimitAccountType, - // sessionHash - // ) - // } catch (limitError) { - // logger.warn('Failed to mark account as rate limited in scheduler:', limitError) - // } - // } - - res.status(500).json({ - error: { - message: error.message || 'Internal server error', - type: 'api_error' - } - }) - } -} - -// 专门处理标准 Gemini API 格式的 streamGenerateContent -async function handleStandardStreamGenerateContent(req, res) { - let abortController = null - let account = null - let sessionHash = null - let accountId = null // 提升到外部作用域 - let isApiAccount = false // 提升到外部作用域 - - try { - if (!ensureGeminiPermission(req, res)) { - return undefined - } - - // 从路径参数中获取模型名 - const model = req.params.modelName || 'gemini-2.0-flash-exp' - sessionHash = sessionHelper.generateSessionHash(req.body) - - // 标准 Gemini API 请求体直接包含 contents 等字段 - const { contents, generationConfig, safetySettings, systemInstruction, tools, toolConfig } = - req.body - - // 验证必需参数 - if (!contents || !Array.isArray(contents) || contents.length === 0) { - return res.status(400).json({ - error: { - message: 'Contents array is required', - type: 'invalid_request_error' - } - }) - } - - // 构建内部 API 需要的请求格式 - const actualRequestData = { - contents, - generationConfig: generationConfig || { - temperature: 0.7, - maxOutputTokens: 4096, - topP: 0.95, - topK: 40 - } - } - - // 只有在 safetySettings 存在且非空时才添加 - if (safetySettings && safetySettings.length > 0) { - actualRequestData.safetySettings = safetySettings - } - - // 添加工具配置(tools 和 toolConfig) - if (tools) { - actualRequestData.tools = tools - } - - if (toolConfig) { - actualRequestData.toolConfig = toolConfig - } - - // 如果有 system instruction,修正格式并添加到请求体 - // Gemini CLI 的内部 API 需要 role: "user" 字段 - if (systemInstruction) { - // 确保 systemInstruction 格式正确 - if (typeof systemInstruction === 'string' && systemInstruction.trim()) { - actualRequestData.systemInstruction = { - role: 'user', // Gemini CLI 内部 API 需要这个字段 - parts: [{ text: systemInstruction }] - } - } else if (systemInstruction.parts && systemInstruction.parts.length > 0) { - // 检查是否有实际内容 - const hasContent = systemInstruction.parts.some( - (part) => part.text && part.text.trim() !== '' - ) - if (hasContent) { - // 添加 role 字段(Gemini CLI 格式) - actualRequestData.systemInstruction = { - role: 'user', // Gemini CLI 内部 API 需要这个字段 - parts: systemInstruction.parts - } - } - } - } - - // 使用统一调度选择账号 - const schedulerResult = await unifiedGeminiScheduler.selectAccountForApiKey( - req.apiKey, - sessionHash, - model, - { allowApiAccounts: true } // 允许调度 API 账户 - ) - ;({ accountId } = schedulerResult) - const { accountType } = schedulerResult - - // 判断账户类型:根据 accountType 判断,而非 accountId 前缀 - isApiAccount = accountType === 'gemini-api' // 赋值而不是声明 - const actualAccountId = accountId // accountId 已经是实际 ID,无需处理前缀 - - const version = req.path.includes('v1beta') ? 'v1beta' : 'v1' - - if (isApiAccount) { - // Gemini API 账户:使用 API Key 直接请求 - account = await geminiApiAccountService.getAccount(actualAccountId) - if (!account) { - return res.status(404).json({ - error: { - message: 'Gemini API account not found', - type: 'account_not_found' - } - }) - } - - logger.info( - `Standard Gemini API streamGenerateContent request (${version}) - API Key Account`, - { - model, - accountId: actualAccountId, - apiKeyId: req.apiKey?.id || 'unknown' - } - ) - } else { - // OAuth 账户:使用原有流程 - account = await geminiAccountService.getAccount(actualAccountId) - - logger.info( - `Standard Gemini API streamGenerateContent request (${version}) - OAuth Account`, - { - model, - projectId: account.projectId, - apiKeyId: req.apiKey?.id || 'unknown' - } - ) - } - - // 创建中止控制器 - abortController = new AbortController() - - // 处理客户端断开连接 - req.on('close', () => { - if (abortController && !abortController.signal.aborted) { - logger.info('Client disconnected, aborting stream request') - abortController.abort() - } - }) - - // 解析账户的代理配置 - let proxyConfig = null - if (account.proxy) { - try { - proxyConfig = typeof account.proxy === 'string' ? JSON.parse(account.proxy) : account.proxy - } catch (e) { - logger.warn('Failed to parse proxy configuration:', e) - } - } - - let streamResponse - - if (isApiAccount) { - // Gemini API 账户:直接使用 API Key 请求流式接口 - // baseUrl 填写域名,版本固定为 v1beta - const apiUrl = `${account.baseUrl}/v1beta/models/${model}:streamGenerateContent?key=${account.apiKey}&alt=sse` - - // 构建 Axios 配置 - const axiosConfig = { - method: 'POST', - url: apiUrl, - data: actualRequestData, - headers: { - 'Content-Type': 'application/json' - }, - responseType: 'stream', - signal: abortController.signal - } - - // 添加代理配置 - if (proxyConfig) { - const proxyHelper = new ProxyHelper() - axiosConfig.httpsAgent = proxyHelper.createProxyAgent(proxyConfig) - axiosConfig.httpAgent = proxyHelper.createProxyAgent(proxyConfig) - } - - try { - const apiResponse = await axios(axiosConfig) - streamResponse = apiResponse.data - } catch (error) { - logger.error('Gemini API stream request failed:', { - status: error.response?.status, - statusText: error.response?.statusText, - data: error.response?.data - }) - throw error - } - } else { - // OAuth 账户:使用原有流程 - const { accessToken, refreshToken } = account - const client = await geminiAccountService.getOauthClient( - accessToken, - refreshToken, - proxyConfig - ) - - // 项目ID优先级:账户配置的项目ID > 临时项目ID > 尝试获取 - let effectiveProjectId = account.projectId || account.tempProjectId || null - - // 如果没有任何项目ID,尝试调用 loadCodeAssist 获取 - if (!effectiveProjectId) { - try { - logger.info('📋 No projectId available, attempting to fetch from loadCodeAssist...') - const loadResponse = await geminiAccountService.loadCodeAssist(client, null, proxyConfig) - - if (loadResponse.cloudaicompanionProject) { - effectiveProjectId = loadResponse.cloudaicompanionProject - // 保存临时项目ID - await geminiAccountService.updateTempProjectId(actualAccountId, effectiveProjectId) - logger.info(`📋 Fetched and cached temporary projectId: ${effectiveProjectId}`) - } - } catch (loadError) { - logger.warn('Failed to fetch projectId from loadCodeAssist:', loadError.message) - } - } - - // 如果还是没有项目ID,返回错误 - if (!effectiveProjectId) { - return res.status(403).json({ - error: { - message: - 'This account requires a project ID to be configured. Please configure a project ID in the account settings.', - type: 'configuration_required' - } - }) - } - - logger.info('📋 Standard API 流式项目ID处理逻辑', { - accountProjectId: account.projectId, - tempProjectId: account.tempProjectId, - effectiveProjectId, - decision: account.projectId - ? '使用账户配置' - : account.tempProjectId - ? '使用临时项目ID' - : '从loadCodeAssist获取' - }) - - // 生成一个符合 Gemini CLI 格式的 user_prompt_id - const userPromptId = `${require('crypto').randomUUID()}########0` - - // 调用内部 API(cloudcode-pa)的流式接口 - streamResponse = await geminiAccountService.generateContentStream( - client, - { model, request: actualRequestData }, - userPromptId, // 使用生成的 user_prompt_id - effectiveProjectId, // 使用处理后的项目ID - req.apiKey?.id, // 使用 API Key ID 作为 session ID - abortController.signal, - proxyConfig - ) - } - - // 设置 SSE 响应头 - res.setHeader('Content-Type', 'text/event-stream') - res.setHeader('Cache-Control', 'no-cache') - res.setHeader('Connection', 'keep-alive') - res.setHeader('X-Accel-Buffering', 'no') - - // 处理流式响应并捕获usage数据 - // 方案 A++:透明转发 + 异步 usage 提取 + SSE 心跳机制 - let totalUsage = { - promptTokenCount: 0, - candidatesTokenCount: 0, - totalTokenCount: 0 - } - - // SSE 心跳机制:防止 Clash 等代理 120 秒超时 - let heartbeatTimer = null - let lastDataTime = Date.now() - const HEARTBEAT_INTERVAL = 15000 // 15 秒 - - const sendHeartbeat = () => { - const timeSinceLastData = Date.now() - lastDataTime - if (timeSinceLastData >= HEARTBEAT_INTERVAL && !res.destroyed) { - res.write('\n') // 发送空行保持连接活跃 - logger.info(`💓 Sent SSE keepalive (gap: ${(timeSinceLastData / 1000).toFixed(1)}s)`) - } - } - - heartbeatTimer = setInterval(sendHeartbeat, HEARTBEAT_INTERVAL) - - // 缓冲区:有些 chunk 内会包含多条 SSE 事件,需要拆分 - let sseBuffer = '' - - // 处理单个 SSE 事件块(不含结尾空行) - const handleEventBlock = (evt) => { - if (!evt.trim()) { - return - } - - // 取出所有 data 行并拼接(兼容多行 data) - const dataLines = evt.split(/\r?\n/).filter((line) => line.startsWith('data:')) - if (dataLines.length === 0) { - // 非 data 事件,直接原样转发 - if (!res.destroyed) { - res.write(`${evt}\n\n`) - } - return - } - - const dataPayload = dataLines.map((line) => line.replace(/^data:\s?/, '')).join('\n') - - let processedPayload = null - let parsed = null - - if (dataPayload === '[DONE]') { - processedPayload = '[DONE]' - } else { - try { - parsed = JSON.parse(dataPayload) - - // 捕获 usage(如果在顶层或 response 内都有可能) - if (parsed.usageMetadata) { - totalUsage = parsed.usageMetadata - } else if (parsed.response?.usageMetadata) { - totalUsage = parsed.response.usageMetadata - } - - // 提取 response 并重新包装 - processedPayload = JSON.stringify(parsed.response || parsed) - } catch (e) { - // 解析失败,直接转发原始 data - } - } - - const outputChunk = processedPayload === null ? `${evt}\n\n` : `data: ${processedPayload}\n\n` - - // 1️⃣ 立即转发处理后的数据 - if (!res.destroyed) { - res.write(outputChunk) - } - - // 2️⃣ 异步提取 usage 数据(兜底,防止上面解析失败未捕获) - setImmediate(() => { - try { - const usageSource = - processedPayload && processedPayload !== '[DONE]' ? processedPayload : dataPayload - - if (!usageSource || !usageSource.includes('usageMetadata')) { - return - } - - // 再尝试一次解析 - const usageObj = JSON.parse(usageSource) - const usage = usageObj.usageMetadata || usageObj.response?.usageMetadata || usageObj.usage - - if (usage && typeof usage === 'object') { - totalUsage = usage - logger.debug('📊 Captured Gemini usage data (async):', totalUsage) - } - } catch (error) { - // 提取用量失败时忽略 - } - }) - } - - streamResponse.on('data', (chunk) => { - try { - // 更新最后数据时间 - lastDataTime = Date.now() - - // 追加到缓冲区后按双换行拆分事件 - sseBuffer += chunk.toString() - const events = sseBuffer.split(/\r?\n\r?\n/) - sseBuffer = events.pop() || '' - - for (const evt of events) { - handleEventBlock(evt) - } - } catch (error) { - logger.error('Error processing stream chunk:', error) - } - }) - - streamResponse.on('end', () => { - logger.info('Stream completed successfully') - - // 处理可能残留在缓冲区的事件(上游未以空行结尾的情况) - if (sseBuffer.trim()) { - try { - handleEventBlock(sseBuffer) - } catch (flushError) { - // 忽略 flush 期间的异常 - } - sseBuffer = '' - } - - // 清理心跳定时器 - if (heartbeatTimer) { - clearInterval(heartbeatTimer) - heartbeatTimer = null - } - - // 立即结束响应,不阻塞 - res.end() - - // 异步记录使用统计(不阻塞响应) - if (totalUsage.totalTokenCount > 0) { - apiKeyService - .recordUsage( - req.apiKey.id, - totalUsage.promptTokenCount || 0, - totalUsage.candidatesTokenCount || 0, - 0, // cacheCreateTokens - 0, // cacheReadTokens - model, - accountId // 使用原始 accountId(含前缀) - ) - .then(() => { - logger.info( - `📊 Recorded Gemini stream usage - Input: ${totalUsage.promptTokenCount}, Output: ${totalUsage.candidatesTokenCount}, Total: ${totalUsage.totalTokenCount}` - ) - }) - .catch((error) => { - logger.error('Failed to record Gemini usage:', error) - }) - } else { - logger.warn( - `⚠️ Stream completed without usage data - totalTokenCount: ${totalUsage.totalTokenCount}` - ) - } - }) - - streamResponse.on('error', (error) => { - logger.error('Stream error:', error) - - // 清理心跳定时器 - if (heartbeatTimer) { - clearInterval(heartbeatTimer) - heartbeatTimer = null - } - - if (!res.headersSent) { - // 如果还没发送响应头,可以返回正常的错误响应 - res.status(500).json({ - error: { - message: error.message || 'Stream error', - type: 'api_error' - } - }) - } else { - // 如果已经开始流式传输,发送 SSE 格式的错误事件和结束标记 - // 这样客户端可以正确识别流的结束,避免 "Premature close" 错误 - if (!res.destroyed) { - try { - // 发送错误事件(SSE 格式) - res.write( - `data: ${JSON.stringify({ - error: { - message: error.message || 'Stream error', - type: 'stream_error', - code: error.code - } - })}\n\n` - ) - - // 发送 SSE 结束标记 - res.write('data: [DONE]\n\n') - } catch (writeError) { - logger.error('Error sending error event:', writeError) - } - } - res.end() - } - }) - } catch (error) { - const normalizedError = await normalizeAxiosStreamError(error) - - logger.error(`Error in standard streamGenerateContent endpoint`, { - message: error.message, - status: error.response?.status, - statusText: error.response?.statusText, - responseData: normalizedError.parsedBody || normalizedError.rawBody, - stack: error.stack - }) - - // 处理速率限制 暂时去掉此处的标记限流的处理 - // if (error.response?.status === 429 && accountId) { - // logger.warn( - // `⚠️ Gemini account ${accountId} rate limited (Standard Stream API), marking as limited` - // ) - // try { - // const rateLimitAccountType = isApiAccount ? 'gemini-api' : 'gemini' - // await unifiedGeminiScheduler.markAccountRateLimited( - // accountId, // 账户 ID - // rateLimitAccountType, - // sessionHash - // ) - // } catch (limitError) { - // logger.warn('Failed to mark account as rate limited in scheduler:', limitError) - // } - // } - - if (!res.headersSent) { - const statusCode = normalizedError.status || 500 - const responseBody = { - error: { - message: normalizedError.message, - type: 'api_error' - } - } - - if (normalizedError.status) { - responseBody.error.upstreamStatus = normalizedError.status - } - if (normalizedError.statusText) { - responseBody.error.upstreamStatusText = normalizedError.statusText - } - if (normalizedError.parsedBody && typeof normalizedError.parsedBody === 'object') { - responseBody.error.upstreamResponse = normalizedError.parsedBody - } else if (normalizedError.rawBody) { - responseBody.error.upstreamRaw = normalizedError.rawBody - } - - return res.status(statusCode).json(responseBody) - } - } finally { - // 清理资源 - if (abortController) { - abortController = null - } - } -} +// 从 handlers/geminiHandlers.js 导入所有处理函数 +const { + ensureGeminiPermissionMiddleware, + handleLoadCodeAssist, + handleOnboardUser, + handleCountTokens, + handleGenerateContent, + handleStreamGenerateContent, + handleStandardGenerateContent, + handleStandardStreamGenerateContent, + handleModels, + handleModelDetails +} = require('../handlers/geminiHandlers') + +// ============================================================================ // v1beta 版本的标准路由 - 支持动态模型名称 +// ============================================================================ + +/** + * POST /v1beta/models/:modelName:loadCodeAssist + */ router.post( '/v1beta/models/:modelName\\:loadCodeAssist', authenticateApiKey, @@ -963,6 +51,9 @@ router.post( } ) +/** + * POST /v1beta/models/:modelName:onboardUser + */ router.post( '/v1beta/models/:modelName\\:onboardUser', authenticateApiKey, @@ -973,6 +64,9 @@ router.post( } ) +/** + * POST /v1beta/models/:modelName:countTokens + */ router.post( '/v1beta/models/:modelName\\:countTokens', authenticateApiKey, @@ -983,7 +77,10 @@ router.post( } ) -// 使用专门的处理函数处理标准 Gemini API 格式 +/** + * POST /v1beta/models/:modelName:generateContent + * 使用专门的标准 API 处理函数(支持 OAuth 和 API 账户) + */ router.post( '/v1beta/models/:modelName\\:generateContent', authenticateApiKey, @@ -991,6 +88,10 @@ router.post( handleStandardGenerateContent ) +/** + * POST /v1beta/models/:modelName:streamGenerateContent + * 使用专门的标准 API 流式处理函数(支持 OAuth 和 API 账户) + */ router.post( '/v1beta/models/:modelName\\:streamGenerateContent', authenticateApiKey, @@ -998,7 +99,13 @@ router.post( handleStandardStreamGenerateContent ) +// ============================================================================ // v1 版本的标准路由(为了完整性,虽然 Gemini 主要使用 v1beta) +// ============================================================================ + +/** + * POST /v1/models/:modelName:generateContent + */ router.post( '/v1/models/:modelName\\:generateContent', authenticateApiKey, @@ -1006,6 +113,9 @@ router.post( handleStandardGenerateContent ) +/** + * POST /v1/models/:modelName:streamGenerateContent + */ router.post( '/v1/models/:modelName\\:streamGenerateContent', authenticateApiKey, @@ -1013,6 +123,9 @@ router.post( handleStandardStreamGenerateContent ) +/** + * POST /v1/models/:modelName:countTokens + */ router.post( '/v1/models/:modelName\\:countTokens', authenticateApiKey, @@ -1023,7 +136,13 @@ router.post( } ) -// v1internal 版本的标准路由(这些使用原有的处理函数,因为格式不同) +// ============================================================================ +// v1internal 版本的标准路由(这些使用内部格式的处理函数) +// ============================================================================ + +/** + * POST /v1internal:loadCodeAssist + */ router.post( '/v1internal\\:loadCodeAssist', authenticateApiKey, @@ -1034,6 +153,9 @@ router.post( } ) +/** + * POST /v1internal:onboardUser + */ router.post( '/v1internal\\:onboardUser', authenticateApiKey, @@ -1044,6 +166,9 @@ router.post( } ) +/** + * POST /v1internal:countTokens + */ router.post( '/v1internal\\:countTokens', authenticateApiKey, @@ -1054,133 +179,86 @@ router.post( } ) -// v1internal 使用不同的处理逻辑,因为它们不包含模型在 URL 中 +/** + * POST /v1internal:generateContent + * v1internal 格式使用内部格式的处理函数 + */ router.post( '/v1internal\\:generateContent', authenticateApiKey, ensureGeminiPermissionMiddleware, (req, res, next) => { logger.info(`Standard Gemini API request (v1internal): ${req.method} ${req.originalUrl}`) - // v1internal 格式不同,使用原有的处理函数 - const { handleGenerateContent } = require('./geminiRoutes') handleGenerateContent(req, res, next) } ) +/** + * POST /v1internal:streamGenerateContent + * v1internal 格式使用内部格式的处理函数 + */ router.post( '/v1internal\\:streamGenerateContent', authenticateApiKey, ensureGeminiPermissionMiddleware, (req, res, next) => { logger.info(`Standard Gemini API request (v1internal): ${req.method} ${req.originalUrl}`) - // v1internal 格式不同,使用原有的处理函数 - const { handleStreamGenerateContent } = require('./geminiRoutes') handleStreamGenerateContent(req, res, next) } ) -// 添加标准 Gemini API 的模型列表端点 -router.get( - '/v1beta/models', - authenticateApiKey, - ensureGeminiPermissionMiddleware, - async (req, res) => { - try { - logger.info('Standard Gemini API models request') - // 直接调用 geminiRoutes 中的模型处理逻辑 - const geminiRoutes = require('./geminiRoutes') - const modelHandler = geminiRoutes.stack.find( - (layer) => layer.route && layer.route.path === '/models' && layer.route.methods.get - ) - if (modelHandler && modelHandler.route.stack[1]) { - // 调用处理函数(跳过第一个 authenticateApiKey 中间件) - modelHandler.route.stack[1].handle(req, res) - } else { - res.status(500).json({ error: 'Models handler not found' }) - } - } catch (error) { - logger.error('Error in standard models endpoint:', error) - res.status(500).json({ - error: { - message: 'Failed to retrieve models', - type: 'api_error' - } - }) - } - } -) +// ============================================================================ +// 模型列表端点 +// ============================================================================ -router.get('/v1/models', authenticateApiKey, ensureGeminiPermissionMiddleware, async (req, res) => { - try { - logger.info('Standard Gemini API models request (v1)') - // 直接调用 geminiRoutes 中的模型处理逻辑 - const geminiRoutes = require('./geminiRoutes') - const modelHandler = geminiRoutes.stack.find( - (layer) => layer.route && layer.route.path === '/models' && layer.route.methods.get - ) - if (modelHandler && modelHandler.route.stack[1]) { - modelHandler.route.stack[1].handle(req, res) - } else { - res.status(500).json({ error: 'Models handler not found' }) - } - } catch (error) { - logger.error('Error in standard models endpoint (v1):', error) - res.status(500).json({ - error: { - message: 'Failed to retrieve models', - type: 'api_error' - } - }) - } +/** + * GET /v1beta/models + * 获取模型列表(v1beta 版本) + */ +router.get('/v1beta/models', authenticateApiKey, ensureGeminiPermissionMiddleware, (req, res) => { + logger.info('Standard Gemini API models request (v1beta)') + handleModels(req, res) }) -// 添加模型详情端点 +/** + * GET /v1/models + * 获取模型列表(v1 版本) + */ +router.get('/v1/models', authenticateApiKey, ensureGeminiPermissionMiddleware, (req, res) => { + logger.info('Standard Gemini API models request (v1)') + handleModels(req, res) +}) + +// ============================================================================ +// 模型详情端点 +// ============================================================================ + +/** + * GET /v1beta/models/:modelName + * 获取模型详情(v1beta 版本) + */ router.get( '/v1beta/models/:modelName', authenticateApiKey, ensureGeminiPermissionMiddleware, - (req, res) => { - const { modelName } = req.params - logger.info(`Standard Gemini API model details request: ${modelName}`) - - res.json({ - name: `models/${modelName}`, - version: '001', - displayName: modelName, - description: `Gemini model: ${modelName}`, - inputTokenLimit: 1048576, - outputTokenLimit: 8192, - supportedGenerationMethods: ['generateContent', 'streamGenerateContent', 'countTokens'], - temperature: 1.0, - topP: 0.95, - topK: 40 - }) - } + handleModelDetails ) +/** + * GET /v1/models/:modelName + * 获取模型详情(v1 版本) + */ router.get( '/v1/models/:modelName', authenticateApiKey, ensureGeminiPermissionMiddleware, - (req, res) => { - const { modelName } = req.params - logger.info(`Standard Gemini API model details request (v1): ${modelName}`) - - res.json({ - name: `models/${modelName}`, - version: '001', - displayName: modelName, - description: `Gemini model: ${modelName}`, - inputTokenLimit: 1048576, - outputTokenLimit: 8192, - supportedGenerationMethods: ['generateContent', 'streamGenerateContent', 'countTokens'], - temperature: 1.0, - topP: 0.95, - topK: 40 - }) - } + handleModelDetails ) +// ============================================================================ +// 初始化日志 +// ============================================================================ + logger.info('Standard Gemini API routes initialized') module.exports = router diff --git a/src/routes/unified.js b/src/routes/unified.js index c1f320d2..a8a8e69d 100644 --- a/src/routes/unified.js +++ b/src/routes/unified.js @@ -2,10 +2,11 @@ const express = require('express') const { authenticateApiKey } = require('../middleware/auth') const logger = require('../utils/logger') const { handleChatCompletion } = require('./openaiClaudeRoutes') +// 从 handlers/geminiHandlers.js 导入处理函数 const { handleGenerateContent: geminiHandleGenerateContent, handleStreamGenerateContent: geminiHandleStreamGenerateContent -} = require('./geminiRoutes') +} = require('../handlers/geminiHandlers') const openaiRoutes = require('./openaiRoutes') const router = express.Router() @@ -16,25 +17,6 @@ function detectBackendFromModel(modelName) { return 'claude' // 默认 Claude } - // 首先尝试使用 modelService 查找模型的 provider - try { - const modelService = require('../services/modelService') - const provider = modelService.getModelProvider(modelName) - - if (provider === 'anthropic') { - return 'claude' - } - if (provider === 'openai') { - return 'openai' - } - if (provider === 'google') { - return 'gemini' - } - } catch (error) { - logger.warn(`⚠️ Failed to detect backend from modelService: ${error.message}`) - } - - // 降级到前缀匹配作为后备方案 const model = modelName.toLowerCase() // Claude 模型 @@ -42,21 +24,16 @@ function detectBackendFromModel(modelName) { return 'claude' } - // OpenAI 模型 - if ( - model.startsWith('gpt-') || - model.startsWith('o1-') || - model.startsWith('o3-') || - model === 'chatgpt-4o-latest' - ) { - return 'openai' - } - // Gemini 模型 if (model.startsWith('gemini-')) { return 'gemini' } + // OpenAI 模型 + if (model.startsWith('gpt-')) { + return 'openai' + } + // 默认使用 Claude return 'claude' } diff --git a/src/services/modelService.js b/src/services/modelService.js index 1bee9dba..49607f55 100644 --- a/src/services/modelService.js +++ b/src/services/modelService.js @@ -1,5 +1,3 @@ -const fs = require('fs') -const path = require('path') const logger = require('../utils/logger') /** @@ -9,54 +7,22 @@ const logger = require('../utils/logger') */ class ModelService { constructor() { - this.modelsFile = path.join(process.cwd(), 'data', 'supported_models.json') - this.supportedModels = null - this.fileWatcher = null + this.supportedModels = this.getDefaultModels() } /** * 初始化模型服务 */ async initialize() { - try { - this.loadModels() - this.setupFileWatcher() - logger.success('✅ Model service initialized successfully') - } catch (error) { - logger.error('❌ Failed to initialize model service:', error) - } + const totalModels = Object.values(this.supportedModels).reduce( + (sum, config) => sum + config.models.length, + 0 + ) + logger.success(`✅ Model service initialized with ${totalModels} models`) } /** - * 加载支持的模型配置 - */ - loadModels() { - try { - if (fs.existsSync(this.modelsFile)) { - const data = fs.readFileSync(this.modelsFile, 'utf8') - this.supportedModels = JSON.parse(data) - - const totalModels = Object.values(this.supportedModels).reduce( - (sum, config) => sum + config.models.length, - 0 - ) - - logger.info(`📋 Loaded ${totalModels} supported models from configuration`) - } else { - logger.warn('⚠️ Supported models file not found, using defaults') - this.supportedModels = this.getDefaultModels() - - // 创建默认配置文件 - this.saveDefaultConfig() - } - } catch (error) { - logger.error('❌ Failed to load supported models:', error) - this.supportedModels = this.getDefaultModels() - } - } - - /** - * 获取默认模型配置(后备方案) + * 获取支持的模型配置 */ getDefaultModels() { return { @@ -64,6 +30,8 @@ class ModelService { provider: 'anthropic', description: 'Claude models from Anthropic', models: [ + 'claude-opus-4-5-20251101', + 'claude-haiku-4-5-20251001', 'claude-sonnet-4-5-20250929', 'claude-opus-4-1-20250805', 'claude-sonnet-4-20250514', @@ -79,55 +47,22 @@ class ModelService { provider: 'openai', description: 'OpenAI GPT models', models: [ - 'gpt-4o', - 'gpt-4o-mini', - 'gpt-4.1', - 'gpt-4.1-mini', - 'gpt-4.1-nano', - 'gpt-4-turbo', - 'gpt-4', - 'gpt-3.5-turbo', - 'o3', - 'o4-mini', - 'chatgpt-4o-latest' + 'gpt-5.1-2025-11-13', + 'gpt-5.1-codex-mini', + 'gpt-5.1-codex', + 'gpt-5.1-codex-max', + 'gpt-5-2025-08-07', + 'gpt-5-codex' ] }, gemini: { provider: 'google', description: 'Google Gemini models', - models: [ - 'gemini-1.5-pro', - 'gemini-1.5-flash', - 'gemini-2.0-flash', - 'gemini-2.0-flash-exp', - 'gemini-2.0-flash-thinking', - 'gemini-2.0-flash-thinking-exp', - 'gemini-2.0-pro', - 'gemini-2.5-flash', - 'gemini-2.5-flash-lite', - 'gemini-2.5-pro' - ] + models: ['gemini-2.5-pro', 'gemini-3-pro-preview', 'gemini-2.5-flash'] } } } - /** - * 保存默认配置到文件 - */ - saveDefaultConfig() { - try { - const dataDir = path.dirname(this.modelsFile) - if (!fs.existsSync(dataDir)) { - fs.mkdirSync(dataDir, { recursive: true }) - } - - fs.writeFileSync(this.modelsFile, JSON.stringify(this.supportedModels, null, 2)) - logger.info('💾 Created default supported_models.json configuration') - } catch (error) { - logger.error('❌ Failed to save default config:', error) - } - } - /** * 获取所有支持的模型(OpenAI API 格式) */ @@ -183,83 +118,27 @@ class ModelService { return model ? model.owned_by : null } - /** - * 重新加载模型配置 - */ - reloadModels() { - logger.info('🔄 Reloading supported models configuration...') - this.loadModels() - } - - /** - * 设置文件监听器(监听配置文件变化) - */ - setupFileWatcher() { - try { - // 如果已有监听器,先关闭 - if (this.fileWatcher) { - this.fileWatcher.close() - this.fileWatcher = null - } - - // 只有文件存在时才设置监听器 - if (!fs.existsSync(this.modelsFile)) { - logger.debug('📋 Models file does not exist yet, skipping file watcher setup') - return - } - - // 使用 fs.watchFile 监听文件变化 - const watchOptions = { - persistent: true, - interval: 60000 // 每60秒检查一次 - } - - let lastMtime = fs.statSync(this.modelsFile).mtimeMs - - fs.watchFile(this.modelsFile, watchOptions, (curr, _prev) => { - if (curr.mtimeMs !== lastMtime) { - lastMtime = curr.mtimeMs - logger.info('📋 Detected change in supported_models.json, reloading...') - this.reloadModels() - } - }) - - // 保存引用以便清理 - this.fileWatcher = { - close: () => fs.unwatchFile(this.modelsFile) - } - - logger.info('👁️ File watcher set up for supported_models.json') - } catch (error) { - logger.error('❌ Failed to setup file watcher:', error) - } - } - /** * 获取服务状态 */ getStatus() { - const totalModels = this.supportedModels - ? Object.values(this.supportedModels).reduce((sum, config) => sum + config.models.length, 0) - : 0 + const totalModels = Object.values(this.supportedModels).reduce( + (sum, config) => sum + config.models.length, + 0 + ) return { - initialized: this.supportedModels !== null, + initialized: true, totalModels, - providers: this.supportedModels ? Object.keys(this.supportedModels) : [], - fileExists: fs.existsSync(this.modelsFile) + providers: Object.keys(this.supportedModels) } } /** - * 清理资源 + * 清理资源(保留接口兼容性) */ cleanup() { - if (this.fileWatcher) { - this.fileWatcher.close() - this.fileWatcher = null - logger.debug('📋 Model service file watcher closed') - } + logger.debug('📋 Model service cleanup (no-op)') } }