fix(slack): populate thread session with existing thread history (#7610)

* feat(slack): populate thread session with existing thread history

When a new session is created for a Slack thread, fetch and inject
the full thread history as context. This preserves conversation
continuity so the bot knows what it previously said in the thread.

- Add resolveSlackThreadHistory() to fetch all thread messages
- Add ThreadHistoryBody to context payload
- Use thread history instead of just thread starter for new sessions

Fixes #4470

* chore: remove redundant comments

* fix: use threadContextNote in queue body

* fix(slack): address Greptile review feedback

- P0: Use thread session key (not base session key) for new-session check
  This ensures thread history is injected when the thread session is new,
  even if the base channel session already exists.

- P1: Fetch up to 200 messages and take the most recent N
  Slack API returns messages in chronological order (oldest first).
  Previously we took the first N, now we take the last N for relevant context.

- P1: Batch resolve user names with Promise.all
  Avoid N sequential API calls when resolving user names in thread history.

- P2: Include file-only messages in thread history
  Messages with attachments but no text are now included with a placeholder
  like '[attached: image.png, document.pdf]'.

- P2: Add documentation about intentional 200-message fetch limit
  Clarifies that we intentionally don't paginate; 200 covers most threads.

* style: add braces for curly lint rule

* feat(slack): add thread.initialHistoryLimit config option

Allow users to configure the maximum number of thread messages to fetch
when starting a new thread session. Defaults to 20. Set to 0 to disable
thread history fetching entirely.

This addresses the optional configuration request from #2608.

* chore: trigger CI

* fix(slack): ensure isNewSession=true on first thread turn

recordInboundSession() in prepare.ts creates the thread session entry
before session.ts reads the store, causing isNewSession to be false
on the very first user message in a thread. This prevented thread
context (history/starter) from being injected.

Add IsFirstThreadTurn flag to message context, set when
readSessionUpdatedAt() returns undefined for the thread session key.
session.ts uses this flag to force isNewSession=true.

* style: format prepare.ts for oxfmt

* fix: suppress InboundHistory/ThreadStarterBody when ThreadHistoryBody present (#13912)

When ThreadHistoryBody is fetched from the Slack API (conversations.replies),
it already contains pending messages and the thread starter. Passing both
InboundHistory and ThreadStarterBody alongside ThreadHistoryBody caused
duplicate content in the LLM context on new thread sessions.

Suppress InboundHistory and ThreadStarterBody when ThreadHistoryBody is
present, since it is a strict superset of both.

* remove verbose comment

* fix(slack): paginate thread history context fetch

* fix(slack): wire session file path options after main merge

---------

Co-authored-by: Peter Steinberger <steipete@gmail.com>
This commit is contained in:
Masataka Shinohara
2026-02-13 13:51:04 +09:00
committed by GitHub
parent daf13dbb06
commit b93ad2cd48
13 changed files with 529 additions and 6 deletions

View File

@@ -298,3 +298,132 @@ describe("resolveSlackMedia", () => {
expect(mockFetch).toHaveBeenCalledTimes(2);
});
});
describe("resolveSlackThreadHistory", () => {
afterEach(() => {
vi.resetModules();
vi.restoreAllMocks();
});
it("paginates and returns the latest N messages across pages", async () => {
const replies = vi
.fn()
.mockResolvedValueOnce({
messages: Array.from({ length: 200 }, (_, i) => ({
text: `msg-${i + 1}`,
user: "U1",
ts: `${i + 1}.000`,
})),
response_metadata: { next_cursor: "cursor-2" },
})
.mockResolvedValueOnce({
messages: Array.from({ length: 60 }, (_, i) => ({
text: `msg-${i + 201}`,
user: "U1",
ts: `${i + 201}.000`,
})),
response_metadata: { next_cursor: "" },
});
const { resolveSlackThreadHistory } = await import("./media.js");
const client = {
conversations: { replies },
} as Parameters<typeof resolveSlackThreadHistory>[0]["client"];
const result = await resolveSlackThreadHistory({
channelId: "C1",
threadTs: "1.000",
client,
currentMessageTs: "260.000",
limit: 5,
});
expect(replies).toHaveBeenCalledTimes(2);
expect(replies).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
channel: "C1",
ts: "1.000",
limit: 200,
inclusive: true,
}),
);
expect(replies).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
channel: "C1",
ts: "1.000",
limit: 200,
inclusive: true,
cursor: "cursor-2",
}),
);
expect(result.map((entry) => entry.ts)).toEqual([
"255.000",
"256.000",
"257.000",
"258.000",
"259.000",
]);
});
it("includes file-only messages and drops empty-only entries", async () => {
const replies = vi.fn().mockResolvedValueOnce({
messages: [
{ text: " ", ts: "1.000", files: [{ name: "screenshot.png" }] },
{ text: " ", ts: "2.000" },
{ text: "hello", ts: "3.000", user: "U1" },
],
response_metadata: { next_cursor: "" },
});
const { resolveSlackThreadHistory } = await import("./media.js");
const client = {
conversations: { replies },
} as Parameters<typeof resolveSlackThreadHistory>[0]["client"];
const result = await resolveSlackThreadHistory({
channelId: "C1",
threadTs: "1.000",
client,
limit: 10,
});
expect(result).toHaveLength(2);
expect(result[0]?.text).toBe("[attached: screenshot.png]");
expect(result[1]?.text).toBe("hello");
});
it("returns empty when limit is zero without calling Slack API", async () => {
const replies = vi.fn();
const { resolveSlackThreadHistory } = await import("./media.js");
const client = {
conversations: { replies },
} as Parameters<typeof resolveSlackThreadHistory>[0]["client"];
const result = await resolveSlackThreadHistory({
channelId: "C1",
threadTs: "1.000",
client,
limit: 0,
});
expect(result).toEqual([]);
expect(replies).not.toHaveBeenCalled();
});
it("returns empty when Slack API throws", async () => {
const replies = vi.fn().mockRejectedValueOnce(new Error("slack down"));
const { resolveSlackThreadHistory } = await import("./media.js");
const client = {
conversations: { replies },
} as Parameters<typeof resolveSlackThreadHistory>[0]["client"];
const result = await resolveSlackThreadHistory({
channelId: "C1",
threadTs: "1.000",
client,
limit: 20,
});
expect(result).toEqual([]);
});
});

