Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/junior-evals/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"@sentry/junior": "workspace:*",
"@sentry/junior-github": "workspace:*",
"@sentry/junior-sentry": "workspace:*",
"chat": "4.27.0",
"chat": "4.28.1",
"typescript": "^5.9.3",
"vitest": "^4.1.5",
"vitest-evals": "0.9.0-beta.1"
Expand Down
8 changes: 4 additions & 4 deletions packages/junior/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@
},
"dependencies": {
"@ai-sdk/gateway": "^3.0.110",
"@chat-adapter/slack": "4.27.0",
"@chat-adapter/state-memory": "4.27.0",
"@chat-adapter/state-redis": "4.27.0",
"@chat-adapter/slack": "4.28.1",
"@chat-adapter/state-memory": "4.28.1",
"@chat-adapter/state-redis": "4.28.1",
"@logtape/logtape": "^2.0.7",
"@mariozechner/pi-agent-core": "0.73.0",
"@mariozechner/pi-ai": "0.73.0",
Expand All @@ -48,7 +48,7 @@
"@vercel/sandbox": "2.0.0-beta.19",
"ai": "^6.0.175",
"bash-tool": "^1.3.16",
"chat": "4.27.0",
"chat": "4.28.1",
"hono": "^4.12.6",
"just-bash": "2.14.2",
"jose": "^6.2.2",
Expand Down
85 changes: 30 additions & 55 deletions packages/junior/src/chat/ingress/junior-chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,12 @@ type ChatInternals = {
function enqueueBackgroundTask(
options: WebhookOptions | undefined,
task: Promise<void>,
): void {
): Promise<void> {
if (!options?.waitUntil) {
throw new Error("Chat background processing requires waitUntil");
}
options.waitUntil(task);
return task;
}

export class JuniorChat<
Expand All @@ -73,81 +74,55 @@ export class JuniorChat<
/**
* Normalize Slack thread IDs before the SDK's concurrency queue.
*
* The SDK uses the `threadId` parameter as the lock/queue key
* (Chat.handleIncomingMessage → getLockKey). @chat-adapter/slack
* (as of 4.22.0) builds DM thread IDs as `slack:<channel>:` (empty
* thread_ts) when the Slack event has no `thread_ts` field — it uses
* `event.thread_ts || ""` instead of falling back to `event.ts`.
* See @chat-adapter/slack/dist/index.js:1466.
*
* A DM root event arrives as `slack:D123:` while a reply in the same
* thread carries `slack:D123:<ts>`, splitting the lock/state/subscription
* keys and breaking conversation continuity.
*
* We fix this by resolving the message eagerly (even when the adapter
* provides a factory), deriving the canonical thread ID from
* `raw.channel` + `raw.thread_ts ?? raw.ts`, and passing both the
* normalized threadId and concrete message to super.processMessage.
*
* Remove this override when @chat-adapter/slack uses `event.ts` as
* the DM thread_ts fallback.
* Slack DM roots can arrive with an empty thread timestamp, while
* later replies include the root timestamp. Resolve factories before
* delegating so the lock/state/subscription key is canonicalized before
* the SDK computes its per-thread queue key.
*/
override processMessage(
adapter: Adapter,
threadId: string,
messageOrFactory: Message | (() => Promise<Message>),
options?: WebhookOptions,
): void {
): Promise<void> {
if (typeof messageOrFactory === "function") {
// The SDK uses threadId as the lock key *before* resolving the
// factory (Chat.processMessage:2207). We must resolve eagerly so
// we can pass the normalized threadId to super. The SDK's own
// processMessage wraps the work in waitUntil, so we do the same.
const runtime = this as unknown as ChatInternals;
enqueueBackgroundTask(
return enqueueBackgroundTask(
options,
(async (): Promise<void> => {
let message: Message;
try {
const message = await messageOrFactory();
if (isExternalSlackUser(message.raw as Record<string, unknown>)) {
return;
}
const normalized = normalizeIncomingSlackThreadId(
threadId,
message,
);
if (normalized !== threadId && "threadId" in message) {
(message as unknown as Record<string, unknown>).threadId =
normalized;
}
super.processMessage(adapter, normalized, message, options);
message = await messageOrFactory();
} catch (error) {
runtime.logger?.error?.("Message factory resolution error", {
error,
threadId,
});
return;
}
if (isExternalSlackUser(message.raw as Record<string, unknown>)) {
return;
}
const normalized = normalizeIncomingSlackThreadId(threadId, message);
if (normalized !== threadId && "threadId" in message) {
(message as unknown as Record<string, unknown>).threadId =
normalized;
}
await super.processMessage(adapter, normalized, message, options);
})(),
);
return;
}
if (isExternalSlackUser(messageOrFactory.raw as Record<string, unknown>)) {
return;

const message = messageOrFactory;
if (isExternalSlackUser(message.raw as Record<string, unknown>)) {
return Promise.resolve();
}
enqueueBackgroundTask(
options,
(async (): Promise<void> => {
const normalized = normalizeIncomingSlackThreadId(
threadId,
messageOrFactory,
);
if (normalized !== threadId && "threadId" in messageOrFactory) {
(messageOrFactory as unknown as Record<string, unknown>).threadId =
normalized;
}
super.processMessage(adapter, normalized, messageOrFactory, options);
})(),
);

const normalized = normalizeIncomingSlackThreadId(threadId, message);
if (normalized !== threadId && "threadId" in message) {
(message as unknown as Record<string, unknown>).threadId = normalized;
}
return super.processMessage(adapter, normalized, message, options);
}

override processReaction(
Expand Down
76 changes: 38 additions & 38 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading