diff --git a/service/task_polling.go b/service/task_polling.go index 12395240e..dc85e579e 100644 --- a/service/task_polling.go +++ b/service/task_polling.go @@ -335,6 +335,8 @@ func updateVideoTasks(ctx context.Context, platform constant.TaskPlatform, chann if err := updateVideoSingleTask(ctx, adaptor, cacheGetChannel, taskId, taskM); err != nil { logger.LogError(ctx, fmt.Sprintf("Failed to update video task %s: %s", taskId, err.Error())) } + // sleep 1 second between each task to avoid hitting rate limits of upstream platforms + time.Sleep(1 * time.Second) } return nil } @@ -388,15 +390,33 @@ func updateVideoSingleTask(ctx context.Context, adaptor TaskPollingAdaptor, ch * task.Data = t.Data } else if taskResult, err = adaptor.ParseTaskResult(responseBody); err != nil { return fmt.Errorf("parseTaskResult failed for task %s: %w", taskId, err) - } else { - task.Data = redactVideoResponseBody(responseBody) } + task.Data = redactVideoResponseBody(responseBody) + logger.LogDebug(ctx, fmt.Sprintf("updateVideoSingleTask taskResult: %+v", taskResult)) now := time.Now().Unix() if taskResult.Status == "" { - taskResult = relaycommon.FailTaskInfo("upstream returned empty status") + //taskResult = relaycommon.FailTaskInfo("upstream returned empty status") + errorResult := &dto.GeneralErrorResponse{} + if err = common.Unmarshal(responseBody, &errorResult); err == nil { + openaiError := errorResult.TryToOpenAIError() + if openaiError != nil { + // 返回规范的 OpenAI 错误格式,提取错误信息,判断错误是否为任务失败 + if openaiError.Code == "429" { + // 429 错误通常表示请求过多或速率限制,暂时不认为是任务失败,保持原状态等待下一轮轮询 + return nil + } + + // 其他错误认为是任务失败,记录错误信息并更新任务状态 + taskResult = relaycommon.FailTaskInfo("upstream returned error") + } else { + // unknown error format, log original response + logger.LogError(ctx, fmt.Sprintf("Task %s returned empty status with unrecognized error format, response: %s", taskId, string(responseBody))) + taskResult = relaycommon.FailTaskInfo("upstream returned unrecognized message") + } + } } shouldRefund := false