View File

@@ -206,3 +206,91 @@ export async function resolveSlackThreadStarter(params: {
return null;
}
}
export type SlackThreadMessage = {
text: string;
userId?: string;
ts?: string;
botId?: string;
files?: SlackFile[];
};
type SlackRepliesPageMessage = {
text?: string;
user?: string;
bot_id?: string;
ts?: string;
files?: SlackFile[];
};
type SlackRepliesPage = {
messages?: SlackRepliesPageMessage[];
response_metadata?: { next_cursor?: string };
};
/**
* Fetches the most recent messages in a Slack thread (excluding the current message).
* Used to populate thread context when a new thread session starts.
*
* Uses cursor pagination and keeps only the latest N retained messages so long threads
* still produce up-to-date context without unbounded memory growth.
*/
export async function resolveSlackThreadHistory(params: {
channelId: string;
threadTs: string;
client: SlackWebClient;
currentMessageTs?: string;
limit?: number;
}): Promise<SlackThreadMessage[]> {
const maxMessages = params.limit ?? 20;
if (!Number.isFinite(maxMessages) || maxMessages <= 0) {
return [];
}
// Slack recommends no more than 200 per page.
const fetchLimit = 200;
const retained: SlackRepliesPageMessage[] = [];
let cursor: string | undefined;
try {
do {
const response = (await params.client.conversations.replies({
channel: params.channelId,
ts: params.threadTs,
limit: fetchLimit,
inclusive: true,
...(cursor ? { cursor } : {}),
})) as SlackRepliesPage;
for (const msg of response.messages ?? []) {
// Keep messages with text OR file attachments
if (!msg.text?.trim() && !msg.files?.length) {
continue;
}
if (params.currentMessageTs && msg.ts === params.currentMessageTs) {
continue;
}
retained.push(msg);
if (retained.length > maxMessages) {
retained.shift();
}
}
const next = response.response_metadata?.next_cursor;
cursor = typeof next === "string" && next.trim().length > 0 ? next.trim() : undefined;
} while (cursor);
return retained.map((msg) => ({
// For file-only messages, create a placeholder showing attached filenames
text: msg.text?.trim()
? msg.text
: `[attached: ${msg.files?.map((f) => f.name ?? "file").join(", ")}]`,
userId: msg.user,
botId: msg.bot_id,
ts: msg.ts,
files: msg.files,
}));
} catch {
return [];
}
}

