mirror of
https://github.com/QuantumNous/new-api.git
synced 2026-03-30 00:46:42 +00:00
feat: disk request body cache (#2780)
* feat: 引入通用 HTTP BodyStorage/DiskCache 缓存配置与管理 - 新增 common/body_storage.go 提供 HTTP 请求体存储抽象和文件缓存能力 - 增加 common/disk_cache_config.go 支持全局磁盘缓存配置 - main.go 挂载缓存初始化流程 - 新增和补充 controller/performance.go (及 unix/windows) 用于缓存性能监控接口 - middleware/body_cleanup.go 自动清理缓存文件 - router 挂载相关接口 - 前端 settings 页面新增性能监控设置 PerformanceSetting - 优化缓存开关状态和模块热插拔能力 - 其他相关文件同步适配缓存扩展 * fix: 修复 BodyStorage 并发安全和错误处理问题 - 修复 diskStorage.Close() 竞态条件,先获取锁再执行 CAS - 为 memoryStorage 添加互斥锁和 closed 状态检查 - 修复 CreateBodyStorageFromReader 在磁盘存储失败时的回退逻辑 - 添加缓存命中统计调用 (IncrementDiskCacheHits/IncrementMemoryCacheHits) - 修复 gin.go 中 Seek 错误被忽略的问题 - 在 api-router 添加 BodyStorageCleanup 中间件 - 修复前端 formatBytes 对异常值的处理 Co-authored-by: Cursor <cursoragent@cursor.com> --------- Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
365
common/body_storage.go
Normal file
365
common/body_storage.go
Normal file
@@ -0,0 +1,365 @@
|
||||
package common
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// BodyStorage 请求体存储接口
|
||||
type BodyStorage interface {
|
||||
io.ReadSeeker
|
||||
io.Closer
|
||||
// Bytes 获取全部内容
|
||||
Bytes() ([]byte, error)
|
||||
// Size 获取数据大小
|
||||
Size() int64
|
||||
// IsDisk 是否是磁盘存储
|
||||
IsDisk() bool
|
||||
}
|
||||
|
||||
// ErrStorageClosed 存储已关闭错误
|
||||
var ErrStorageClosed = fmt.Errorf("body storage is closed")
|
||||
|
||||
// memoryStorage 内存存储实现
|
||||
type memoryStorage struct {
|
||||
data []byte
|
||||
reader *bytes.Reader
|
||||
size int64
|
||||
closed int32
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func newMemoryStorage(data []byte) *memoryStorage {
|
||||
size := int64(len(data))
|
||||
IncrementMemoryBuffers(size)
|
||||
return &memoryStorage{
|
||||
data: data,
|
||||
reader: bytes.NewReader(data),
|
||||
size: size,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *memoryStorage) Read(p []byte) (n int, err error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
if atomic.LoadInt32(&m.closed) == 1 {
|
||||
return 0, ErrStorageClosed
|
||||
}
|
||||
return m.reader.Read(p)
|
||||
}
|
||||
|
||||
func (m *memoryStorage) Seek(offset int64, whence int) (int64, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
if atomic.LoadInt32(&m.closed) == 1 {
|
||||
return 0, ErrStorageClosed
|
||||
}
|
||||
return m.reader.Seek(offset, whence)
|
||||
}
|
||||
|
||||
func (m *memoryStorage) Close() error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
if atomic.CompareAndSwapInt32(&m.closed, 0, 1) {
|
||||
DecrementMemoryBuffers(m.size)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memoryStorage) Bytes() ([]byte, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
if atomic.LoadInt32(&m.closed) == 1 {
|
||||
return nil, ErrStorageClosed
|
||||
}
|
||||
return m.data, nil
|
||||
}
|
||||
|
||||
func (m *memoryStorage) Size() int64 {
|
||||
return m.size
|
||||
}
|
||||
|
||||
func (m *memoryStorage) IsDisk() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// diskStorage 磁盘存储实现
|
||||
type diskStorage struct {
|
||||
file *os.File
|
||||
filePath string
|
||||
size int64
|
||||
closed int32
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func newDiskStorage(data []byte, cachePath string) (*diskStorage, error) {
|
||||
// 确定缓存目录
|
||||
dir := cachePath
|
||||
if dir == "" {
|
||||
dir = os.TempDir()
|
||||
}
|
||||
dir = filepath.Join(dir, "new-api-body-cache")
|
||||
|
||||
// 确保目录存在
|
||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||
return nil, fmt.Errorf("failed to create cache directory: %w", err)
|
||||
}
|
||||
|
||||
// 创建临时文件
|
||||
filename := fmt.Sprintf("body-%s-%d.tmp", uuid.New().String()[:8], time.Now().UnixNano())
|
||||
filePath := filepath.Join(dir, filename)
|
||||
|
||||
file, err := os.OpenFile(filePath, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0600)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create temp file: %w", err)
|
||||
}
|
||||
|
||||
// 写入数据
|
||||
n, err := file.Write(data)
|
||||
if err != nil {
|
||||
file.Close()
|
||||
os.Remove(filePath)
|
||||
return nil, fmt.Errorf("failed to write to temp file: %w", err)
|
||||
}
|
||||
|
||||
// 重置文件指针
|
||||
if _, err := file.Seek(0, io.SeekStart); err != nil {
|
||||
file.Close()
|
||||
os.Remove(filePath)
|
||||
return nil, fmt.Errorf("failed to seek temp file: %w", err)
|
||||
}
|
||||
|
||||
size := int64(n)
|
||||
IncrementDiskFiles(size)
|
||||
|
||||
return &diskStorage{
|
||||
file: file,
|
||||
filePath: filePath,
|
||||
size: size,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newDiskStorageFromReader(reader io.Reader, maxBytes int64, cachePath string) (*diskStorage, error) {
|
||||
// 确定缓存目录
|
||||
dir := cachePath
|
||||
if dir == "" {
|
||||
dir = os.TempDir()
|
||||
}
|
||||
dir = filepath.Join(dir, "new-api-body-cache")
|
||||
|
||||
// 确保目录存在
|
||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||
return nil, fmt.Errorf("failed to create cache directory: %w", err)
|
||||
}
|
||||
|
||||
// 创建临时文件
|
||||
filename := fmt.Sprintf("body-%s-%d.tmp", uuid.New().String()[:8], time.Now().UnixNano())
|
||||
filePath := filepath.Join(dir, filename)
|
||||
|
||||
file, err := os.OpenFile(filePath, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0600)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create temp file: %w", err)
|
||||
}
|
||||
|
||||
// 从 reader 读取并写入文件
|
||||
written, err := io.Copy(file, io.LimitReader(reader, maxBytes+1))
|
||||
if err != nil {
|
||||
file.Close()
|
||||
os.Remove(filePath)
|
||||
return nil, fmt.Errorf("failed to write to temp file: %w", err)
|
||||
}
|
||||
|
||||
if written > maxBytes {
|
||||
file.Close()
|
||||
os.Remove(filePath)
|
||||
return nil, ErrRequestBodyTooLarge
|
||||
}
|
||||
|
||||
// 重置文件指针
|
||||
if _, err := file.Seek(0, io.SeekStart); err != nil {
|
||||
file.Close()
|
||||
os.Remove(filePath)
|
||||
return nil, fmt.Errorf("failed to seek temp file: %w", err)
|
||||
}
|
||||
|
||||
IncrementDiskFiles(written)
|
||||
|
||||
return &diskStorage{
|
||||
file: file,
|
||||
filePath: filePath,
|
||||
size: written,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (d *diskStorage) Read(p []byte) (n int, err error) {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
if atomic.LoadInt32(&d.closed) == 1 {
|
||||
return 0, ErrStorageClosed
|
||||
}
|
||||
return d.file.Read(p)
|
||||
}
|
||||
|
||||
func (d *diskStorage) Seek(offset int64, whence int) (int64, error) {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
if atomic.LoadInt32(&d.closed) == 1 {
|
||||
return 0, ErrStorageClosed
|
||||
}
|
||||
return d.file.Seek(offset, whence)
|
||||
}
|
||||
|
||||
func (d *diskStorage) Close() error {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
if atomic.CompareAndSwapInt32(&d.closed, 0, 1) {
|
||||
d.file.Close()
|
||||
os.Remove(d.filePath)
|
||||
DecrementDiskFiles(d.size)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *diskStorage) Bytes() ([]byte, error) {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
|
||||
if atomic.LoadInt32(&d.closed) == 1 {
|
||||
return nil, ErrStorageClosed
|
||||
}
|
||||
|
||||
// 保存当前位置
|
||||
currentPos, err := d.file.Seek(0, io.SeekCurrent)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 移动到开头
|
||||
if _, err := d.file.Seek(0, io.SeekStart); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 读取全部内容
|
||||
data := make([]byte, d.size)
|
||||
_, err = io.ReadFull(d.file, data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 恢复位置
|
||||
if _, err := d.file.Seek(currentPos, io.SeekStart); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func (d *diskStorage) Size() int64 {
|
||||
return d.size
|
||||
}
|
||||
|
||||
func (d *diskStorage) IsDisk() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// CreateBodyStorage 根据数据大小创建合适的存储
|
||||
func CreateBodyStorage(data []byte) (BodyStorage, error) {
|
||||
size := int64(len(data))
|
||||
threshold := GetDiskCacheThresholdBytes()
|
||||
|
||||
// 检查是否应该使用磁盘缓存
|
||||
if IsDiskCacheEnabled() &&
|
||||
size >= threshold &&
|
||||
IsDiskCacheAvailable(size) {
|
||||
storage, err := newDiskStorage(data, GetDiskCachePath())
|
||||
if err != nil {
|
||||
// 如果磁盘存储失败,回退到内存存储
|
||||
SysError(fmt.Sprintf("failed to create disk storage, falling back to memory: %v", err))
|
||||
return newMemoryStorage(data), nil
|
||||
}
|
||||
return storage, nil
|
||||
}
|
||||
|
||||
return newMemoryStorage(data), nil
|
||||
}
|
||||
|
||||
// CreateBodyStorageFromReader 从 Reader 创建存储(用于大请求的流式处理)
|
||||
func CreateBodyStorageFromReader(reader io.Reader, contentLength int64, maxBytes int64) (BodyStorage, error) {
|
||||
threshold := GetDiskCacheThresholdBytes()
|
||||
|
||||
// 如果启用了磁盘缓存且内容长度超过阈值,直接使用磁盘存储
|
||||
if IsDiskCacheEnabled() &&
|
||||
contentLength > 0 &&
|
||||
contentLength >= threshold &&
|
||||
IsDiskCacheAvailable(contentLength) {
|
||||
storage, err := newDiskStorageFromReader(reader, maxBytes, GetDiskCachePath())
|
||||
if err != nil {
|
||||
if IsRequestBodyTooLargeError(err) {
|
||||
return nil, err
|
||||
}
|
||||
// 磁盘存储失败,reader 已被消费,无法安全回退
|
||||
// 直接返回错误而非尝试回退(因为 reader 数据已丢失)
|
||||
return nil, fmt.Errorf("disk storage creation failed: %w", err)
|
||||
}
|
||||
IncrementDiskCacheHits()
|
||||
return storage, nil
|
||||
}
|
||||
|
||||
// 使用内存读取
|
||||
data, err := io.ReadAll(io.LimitReader(reader, maxBytes+1))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if int64(len(data)) > maxBytes {
|
||||
return nil, ErrRequestBodyTooLarge
|
||||
}
|
||||
|
||||
storage, err := CreateBodyStorage(data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// 如果最终使用内存存储,记录内存缓存命中
|
||||
if !storage.IsDisk() {
|
||||
IncrementMemoryCacheHits()
|
||||
} else {
|
||||
IncrementDiskCacheHits()
|
||||
}
|
||||
return storage, nil
|
||||
}
|
||||
|
||||
// CleanupOldCacheFiles 清理旧的缓存文件(用于启动时清理残留)
|
||||
func CleanupOldCacheFiles() {
|
||||
cachePath := GetDiskCachePath()
|
||||
if cachePath == "" {
|
||||
cachePath = os.TempDir()
|
||||
}
|
||||
dir := filepath.Join(cachePath, "new-api-body-cache")
|
||||
|
||||
entries, err := os.ReadDir(dir)
|
||||
if err != nil {
|
||||
return // 目录不存在或无法读取
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
for _, entry := range entries {
|
||||
if entry.IsDir() {
|
||||
continue
|
||||
}
|
||||
info, err := entry.Info()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
// 删除超过 5 分钟的旧文件
|
||||
if now.Sub(info.ModTime()) > 5*time.Minute {
|
||||
os.Remove(filepath.Join(dir, entry.Name()))
|
||||
}
|
||||
}
|
||||
}
|
||||
156
common/disk_cache_config.go
Normal file
156
common/disk_cache_config.go
Normal file
@@ -0,0 +1,156 @@
|
||||
package common
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// DiskCacheConfig 磁盘缓存配置(由 performance_setting 包更新)
|
||||
type DiskCacheConfig struct {
|
||||
// Enabled 是否启用磁盘缓存
|
||||
Enabled bool
|
||||
// ThresholdMB 触发磁盘缓存的请求体大小阈值(MB)
|
||||
ThresholdMB int
|
||||
// MaxSizeMB 磁盘缓存最大总大小(MB)
|
||||
MaxSizeMB int
|
||||
// Path 磁盘缓存目录
|
||||
Path string
|
||||
}
|
||||
|
||||
// 全局磁盘缓存配置
|
||||
var diskCacheConfig = DiskCacheConfig{
|
||||
Enabled: false,
|
||||
ThresholdMB: 10,
|
||||
MaxSizeMB: 1024,
|
||||
Path: "",
|
||||
}
|
||||
var diskCacheConfigMu sync.RWMutex
|
||||
|
||||
// GetDiskCacheConfig 获取磁盘缓存配置
|
||||
func GetDiskCacheConfig() DiskCacheConfig {
|
||||
diskCacheConfigMu.RLock()
|
||||
defer diskCacheConfigMu.RUnlock()
|
||||
return diskCacheConfig
|
||||
}
|
||||
|
||||
// SetDiskCacheConfig 设置磁盘缓存配置
|
||||
func SetDiskCacheConfig(config DiskCacheConfig) {
|
||||
diskCacheConfigMu.Lock()
|
||||
defer diskCacheConfigMu.Unlock()
|
||||
diskCacheConfig = config
|
||||
}
|
||||
|
||||
// IsDiskCacheEnabled 是否启用磁盘缓存
|
||||
func IsDiskCacheEnabled() bool {
|
||||
diskCacheConfigMu.RLock()
|
||||
defer diskCacheConfigMu.RUnlock()
|
||||
return diskCacheConfig.Enabled
|
||||
}
|
||||
|
||||
// GetDiskCacheThresholdBytes 获取磁盘缓存阈值(字节)
|
||||
func GetDiskCacheThresholdBytes() int64 {
|
||||
diskCacheConfigMu.RLock()
|
||||
defer diskCacheConfigMu.RUnlock()
|
||||
return int64(diskCacheConfig.ThresholdMB) << 20
|
||||
}
|
||||
|
||||
// GetDiskCacheMaxSizeBytes 获取磁盘缓存最大大小(字节)
|
||||
func GetDiskCacheMaxSizeBytes() int64 {
|
||||
diskCacheConfigMu.RLock()
|
||||
defer diskCacheConfigMu.RUnlock()
|
||||
return int64(diskCacheConfig.MaxSizeMB) << 20
|
||||
}
|
||||
|
||||
// GetDiskCachePath 获取磁盘缓存目录
|
||||
func GetDiskCachePath() string {
|
||||
diskCacheConfigMu.RLock()
|
||||
defer diskCacheConfigMu.RUnlock()
|
||||
return diskCacheConfig.Path
|
||||
}
|
||||
|
||||
// DiskCacheStats 磁盘缓存统计信息
|
||||
type DiskCacheStats struct {
|
||||
// 当前活跃的磁盘缓存文件数
|
||||
ActiveDiskFiles int64 `json:"active_disk_files"`
|
||||
// 当前磁盘缓存总大小(字节)
|
||||
CurrentDiskUsageBytes int64 `json:"current_disk_usage_bytes"`
|
||||
// 当前内存缓存数量
|
||||
ActiveMemoryBuffers int64 `json:"active_memory_buffers"`
|
||||
// 当前内存缓存总大小(字节)
|
||||
CurrentMemoryUsageBytes int64 `json:"current_memory_usage_bytes"`
|
||||
// 磁盘缓存命中次数
|
||||
DiskCacheHits int64 `json:"disk_cache_hits"`
|
||||
// 内存缓存命中次数
|
||||
MemoryCacheHits int64 `json:"memory_cache_hits"`
|
||||
// 磁盘缓存最大限制(字节)
|
||||
DiskCacheMaxBytes int64 `json:"disk_cache_max_bytes"`
|
||||
// 磁盘缓存阈值(字节)
|
||||
DiskCacheThresholdBytes int64 `json:"disk_cache_threshold_bytes"`
|
||||
}
|
||||
|
||||
var diskCacheStats DiskCacheStats
|
||||
|
||||
// GetDiskCacheStats 获取缓存统计信息
|
||||
func GetDiskCacheStats() DiskCacheStats {
|
||||
stats := DiskCacheStats{
|
||||
ActiveDiskFiles: atomic.LoadInt64(&diskCacheStats.ActiveDiskFiles),
|
||||
CurrentDiskUsageBytes: atomic.LoadInt64(&diskCacheStats.CurrentDiskUsageBytes),
|
||||
ActiveMemoryBuffers: atomic.LoadInt64(&diskCacheStats.ActiveMemoryBuffers),
|
||||
CurrentMemoryUsageBytes: atomic.LoadInt64(&diskCacheStats.CurrentMemoryUsageBytes),
|
||||
DiskCacheHits: atomic.LoadInt64(&diskCacheStats.DiskCacheHits),
|
||||
MemoryCacheHits: atomic.LoadInt64(&diskCacheStats.MemoryCacheHits),
|
||||
DiskCacheMaxBytes: GetDiskCacheMaxSizeBytes(),
|
||||
DiskCacheThresholdBytes: GetDiskCacheThresholdBytes(),
|
||||
}
|
||||
return stats
|
||||
}
|
||||
|
||||
// IncrementDiskFiles 增加磁盘文件计数
|
||||
func IncrementDiskFiles(size int64) {
|
||||
atomic.AddInt64(&diskCacheStats.ActiveDiskFiles, 1)
|
||||
atomic.AddInt64(&diskCacheStats.CurrentDiskUsageBytes, size)
|
||||
}
|
||||
|
||||
// DecrementDiskFiles 减少磁盘文件计数
|
||||
func DecrementDiskFiles(size int64) {
|
||||
atomic.AddInt64(&diskCacheStats.ActiveDiskFiles, -1)
|
||||
atomic.AddInt64(&diskCacheStats.CurrentDiskUsageBytes, -size)
|
||||
}
|
||||
|
||||
// IncrementMemoryBuffers 增加内存缓存计数
|
||||
func IncrementMemoryBuffers(size int64) {
|
||||
atomic.AddInt64(&diskCacheStats.ActiveMemoryBuffers, 1)
|
||||
atomic.AddInt64(&diskCacheStats.CurrentMemoryUsageBytes, size)
|
||||
}
|
||||
|
||||
// DecrementMemoryBuffers 减少内存缓存计数
|
||||
func DecrementMemoryBuffers(size int64) {
|
||||
atomic.AddInt64(&diskCacheStats.ActiveMemoryBuffers, -1)
|
||||
atomic.AddInt64(&diskCacheStats.CurrentMemoryUsageBytes, -size)
|
||||
}
|
||||
|
||||
// IncrementDiskCacheHits 增加磁盘缓存命中次数
|
||||
func IncrementDiskCacheHits() {
|
||||
atomic.AddInt64(&diskCacheStats.DiskCacheHits, 1)
|
||||
}
|
||||
|
||||
// IncrementMemoryCacheHits 增加内存缓存命中次数
|
||||
func IncrementMemoryCacheHits() {
|
||||
atomic.AddInt64(&diskCacheStats.MemoryCacheHits, 1)
|
||||
}
|
||||
|
||||
// ResetDiskCacheStats 重置统计信息(不重置当前使用量)
|
||||
func ResetDiskCacheStats() {
|
||||
atomic.StoreInt64(&diskCacheStats.DiskCacheHits, 0)
|
||||
atomic.StoreInt64(&diskCacheStats.MemoryCacheHits, 0)
|
||||
}
|
||||
|
||||
// IsDiskCacheAvailable 检查是否可以创建新的磁盘缓存
|
||||
func IsDiskCacheAvailable(requestSize int64) bool {
|
||||
if !IsDiskCacheEnabled() {
|
||||
return false
|
||||
}
|
||||
maxBytes := GetDiskCacheMaxSizeBytes()
|
||||
currentUsage := atomic.LoadInt64(&diskCacheStats.CurrentDiskUsageBytes)
|
||||
return currentUsage+requestSize <= maxBytes
|
||||
}
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
)
|
||||
|
||||
const KeyRequestBody = "key_request_body"
|
||||
const KeyBodyStorage = "key_body_storage"
|
||||
|
||||
var ErrRequestBodyTooLarge = errors.New("request body too large")
|
||||
|
||||
@@ -33,42 +34,99 @@ func IsRequestBodyTooLargeError(err error) bool {
|
||||
}
|
||||
|
||||
func GetRequestBody(c *gin.Context) ([]byte, error) {
|
||||
// 首先检查是否有 BodyStorage 缓存
|
||||
if storage, exists := c.Get(KeyBodyStorage); exists && storage != nil {
|
||||
if bs, ok := storage.(BodyStorage); ok {
|
||||
if _, err := bs.Seek(0, io.SeekStart); err != nil {
|
||||
return nil, fmt.Errorf("failed to seek body storage: %w", err)
|
||||
}
|
||||
return bs.Bytes()
|
||||
}
|
||||
}
|
||||
|
||||
// 检查旧的缓存方式
|
||||
cached, exists := c.Get(KeyRequestBody)
|
||||
if exists && cached != nil {
|
||||
if b, ok := cached.([]byte); ok {
|
||||
return b, nil
|
||||
}
|
||||
}
|
||||
|
||||
maxMB := constant.MaxRequestBodyMB
|
||||
if maxMB <= 0 {
|
||||
// no limit
|
||||
body, err := io.ReadAll(c.Request.Body)
|
||||
_ = c.Request.Body.Close()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.Set(KeyRequestBody, body)
|
||||
return body, nil
|
||||
maxMB = 128 // 默认 128MB
|
||||
}
|
||||
maxBytes := int64(maxMB) << 20
|
||||
|
||||
limited := io.LimitReader(c.Request.Body, maxBytes+1)
|
||||
body, err := io.ReadAll(limited)
|
||||
contentLength := c.Request.ContentLength
|
||||
|
||||
// 使用新的存储系统
|
||||
storage, err := CreateBodyStorageFromReader(c.Request.Body, contentLength, maxBytes)
|
||||
_ = c.Request.Body.Close()
|
||||
|
||||
if err != nil {
|
||||
_ = c.Request.Body.Close()
|
||||
if IsRequestBodyTooLargeError(err) {
|
||||
return nil, errors.Wrap(ErrRequestBodyTooLarge, fmt.Sprintf("request body exceeds %d MB", maxMB))
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
_ = c.Request.Body.Close()
|
||||
if int64(len(body)) > maxBytes {
|
||||
return nil, errors.Wrap(ErrRequestBodyTooLarge, fmt.Sprintf("request body exceeds %d MB", maxMB))
|
||||
|
||||
// 缓存存储对象
|
||||
c.Set(KeyBodyStorage, storage)
|
||||
|
||||
// 获取字节数据
|
||||
body, err := storage.Bytes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 同时设置旧的缓存键以保持兼容性
|
||||
c.Set(KeyRequestBody, body)
|
||||
|
||||
return body, nil
|
||||
}
|
||||
|
||||
// GetBodyStorage 获取请求体存储对象(用于需要多次读取的场景)
|
||||
func GetBodyStorage(c *gin.Context) (BodyStorage, error) {
|
||||
// 检查是否已有存储
|
||||
if storage, exists := c.Get(KeyBodyStorage); exists && storage != nil {
|
||||
if bs, ok := storage.(BodyStorage); ok {
|
||||
if _, err := bs.Seek(0, io.SeekStart); err != nil {
|
||||
return nil, fmt.Errorf("failed to seek body storage: %w", err)
|
||||
}
|
||||
return bs, nil
|
||||
}
|
||||
}
|
||||
|
||||
// 如果没有,调用 GetRequestBody 创建存储
|
||||
_, err := GetRequestBody(c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 再次获取存储
|
||||
if storage, exists := c.Get(KeyBodyStorage); exists && storage != nil {
|
||||
if bs, ok := storage.(BodyStorage); ok {
|
||||
if _, err := bs.Seek(0, io.SeekStart); err != nil {
|
||||
return nil, fmt.Errorf("failed to seek body storage: %w", err)
|
||||
}
|
||||
return bs, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, errors.New("failed to get body storage")
|
||||
}
|
||||
|
||||
// CleanupBodyStorage 清理请求体存储(应在请求结束时调用)
|
||||
func CleanupBodyStorage(c *gin.Context) {
|
||||
if storage, exists := c.Get(KeyBodyStorage); exists && storage != nil {
|
||||
if bs, ok := storage.(BodyStorage); ok {
|
||||
bs.Close()
|
||||
}
|
||||
c.Set(KeyBodyStorage, nil)
|
||||
}
|
||||
}
|
||||
|
||||
func UnmarshalBodyReusable(c *gin.Context, v any) error {
|
||||
requestBody, err := GetRequestBody(c)
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user