mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-19 12:28:37 +00:00
fix: decouple Discord inbound worker timeout from listener timeout (#36602) (thanks @dutifulbob) (#36602)
Co-authored-by: Onur Solmaz <2453968+osolmaz@users.noreply.github.com>
This commit is contained in:
@@ -144,6 +144,7 @@ Docs: https://docs.openclaw.ai
|
|||||||
- Agents/failover service-unavailable handling: stop treating bare proxy/CDN `service unavailable` errors as provider overload while keeping them retryable via the timeout/failover path, so transient outages no longer show false rate-limit warnings or block fallback. (#36646) thanks @jnMetaCode.
|
- Agents/failover service-unavailable handling: stop treating bare proxy/CDN `service unavailable` errors as provider overload while keeping them retryable via the timeout/failover path, so transient outages no longer show false rate-limit warnings or block fallback. (#36646) thanks @jnMetaCode.
|
||||||
- Agents/current-time UTC anchor: append a machine-readable UTC suffix alongside local `Current time:` lines in shared cron-style prompt contexts so agents can compare UTC-stamped workspace timestamps without doing timezone math. (#32423) thanks @jriff.
|
- Agents/current-time UTC anchor: append a machine-readable UTC suffix alongside local `Current time:` lines in shared cron-style prompt contexts so agents can compare UTC-stamped workspace timestamps without doing timezone math. (#32423) thanks @jriff.
|
||||||
- TUI/webchat command-owner scope alignment: treat internal-channel gateway sessions with `operator.admin` as owner-authorized in command auth, restoring cron/gateway/connector tool access for affected TUI/webchat sessions while keeping external channels on identity-based owner checks. (from #35666, #35673, #35704) Thanks @Naylenv, @Octane0411, and @Sid-Qin.
|
- TUI/webchat command-owner scope alignment: treat internal-channel gateway sessions with `operator.admin` as owner-authorized in command auth, restoring cron/gateway/connector tool access for affected TUI/webchat sessions while keeping external channels on identity-based owner checks. (from #35666, #35673, #35704) Thanks @Naylenv, @Octane0411, and @Sid-Qin.
|
||||||
|
- Discord/inbound timeout isolation: separate inbound worker timeout tracking from listener timeout budgets so queued Discord replies are no longer dropped when listener watchdog windows expire mid-run. (#36602) Thanks @dutifulbob.
|
||||||
|
|
||||||
## 2026.3.2
|
## 2026.3.2
|
||||||
|
|
||||||
|
|||||||
@@ -1102,12 +1102,19 @@ openclaw logs --follow
|
|||||||
|
|
||||||
- `Listener DiscordMessageListener timed out after 30000ms for event MESSAGE_CREATE`
|
- `Listener DiscordMessageListener timed out after 30000ms for event MESSAGE_CREATE`
|
||||||
- `Slow listener detected ...`
|
- `Slow listener detected ...`
|
||||||
|
- `discord inbound worker timed out after ...`
|
||||||
|
|
||||||
Canonical knob:
|
Listener budget knob:
|
||||||
|
|
||||||
- single-account: `channels.discord.eventQueue.listenerTimeout`
|
- single-account: `channels.discord.eventQueue.listenerTimeout`
|
||||||
- multi-account: `channels.discord.accounts.<accountId>.eventQueue.listenerTimeout`
|
- multi-account: `channels.discord.accounts.<accountId>.eventQueue.listenerTimeout`
|
||||||
|
|
||||||
|
Worker run timeout knob:
|
||||||
|
|
||||||
|
- single-account: `channels.discord.inboundWorker.runTimeoutMs`
|
||||||
|
- multi-account: `channels.discord.accounts.<accountId>.inboundWorker.runTimeoutMs`
|
||||||
|
- default: `1800000` (30 minutes); set `0` to disable
|
||||||
|
|
||||||
Recommended baseline:
|
Recommended baseline:
|
||||||
|
|
||||||
```json5
|
```json5
|
||||||
@@ -1119,6 +1126,9 @@ openclaw logs --follow
|
|||||||
eventQueue: {
|
eventQueue: {
|
||||||
listenerTimeout: 120000,
|
listenerTimeout: 120000,
|
||||||
},
|
},
|
||||||
|
inboundWorker: {
|
||||||
|
runTimeoutMs: 1800000,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@@ -1126,7 +1136,8 @@ openclaw logs --follow
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
Tune this first before adding alternate timeout controls elsewhere.
|
Use `eventQueue.listenerTimeout` for slow listener setup and `inboundWorker.runTimeoutMs`
|
||||||
|
only if you want a separate safety valve for queued agent turns.
|
||||||
|
|
||||||
</Accordion>
|
</Accordion>
|
||||||
|
|
||||||
@@ -1177,7 +1188,8 @@ High-signal Discord fields:
|
|||||||
- startup/auth: `enabled`, `token`, `accounts.*`, `allowBots`
|
- startup/auth: `enabled`, `token`, `accounts.*`, `allowBots`
|
||||||
- policy: `groupPolicy`, `dm.*`, `guilds.*`, `guilds.*.channels.*`
|
- policy: `groupPolicy`, `dm.*`, `guilds.*`, `guilds.*.channels.*`
|
||||||
- command: `commands.native`, `commands.useAccessGroups`, `configWrites`, `slashCommand.*`
|
- command: `commands.native`, `commands.useAccessGroups`, `configWrites`, `slashCommand.*`
|
||||||
- event queue: `eventQueue.listenerTimeout` (canonical), `eventQueue.maxQueueSize`, `eventQueue.maxConcurrency`
|
- event queue: `eventQueue.listenerTimeout` (listener budget), `eventQueue.maxQueueSize`, `eventQueue.maxConcurrency`
|
||||||
|
- inbound worker: `inboundWorker.runTimeoutMs`
|
||||||
- reply/history: `replyToMode`, `historyLimit`, `dmHistoryLimit`, `dms.*.historyLimit`
|
- reply/history: `replyToMode`, `historyLimit`, `dmHistoryLimit`, `dms.*.historyLimit`
|
||||||
- delivery: `textChunkLimit`, `chunkMode`, `maxLinesPerMessage`
|
- delivery: `textChunkLimit`, `chunkMode`, `maxLinesPerMessage`
|
||||||
- streaming: `streaming` (legacy alias: `streamMode`), `draftChunk`, `blockStreaming`, `blockStreamingCoalesce`
|
- streaming: `streaming` (legacy alias: `streamMode`), `draftChunk`, `blockStreaming`, `blockStreamingCoalesce`
|
||||||
|
|||||||
337
docs/experiments/plans/discord-async-inbound-worker.md
Normal file
337
docs/experiments/plans/discord-async-inbound-worker.md
Normal file
@@ -0,0 +1,337 @@
|
|||||||
|
---
|
||||||
|
summary: "Status and next steps for decoupling Discord gateway listeners from long-running agent turns with a Discord-specific inbound worker"
|
||||||
|
owner: "openclaw"
|
||||||
|
status: "in_progress"
|
||||||
|
last_updated: "2026-03-05"
|
||||||
|
title: "Discord Async Inbound Worker Plan"
|
||||||
|
---
|
||||||
|
|
||||||
|
# Discord Async Inbound Worker Plan
|
||||||
|
|
||||||
|
## Objective
|
||||||
|
|
||||||
|
Remove Discord listener timeout as a user-facing failure mode by making inbound Discord turns asynchronous:
|
||||||
|
|
||||||
|
1. Gateway listener accepts and normalizes inbound events quickly.
|
||||||
|
2. A Discord run queue stores serialized jobs keyed by the same ordering boundary we use today.
|
||||||
|
3. A worker executes the actual agent turn outside the Carbon listener lifetime.
|
||||||
|
4. Replies are delivered back to the originating channel or thread after the run completes.
|
||||||
|
|
||||||
|
This is the long-term fix for queued Discord runs timing out at `channels.discord.eventQueue.listenerTimeout` while the agent run itself is still making progress.
|
||||||
|
|
||||||
|
## Current status
|
||||||
|
|
||||||
|
This plan is partially implemented.
|
||||||
|
|
||||||
|
Already done:
|
||||||
|
|
||||||
|
- Discord listener timeout and Discord run timeout are now separate settings.
|
||||||
|
- Accepted inbound Discord turns are enqueued into `src/discord/monitor/inbound-worker.ts`.
|
||||||
|
- The worker now owns the long-running turn instead of the Carbon listener.
|
||||||
|
- Existing per-route ordering is preserved by queue key.
|
||||||
|
- Timeout regression coverage exists for the Discord worker path.
|
||||||
|
|
||||||
|
What this means in plain language:
|
||||||
|
|
||||||
|
- the production timeout bug is fixed
|
||||||
|
- the long-running turn no longer dies just because the Discord listener budget expires
|
||||||
|
- the worker architecture is not finished yet
|
||||||
|
|
||||||
|
What is still missing:
|
||||||
|
|
||||||
|
- `DiscordInboundJob` is still only partially normalized and still carries live runtime references
|
||||||
|
- command semantics (`stop`, `new`, `reset`, future session controls) are not yet fully worker-native
|
||||||
|
- worker observability and operator status are still minimal
|
||||||
|
- there is still no restart durability
|
||||||
|
|
||||||
|
## Why this exists
|
||||||
|
|
||||||
|
Current behavior ties the full agent turn to the listener lifetime:
|
||||||
|
|
||||||
|
- `src/discord/monitor/listeners.ts` applies the timeout and abort boundary.
|
||||||
|
- `src/discord/monitor/message-handler.ts` keeps the queued run inside that boundary.
|
||||||
|
- `src/discord/monitor/message-handler.process.ts` performs media loading, routing, dispatch, typing, draft streaming, and final reply delivery inline.
|
||||||
|
|
||||||
|
That architecture has two bad properties:
|
||||||
|
|
||||||
|
- long but healthy turns can be aborted by the listener watchdog
|
||||||
|
- users can see no reply even when the downstream runtime would have produced one
|
||||||
|
|
||||||
|
Raising the timeout helps but does not change the failure mode.
|
||||||
|
|
||||||
|
## Non-goals
|
||||||
|
|
||||||
|
- Do not redesign non-Discord channels in this pass.
|
||||||
|
- Do not broaden this into a generic all-channel worker framework in the first implementation.
|
||||||
|
- Do not extract a shared cross-channel inbound worker abstraction yet; only share low-level primitives when duplication is obvious.
|
||||||
|
- Do not add durable crash recovery in the first pass unless needed to land safely.
|
||||||
|
- Do not change route selection, binding semantics, or ACP policy in this plan.
|
||||||
|
|
||||||
|
## Current constraints
|
||||||
|
|
||||||
|
The current Discord processing path still depends on some live runtime objects that should not stay inside the long-term job payload:
|
||||||
|
|
||||||
|
- Carbon `Client`
|
||||||
|
- raw Discord event shapes
|
||||||
|
- in-memory guild history map
|
||||||
|
- thread binding manager callbacks
|
||||||
|
- live typing and draft stream state
|
||||||
|
|
||||||
|
We already moved execution onto a worker queue, but the normalization boundary is still incomplete. Right now the worker is "run later in the same process with some of the same live objects," not a fully data-only job boundary.
|
||||||
|
|
||||||
|
## Target architecture
|
||||||
|
|
||||||
|
### 1. Listener stage
|
||||||
|
|
||||||
|
`DiscordMessageListener` remains the ingress point, but its job becomes:
|
||||||
|
|
||||||
|
- run preflight and policy checks
|
||||||
|
- normalize accepted input into a serializable `DiscordInboundJob`
|
||||||
|
- enqueue the job into a per-session or per-channel async queue
|
||||||
|
- return immediately to Carbon once the enqueue succeeds
|
||||||
|
|
||||||
|
The listener should no longer own the end-to-end LLM turn lifetime.
|
||||||
|
|
||||||
|
### 2. Normalized job payload
|
||||||
|
|
||||||
|
Introduce a serializable job descriptor that contains only the data needed to run the turn later.
|
||||||
|
|
||||||
|
Minimum shape:
|
||||||
|
|
||||||
|
- route identity
|
||||||
|
- `agentId`
|
||||||
|
- `sessionKey`
|
||||||
|
- `accountId`
|
||||||
|
- `channel`
|
||||||
|
- delivery identity
|
||||||
|
- destination channel id
|
||||||
|
- reply target message id
|
||||||
|
- thread id if present
|
||||||
|
- sender identity
|
||||||
|
- sender id, label, username, tag
|
||||||
|
- channel context
|
||||||
|
- guild id
|
||||||
|
- channel name or slug
|
||||||
|
- thread metadata
|
||||||
|
- resolved system prompt override
|
||||||
|
- normalized message body
|
||||||
|
- base text
|
||||||
|
- effective message text
|
||||||
|
- attachment descriptors or resolved media references
|
||||||
|
- gating decisions
|
||||||
|
- mention requirement outcome
|
||||||
|
- command authorization outcome
|
||||||
|
- bound session or agent metadata if applicable
|
||||||
|
|
||||||
|
The job payload must not contain live Carbon objects or mutable closures.
|
||||||
|
|
||||||
|
Current implementation status:
|
||||||
|
|
||||||
|
- partially done
|
||||||
|
- `src/discord/monitor/inbound-job.ts` exists and defines the worker handoff
|
||||||
|
- the payload still contains live Discord runtime context and should be reduced further
|
||||||
|
|
||||||
|
### 3. Worker stage
|
||||||
|
|
||||||
|
Add a Discord-specific worker runner responsible for:
|
||||||
|
|
||||||
|
- reconstructing the turn context from `DiscordInboundJob`
|
||||||
|
- loading media and any additional channel metadata needed for the run
|
||||||
|
- dispatching the agent turn
|
||||||
|
- delivering final reply payloads
|
||||||
|
- updating status and diagnostics
|
||||||
|
|
||||||
|
Recommended location:
|
||||||
|
|
||||||
|
- `src/discord/monitor/inbound-worker.ts`
|
||||||
|
- `src/discord/monitor/inbound-job.ts`
|
||||||
|
|
||||||
|
### 4. Ordering model
|
||||||
|
|
||||||
|
Ordering must remain equivalent to today for a given route boundary.
|
||||||
|
|
||||||
|
Recommended key:
|
||||||
|
|
||||||
|
- use the same queue key logic as `resolveDiscordRunQueueKey(...)`
|
||||||
|
|
||||||
|
This preserves existing behavior:
|
||||||
|
|
||||||
|
- one bound agent conversation does not interleave with itself
|
||||||
|
- different Discord channels can still progress independently
|
||||||
|
|
||||||
|
### 5. Timeout model
|
||||||
|
|
||||||
|
After cutover, there are two separate timeout classes:
|
||||||
|
|
||||||
|
- listener timeout
|
||||||
|
- only covers normalization and enqueue
|
||||||
|
- should be short
|
||||||
|
- run timeout
|
||||||
|
- optional, worker-owned, explicit, and user-visible
|
||||||
|
- should not be inherited accidentally from Carbon listener settings
|
||||||
|
|
||||||
|
This removes the current accidental coupling between "Discord gateway listener stayed alive" and "agent run is healthy."
|
||||||
|
|
||||||
|
## Recommended implementation phases
|
||||||
|
|
||||||
|
### Phase 1: normalization boundary
|
||||||
|
|
||||||
|
- Status: partially implemented
|
||||||
|
- Done:
|
||||||
|
- extracted `buildDiscordInboundJob(...)`
|
||||||
|
- added worker handoff tests
|
||||||
|
- Remaining:
|
||||||
|
- make `DiscordInboundJob` plain data only
|
||||||
|
- move live runtime dependencies to worker-owned services instead of per-job payload
|
||||||
|
- stop rebuilding process context by stitching live listener refs back into the job
|
||||||
|
|
||||||
|
### Phase 2: in-memory worker queue
|
||||||
|
|
||||||
|
- Status: implemented
|
||||||
|
- Done:
|
||||||
|
- added `DiscordInboundWorkerQueue` keyed by resolved run queue key
|
||||||
|
- listener enqueues jobs instead of directly awaiting `processDiscordMessage(...)`
|
||||||
|
- worker executes jobs in-process, in memory only
|
||||||
|
|
||||||
|
This is the first functional cutover.
|
||||||
|
|
||||||
|
### Phase 3: process split
|
||||||
|
|
||||||
|
- Status: not started
|
||||||
|
- Move delivery, typing, and draft streaming ownership behind worker-facing adapters.
|
||||||
|
- Replace direct use of live preflight context with worker context reconstruction.
|
||||||
|
- Keep `processDiscordMessage(...)` temporarily as a facade if needed, then split it.
|
||||||
|
|
||||||
|
### Phase 4: command semantics
|
||||||
|
|
||||||
|
- Status: not started
|
||||||
|
Make sure native Discord commands still behave correctly when work is queued:
|
||||||
|
|
||||||
|
- `stop`
|
||||||
|
- `new`
|
||||||
|
- `reset`
|
||||||
|
- any future session-control commands
|
||||||
|
|
||||||
|
The worker queue must expose enough run state for commands to target the active or queued turn.
|
||||||
|
|
||||||
|
### Phase 5: observability and operator UX
|
||||||
|
|
||||||
|
- Status: not started
|
||||||
|
- emit queue depth and active worker counts into monitor status
|
||||||
|
- record enqueue time, start time, finish time, and timeout or cancellation reason
|
||||||
|
- surface worker-owned timeout or delivery failures clearly in logs
|
||||||
|
|
||||||
|
### Phase 6: optional durability follow-up
|
||||||
|
|
||||||
|
- Status: not started
|
||||||
|
Only after the in-memory version is stable:
|
||||||
|
|
||||||
|
- decide whether queued Discord jobs should survive gateway restart
|
||||||
|
- if yes, persist job descriptors and delivery checkpoints
|
||||||
|
- if no, document the explicit in-memory boundary
|
||||||
|
|
||||||
|
This should be a separate follow-up unless restart recovery is required to land.
|
||||||
|
|
||||||
|
## File impact
|
||||||
|
|
||||||
|
Current primary files:
|
||||||
|
|
||||||
|
- `src/discord/monitor/listeners.ts`
|
||||||
|
- `src/discord/monitor/message-handler.ts`
|
||||||
|
- `src/discord/monitor/message-handler.preflight.ts`
|
||||||
|
- `src/discord/monitor/message-handler.process.ts`
|
||||||
|
- `src/discord/monitor/status.ts`
|
||||||
|
|
||||||
|
Current worker files:
|
||||||
|
|
||||||
|
- `src/discord/monitor/inbound-job.ts`
|
||||||
|
- `src/discord/monitor/inbound-worker.ts`
|
||||||
|
- `src/discord/monitor/inbound-job.test.ts`
|
||||||
|
- `src/discord/monitor/message-handler.queue.test.ts`
|
||||||
|
|
||||||
|
Likely next touch points:
|
||||||
|
|
||||||
|
- `src/auto-reply/dispatch.ts`
|
||||||
|
- `src/discord/monitor/reply-delivery.ts`
|
||||||
|
- `src/discord/monitor/thread-bindings.ts`
|
||||||
|
- `src/discord/monitor/native-command.ts`
|
||||||
|
|
||||||
|
## Next step now
|
||||||
|
|
||||||
|
The next step is to make the worker boundary real instead of partial.
|
||||||
|
|
||||||
|
Do this next:
|
||||||
|
|
||||||
|
1. Move live runtime dependencies out of `DiscordInboundJob`
|
||||||
|
2. Keep those dependencies on the Discord worker instance instead
|
||||||
|
3. Reduce queued jobs to plain Discord-specific data:
|
||||||
|
- route identity
|
||||||
|
- delivery target
|
||||||
|
- sender info
|
||||||
|
- normalized message snapshot
|
||||||
|
- gating and binding decisions
|
||||||
|
4. Reconstruct worker execution context from that plain data inside the worker
|
||||||
|
|
||||||
|
In practice, that means:
|
||||||
|
|
||||||
|
- `client`
|
||||||
|
- `threadBindings`
|
||||||
|
- `guildHistories`
|
||||||
|
- `discordRestFetch`
|
||||||
|
- other mutable runtime-only handles
|
||||||
|
|
||||||
|
should stop living on each queued job and instead live on the worker itself or behind worker-owned adapters.
|
||||||
|
|
||||||
|
After that lands, the next follow-up should be command-state cleanup for `stop`, `new`, and `reset`.
|
||||||
|
|
||||||
|
## Testing plan
|
||||||
|
|
||||||
|
Keep the existing timeout repro coverage in:
|
||||||
|
|
||||||
|
- `src/discord/monitor/message-handler.queue.test.ts`
|
||||||
|
|
||||||
|
Add new tests for:
|
||||||
|
|
||||||
|
1. listener returns after enqueue without awaiting full turn
|
||||||
|
2. per-route ordering is preserved
|
||||||
|
3. different channels still run concurrently
|
||||||
|
4. replies are delivered to the original message destination
|
||||||
|
5. `stop` cancels the active worker-owned run
|
||||||
|
6. worker failure produces visible diagnostics without blocking later jobs
|
||||||
|
7. ACP-bound Discord channels still route correctly under worker execution
|
||||||
|
|
||||||
|
## Risks and mitigations
|
||||||
|
|
||||||
|
- Risk: command semantics drift from current synchronous behavior
|
||||||
|
Mitigation: land command-state plumbing in the same cutover, not later
|
||||||
|
|
||||||
|
- Risk: reply delivery loses thread or reply-to context
|
||||||
|
Mitigation: make delivery identity first-class in `DiscordInboundJob`
|
||||||
|
|
||||||
|
- Risk: duplicate sends during retries or queue restarts
|
||||||
|
Mitigation: keep first pass in-memory only, or add explicit delivery idempotency before persistence
|
||||||
|
|
||||||
|
- Risk: `message-handler.process.ts` becomes harder to reason about during migration
|
||||||
|
Mitigation: split into normalization, execution, and delivery helpers before or during worker cutover
|
||||||
|
|
||||||
|
## Acceptance criteria
|
||||||
|
|
||||||
|
The plan is complete when:
|
||||||
|
|
||||||
|
1. Discord listener timeout no longer aborts healthy long-running turns.
|
||||||
|
2. Listener lifetime and agent-turn lifetime are separate concepts in code.
|
||||||
|
3. Existing per-session ordering is preserved.
|
||||||
|
4. ACP-bound Discord channels work through the same worker path.
|
||||||
|
5. `stop` targets the worker-owned run instead of the old listener-owned call stack.
|
||||||
|
6. Timeout and delivery failures become explicit worker outcomes, not silent listener drops.
|
||||||
|
|
||||||
|
## Remaining landing strategy
|
||||||
|
|
||||||
|
Finish this in follow-up PRs:
|
||||||
|
|
||||||
|
1. make `DiscordInboundJob` plain-data only and move live runtime refs onto the worker
|
||||||
|
2. clean up command-state ownership for `stop`, `new`, and `reset`
|
||||||
|
3. add worker observability and operator status
|
||||||
|
4. decide whether durability is needed or explicitly document the in-memory boundary
|
||||||
|
|
||||||
|
This is still a bounded follow-up if kept Discord-only and if we continue to avoid a premature cross-channel worker abstraction.
|
||||||
@@ -1,3 +1,7 @@
|
|||||||
|
import {
|
||||||
|
DISCORD_DEFAULT_INBOUND_WORKER_TIMEOUT_MS,
|
||||||
|
DISCORD_DEFAULT_LISTENER_TIMEOUT_MS,
|
||||||
|
} from "../discord/monitor/timeouts.js";
|
||||||
import { MEDIA_AUDIO_FIELD_HELP } from "./media-audio-field-metadata.js";
|
import { MEDIA_AUDIO_FIELD_HELP } from "./media-audio-field-metadata.js";
|
||||||
import { IRC_FIELD_HELP } from "./schema.irc.js";
|
import { IRC_FIELD_HELP } from "./schema.irc.js";
|
||||||
|
|
||||||
@@ -1451,8 +1455,8 @@ export const FIELD_HELP: Record<string, string> = {
|
|||||||
"channels.discord.retry.maxDelayMs": "Maximum retry delay cap in ms for Discord outbound calls.",
|
"channels.discord.retry.maxDelayMs": "Maximum retry delay cap in ms for Discord outbound calls.",
|
||||||
"channels.discord.retry.jitter": "Jitter factor (0-1) applied to Discord retry delays.",
|
"channels.discord.retry.jitter": "Jitter factor (0-1) applied to Discord retry delays.",
|
||||||
"channels.discord.maxLinesPerMessage": "Soft max line count per Discord message (default: 17).",
|
"channels.discord.maxLinesPerMessage": "Soft max line count per Discord message (default: 17).",
|
||||||
"channels.discord.eventQueue.listenerTimeout":
|
"channels.discord.inboundWorker.runTimeoutMs": `Optional queued Discord inbound worker timeout in ms. This is separate from Carbon listener timeouts; defaults to ${DISCORD_DEFAULT_INBOUND_WORKER_TIMEOUT_MS} and can be disabled with 0. Set per account via channels.discord.accounts.<id>.inboundWorker.runTimeoutMs.`,
|
||||||
"Canonical Discord listener timeout control in ms for gateway event handlers. Default is 120000 in OpenClaw; set per account via channels.discord.accounts.<id>.eventQueue.listenerTimeout.",
|
"channels.discord.eventQueue.listenerTimeout": `Canonical Discord listener timeout control in ms for gateway normalization/enqueue handlers. Default is ${DISCORD_DEFAULT_LISTENER_TIMEOUT_MS} in OpenClaw; set per account via channels.discord.accounts.<id>.eventQueue.listenerTimeout.`,
|
||||||
"channels.discord.eventQueue.maxQueueSize":
|
"channels.discord.eventQueue.maxQueueSize":
|
||||||
"Optional Discord EventQueue capacity override (max queued events before backpressure). Set per account via channels.discord.accounts.<id>.eventQueue.maxQueueSize.",
|
"Optional Discord EventQueue capacity override (max queued events before backpressure). Set per account via channels.discord.accounts.<id>.eventQueue.maxQueueSize.",
|
||||||
"channels.discord.eventQueue.maxConcurrency":
|
"channels.discord.eventQueue.maxConcurrency":
|
||||||
|
|||||||
@@ -722,6 +722,7 @@ export const FIELD_LABELS: Record<string, string> = {
|
|||||||
"channels.discord.retry.maxDelayMs": "Discord Retry Max Delay (ms)",
|
"channels.discord.retry.maxDelayMs": "Discord Retry Max Delay (ms)",
|
||||||
"channels.discord.retry.jitter": "Discord Retry Jitter",
|
"channels.discord.retry.jitter": "Discord Retry Jitter",
|
||||||
"channels.discord.maxLinesPerMessage": "Discord Max Lines Per Message",
|
"channels.discord.maxLinesPerMessage": "Discord Max Lines Per Message",
|
||||||
|
"channels.discord.inboundWorker.runTimeoutMs": "Discord Inbound Worker Timeout (ms)",
|
||||||
"channels.discord.eventQueue.listenerTimeout": "Discord EventQueue Listener Timeout (ms)",
|
"channels.discord.eventQueue.listenerTimeout": "Discord EventQueue Listener Timeout (ms)",
|
||||||
"channels.discord.eventQueue.maxQueueSize": "Discord EventQueue Max Queue Size",
|
"channels.discord.eventQueue.maxQueueSize": "Discord EventQueue Max Queue Size",
|
||||||
"channels.discord.eventQueue.maxConcurrency": "Discord EventQueue Max Concurrency",
|
"channels.discord.eventQueue.maxConcurrency": "Discord EventQueue Max Concurrency",
|
||||||
|
|||||||
@@ -330,11 +330,21 @@ export type DiscordAccountConfig = {
|
|||||||
activityType?: 0 | 1 | 2 | 3 | 4 | 5;
|
activityType?: 0 | 1 | 2 | 3 | 4 | 5;
|
||||||
/** Streaming URL (Twitch/YouTube). Required when activityType=1. */
|
/** Streaming URL (Twitch/YouTube). Required when activityType=1. */
|
||||||
activityUrl?: string;
|
activityUrl?: string;
|
||||||
|
/**
|
||||||
|
* In-process worker settings for queued inbound Discord runs.
|
||||||
|
* This is separate from Carbon's eventQueue listener budget.
|
||||||
|
*/
|
||||||
|
inboundWorker?: {
|
||||||
|
/**
|
||||||
|
* Max time (ms) a queued inbound run may execute before OpenClaw aborts it.
|
||||||
|
* Defaults to 1800000 (30 minutes). Set 0 to disable the worker-owned timeout.
|
||||||
|
*/
|
||||||
|
runTimeoutMs?: number;
|
||||||
|
};
|
||||||
/**
|
/**
|
||||||
* Carbon EventQueue configuration. Controls how Discord gateway events are processed.
|
* Carbon EventQueue configuration. Controls how Discord gateway events are processed.
|
||||||
* The most important option is `listenerTimeout` which defaults to 30s in Carbon --
|
* `listenerTimeout` only covers gateway listener work such as normalization and enqueue.
|
||||||
* too short for LLM calls with extended thinking. Set a higher value (e.g. 120000)
|
* It does not control the lifetime of queued inbound agent turns.
|
||||||
* to prevent the event queue from killing long-running message handlers.
|
|
||||||
*/
|
*/
|
||||||
eventQueue?: {
|
eventQueue?: {
|
||||||
/** Max time (ms) a single listener can run before being killed. Default: 120000. */
|
/** Max time (ms) a single listener can run before being killed. Default: 120000. */
|
||||||
|
|||||||
@@ -528,6 +528,12 @@ export const DiscordAccountSchema = z
|
|||||||
.union([z.literal(0), z.literal(1), z.literal(2), z.literal(3), z.literal(4), z.literal(5)])
|
.union([z.literal(0), z.literal(1), z.literal(2), z.literal(3), z.literal(4), z.literal(5)])
|
||||||
.optional(),
|
.optional(),
|
||||||
activityUrl: z.string().url().optional(),
|
activityUrl: z.string().url().optional(),
|
||||||
|
inboundWorker: z
|
||||||
|
.object({
|
||||||
|
runTimeoutMs: z.number().int().nonnegative().optional(),
|
||||||
|
})
|
||||||
|
.strict()
|
||||||
|
.optional(),
|
||||||
eventQueue: z
|
eventQueue: z
|
||||||
.object({
|
.object({
|
||||||
listenerTimeout: z.number().int().positive().optional(),
|
listenerTimeout: z.number().int().positive().optional(),
|
||||||
|
|||||||
148
src/discord/monitor/inbound-job.test.ts
Normal file
148
src/discord/monitor/inbound-job.test.ts
Normal file
@@ -0,0 +1,148 @@
|
|||||||
|
import { Message } from "@buape/carbon";
|
||||||
|
import { describe, expect, it } from "vitest";
|
||||||
|
import { buildDiscordInboundJob, materializeDiscordInboundJob } from "./inbound-job.js";
|
||||||
|
import { createBaseDiscordMessageContext } from "./message-handler.test-harness.js";
|
||||||
|
|
||||||
|
describe("buildDiscordInboundJob", () => {
|
||||||
|
it("keeps live runtime references out of the payload", async () => {
|
||||||
|
const ctx = await createBaseDiscordMessageContext({
|
||||||
|
message: {
|
||||||
|
id: "m1",
|
||||||
|
channelId: "thread-1",
|
||||||
|
timestamp: new Date().toISOString(),
|
||||||
|
attachments: [],
|
||||||
|
channel: {
|
||||||
|
id: "thread-1",
|
||||||
|
isThread: () => true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
data: {
|
||||||
|
guild: { id: "g1", name: "Guild" },
|
||||||
|
message: {
|
||||||
|
id: "m1",
|
||||||
|
channelId: "thread-1",
|
||||||
|
timestamp: new Date().toISOString(),
|
||||||
|
attachments: [],
|
||||||
|
channel: {
|
||||||
|
id: "thread-1",
|
||||||
|
isThread: () => true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
threadChannel: {
|
||||||
|
id: "thread-1",
|
||||||
|
name: "codex",
|
||||||
|
parentId: "forum-1",
|
||||||
|
parent: {
|
||||||
|
id: "forum-1",
|
||||||
|
name: "Forum",
|
||||||
|
},
|
||||||
|
ownerId: "user-1",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const job = buildDiscordInboundJob(ctx);
|
||||||
|
|
||||||
|
expect("runtime" in job.payload).toBe(false);
|
||||||
|
expect("client" in job.payload).toBe(false);
|
||||||
|
expect("threadBindings" in job.payload).toBe(false);
|
||||||
|
expect("discordRestFetch" in job.payload).toBe(false);
|
||||||
|
expect("channel" in job.payload.message).toBe(false);
|
||||||
|
expect("channel" in job.payload.data.message).toBe(false);
|
||||||
|
expect(job.runtime.client).toBe(ctx.client);
|
||||||
|
expect(job.runtime.threadBindings).toBe(ctx.threadBindings);
|
||||||
|
expect(job.payload.threadChannel).toEqual({
|
||||||
|
id: "thread-1",
|
||||||
|
name: "codex",
|
||||||
|
parentId: "forum-1",
|
||||||
|
parent: {
|
||||||
|
id: "forum-1",
|
||||||
|
name: "Forum",
|
||||||
|
},
|
||||||
|
ownerId: "user-1",
|
||||||
|
});
|
||||||
|
expect(() => JSON.stringify(job.payload)).not.toThrow();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("re-materializes the process context with an overridden abort signal", async () => {
|
||||||
|
const ctx = await createBaseDiscordMessageContext();
|
||||||
|
const job = buildDiscordInboundJob(ctx);
|
||||||
|
const overrideAbortController = new AbortController();
|
||||||
|
|
||||||
|
const rematerialized = materializeDiscordInboundJob(job, overrideAbortController.signal);
|
||||||
|
|
||||||
|
expect(rematerialized.runtime).toBe(ctx.runtime);
|
||||||
|
expect(rematerialized.client).toBe(ctx.client);
|
||||||
|
expect(rematerialized.threadBindings).toBe(ctx.threadBindings);
|
||||||
|
expect(rematerialized.abortSignal).toBe(overrideAbortController.signal);
|
||||||
|
expect(rematerialized.message).toEqual(job.payload.message);
|
||||||
|
expect(rematerialized.data).toEqual(job.payload.data);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("preserves Carbon message getters across queued jobs", async () => {
|
||||||
|
const ctx = await createBaseDiscordMessageContext();
|
||||||
|
const message = new Message(
|
||||||
|
ctx.client as never,
|
||||||
|
{
|
||||||
|
id: "m1",
|
||||||
|
channel_id: "c1",
|
||||||
|
content: "hello",
|
||||||
|
attachments: [{ id: "a1", filename: "note.txt" }],
|
||||||
|
timestamp: new Date().toISOString(),
|
||||||
|
author: {
|
||||||
|
id: "u1",
|
||||||
|
username: "alice",
|
||||||
|
discriminator: "0",
|
||||||
|
avatar: null,
|
||||||
|
},
|
||||||
|
referenced_message: {
|
||||||
|
id: "m0",
|
||||||
|
channel_id: "c1",
|
||||||
|
content: "earlier",
|
||||||
|
attachments: [],
|
||||||
|
timestamp: new Date().toISOString(),
|
||||||
|
author: {
|
||||||
|
id: "u2",
|
||||||
|
username: "bob",
|
||||||
|
discriminator: "0",
|
||||||
|
avatar: null,
|
||||||
|
},
|
||||||
|
type: 0,
|
||||||
|
tts: false,
|
||||||
|
mention_everyone: false,
|
||||||
|
pinned: false,
|
||||||
|
flags: 0,
|
||||||
|
},
|
||||||
|
type: 0,
|
||||||
|
tts: false,
|
||||||
|
mention_everyone: false,
|
||||||
|
pinned: false,
|
||||||
|
flags: 0,
|
||||||
|
} as ConstructorParameters<typeof Message>[1],
|
||||||
|
);
|
||||||
|
const runtimeChannel = { id: "c1", isThread: () => false };
|
||||||
|
Object.defineProperty(message, "channel", {
|
||||||
|
value: runtimeChannel,
|
||||||
|
configurable: true,
|
||||||
|
enumerable: true,
|
||||||
|
writable: true,
|
||||||
|
});
|
||||||
|
|
||||||
|
const job = buildDiscordInboundJob({
|
||||||
|
...ctx,
|
||||||
|
message,
|
||||||
|
data: {
|
||||||
|
...ctx.data,
|
||||||
|
message,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
const rematerialized = materializeDiscordInboundJob(job);
|
||||||
|
|
||||||
|
expect(job.payload.message).toBeInstanceOf(Message);
|
||||||
|
expect("channel" in job.payload.message).toBe(false);
|
||||||
|
expect(rematerialized.message.content).toBe("hello");
|
||||||
|
expect(rematerialized.message.attachments).toHaveLength(1);
|
||||||
|
expect(rematerialized.message.timestamp).toBe(message.timestamp);
|
||||||
|
expect(rematerialized.message.referencedMessage?.content).toBe("earlier");
|
||||||
|
});
|
||||||
|
});
|
||||||
111
src/discord/monitor/inbound-job.ts
Normal file
111
src/discord/monitor/inbound-job.ts
Normal file
@@ -0,0 +1,111 @@
|
|||||||
|
import type { DiscordMessagePreflightContext } from "./message-handler.preflight.types.js";
|
||||||
|
|
||||||
|
type DiscordInboundJobRuntimeField =
|
||||||
|
| "runtime"
|
||||||
|
| "abortSignal"
|
||||||
|
| "guildHistories"
|
||||||
|
| "client"
|
||||||
|
| "threadBindings"
|
||||||
|
| "discordRestFetch";
|
||||||
|
|
||||||
|
export type DiscordInboundJobRuntime = Pick<
|
||||||
|
DiscordMessagePreflightContext,
|
||||||
|
DiscordInboundJobRuntimeField
|
||||||
|
>;
|
||||||
|
|
||||||
|
export type DiscordInboundJobPayload = Omit<
|
||||||
|
DiscordMessagePreflightContext,
|
||||||
|
DiscordInboundJobRuntimeField
|
||||||
|
>;
|
||||||
|
|
||||||
|
export type DiscordInboundJob = {
|
||||||
|
queueKey: string;
|
||||||
|
payload: DiscordInboundJobPayload;
|
||||||
|
runtime: DiscordInboundJobRuntime;
|
||||||
|
};
|
||||||
|
|
||||||
|
export function resolveDiscordInboundJobQueueKey(ctx: DiscordMessagePreflightContext): string {
|
||||||
|
const sessionKey = ctx.route.sessionKey?.trim();
|
||||||
|
if (sessionKey) {
|
||||||
|
return sessionKey;
|
||||||
|
}
|
||||||
|
const baseSessionKey = ctx.baseSessionKey?.trim();
|
||||||
|
if (baseSessionKey) {
|
||||||
|
return baseSessionKey;
|
||||||
|
}
|
||||||
|
return ctx.messageChannelId;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function buildDiscordInboundJob(ctx: DiscordMessagePreflightContext): DiscordInboundJob {
|
||||||
|
const {
|
||||||
|
runtime,
|
||||||
|
abortSignal,
|
||||||
|
guildHistories,
|
||||||
|
client,
|
||||||
|
threadBindings,
|
||||||
|
discordRestFetch,
|
||||||
|
message,
|
||||||
|
data,
|
||||||
|
threadChannel,
|
||||||
|
...payload
|
||||||
|
} = ctx;
|
||||||
|
|
||||||
|
const sanitizedMessage = sanitizeDiscordInboundMessage(message);
|
||||||
|
return {
|
||||||
|
queueKey: resolveDiscordInboundJobQueueKey(ctx),
|
||||||
|
payload: {
|
||||||
|
...payload,
|
||||||
|
message: sanitizedMessage,
|
||||||
|
data: {
|
||||||
|
...data,
|
||||||
|
message: sanitizedMessage,
|
||||||
|
},
|
||||||
|
threadChannel: normalizeDiscordThreadChannel(threadChannel),
|
||||||
|
},
|
||||||
|
runtime: {
|
||||||
|
runtime,
|
||||||
|
abortSignal,
|
||||||
|
guildHistories,
|
||||||
|
client,
|
||||||
|
threadBindings,
|
||||||
|
discordRestFetch,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
export function materializeDiscordInboundJob(
|
||||||
|
job: DiscordInboundJob,
|
||||||
|
abortSignal?: AbortSignal,
|
||||||
|
): DiscordMessagePreflightContext {
|
||||||
|
return {
|
||||||
|
...job.payload,
|
||||||
|
...job.runtime,
|
||||||
|
abortSignal: abortSignal ?? job.runtime.abortSignal,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function sanitizeDiscordInboundMessage<T extends object>(message: T): T {
|
||||||
|
const descriptors = Object.getOwnPropertyDescriptors(message);
|
||||||
|
delete descriptors.channel;
|
||||||
|
return Object.create(Object.getPrototypeOf(message), descriptors) as T;
|
||||||
|
}
|
||||||
|
|
||||||
|
function normalizeDiscordThreadChannel(
|
||||||
|
threadChannel: DiscordMessagePreflightContext["threadChannel"],
|
||||||
|
): DiscordMessagePreflightContext["threadChannel"] {
|
||||||
|
if (!threadChannel) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return {
|
||||||
|
id: threadChannel.id,
|
||||||
|
name: threadChannel.name,
|
||||||
|
parentId: threadChannel.parentId,
|
||||||
|
parent: threadChannel.parent
|
||||||
|
? {
|
||||||
|
id: threadChannel.parent.id,
|
||||||
|
name: threadChannel.parent.name,
|
||||||
|
}
|
||||||
|
: undefined,
|
||||||
|
ownerId: threadChannel.ownerId,
|
||||||
|
};
|
||||||
|
}
|
||||||
105
src/discord/monitor/inbound-worker.ts
Normal file
105
src/discord/monitor/inbound-worker.ts
Normal file
@@ -0,0 +1,105 @@
|
|||||||
|
import { createRunStateMachine } from "../../channels/run-state-machine.js";
|
||||||
|
import { danger } from "../../globals.js";
|
||||||
|
import { formatDurationSeconds } from "../../infra/format-time/format-duration.ts";
|
||||||
|
import { KeyedAsyncQueue } from "../../plugin-sdk/keyed-async-queue.js";
|
||||||
|
import { materializeDiscordInboundJob, type DiscordInboundJob } from "./inbound-job.js";
|
||||||
|
import type { RuntimeEnv } from "./message-handler.preflight.types.js";
|
||||||
|
import { processDiscordMessage } from "./message-handler.process.js";
|
||||||
|
import type { DiscordMonitorStatusSink } from "./status.js";
|
||||||
|
import { normalizeDiscordInboundWorkerTimeoutMs, runDiscordTaskWithTimeout } from "./timeouts.js";
|
||||||
|
|
||||||
|
type DiscordInboundWorkerParams = {
|
||||||
|
runtime: RuntimeEnv;
|
||||||
|
setStatus?: DiscordMonitorStatusSink;
|
||||||
|
abortSignal?: AbortSignal;
|
||||||
|
runTimeoutMs?: number;
|
||||||
|
};
|
||||||
|
|
||||||
|
export type DiscordInboundWorker = {
|
||||||
|
enqueue: (job: DiscordInboundJob) => void;
|
||||||
|
deactivate: () => void;
|
||||||
|
};
|
||||||
|
|
||||||
|
function formatDiscordRunContextSuffix(job: DiscordInboundJob): string {
|
||||||
|
const channelId = job.payload.messageChannelId?.trim();
|
||||||
|
const messageId = job.payload.data?.message?.id?.trim();
|
||||||
|
const details = [
|
||||||
|
channelId ? `channelId=${channelId}` : null,
|
||||||
|
messageId ? `messageId=${messageId}` : null,
|
||||||
|
].filter((entry): entry is string => Boolean(entry));
|
||||||
|
if (details.length === 0) {
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
return ` (${details.join(", ")})`;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function processDiscordInboundJob(params: {
|
||||||
|
job: DiscordInboundJob;
|
||||||
|
runtime: RuntimeEnv;
|
||||||
|
lifecycleSignal?: AbortSignal;
|
||||||
|
runTimeoutMs?: number;
|
||||||
|
}) {
|
||||||
|
const timeoutMs = normalizeDiscordInboundWorkerTimeoutMs(params.runTimeoutMs);
|
||||||
|
const contextSuffix = formatDiscordRunContextSuffix(params.job);
|
||||||
|
await runDiscordTaskWithTimeout({
|
||||||
|
run: async (abortSignal) => {
|
||||||
|
await processDiscordMessage(materializeDiscordInboundJob(params.job, abortSignal));
|
||||||
|
},
|
||||||
|
timeoutMs,
|
||||||
|
abortSignals: [params.job.runtime.abortSignal, params.lifecycleSignal],
|
||||||
|
onTimeout: (resolvedTimeoutMs) => {
|
||||||
|
params.runtime.error?.(
|
||||||
|
danger(
|
||||||
|
`discord inbound worker timed out after ${formatDurationSeconds(resolvedTimeoutMs, {
|
||||||
|
decimals: 1,
|
||||||
|
unit: "seconds",
|
||||||
|
})}${contextSuffix}`,
|
||||||
|
),
|
||||||
|
);
|
||||||
|
},
|
||||||
|
onErrorAfterTimeout: (error) => {
|
||||||
|
params.runtime.error?.(
|
||||||
|
danger(`discord inbound worker failed after timeout: ${String(error)}${contextSuffix}`),
|
||||||
|
);
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
export function createDiscordInboundWorker(
|
||||||
|
params: DiscordInboundWorkerParams,
|
||||||
|
): DiscordInboundWorker {
|
||||||
|
const runQueue = new KeyedAsyncQueue();
|
||||||
|
const runState = createRunStateMachine({
|
||||||
|
setStatus: params.setStatus,
|
||||||
|
abortSignal: params.abortSignal,
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
enqueue(job) {
|
||||||
|
void runQueue
|
||||||
|
.enqueue(job.queueKey, async () => {
|
||||||
|
if (!runState.isActive()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
runState.onRunStart();
|
||||||
|
try {
|
||||||
|
if (!runState.isActive()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
await processDiscordInboundJob({
|
||||||
|
job,
|
||||||
|
runtime: params.runtime,
|
||||||
|
lifecycleSignal: params.abortSignal,
|
||||||
|
runTimeoutMs: params.runTimeoutMs,
|
||||||
|
});
|
||||||
|
} finally {
|
||||||
|
runState.onRunEnd();
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.catch((error) => {
|
||||||
|
params.runtime.error?.(danger(`discord inbound worker failed: ${String(error)}`));
|
||||||
|
});
|
||||||
|
},
|
||||||
|
deactivate: runState.deactivate,
|
||||||
|
};
|
||||||
|
}
|
||||||
@@ -34,6 +34,7 @@ import { resolveDiscordChannelInfo } from "./message-utils.js";
|
|||||||
import { setPresence } from "./presence-cache.js";
|
import { setPresence } from "./presence-cache.js";
|
||||||
import { isThreadArchived } from "./thread-bindings.discord-api.js";
|
import { isThreadArchived } from "./thread-bindings.discord-api.js";
|
||||||
import { closeDiscordThreadSessions } from "./thread-session-close.js";
|
import { closeDiscordThreadSessions } from "./thread-session-close.js";
|
||||||
|
import { normalizeDiscordListenerTimeoutMs, runDiscordTaskWithTimeout } from "./timeouts.js";
|
||||||
|
|
||||||
type LoadedConfig = ReturnType<typeof import("../../config/config.js").loadConfig>;
|
type LoadedConfig = ReturnType<typeof import("../../config/config.js").loadConfig>;
|
||||||
type RuntimeEnv = import("../../runtime.js").RuntimeEnv;
|
type RuntimeEnv = import("../../runtime.js").RuntimeEnv;
|
||||||
@@ -70,16 +71,8 @@ type DiscordReactionRoutingParams = {
|
|||||||
};
|
};
|
||||||
|
|
||||||
const DISCORD_SLOW_LISTENER_THRESHOLD_MS = 30_000;
|
const DISCORD_SLOW_LISTENER_THRESHOLD_MS = 30_000;
|
||||||
const DISCORD_DEFAULT_LISTENER_TIMEOUT_MS = 120_000;
|
|
||||||
const discordEventQueueLog = createSubsystemLogger("discord/event-queue");
|
const discordEventQueueLog = createSubsystemLogger("discord/event-queue");
|
||||||
|
|
||||||
function normalizeDiscordListenerTimeoutMs(raw: number | undefined): number {
|
|
||||||
if (!Number.isFinite(raw) || (raw ?? 0) <= 0) {
|
|
||||||
return DISCORD_DEFAULT_LISTENER_TIMEOUT_MS;
|
|
||||||
}
|
|
||||||
return Math.max(1_000, Math.floor(raw!));
|
|
||||||
}
|
|
||||||
|
|
||||||
function formatListenerContextValue(value: unknown): string | null {
|
function formatListenerContextValue(value: unknown): string | null {
|
||||||
if (value === undefined || value === null) {
|
if (value === undefined || value === null) {
|
||||||
return null;
|
return null;
|
||||||
@@ -138,57 +131,44 @@ async function runDiscordListenerWithSlowLog(params: {
|
|||||||
logger: Logger | undefined;
|
logger: Logger | undefined;
|
||||||
listener: string;
|
listener: string;
|
||||||
event: string;
|
event: string;
|
||||||
run: (abortSignal: AbortSignal) => Promise<void>;
|
run: (abortSignal: AbortSignal | undefined) => Promise<void>;
|
||||||
timeoutMs?: number;
|
timeoutMs?: number;
|
||||||
context?: Record<string, unknown>;
|
context?: Record<string, unknown>;
|
||||||
onError?: (err: unknown) => void;
|
onError?: (err: unknown) => void;
|
||||||
}) {
|
}) {
|
||||||
const startedAt = Date.now();
|
const startedAt = Date.now();
|
||||||
const timeoutMs = normalizeDiscordListenerTimeoutMs(params.timeoutMs);
|
const timeoutMs = normalizeDiscordListenerTimeoutMs(params.timeoutMs);
|
||||||
let timedOut = false;
|
|
||||||
let timeoutHandle: ReturnType<typeof setTimeout> | null = null;
|
|
||||||
const logger = params.logger ?? discordEventQueueLog;
|
const logger = params.logger ?? discordEventQueueLog;
|
||||||
const abortController = new AbortController();
|
let timedOut = false;
|
||||||
const runPromise = params.run(abortController.signal).catch((err) => {
|
|
||||||
if (timedOut) {
|
try {
|
||||||
const errorName =
|
timedOut = await runDiscordTaskWithTimeout({
|
||||||
err && typeof err === "object" && "name" in err ? String(err.name) : undefined;
|
run: params.run,
|
||||||
if (abortController.signal.aborted && errorName === "AbortError") {
|
timeoutMs,
|
||||||
|
onTimeout: (resolvedTimeoutMs) => {
|
||||||
|
logger.error(
|
||||||
|
danger(
|
||||||
|
`discord handler timed out after ${formatDurationSeconds(resolvedTimeoutMs, {
|
||||||
|
decimals: 1,
|
||||||
|
unit: "seconds",
|
||||||
|
})}${formatListenerContextSuffix(params.context)}`,
|
||||||
|
),
|
||||||
|
);
|
||||||
|
},
|
||||||
|
onAbortAfterTimeout: () => {
|
||||||
logger.warn(
|
logger.warn(
|
||||||
`discord handler canceled after timeout${formatListenerContextSuffix(params.context)}`,
|
`discord handler canceled after timeout${formatListenerContextSuffix(params.context)}`,
|
||||||
);
|
);
|
||||||
return;
|
},
|
||||||
}
|
onErrorAfterTimeout: (err) => {
|
||||||
logger.error(
|
logger.error(
|
||||||
danger(
|
danger(
|
||||||
`discord handler failed after timeout: ${String(err)}${formatListenerContextSuffix(params.context)}`,
|
`discord handler failed after timeout: ${String(err)}${formatListenerContextSuffix(params.context)}`,
|
||||||
),
|
),
|
||||||
);
|
);
|
||||||
return;
|
},
|
||||||
}
|
|
||||||
throw err;
|
|
||||||
});
|
|
||||||
|
|
||||||
try {
|
|
||||||
const timeoutPromise = new Promise<"timeout">((resolve) => {
|
|
||||||
timeoutHandle = setTimeout(() => resolve("timeout"), timeoutMs);
|
|
||||||
timeoutHandle.unref?.();
|
|
||||||
});
|
});
|
||||||
const result = await Promise.race([
|
if (timedOut) {
|
||||||
runPromise.then(() => "completed" as const),
|
|
||||||
timeoutPromise,
|
|
||||||
]);
|
|
||||||
if (result === "timeout") {
|
|
||||||
timedOut = true;
|
|
||||||
abortController.abort();
|
|
||||||
logger.error(
|
|
||||||
danger(
|
|
||||||
`discord handler timed out after ${formatDurationSeconds(timeoutMs, {
|
|
||||||
decimals: 1,
|
|
||||||
unit: "seconds",
|
|
||||||
})}${formatListenerContextSuffix(params.context)}`,
|
|
||||||
),
|
|
||||||
);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
@@ -198,9 +178,6 @@ async function runDiscordListenerWithSlowLog(params: {
|
|||||||
}
|
}
|
||||||
throw err;
|
throw err;
|
||||||
} finally {
|
} finally {
|
||||||
if (timeoutHandle) {
|
|
||||||
clearTimeout(timeoutHandle);
|
|
||||||
}
|
|
||||||
if (!timedOut) {
|
if (!timedOut) {
|
||||||
logSlowDiscordListener({
|
logSlowDiscordListener({
|
||||||
logger: params.logger,
|
logger: params.logger,
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
import { ChannelType } from "@buape/carbon";
|
import { ChannelType, type RequestClient } from "@buape/carbon";
|
||||||
import { resolveAckReaction, resolveHumanDelayConfig } from "../../agents/identity.js";
|
import { resolveAckReaction, resolveHumanDelayConfig } from "../../agents/identity.js";
|
||||||
import { EmbeddedBlockChunker } from "../../agents/pi-embedded-block-chunker.js";
|
import { EmbeddedBlockChunker } from "../../agents/pi-embedded-block-chunker.js";
|
||||||
import { resolveChunkMode } from "../../auto-reply/chunk.js";
|
import { resolveChunkMode } from "../../auto-reply/chunk.js";
|
||||||
@@ -161,15 +161,17 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
|
|||||||
}),
|
}),
|
||||||
);
|
);
|
||||||
const statusReactionsEnabled = shouldAckReaction();
|
const statusReactionsEnabled = shouldAckReaction();
|
||||||
|
// Discord outbound helpers expect Carbon's request client shape explicitly.
|
||||||
|
const discordRest = client.rest as unknown as RequestClient;
|
||||||
const discordAdapter: StatusReactionAdapter = {
|
const discordAdapter: StatusReactionAdapter = {
|
||||||
setReaction: async (emoji) => {
|
setReaction: async (emoji) => {
|
||||||
await reactMessageDiscord(messageChannelId, message.id, emoji, {
|
await reactMessageDiscord(messageChannelId, message.id, emoji, {
|
||||||
rest: client.rest as never,
|
rest: discordRest,
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
removeReaction: async (emoji) => {
|
removeReaction: async (emoji) => {
|
||||||
await removeReactionDiscord(messageChannelId, message.id, emoji, {
|
await removeReactionDiscord(messageChannelId, message.id, emoji, {
|
||||||
rest: client.rest as never,
|
rest: discordRest,
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import { createNoopThreadBindingManager } from "./thread-bindings.js";
|
|||||||
|
|
||||||
const preflightDiscordMessageMock = vi.hoisted(() => vi.fn());
|
const preflightDiscordMessageMock = vi.hoisted(() => vi.fn());
|
||||||
const processDiscordMessageMock = vi.hoisted(() => vi.fn());
|
const processDiscordMessageMock = vi.hoisted(() => vi.fn());
|
||||||
|
const eventualReplyDeliveredMock = vi.hoisted(() => vi.fn());
|
||||||
|
|
||||||
vi.mock("./message-handler.preflight.js", () => ({
|
vi.mock("./message-handler.preflight.js", () => ({
|
||||||
preflightDiscordMessage: preflightDiscordMessageMock,
|
preflightDiscordMessage: preflightDiscordMessageMock,
|
||||||
@@ -26,7 +27,7 @@ function createDeferred<T = void>() {
|
|||||||
function createHandlerParams(overrides?: {
|
function createHandlerParams(overrides?: {
|
||||||
setStatus?: (patch: Record<string, unknown>) => void;
|
setStatus?: (patch: Record<string, unknown>) => void;
|
||||||
abortSignal?: AbortSignal;
|
abortSignal?: AbortSignal;
|
||||||
listenerTimeoutMs?: number;
|
workerRunTimeoutMs?: number;
|
||||||
}) {
|
}) {
|
||||||
const cfg: OpenClawConfig = {
|
const cfg: OpenClawConfig = {
|
||||||
channels: {
|
channels: {
|
||||||
@@ -65,7 +66,7 @@ function createHandlerParams(overrides?: {
|
|||||||
threadBindings: createNoopThreadBindingManager("default"),
|
threadBindings: createNoopThreadBindingManager("default"),
|
||||||
setStatus: overrides?.setStatus,
|
setStatus: overrides?.setStatus,
|
||||||
abortSignal: overrides?.abortSignal,
|
abortSignal: overrides?.abortSignal,
|
||||||
listenerTimeoutMs: overrides?.listenerTimeoutMs,
|
workerRunTimeoutMs: overrides?.workerRunTimeoutMs,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -85,6 +86,19 @@ function createMessageData(messageId: string, channelId = "ch-1") {
|
|||||||
|
|
||||||
function createPreflightContext(channelId = "ch-1") {
|
function createPreflightContext(channelId = "ch-1") {
|
||||||
return {
|
return {
|
||||||
|
data: {
|
||||||
|
channel_id: channelId,
|
||||||
|
message: {
|
||||||
|
id: `msg-${channelId}`,
|
||||||
|
channel_id: channelId,
|
||||||
|
attachments: [],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
message: {
|
||||||
|
id: `msg-${channelId}`,
|
||||||
|
channel_id: channelId,
|
||||||
|
attachments: [],
|
||||||
|
},
|
||||||
route: {
|
route: {
|
||||||
sessionKey: `agent:main:discord:channel:${channelId}`,
|
sessionKey: `agent:main:discord:channel:${channelId}`,
|
||||||
},
|
},
|
||||||
@@ -169,7 +183,7 @@ describe("createDiscordMessageHandler queue behavior", () => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
it("applies listener timeout to queued runs so stalled runs do not block the queue", async () => {
|
it("applies explicit inbound worker timeout to queued runs so stalled runs do not block the queue", async () => {
|
||||||
vi.useFakeTimers();
|
vi.useFakeTimers();
|
||||||
try {
|
try {
|
||||||
preflightDiscordMessageMock.mockReset();
|
preflightDiscordMessageMock.mockReset();
|
||||||
@@ -191,7 +205,7 @@ describe("createDiscordMessageHandler queue behavior", () => {
|
|||||||
createPreflightContext(params.data.channel_id),
|
createPreflightContext(params.data.channel_id),
|
||||||
);
|
);
|
||||||
|
|
||||||
const params = createHandlerParams({ listenerTimeoutMs: 50 });
|
const params = createHandlerParams({ workerRunTimeoutMs: 50 });
|
||||||
const handler = createDiscordMessageHandler(params);
|
const handler = createDiscordMessageHandler(params);
|
||||||
|
|
||||||
await expect(
|
await expect(
|
||||||
@@ -211,7 +225,50 @@ describe("createDiscordMessageHandler queue behavior", () => {
|
|||||||
| undefined;
|
| undefined;
|
||||||
expect(firstCtx?.abortSignal?.aborted).toBe(true);
|
expect(firstCtx?.abortSignal?.aborted).toBe(true);
|
||||||
expect(params.runtime.error).toHaveBeenCalledWith(
|
expect(params.runtime.error).toHaveBeenCalledWith(
|
||||||
expect.stringContaining("discord queued run timed out after"),
|
expect.stringContaining("discord inbound worker timed out after"),
|
||||||
|
);
|
||||||
|
} finally {
|
||||||
|
vi.useRealTimers();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it("does not time out queued runs when the inbound worker timeout is disabled", async () => {
|
||||||
|
vi.useFakeTimers();
|
||||||
|
try {
|
||||||
|
preflightDiscordMessageMock.mockReset();
|
||||||
|
processDiscordMessageMock.mockReset();
|
||||||
|
eventualReplyDeliveredMock.mockReset();
|
||||||
|
|
||||||
|
processDiscordMessageMock.mockImplementationOnce(
|
||||||
|
async (ctx: { abortSignal?: AbortSignal }) => {
|
||||||
|
await new Promise<void>((resolve) => {
|
||||||
|
setTimeout(() => {
|
||||||
|
if (!ctx.abortSignal?.aborted) {
|
||||||
|
eventualReplyDeliveredMock();
|
||||||
|
}
|
||||||
|
resolve();
|
||||||
|
}, 80);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
);
|
||||||
|
preflightDiscordMessageMock.mockImplementation(
|
||||||
|
async (params: { data: { channel_id: string } }) =>
|
||||||
|
createPreflightContext(params.data.channel_id),
|
||||||
|
);
|
||||||
|
|
||||||
|
const params = createHandlerParams({ workerRunTimeoutMs: 0 });
|
||||||
|
const handler = createDiscordMessageHandler(params);
|
||||||
|
|
||||||
|
await expect(
|
||||||
|
handler(createMessageData("m-1") as never, {} as never),
|
||||||
|
).resolves.toBeUndefined();
|
||||||
|
|
||||||
|
await vi.advanceTimersByTimeAsync(80);
|
||||||
|
await Promise.resolve();
|
||||||
|
|
||||||
|
expect(eventualReplyDeliveredMock).toHaveBeenCalledTimes(1);
|
||||||
|
expect(params.runtime.error).not.toHaveBeenCalledWith(
|
||||||
|
expect.stringContaining("discord inbound worker timed out after"),
|
||||||
);
|
);
|
||||||
} finally {
|
} finally {
|
||||||
vi.useRealTimers();
|
vi.useRealTimers();
|
||||||
|
|||||||
@@ -3,18 +3,13 @@ import {
|
|||||||
createChannelInboundDebouncer,
|
createChannelInboundDebouncer,
|
||||||
shouldDebounceTextInbound,
|
shouldDebounceTextInbound,
|
||||||
} from "../../channels/inbound-debounce-policy.js";
|
} from "../../channels/inbound-debounce-policy.js";
|
||||||
import { createRunStateMachine } from "../../channels/run-state-machine.js";
|
|
||||||
import { resolveOpenProviderRuntimeGroupPolicy } from "../../config/runtime-group-policy.js";
|
import { resolveOpenProviderRuntimeGroupPolicy } from "../../config/runtime-group-policy.js";
|
||||||
import { danger } from "../../globals.js";
|
import { danger } from "../../globals.js";
|
||||||
import { formatDurationSeconds } from "../../infra/format-time/format-duration.ts";
|
import { buildDiscordInboundJob } from "./inbound-job.js";
|
||||||
import { KeyedAsyncQueue } from "../../plugin-sdk/keyed-async-queue.js";
|
import { createDiscordInboundWorker } from "./inbound-worker.js";
|
||||||
import type { DiscordMessageEvent, DiscordMessageHandler } from "./listeners.js";
|
import type { DiscordMessageEvent, DiscordMessageHandler } from "./listeners.js";
|
||||||
import { preflightDiscordMessage } from "./message-handler.preflight.js";
|
import { preflightDiscordMessage } from "./message-handler.preflight.js";
|
||||||
import type {
|
import type { DiscordMessagePreflightParams } from "./message-handler.preflight.types.js";
|
||||||
DiscordMessagePreflightContext,
|
|
||||||
DiscordMessagePreflightParams,
|
|
||||||
} from "./message-handler.preflight.types.js";
|
|
||||||
import { processDiscordMessage } from "./message-handler.process.js";
|
|
||||||
import {
|
import {
|
||||||
hasDiscordMessageStickers,
|
hasDiscordMessageStickers,
|
||||||
resolveDiscordMessageChannelId,
|
resolveDiscordMessageChannelId,
|
||||||
@@ -28,154 +23,13 @@ type DiscordMessageHandlerParams = Omit<
|
|||||||
> & {
|
> & {
|
||||||
setStatus?: DiscordMonitorStatusSink;
|
setStatus?: DiscordMonitorStatusSink;
|
||||||
abortSignal?: AbortSignal;
|
abortSignal?: AbortSignal;
|
||||||
listenerTimeoutMs?: number;
|
workerRunTimeoutMs?: number;
|
||||||
};
|
};
|
||||||
|
|
||||||
export type DiscordMessageHandlerWithLifecycle = DiscordMessageHandler & {
|
export type DiscordMessageHandlerWithLifecycle = DiscordMessageHandler & {
|
||||||
deactivate: () => void;
|
deactivate: () => void;
|
||||||
};
|
};
|
||||||
|
|
||||||
const DEFAULT_DISCORD_RUN_TIMEOUT_MS = 120_000;
|
|
||||||
const MAX_DISCORD_TIMEOUT_MS = 2_147_483_647;
|
|
||||||
|
|
||||||
function normalizeDiscordRunTimeoutMs(timeoutMs?: number): number {
|
|
||||||
if (typeof timeoutMs !== "number" || !Number.isFinite(timeoutMs) || timeoutMs <= 0) {
|
|
||||||
return DEFAULT_DISCORD_RUN_TIMEOUT_MS;
|
|
||||||
}
|
|
||||||
return Math.max(1, Math.min(Math.floor(timeoutMs), MAX_DISCORD_TIMEOUT_MS));
|
|
||||||
}
|
|
||||||
|
|
||||||
function isAbortError(error: unknown): boolean {
|
|
||||||
if (typeof error !== "object" || error === null) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return "name" in error && String((error as { name?: unknown }).name) === "AbortError";
|
|
||||||
}
|
|
||||||
|
|
||||||
function formatDiscordRunContextSuffix(ctx: DiscordMessagePreflightContext): string {
|
|
||||||
const eventData = ctx as {
|
|
||||||
data?: {
|
|
||||||
channel_id?: string;
|
|
||||||
message?: {
|
|
||||||
id?: string;
|
|
||||||
};
|
|
||||||
};
|
|
||||||
};
|
|
||||||
const channelId = ctx.messageChannelId?.trim() || eventData.data?.channel_id?.trim();
|
|
||||||
const messageId = eventData.data?.message?.id?.trim();
|
|
||||||
const details = [
|
|
||||||
channelId ? `channelId=${channelId}` : null,
|
|
||||||
messageId ? `messageId=${messageId}` : null,
|
|
||||||
].filter((entry): entry is string => Boolean(entry));
|
|
||||||
if (details.length === 0) {
|
|
||||||
return "";
|
|
||||||
}
|
|
||||||
return ` (${details.join(", ")})`;
|
|
||||||
}
|
|
||||||
|
|
||||||
function mergeAbortSignals(signals: Array<AbortSignal | undefined>): AbortSignal | undefined {
|
|
||||||
const activeSignals = signals.filter((signal): signal is AbortSignal => Boolean(signal));
|
|
||||||
if (activeSignals.length === 0) {
|
|
||||||
return undefined;
|
|
||||||
}
|
|
||||||
if (activeSignals.length === 1) {
|
|
||||||
return activeSignals[0];
|
|
||||||
}
|
|
||||||
if (typeof AbortSignal.any === "function") {
|
|
||||||
return AbortSignal.any(activeSignals);
|
|
||||||
}
|
|
||||||
const fallbackController = new AbortController();
|
|
||||||
for (const signal of activeSignals) {
|
|
||||||
if (signal.aborted) {
|
|
||||||
fallbackController.abort();
|
|
||||||
return fallbackController.signal;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
const abortFallback = () => {
|
|
||||||
fallbackController.abort();
|
|
||||||
for (const signal of activeSignals) {
|
|
||||||
signal.removeEventListener("abort", abortFallback);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
for (const signal of activeSignals) {
|
|
||||||
signal.addEventListener("abort", abortFallback, { once: true });
|
|
||||||
}
|
|
||||||
return fallbackController.signal;
|
|
||||||
}
|
|
||||||
|
|
||||||
async function processDiscordRunWithTimeout(params: {
|
|
||||||
ctx: DiscordMessagePreflightContext;
|
|
||||||
runtime: DiscordMessagePreflightParams["runtime"];
|
|
||||||
lifecycleSignal?: AbortSignal;
|
|
||||||
timeoutMs?: number;
|
|
||||||
}) {
|
|
||||||
const timeoutMs = normalizeDiscordRunTimeoutMs(params.timeoutMs);
|
|
||||||
const timeoutAbortController = new AbortController();
|
|
||||||
const combinedSignal = mergeAbortSignals([
|
|
||||||
params.ctx.abortSignal,
|
|
||||||
params.lifecycleSignal,
|
|
||||||
timeoutAbortController.signal,
|
|
||||||
]);
|
|
||||||
const processCtx =
|
|
||||||
combinedSignal && combinedSignal !== params.ctx.abortSignal
|
|
||||||
? { ...params.ctx, abortSignal: combinedSignal }
|
|
||||||
: params.ctx;
|
|
||||||
const contextSuffix = formatDiscordRunContextSuffix(params.ctx);
|
|
||||||
let timedOut = false;
|
|
||||||
let timeoutHandle: ReturnType<typeof setTimeout> | null = null;
|
|
||||||
const processPromise = processDiscordMessage(processCtx).catch((error) => {
|
|
||||||
if (timedOut) {
|
|
||||||
if (timeoutAbortController.signal.aborted && isAbortError(error)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
params.runtime.error?.(
|
|
||||||
danger(`discord queued run failed after timeout: ${String(error)}${contextSuffix}`),
|
|
||||||
);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
throw error;
|
|
||||||
});
|
|
||||||
|
|
||||||
try {
|
|
||||||
const timeoutPromise = new Promise<"timeout">((resolve) => {
|
|
||||||
timeoutHandle = setTimeout(() => resolve("timeout"), timeoutMs);
|
|
||||||
timeoutHandle.unref?.();
|
|
||||||
});
|
|
||||||
const result = await Promise.race([
|
|
||||||
processPromise.then(() => "completed" as const),
|
|
||||||
timeoutPromise,
|
|
||||||
]);
|
|
||||||
if (result === "timeout") {
|
|
||||||
timedOut = true;
|
|
||||||
timeoutAbortController.abort();
|
|
||||||
params.runtime.error?.(
|
|
||||||
danger(
|
|
||||||
`discord queued run timed out after ${formatDurationSeconds(timeoutMs, {
|
|
||||||
decimals: 1,
|
|
||||||
unit: "seconds",
|
|
||||||
})}${contextSuffix}`,
|
|
||||||
),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
if (timeoutHandle) {
|
|
||||||
clearTimeout(timeoutHandle);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function resolveDiscordRunQueueKey(ctx: DiscordMessagePreflightContext): string {
|
|
||||||
const sessionKey = ctx.route.sessionKey?.trim();
|
|
||||||
if (sessionKey) {
|
|
||||||
return sessionKey;
|
|
||||||
}
|
|
||||||
const baseSessionKey = ctx.baseSessionKey?.trim();
|
|
||||||
if (baseSessionKey) {
|
|
||||||
return baseSessionKey;
|
|
||||||
}
|
|
||||||
return ctx.messageChannelId;
|
|
||||||
}
|
|
||||||
|
|
||||||
export function createDiscordMessageHandler(
|
export function createDiscordMessageHandler(
|
||||||
params: DiscordMessageHandlerParams,
|
params: DiscordMessageHandlerParams,
|
||||||
): DiscordMessageHandlerWithLifecycle {
|
): DiscordMessageHandlerWithLifecycle {
|
||||||
@@ -188,39 +42,13 @@ export function createDiscordMessageHandler(
|
|||||||
params.discordConfig?.ackReactionScope ??
|
params.discordConfig?.ackReactionScope ??
|
||||||
params.cfg.messages?.ackReactionScope ??
|
params.cfg.messages?.ackReactionScope ??
|
||||||
"group-mentions";
|
"group-mentions";
|
||||||
const runQueue = new KeyedAsyncQueue();
|
const inboundWorker = createDiscordInboundWorker({
|
||||||
const runState = createRunStateMachine({
|
runtime: params.runtime,
|
||||||
setStatus: params.setStatus,
|
setStatus: params.setStatus,
|
||||||
abortSignal: params.abortSignal,
|
abortSignal: params.abortSignal,
|
||||||
|
runTimeoutMs: params.workerRunTimeoutMs,
|
||||||
});
|
});
|
||||||
|
|
||||||
const enqueueDiscordRun = (ctx: DiscordMessagePreflightContext) => {
|
|
||||||
const queueKey = resolveDiscordRunQueueKey(ctx);
|
|
||||||
void runQueue
|
|
||||||
.enqueue(queueKey, async () => {
|
|
||||||
if (!runState.isActive()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
runState.onRunStart();
|
|
||||||
try {
|
|
||||||
if (!runState.isActive()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
await processDiscordRunWithTimeout({
|
|
||||||
ctx,
|
|
||||||
runtime: params.runtime,
|
|
||||||
lifecycleSignal: params.abortSignal,
|
|
||||||
timeoutMs: params.listenerTimeoutMs,
|
|
||||||
});
|
|
||||||
} finally {
|
|
||||||
runState.onRunEnd();
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.catch((err) => {
|
|
||||||
params.runtime.error?.(danger(`discord process failed: ${String(err)}`));
|
|
||||||
});
|
|
||||||
};
|
|
||||||
|
|
||||||
const { debouncer } = createChannelInboundDebouncer<{
|
const { debouncer } = createChannelInboundDebouncer<{
|
||||||
data: DiscordMessageEvent;
|
data: DiscordMessageEvent;
|
||||||
client: Client;
|
client: Client;
|
||||||
@@ -279,7 +107,7 @@ export function createDiscordMessageHandler(
|
|||||||
if (!ctx) {
|
if (!ctx) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
enqueueDiscordRun(ctx);
|
inboundWorker.enqueue(buildDiscordInboundJob(ctx));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
const combinedBaseText = entries
|
const combinedBaseText = entries
|
||||||
@@ -324,7 +152,7 @@ export function createDiscordMessageHandler(
|
|||||||
ctxBatch.MessageSidLast = ids[ids.length - 1];
|
ctxBatch.MessageSidLast = ids[ids.length - 1];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
enqueueDiscordRun(ctx);
|
inboundWorker.enqueue(buildDiscordInboundJob(ctx));
|
||||||
},
|
},
|
||||||
onError: (err) => {
|
onError: (err) => {
|
||||||
params.runtime.error?.(danger(`discord debounce flush failed: ${String(err)}`));
|
params.runtime.error?.(danger(`discord debounce flush failed: ${String(err)}`));
|
||||||
@@ -352,7 +180,7 @@ export function createDiscordMessageHandler(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
handler.deactivate = runState.deactivate;
|
handler.deactivate = inboundWorker.deactivate;
|
||||||
|
|
||||||
return handler;
|
return handler;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ const {
|
|||||||
clientConstructorOptionsMock,
|
clientConstructorOptionsMock,
|
||||||
createDiscordAutoPresenceControllerMock,
|
createDiscordAutoPresenceControllerMock,
|
||||||
createDiscordNativeCommandMock,
|
createDiscordNativeCommandMock,
|
||||||
|
createDiscordMessageHandlerMock,
|
||||||
createNoopThreadBindingManagerMock,
|
createNoopThreadBindingManagerMock,
|
||||||
createThreadBindingManagerMock,
|
createThreadBindingManagerMock,
|
||||||
reconcileAcpThreadBindingsOnStartupMock,
|
reconcileAcpThreadBindingsOnStartupMock,
|
||||||
@@ -49,6 +50,14 @@ const {
|
|||||||
clientFetchUserMock: vi.fn(async (_target: string) => ({ id: "bot-1" })),
|
clientFetchUserMock: vi.fn(async (_target: string) => ({ id: "bot-1" })),
|
||||||
clientGetPluginMock: vi.fn<(_name: string) => unknown>(() => undefined),
|
clientGetPluginMock: vi.fn<(_name: string) => unknown>(() => undefined),
|
||||||
createDiscordNativeCommandMock: vi.fn(() => ({ name: "mock-command" })),
|
createDiscordNativeCommandMock: vi.fn(() => ({ name: "mock-command" })),
|
||||||
|
createDiscordMessageHandlerMock: vi.fn(() =>
|
||||||
|
Object.assign(
|
||||||
|
vi.fn(async () => undefined),
|
||||||
|
{
|
||||||
|
deactivate: vi.fn(),
|
||||||
|
},
|
||||||
|
),
|
||||||
|
),
|
||||||
createNoopThreadBindingManagerMock: vi.fn(() => {
|
createNoopThreadBindingManagerMock: vi.fn(() => {
|
||||||
const manager = { stop: vi.fn() };
|
const manager = { stop: vi.fn() };
|
||||||
createdBindingManagers.push(manager);
|
createdBindingManagers.push(manager);
|
||||||
@@ -248,7 +257,7 @@ vi.mock("./listeners.js", () => ({
|
|||||||
}));
|
}));
|
||||||
|
|
||||||
vi.mock("./message-handler.js", () => ({
|
vi.mock("./message-handler.js", () => ({
|
||||||
createDiscordMessageHandler: () => ({ handle: vi.fn() }),
|
createDiscordMessageHandler: createDiscordMessageHandlerMock,
|
||||||
}));
|
}));
|
||||||
|
|
||||||
vi.mock("./native-command.js", () => ({
|
vi.mock("./native-command.js", () => ({
|
||||||
@@ -346,6 +355,14 @@ describe("monitorDiscordProvider", () => {
|
|||||||
refresh: vi.fn(),
|
refresh: vi.fn(),
|
||||||
runNow: vi.fn(),
|
runNow: vi.fn(),
|
||||||
}));
|
}));
|
||||||
|
createDiscordMessageHandlerMock.mockClear().mockImplementation(() =>
|
||||||
|
Object.assign(
|
||||||
|
vi.fn(async () => undefined),
|
||||||
|
{
|
||||||
|
deactivate: vi.fn(),
|
||||||
|
},
|
||||||
|
),
|
||||||
|
);
|
||||||
clientFetchUserMock.mockClear().mockResolvedValue({ id: "bot-1" });
|
clientFetchUserMock.mockClear().mockResolvedValue({ id: "bot-1" });
|
||||||
clientGetPluginMock.mockClear().mockReturnValue(undefined);
|
clientGetPluginMock.mockClear().mockReturnValue(undefined);
|
||||||
createDiscordNativeCommandMock.mockClear().mockReturnValue({ name: "mock-command" });
|
createDiscordNativeCommandMock.mockClear().mockReturnValue({ name: "mock-command" });
|
||||||
@@ -629,6 +646,63 @@ describe("monitorDiscordProvider", () => {
|
|||||||
expect(eventQueue?.listenerTimeout).toBe(300_000);
|
expect(eventQueue?.listenerTimeout).toBe(300_000);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("does not reuse eventQueue.listenerTimeout as the queued inbound worker timeout", async () => {
|
||||||
|
const { monitorDiscordProvider } = await import("./provider.js");
|
||||||
|
|
||||||
|
resolveDiscordAccountMock.mockImplementation(() => ({
|
||||||
|
accountId: "default",
|
||||||
|
token: "cfg-token",
|
||||||
|
config: {
|
||||||
|
commands: { native: true, nativeSkills: false },
|
||||||
|
voice: { enabled: false },
|
||||||
|
agentComponents: { enabled: false },
|
||||||
|
execApprovals: { enabled: false },
|
||||||
|
eventQueue: { listenerTimeout: 50_000 },
|
||||||
|
},
|
||||||
|
}));
|
||||||
|
|
||||||
|
await monitorDiscordProvider({
|
||||||
|
config: baseConfig(),
|
||||||
|
runtime: baseRuntime(),
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(createDiscordMessageHandlerMock).toHaveBeenCalledTimes(1);
|
||||||
|
const firstCall = createDiscordMessageHandlerMock.mock.calls.at(0) as
|
||||||
|
| [{ workerRunTimeoutMs?: number; listenerTimeoutMs?: number }]
|
||||||
|
| undefined;
|
||||||
|
const params = firstCall?.[0];
|
||||||
|
expect(params?.workerRunTimeoutMs).toBeUndefined();
|
||||||
|
expect("listenerTimeoutMs" in (params ?? {})).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("forwards inbound worker timeout config to the Discord message handler", async () => {
|
||||||
|
const { monitorDiscordProvider } = await import("./provider.js");
|
||||||
|
|
||||||
|
resolveDiscordAccountMock.mockImplementation(() => ({
|
||||||
|
accountId: "default",
|
||||||
|
token: "cfg-token",
|
||||||
|
config: {
|
||||||
|
commands: { native: true, nativeSkills: false },
|
||||||
|
voice: { enabled: false },
|
||||||
|
agentComponents: { enabled: false },
|
||||||
|
execApprovals: { enabled: false },
|
||||||
|
inboundWorker: { runTimeoutMs: 300_000 },
|
||||||
|
},
|
||||||
|
}));
|
||||||
|
|
||||||
|
await monitorDiscordProvider({
|
||||||
|
config: baseConfig(),
|
||||||
|
runtime: baseRuntime(),
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(createDiscordMessageHandlerMock).toHaveBeenCalledTimes(1);
|
||||||
|
const firstCall = createDiscordMessageHandlerMock.mock.calls.at(0) as
|
||||||
|
| [{ workerRunTimeoutMs?: number }]
|
||||||
|
| undefined;
|
||||||
|
const params = firstCall?.[0];
|
||||||
|
expect(params?.workerRunTimeoutMs).toBe(300_000);
|
||||||
|
});
|
||||||
|
|
||||||
it("registers plugin commands as native Discord commands", async () => {
|
it("registers plugin commands as native Discord commands", async () => {
|
||||||
const { monitorDiscordProvider } = await import("./provider.js");
|
const { monitorDiscordProvider } = await import("./provider.js");
|
||||||
listNativeCommandSpecsForConfigMock.mockReturnValue([
|
listNativeCommandSpecsForConfigMock.mockReturnValue([
|
||||||
|
|||||||
@@ -600,8 +600,9 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
|
|||||||
if (voiceEnabled) {
|
if (voiceEnabled) {
|
||||||
clientPlugins.push(new VoicePlugin());
|
clientPlugins.push(new VoicePlugin());
|
||||||
}
|
}
|
||||||
// Pass eventQueue config to Carbon so the listener timeout can be tuned.
|
// Pass eventQueue config to Carbon so the gateway listener budget can be tuned.
|
||||||
// Default listenerTimeout is 120s (Carbon defaults to 30s which is too short for LLM calls).
|
// Default listenerTimeout is 120s (Carbon defaults to 30s, which is too short for some
|
||||||
|
// Discord normalization/enqueue work).
|
||||||
const eventQueueOpts = {
|
const eventQueueOpts = {
|
||||||
listenerTimeout: 120_000,
|
listenerTimeout: 120_000,
|
||||||
...discordCfg.eventQueue,
|
...discordCfg.eventQueue,
|
||||||
@@ -683,7 +684,7 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
|
|||||||
runtime,
|
runtime,
|
||||||
setStatus: opts.setStatus,
|
setStatus: opts.setStatus,
|
||||||
abortSignal: opts.abortSignal,
|
abortSignal: opts.abortSignal,
|
||||||
listenerTimeoutMs: eventQueueOpts.listenerTimeout,
|
workerRunTimeoutMs: discordCfg.inboundWorker?.runTimeoutMs,
|
||||||
botUserId,
|
botUserId,
|
||||||
guildHistories,
|
guildHistories,
|
||||||
historyLimit,
|
historyLimit,
|
||||||
|
|||||||
120
src/discord/monitor/timeouts.ts
Normal file
120
src/discord/monitor/timeouts.ts
Normal file
@@ -0,0 +1,120 @@
|
|||||||
|
const MAX_DISCORD_TIMEOUT_MS = 2_147_483_647;
|
||||||
|
|
||||||
|
export const DISCORD_DEFAULT_LISTENER_TIMEOUT_MS = 120_000;
|
||||||
|
export const DISCORD_DEFAULT_INBOUND_WORKER_TIMEOUT_MS = 30 * 60_000;
|
||||||
|
|
||||||
|
function clampDiscordTimeoutMs(timeoutMs: number, minimumMs: number): number {
|
||||||
|
return Math.max(minimumMs, Math.min(Math.floor(timeoutMs), MAX_DISCORD_TIMEOUT_MS));
|
||||||
|
}
|
||||||
|
|
||||||
|
export function normalizeDiscordListenerTimeoutMs(raw: number | undefined): number {
|
||||||
|
if (!Number.isFinite(raw) || (raw ?? 0) <= 0) {
|
||||||
|
return DISCORD_DEFAULT_LISTENER_TIMEOUT_MS;
|
||||||
|
}
|
||||||
|
return clampDiscordTimeoutMs(raw!, 1_000);
|
||||||
|
}
|
||||||
|
|
||||||
|
export function normalizeDiscordInboundWorkerTimeoutMs(
|
||||||
|
raw: number | undefined,
|
||||||
|
): number | undefined {
|
||||||
|
if (raw === 0) {
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
if (typeof raw !== "number" || !Number.isFinite(raw) || raw < 0) {
|
||||||
|
return DISCORD_DEFAULT_INBOUND_WORKER_TIMEOUT_MS;
|
||||||
|
}
|
||||||
|
return clampDiscordTimeoutMs(raw, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
export function isAbortError(error: unknown): boolean {
|
||||||
|
if (typeof error !== "object" || error === null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return "name" in error && String((error as { name?: unknown }).name) === "AbortError";
|
||||||
|
}
|
||||||
|
|
||||||
|
export function mergeAbortSignals(
|
||||||
|
signals: Array<AbortSignal | undefined>,
|
||||||
|
): AbortSignal | undefined {
|
||||||
|
const activeSignals = signals.filter((signal): signal is AbortSignal => Boolean(signal));
|
||||||
|
if (activeSignals.length === 0) {
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
if (activeSignals.length === 1) {
|
||||||
|
return activeSignals[0];
|
||||||
|
}
|
||||||
|
if (typeof AbortSignal.any === "function") {
|
||||||
|
return AbortSignal.any(activeSignals);
|
||||||
|
}
|
||||||
|
const fallbackController = new AbortController();
|
||||||
|
for (const signal of activeSignals) {
|
||||||
|
if (signal.aborted) {
|
||||||
|
fallbackController.abort();
|
||||||
|
return fallbackController.signal;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
const abortFallback = () => {
|
||||||
|
fallbackController.abort();
|
||||||
|
for (const signal of activeSignals) {
|
||||||
|
signal.removeEventListener("abort", abortFallback);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
for (const signal of activeSignals) {
|
||||||
|
signal.addEventListener("abort", abortFallback, { once: true });
|
||||||
|
}
|
||||||
|
return fallbackController.signal;
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function runDiscordTaskWithTimeout(params: {
|
||||||
|
run: (abortSignal: AbortSignal | undefined) => Promise<void>;
|
||||||
|
timeoutMs?: number;
|
||||||
|
abortSignals?: Array<AbortSignal | undefined>;
|
||||||
|
onTimeout: (timeoutMs: number) => void;
|
||||||
|
onAbortAfterTimeout?: () => void;
|
||||||
|
onErrorAfterTimeout?: (error: unknown) => void;
|
||||||
|
}): Promise<boolean> {
|
||||||
|
const timeoutAbortController = params.timeoutMs ? new AbortController() : undefined;
|
||||||
|
const mergedAbortSignal = mergeAbortSignals([
|
||||||
|
...(params.abortSignals ?? []),
|
||||||
|
timeoutAbortController?.signal,
|
||||||
|
]);
|
||||||
|
|
||||||
|
let timedOut = false;
|
||||||
|
let timeoutHandle: ReturnType<typeof setTimeout> | null = null;
|
||||||
|
const runPromise = params.run(mergedAbortSignal).catch((error) => {
|
||||||
|
if (!timedOut) {
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
if (timeoutAbortController?.signal.aborted && isAbortError(error)) {
|
||||||
|
params.onAbortAfterTimeout?.();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
params.onErrorAfterTimeout?.(error);
|
||||||
|
});
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (!params.timeoutMs) {
|
||||||
|
await runPromise;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
const timeoutPromise = new Promise<"timeout">((resolve) => {
|
||||||
|
timeoutHandle = setTimeout(() => resolve("timeout"), params.timeoutMs);
|
||||||
|
timeoutHandle.unref?.();
|
||||||
|
});
|
||||||
|
const result = await Promise.race([
|
||||||
|
runPromise.then(() => "completed" as const),
|
||||||
|
timeoutPromise,
|
||||||
|
]);
|
||||||
|
if (result === "timeout") {
|
||||||
|
timedOut = true;
|
||||||
|
timeoutAbortController?.abort();
|
||||||
|
params.onTimeout(params.timeoutMs);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
} finally {
|
||||||
|
if (timeoutHandle) {
|
||||||
|
clearTimeout(timeoutHandle);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user