View File

@@ -1,10 +1,15 @@
import type { App } from "@slack/bolt";
import { describe, expect, it } from "vitest";
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../../../config/config.js";
import type { RuntimeEnv } from "../../../runtime.js";
import type { ResolvedSlackAccount } from "../../accounts.js";
import type { SlackMessageEvent } from "../../types.js";
import { expectInboundContextContract } from "../../../../test/helpers/inbound-contract.js";
import { resolveAgentRoute } from "../../../routing/resolve-route.js";
import { resolveThreadSessionKeys } from "../../../routing/session-key.js";
import { createSlackMonitorContext } from "../context.js";
import { prepareSlackMessage } from "./prepare.js";
@@ -236,6 +241,207 @@ describe("slack prepareSlackMessage inbound contract", () => {
expect(prepared!.ctxPayload.MessageThreadId).toBe("1.000");
});
it("marks first thread turn and injects thread history for a new thread session", async () => {
const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-slack-thread-"));
const storePath = path.join(tmpDir, "sessions.json");
try {
const replies = vi
.fn()
.mockResolvedValueOnce({
messages: [{ text: "starter", user: "U2", ts: "100.000" }],
})
.mockResolvedValueOnce({
messages: [
{ text: "starter", user: "U2", ts: "100.000" },
{ text: "assistant reply", bot_id: "B1", ts: "100.500" },
{ text: "follow-up question", user: "U1", ts: "100.800" },
{ text: "current message", user: "U1", ts: "101.000" },
],
response_metadata: { next_cursor: "" },
});
const slackCtx = createSlackMonitorContext({
cfg: {
session: { store: storePath },
channels: { slack: { enabled: true, replyToMode: "all", groupPolicy: "open" } },
} as OpenClawConfig,
accountId: "default",
botToken: "token",
app: { client: { conversations: { replies } } } as App,
runtime: {} as RuntimeEnv,
botUserId: "B1",
teamId: "T1",
apiAppId: "A1",
historyLimit: 0,
sessionScope: "per-sender",
mainKey: "main",
dmEnabled: true,
dmPolicy: "open",
allowFrom: [],
groupDmEnabled: true,
groupDmChannels: [],
defaultRequireMention: false,
groupPolicy: "open",
useAccessGroups: false,
reactionMode: "off",
reactionAllowlist: [],
replyToMode: "all",
threadHistoryScope: "thread",
threadInheritParent: false,
slashCommand: {
enabled: false,
name: "openclaw",
sessionPrefix: "slack:slash",
ephemeral: true,
},
textLimit: 4000,
ackReactionScope: "group-mentions",
mediaMaxBytes: 1024,
removeAckAfterReply: false,
});
slackCtx.resolveUserName = async (id: string) => ({
name: id === "U1" ? "Alice" : "Bob",
});
slackCtx.resolveChannelName = async () => ({ name: "general", type: "channel" });
const account: ResolvedSlackAccount = {
accountId: "default",
enabled: true,
botTokenSource: "config",
appTokenSource: "config",
config: {
replyToMode: "all",
thread: { initialHistoryLimit: 20 },
},
};
const message: SlackMessageEvent = {
channel: "C123",
channel_type: "channel",
user: "U1",
text: "current message",
ts: "101.000",
thread_ts: "100.000",
} as SlackMessageEvent;
const prepared = await prepareSlackMessage({
ctx: slackCtx,
account,
message,
opts: { source: "message" },
});
expect(prepared).toBeTruthy();
expect(prepared!.ctxPayload.IsFirstThreadTurn).toBe(true);
expect(prepared!.ctxPayload.ThreadHistoryBody).toContain("assistant reply");
expect(prepared!.ctxPayload.ThreadHistoryBody).toContain("follow-up question");
expect(prepared!.ctxPayload.ThreadHistoryBody).not.toContain("current message");
expect(replies).toHaveBeenCalledTimes(2);
} finally {
fs.rmSync(tmpDir, { recursive: true, force: true });
}
});
it("does not mark first thread turn when thread session already exists in store", async () => {
const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-slack-thread-"));
const storePath = path.join(tmpDir, "sessions.json");
try {
const cfg = {
session: { store: storePath },
channels: { slack: { enabled: true, replyToMode: "all", groupPolicy: "open" } },
} as OpenClawConfig;
const route = resolveAgentRoute({
cfg,
channel: "slack",
accountId: "default",
teamId: "T1",
peer: { kind: "channel", id: "C123" },
});
const threadKeys = resolveThreadSessionKeys({
baseSessionKey: route.sessionKey,
threadId: "200.000",
});
fs.writeFileSync(
storePath,
JSON.stringify({ [threadKeys.sessionKey]: { updatedAt: Date.now() } }, null, 2),
);
const replies = vi.fn().mockResolvedValue({
messages: [{ text: "starter", user: "U2", ts: "200.000" }],
});
const slackCtx = createSlackMonitorContext({
cfg,
accountId: "default",
botToken: "token",
app: { client: { conversations: { replies } } } as App,
runtime: {} as RuntimeEnv,
botUserId: "B1",
teamId: "T1",
apiAppId: "A1",
historyLimit: 0,
sessionScope: "per-sender",
mainKey: "main",
dmEnabled: true,
dmPolicy: "open",
allowFrom: [],
groupDmEnabled: true,
groupDmChannels: [],
defaultRequireMention: false,
groupPolicy: "open",
useAccessGroups: false,
reactionMode: "off",
reactionAllowlist: [],
replyToMode: "all",
threadHistoryScope: "thread",
threadInheritParent: false,
slashCommand: {
enabled: false,
name: "openclaw",
sessionPrefix: "slack:slash",
ephemeral: true,
},
textLimit: 4000,
ackReactionScope: "group-mentions",
mediaMaxBytes: 1024,
removeAckAfterReply: false,
});
slackCtx.resolveUserName = async () => ({ name: "Alice" });
slackCtx.resolveChannelName = async () => ({ name: "general", type: "channel" });
const account: ResolvedSlackAccount = {
accountId: "default",
enabled: true,
botTokenSource: "config",
appTokenSource: "config",
config: {
replyToMode: "all",
thread: { initialHistoryLimit: 20 },
},
};
const message: SlackMessageEvent = {
channel: "C123",
channel_type: "channel",
user: "U1",
text: "reply in old thread",
ts: "201.000",
thread_ts: "200.000",
} as SlackMessageEvent;
const prepared = await prepareSlackMessage({
ctx: slackCtx,
account,
message,
opts: { source: "message" },
});
expect(prepared).toBeTruthy();
expect(prepared!.ctxPayload.IsFirstThreadTurn).toBeUndefined();
expect(prepared!.ctxPayload.ThreadHistoryBody).toBeUndefined();
} finally {
fs.rmSync(tmpDir, { recursive: true, force: true });
}
});
it("includes thread_ts and parent_user_id metadata in thread replies", async () => {
const slackCtx = createSlackMonitorContext({
cfg: {

View File

@@ -44,7 +44,11 @@ import { resolveSlackEffectiveAllowFrom } from "../auth.js";
import { resolveSlackChannelConfig } from "../channel-config.js";
import { stripSlackMentionsForCommandDetection } from "../commands.js";
import { normalizeSlackChannelType, type SlackMonitorContext } from "../context.js";
import { resolveSlackMedia, resolveSlackThreadStarter } from "../media.js";
import {
resolveSlackMedia,
resolveSlackThreadHistory,
resolveSlackThreadStarter,
} from "../media.js";
export async function prepareSlackMessage(params: {
ctx: SlackMonitorContext;
@@ -461,6 +465,8 @@ export async function prepareSlackMessage(params: {
systemPromptParts.length > 0 ? systemPromptParts.join("\n\n") : undefined;
let threadStarterBody: string | undefined;
let threadHistoryBody: string | undefined;
let threadSessionPreviousTimestamp: number | undefined;
let threadLabel: string | undefined;
let threadStarterMedia: Awaited<ReturnType<typeof resolveSlackMedia>> = null;
if (isThreadReply && threadTs) {
@@ -490,6 +496,64 @@ export async function prepareSlackMessage(params: {
} else {
threadLabel = `Slack thread ${roomLabel}`;
}
// Fetch full thread history for new thread sessions
// This provides context of previous messages (including bot replies) in the thread
// Use the thread session key (not base session key) to determine if this is a new session
const threadInitialHistoryLimit = account.config?.thread?.initialHistoryLimit ?? 20;
threadSessionPreviousTimestamp = readSessionUpdatedAt({
storePath,
sessionKey, // Thread-specific session key
});
if (threadInitialHistoryLimit > 0 && !threadSessionPreviousTimestamp) {
const threadHistory = await resolveSlackThreadHistory({
channelId: message.channel,
threadTs,
client: ctx.app.client,
currentMessageTs: message.ts,
limit: threadInitialHistoryLimit,
});
if (threadHistory.length > 0) {
// Batch resolve user names to avoid N sequential API calls
const uniqueUserIds = [
...new Set(threadHistory.map((m) => m.userId).filter((id): id is string => Boolean(id))),
];
const userMap = new Map<string, { name?: string }>();
await Promise.all(
uniqueUserIds.map(async (id) => {
const user = await ctx.resolveUserName(id);
if (user) {
userMap.set(id, user);
}
}),
);
const historyParts: string[] = [];
for (const historyMsg of threadHistory) {
const msgUser = historyMsg.userId ? userMap.get(historyMsg.userId) : null;
const msgSenderName =
msgUser?.name ?? (historyMsg.botId ? `Bot (${historyMsg.botId})` : "Unknown");
const isBot = Boolean(historyMsg.botId);
const role = isBot ? "assistant" : "user";
const msgWithId = `${historyMsg.text}\n[slack message id: ${historyMsg.ts ?? "unknown"} channel: ${message.channel}]`;
historyParts.push(
formatInboundEnvelope({
channel: "Slack",
from: `${msgSenderName} (${role})`,
timestamp: historyMsg.ts ? Math.round(Number(historyMsg.ts) * 1000) : undefined,
body: msgWithId,
chatType: "channel",
envelope: envelopeOptions,
}),
);
}
threadHistoryBody = historyParts.join("\n\n");
logVerbose(
`slack: populated thread history with ${threadHistory.length} messages for new session`,
);
}
}
}
// Use thread starter media if current message has none
@@ -529,6 +593,9 @@ export async function prepareSlackMessage(params: {
MessageThreadId: threadContext.messageThreadId,
ParentSessionKey: threadKeys.parentSessionKey,
ThreadStarterBody: threadStarterBody,
ThreadHistoryBody: threadHistoryBody,
IsFirstThreadTurn:
isThreadReply && threadTs && !threadSessionPreviousTimestamp ? true : undefined,
ThreadLabel: threadLabel,
Timestamp: message.ts ? Math.round(Number(message.ts) * 1000) : undefined,
WasMentioned: isRoomish ? effectiveWasMentioned : undefined,