diff --git a/src/models/redis.js b/src/models/redis.js index 33edf935..0b1c7ce0 100644 --- a/src/models/redis.js +++ b/src/models/redis.js @@ -405,20 +405,20 @@ class RedisClient { return await this.client.del(key); } - // 🔐 会话管理 + // 🔐 会话管理(用于管理员登录等) async setSession(sessionId, sessionData, ttl = 86400) { - const key = `session:${sessionId}`; + const key = `admin_session:${sessionId}`; await this.client.hset(key, sessionData); await this.client.expire(key, ttl); } async getSession(sessionId) { - const key = `session:${sessionId}`; + const key = `admin_session:${sessionId}`; return await this.client.hgetall(key); } async deleteSession(sessionId) { - const key = `session:${sessionId}`; + const key = `admin_session:${sessionId}`; return await this.client.del(key); } @@ -640,6 +640,22 @@ class RedisClient { } } + // 🔗 会话sticky映射管理 + async setSessionAccountMapping(sessionHash, accountId, ttl = 3600) { + const key = `session:${sessionHash}`; + await this.client.set(key, accountId, 'EX', ttl); + } + + async getSessionAccountMapping(sessionHash) { + const key = `session:${sessionHash}`; + return await this.client.get(key); + } + + async deleteSessionAccountMapping(sessionHash) { + const key = `session:${sessionHash}`; + return await this.client.del(key); + } + // 🧹 清理过期数据 async cleanup() { try { diff --git a/src/services/claudeAccountService.js b/src/services/claudeAccountService.js index 8498bf1b..c37bbf03 100644 --- a/src/services/claudeAccountService.js +++ b/src/services/claudeAccountService.js @@ -286,8 +286,8 @@ class ClaudeAccountService { } } - // 🎯 智能选择可用账户 - async selectAvailableAccount() { + // 🎯 智能选择可用账户(支持sticky会话) + async selectAvailableAccount(sessionHash = null) { try { const accounts = await redis.getAllClaudeAccounts(); @@ -300,6 +300,24 @@ class ClaudeAccountService { throw new Error('No active Claude accounts available'); } + // 如果有会话哈希,检查是否有已映射的账户 + if (sessionHash) { + const mappedAccountId = await redis.getSessionAccountMapping(sessionHash); + if (mappedAccountId) { + // 验证映射的账户是否仍然可用 + const mappedAccount = activeAccounts.find(acc => acc.id === mappedAccountId); + if (mappedAccount) { + logger.info(`🎯 Using sticky session account: ${mappedAccount.name} (${mappedAccountId}) for session ${sessionHash}`); + return mappedAccountId; + } else { + logger.warn(`⚠️ Mapped account ${mappedAccountId} is no longer available, selecting new account`); + // 清理无效的映射 + await redis.deleteSessionAccountMapping(sessionHash); + } + } + } + + // 如果没有映射或映射无效,选择新账户 // 优先选择最近刷新过token的账户 const sortedAccounts = activeAccounts.sort((a, b) => { const aLastRefresh = new Date(a.lastRefreshAt || 0).getTime(); @@ -307,7 +325,15 @@ class ClaudeAccountService { return bLastRefresh - aLastRefresh; }); - return sortedAccounts[0].id; + const selectedAccountId = sortedAccounts[0].id; + + // 如果有会话哈希,建立新的映射 + if (sessionHash) { + await redis.setSessionAccountMapping(sessionHash, selectedAccountId, 3600); // 1小时过期 + logger.info(`🎯 Created new sticky session mapping: ${sortedAccounts[0].name} (${selectedAccountId}) for session ${sessionHash}`); + } + + return selectedAccountId; } catch (error) { logger.error('❌ Failed to select available account:', error); throw error; diff --git a/src/services/claudeRelayService.js b/src/services/claudeRelayService.js index 4e12530e..521c7239 100644 --- a/src/services/claudeRelayService.js +++ b/src/services/claudeRelayService.js @@ -2,6 +2,7 @@ const https = require('https'); const { SocksProxyAgent } = require('socks-proxy-agent'); const { HttpsProxyAgent } = require('https-proxy-agent'); const claudeAccountService = require('./claudeAccountService'); +const sessionHelper = require('../utils/sessionHelper'); const logger = require('../utils/logger'); const config = require('../../config/config'); @@ -16,10 +17,13 @@ class ClaudeRelayService { // 🚀 转发请求到Claude API async relayRequest(requestBody, apiKeyData) { try { - // 选择可用的Claude账户 - const accountId = apiKeyData.claudeAccountId || await claudeAccountService.selectAvailableAccount(); + // 生成会话哈希用于sticky会话 + const sessionHash = sessionHelper.generateSessionHash(requestBody); - logger.info(`📤 Processing API request for key: ${apiKeyData.name || apiKeyData.id}, account: ${accountId}`); + // 选择可用的Claude账户(支持sticky会话) + const accountId = apiKeyData.claudeAccountId || await claudeAccountService.selectAvailableAccount(sessionHash); + + logger.info(`📤 Processing API request for key: ${apiKeyData.name || apiKeyData.id}, account: ${accountId}${sessionHash ? `, session: ${sessionHash}` : ''}`); // 获取有效的访问token const accessToken = await claudeAccountService.getValidAccessToken(accountId); @@ -224,10 +228,13 @@ class ClaudeRelayService { // 🌊 处理流式响应(带usage数据捕获) async relayStreamRequestWithUsageCapture(requestBody, apiKeyData, responseStream, usageCallback) { try { - // 选择可用的Claude账户 - const accountId = apiKeyData.claudeAccountId || await claudeAccountService.selectAvailableAccount(); + // 生成会话哈希用于sticky会话 + const sessionHash = sessionHelper.generateSessionHash(requestBody); - logger.info(`📡 Processing streaming API request with usage capture for key: ${apiKeyData.name || apiKeyData.id}, account: ${accountId}`); + // 选择可用的Claude账户(支持sticky会话) + const accountId = apiKeyData.claudeAccountId || await claudeAccountService.selectAvailableAccount(sessionHash); + + logger.info(`📡 Processing streaming API request with usage capture for key: ${apiKeyData.name || apiKeyData.id}, account: ${accountId}${sessionHash ? `, session: ${sessionHash}` : ''}`); // 获取有效的访问token const accessToken = await claudeAccountService.getValidAccessToken(accountId); diff --git a/src/utils/sessionHelper.js b/src/utils/sessionHelper.js new file mode 100644 index 00000000..6f040b82 --- /dev/null +++ b/src/utils/sessionHelper.js @@ -0,0 +1,118 @@ +const crypto = require('crypto'); +const logger = require('./logger'); + +class SessionHelper { + /** + * 生成会话哈希,用于sticky会话保持 + * 基于Anthropic的prompt caching机制,优先使用cacheable内容 + * @param {Object} requestBody - 请求体 + * @returns {string|null} - 32字符的会话哈希,如果无法生成则返回null + */ + generateSessionHash(requestBody) { + if (!requestBody || typeof requestBody !== 'object') { + return null; + } + + let cacheableContent = ''; + const system = requestBody.system || ''; + const messages = requestBody.messages || []; + + // 1. 优先提取带有cache_control: {"type": "ephemeral"}的内容 + // 检查system中的cacheable内容 + if (Array.isArray(system)) { + for (const part of system) { + if (part && part.cache_control && part.cache_control.type === 'ephemeral') { + cacheableContent += part.text || ''; + } + } + } + + // 检查messages中的cacheable内容 + for (const msg of messages) { + const content = msg.content || ''; + if (Array.isArray(content)) { + for (const part of content) { + if (part && part.cache_control && part.cache_control.type === 'ephemeral') { + if (part.type === 'text') { + cacheableContent += part.text || ''; + } + // 其他类型(如image)不参与hash计算 + } + } + } else if (typeof content === 'string' && msg.cache_control && msg.cache_control.type === 'ephemeral') { + // 罕见情况,但需要检查 + cacheableContent += content; + } + } + + // 2. 如果有cacheable内容,直接使用 + if (cacheableContent) { + const hash = crypto.createHash('sha256').update(cacheableContent).digest('hex').substring(0, 32); + logger.debug(`📋 Session hash generated from cacheable content: ${hash}`); + return hash; + } + + // 3. Fallback: 使用system内容 + if (system) { + let systemText = ''; + if (typeof system === 'string') { + systemText = system; + } else if (Array.isArray(system)) { + systemText = system.map(part => part.text || '').join(''); + } + + if (systemText) { + const hash = crypto.createHash('sha256').update(systemText).digest('hex').substring(0, 32); + logger.debug(`📋 Session hash generated from system content: ${hash}`); + return hash; + } + } + + // 4. 最后fallback: 使用第一条消息内容 + if (messages.length > 0) { + const firstMessage = messages[0]; + let firstMessageText = ''; + + if (typeof firstMessage.content === 'string') { + firstMessageText = firstMessage.content; + } else if (Array.isArray(firstMessage.content)) { + firstMessageText = firstMessage.content + .filter(part => part.type === 'text') + .map(part => part.text || '') + .join(''); + } + + if (firstMessageText) { + const hash = crypto.createHash('sha256').update(firstMessageText).digest('hex').substring(0, 32); + logger.debug(`📋 Session hash generated from first message: ${hash}`); + return hash; + } + } + + // 无法生成会话哈希 + logger.debug('📋 Unable to generate session hash - no suitable content found'); + return null; + } + + /** + * 获取会话的Redis键名 + * @param {string} sessionHash - 会话哈希 + * @returns {string} - Redis键名 + */ + getSessionRedisKey(sessionHash) { + return `session:${sessionHash}`; + } + + /** + * 验证会话哈希格式 + * @param {string} sessionHash - 会话哈希 + * @returns {boolean} - 是否有效 + */ + isValidSessionHash(sessionHash) { + return typeof sessionHash === 'string' && + sessionHash.length === 32 && + /^[a-f0-9]{32}$/.test(sessionHash); + } +} + +module.exports = new SessionHelper(); \ No newline at end of file