feat: 添加智能sticky会话保持功能

- 新增 sessionHelper.js 实现会话哈希生成,基于Anthropic的prompt caching机制
- 扩展 claudeAccountService.selectAvailableAccount 支持会话绑定
- 更新 claudeRelayService 集成会话保持功能
- 添加 Redis 会话映射存储,支持1小时过期
- 支持故障转移:绑定账户不可用时自动重新选择
- 三级fallback策略:cacheable内容 → system内容 → 第一条消息
- 完全向后兼容,自动启用无需配置

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
shaw
2025-07-15 18:15:53 +08:00
parent 2257b42527
commit b2b8d8c719
4 changed files with 180 additions and 13 deletions

View File

@@ -405,20 +405,20 @@ class RedisClient {
return await this.client.del(key);
}
// 🔐 会话管理
// 🔐 会话管理(用于管理员登录等)
async setSession(sessionId, sessionData, ttl = 86400) {
const key = `session:${sessionId}`;
const key = `admin_session:${sessionId}`;
await this.client.hset(key, sessionData);
await this.client.expire(key, ttl);
}
async getSession(sessionId) {
const key = `session:${sessionId}`;
const key = `admin_session:${sessionId}`;
return await this.client.hgetall(key);
}
async deleteSession(sessionId) {
const key = `session:${sessionId}`;
const key = `admin_session:${sessionId}`;
return await this.client.del(key);
}
@@ -640,6 +640,22 @@ class RedisClient {
}
}
// 🔗 会话sticky映射管理
async setSessionAccountMapping(sessionHash, accountId, ttl = 3600) {
const key = `session:${sessionHash}`;
await this.client.set(key, accountId, 'EX', ttl);
}
async getSessionAccountMapping(sessionHash) {
const key = `session:${sessionHash}`;
return await this.client.get(key);
}
async deleteSessionAccountMapping(sessionHash) {
const key = `session:${sessionHash}`;
return await this.client.del(key);
}
// 🧹 清理过期数据
async cleanup() {
try {

View File

@@ -286,8 +286,8 @@ class ClaudeAccountService {
}
}
// 🎯 智能选择可用账户
async selectAvailableAccount() {
// 🎯 智能选择可用账户支持sticky会话
async selectAvailableAccount(sessionHash = null) {
try {
const accounts = await redis.getAllClaudeAccounts();
@@ -300,6 +300,24 @@ class ClaudeAccountService {
throw new Error('No active Claude accounts available');
}
// 如果有会话哈希,检查是否有已映射的账户
if (sessionHash) {
const mappedAccountId = await redis.getSessionAccountMapping(sessionHash);
if (mappedAccountId) {
// 验证映射的账户是否仍然可用
const mappedAccount = activeAccounts.find(acc => acc.id === mappedAccountId);
if (mappedAccount) {
logger.info(`🎯 Using sticky session account: ${mappedAccount.name} (${mappedAccountId}) for session ${sessionHash}`);
return mappedAccountId;
} else {
logger.warn(`⚠️ Mapped account ${mappedAccountId} is no longer available, selecting new account`);
// 清理无效的映射
await redis.deleteSessionAccountMapping(sessionHash);
}
}
}
// 如果没有映射或映射无效,选择新账户
// 优先选择最近刷新过token的账户
const sortedAccounts = activeAccounts.sort((a, b) => {
const aLastRefresh = new Date(a.lastRefreshAt || 0).getTime();
@@ -307,7 +325,15 @@ class ClaudeAccountService {
return bLastRefresh - aLastRefresh;
});
return sortedAccounts[0].id;
const selectedAccountId = sortedAccounts[0].id;
// 如果有会话哈希,建立新的映射
if (sessionHash) {
await redis.setSessionAccountMapping(sessionHash, selectedAccountId, 3600); // 1小时过期
logger.info(`🎯 Created new sticky session mapping: ${sortedAccounts[0].name} (${selectedAccountId}) for session ${sessionHash}`);
}
return selectedAccountId;
} catch (error) {
logger.error('❌ Failed to select available account:', error);
throw error;

View File

@@ -2,6 +2,7 @@ const https = require('https');
const { SocksProxyAgent } = require('socks-proxy-agent');
const { HttpsProxyAgent } = require('https-proxy-agent');
const claudeAccountService = require('./claudeAccountService');
const sessionHelper = require('../utils/sessionHelper');
const logger = require('../utils/logger');
const config = require('../../config/config');
@@ -16,10 +17,13 @@ class ClaudeRelayService {
// 🚀 转发请求到Claude API
async relayRequest(requestBody, apiKeyData) {
try {
// 选择可用的Claude账户
const accountId = apiKeyData.claudeAccountId || await claudeAccountService.selectAvailableAccount();
// 生成会话哈希用于sticky会话
const sessionHash = sessionHelper.generateSessionHash(requestBody);
logger.info(`📤 Processing API request for key: ${apiKeyData.name || apiKeyData.id}, account: ${accountId}`);
// 选择可用的Claude账户支持sticky会话
const accountId = apiKeyData.claudeAccountId || await claudeAccountService.selectAvailableAccount(sessionHash);
logger.info(`📤 Processing API request for key: ${apiKeyData.name || apiKeyData.id}, account: ${accountId}${sessionHash ? `, session: ${sessionHash}` : ''}`);
// 获取有效的访问token
const accessToken = await claudeAccountService.getValidAccessToken(accountId);
@@ -224,10 +228,13 @@ class ClaudeRelayService {
// 🌊 处理流式响应带usage数据捕获
async relayStreamRequestWithUsageCapture(requestBody, apiKeyData, responseStream, usageCallback) {
try {
// 选择可用的Claude账户
const accountId = apiKeyData.claudeAccountId || await claudeAccountService.selectAvailableAccount();
// 生成会话哈希用于sticky会话
const sessionHash = sessionHelper.generateSessionHash(requestBody);
logger.info(`📡 Processing streaming API request with usage capture for key: ${apiKeyData.name || apiKeyData.id}, account: ${accountId}`);
// 选择可用的Claude账户支持sticky会话
const accountId = apiKeyData.claudeAccountId || await claudeAccountService.selectAvailableAccount(sessionHash);
logger.info(`📡 Processing streaming API request with usage capture for key: ${apiKeyData.name || apiKeyData.id}, account: ${accountId}${sessionHash ? `, session: ${sessionHash}` : ''}`);
// 获取有效的访问token
const accessToken = await claudeAccountService.getValidAccessToken(accountId);

118
src/utils/sessionHelper.js Normal file
View File

@@ -0,0 +1,118 @@
const crypto = require('crypto');
const logger = require('./logger');
class SessionHelper {
/**
* 生成会话哈希用于sticky会话保持
* 基于Anthropic的prompt caching机制优先使用cacheable内容
* @param {Object} requestBody - 请求体
* @returns {string|null} - 32字符的会话哈希如果无法生成则返回null
*/
generateSessionHash(requestBody) {
if (!requestBody || typeof requestBody !== 'object') {
return null;
}
let cacheableContent = '';
const system = requestBody.system || '';
const messages = requestBody.messages || [];
// 1. 优先提取带有cache_control: {"type": "ephemeral"}的内容
// 检查system中的cacheable内容
if (Array.isArray(system)) {
for (const part of system) {
if (part && part.cache_control && part.cache_control.type === 'ephemeral') {
cacheableContent += part.text || '';
}
}
}
// 检查messages中的cacheable内容
for (const msg of messages) {
const content = msg.content || '';
if (Array.isArray(content)) {
for (const part of content) {
if (part && part.cache_control && part.cache_control.type === 'ephemeral') {
if (part.type === 'text') {
cacheableContent += part.text || '';
}
// 其他类型如image不参与hash计算
}
}
} else if (typeof content === 'string' && msg.cache_control && msg.cache_control.type === 'ephemeral') {
// 罕见情况,但需要检查
cacheableContent += content;
}
}
// 2. 如果有cacheable内容直接使用
if (cacheableContent) {
const hash = crypto.createHash('sha256').update(cacheableContent).digest('hex').substring(0, 32);
logger.debug(`📋 Session hash generated from cacheable content: ${hash}`);
return hash;
}
// 3. Fallback: 使用system内容
if (system) {
let systemText = '';
if (typeof system === 'string') {
systemText = system;
} else if (Array.isArray(system)) {
systemText = system.map(part => part.text || '').join('');
}
if (systemText) {
const hash = crypto.createHash('sha256').update(systemText).digest('hex').substring(0, 32);
logger.debug(`📋 Session hash generated from system content: ${hash}`);
return hash;
}
}
// 4. 最后fallback: 使用第一条消息内容
if (messages.length > 0) {
const firstMessage = messages[0];
let firstMessageText = '';
if (typeof firstMessage.content === 'string') {
firstMessageText = firstMessage.content;
} else if (Array.isArray(firstMessage.content)) {
firstMessageText = firstMessage.content
.filter(part => part.type === 'text')
.map(part => part.text || '')
.join('');
}
if (firstMessageText) {
const hash = crypto.createHash('sha256').update(firstMessageText).digest('hex').substring(0, 32);
logger.debug(`📋 Session hash generated from first message: ${hash}`);
return hash;
}
}
// 无法生成会话哈希
logger.debug('📋 Unable to generate session hash - no suitable content found');
return null;
}
/**
* 获取会话的Redis键名
* @param {string} sessionHash - 会话哈希
* @returns {string} - Redis键名
*/
getSessionRedisKey(sessionHash) {
return `session:${sessionHash}`;
}
/**
* 验证会话哈希格式
* @param {string} sessionHash - 会话哈希
* @returns {boolean} - 是否有效
*/
isValidSessionHash(sessionHash) {
return typeof sessionHash === 'string' &&
sessionHash.length === 32 &&
/^[a-f0-9]{32}$/.test(sessionHash);
}
}
module.exports = new SessionHelper();