Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"prepare": "[ -d .git ] && husky || true"
},
"dependencies": {
"@shetty4l/core": "^0.1.34"
"@shetty4l/core": "^0.1.35"
},
"devDependencies": {
"@biomejs/biome": "^2.4.2",
Expand Down
78 changes: 77 additions & 1 deletion src/channels/telegram/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,43 @@ export interface TelegramMessage {
message_thread_id?: number;
}

export interface CallbackQuery {
id: string;
from: {
id: number;
first_name?: string;
username?: string;
};
message?: {
message_id: number;
chat: {
id: number;
};
message_thread_id?: number;
text?: string;
};
data?: string;
}

export interface TelegramUpdate {
update_id: number;
message?: TelegramMessage;
callback_query?: CallbackQuery;
}

export interface InlineKeyboardButton {
text: string;
callback_data: string;
}

export interface InlineKeyboardMarkup {
inline_keyboard: InlineKeyboardButton[][];
}

export interface SendMessageOptions {
threadId?: number;
parseMode?: string;
replyMarkup?: InlineKeyboardMarkup;
}

export class TelegramApiError extends Error {
Expand Down Expand Up @@ -195,7 +224,7 @@ export async function getUpdates(
): Promise<TelegramUpdate[]> {
const payload: Record<string, unknown> = {
timeout: timeoutSec,
allowed_updates: ["message"],
allowed_updates: ["message", "callback_query"],
};
if (offset !== undefined) {
payload.offset = offset;
Expand Down Expand Up @@ -227,6 +256,9 @@ export async function sendMessage(
if (opts.parseMode !== undefined) {
payload.parse_mode = opts.parseMode;
}
if (opts.replyMarkup !== undefined) {
payload.reply_markup = opts.replyMarkup;
}

return callTelegramApi<TelegramMessage>(
botToken,
Expand All @@ -235,3 +267,47 @@ export async function sendMessage(
15000,
);
}

export async function answerCallbackQuery(
botToken: string,
callbackQueryId: string,
text?: string,
): Promise<boolean> {
const payload: Record<string, unknown> = {
callback_query_id: callbackQueryId,
};
if (text !== undefined) {
payload.text = text;
}

return callTelegramApi<boolean>(
botToken,
"answerCallbackQuery",
payload,
15000,
);
}

export async function editMessageReplyMarkup(
botToken: string,
chatId: number,
messageId: number,
replyMarkup: InlineKeyboardMarkup | null,
): Promise<boolean> {
const payload: Record<string, unknown> = {
chat_id: chatId,
message_id: messageId,
};
if (replyMarkup !== null) {
payload.reply_markup = replyMarkup;
} else {
payload.reply_markup = { inline_keyboard: [] };
}

return callTelegramApi<boolean>(
botToken,
"editMessageReplyMarkup",
payload,
15000,
);
}
128 changes: 124 additions & 4 deletions src/channels/telegram/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,18 @@ import { createLogger } from "@shetty4l/core/log";
import type { StateLoader } from "@shetty4l/core/state";
import type { TelegramChannelConfig } from "../../config";
import { TelegramChannelState } from "../../state/telegram";
import { TelegramCallback } from "../../state/telegram-callback";
import type { CortexClient, OutboxMessage } from "../cortex-client";
import type { Channel, ChannelStats } from "../index";
import { getUpdates, parseTelegramTopicKey, sendMessage } from "./api";
import {
answerCallbackQuery,
type CallbackQuery,
editMessageReplyMarkup,
getUpdates,
type InlineKeyboardMarkup,
parseTelegramTopicKey,
sendMessage,
} from "./api";
import { chunkMarkdownV2 } from "./chunker";
import { formatForTelegram } from "./format";

Expand Down Expand Up @@ -161,6 +170,13 @@ export class TelegramChannel implements Channel {
for (const update of updates) {
if (!this.running) break;

// Handle callback queries (inline button clicks)
if (update.callback_query) {
await this.handleCallbackQuery(update.callback_query);
s.updateOffset = update.update_id + 1;
continue;
}

// Filter by allowed user IDs
const userId = update.message?.from?.id;
if (!userId || !this.config.allowedUserIds.includes(userId)) {
Expand Down Expand Up @@ -218,6 +234,96 @@ export class TelegramChannel implements Channel {
}
}

private async handleCallbackQuery(query: CallbackQuery): Promise<void> {
const s = this.state ?? this.memoryState;

// Check for duplicate via exists()
if (this.stateLoader) {
const exists = await this.stateLoader.exists(TelegramCallback, query.id);
if (exists) {
log(`duplicate callback query ${query.id}, skipping`);
return;
}
}

// Record callback for deduplication
if (this.stateLoader) {
const callback = this.stateLoader.load(TelegramCallback, query.id);
callback.callbackQueryId = query.id;
callback.chatId = query.message?.chat.id ?? 0;
callback.messageId = query.message?.message_id ?? 0;
callback.userId = query.from.id;
callback.data = query.data ?? "";
callback.processedAt = new Date();
await this.stateLoader.flush();
}

// Answer the callback to dismiss the loading spinner
try {
await answerCallbackQuery(this.config.botToken, query.id);
} catch (e) {
log(`failed to answer callback query ${query.id}: ${e}`);
// Continue processing - answering is not critical
}

// Remove buttons from the message
if (query.message) {
try {
await editMessageReplyMarkup(
this.config.botToken,
query.message.chat.id,
query.message.message_id,
null,
);
} catch (e) {
log(`failed to remove buttons from message: ${e}`);
// Continue processing - button removal is not critical
}
}

// Filter by allowed user IDs
const userId = query.from.id;
if (!this.config.allowedUserIds.includes(userId)) {
log(`ignoring callback from unauthorized user: ${userId}`);
return;
}

// Derive topic key
const chatId = query.message?.chat.id ?? 0;
const threadId = query.message?.message_thread_id;
const topicKey = threadId ? `${chatId}:${threadId}` : `${chatId}`;

// Post to Cortex
const result = await this.cortex.receive({
channel: "telegram",
externalId: `callback:${query.id}`,
data: {
type: "button_callback",
callbackData: query.data,
originalMessageId: query.message?.message_id,
originalMessageText: query.message?.text,
userId,
chatId,
threadId,
topicKey,
},
occurredAt: new Date().toISOString(),
mode: "realtime",
metadata: { topicKey },
});

if (result.ok) {
log(`posted callback ${query.id} to cortex (topic: ${topicKey})`);
s.lastPostAt = new Date();
s.status = "healthy";
s.error = null;
s.consecutiveFailures = 0;
} else {
log(`cortex error for callback ${query.id}: ${result.error}`);
throw new Error(`Cortex error: ${result.error}`);
}
}

// --- Delivery Loop ---

private async runDeliveryLoop(): Promise<void> {
Expand Down Expand Up @@ -305,11 +411,25 @@ export class TelegramChannel implements Channel {
const formatted = formatForTelegram(msg.text);
const chunks = chunkMarkdownV2(formatted);

// Send each chunk
for (const chunk of chunks) {
await sendMessage(this.config.botToken, topic.chatId, chunk, {
// Build inline keyboard from payload.buttons if present
const buttons = msg.payload?.buttons as
| Array<{ label: string; data: string }>
| undefined;
const replyMarkup: InlineKeyboardMarkup | undefined = buttons?.length
? {
inline_keyboard: [
buttons.map((b) => ({ text: b.label, callback_data: b.data })),
],
}
: undefined;

// Send each chunk (only last chunk gets buttons)
for (let i = 0; i < chunks.length; i++) {
const isLastChunk = i === chunks.length - 1;
await sendMessage(this.config.botToken, topic.chatId, chunks[i], {
threadId: topic.threadId,
parseMode: "MarkdownV2",
replyMarkup: isLastChunk ? replyMarkup : undefined,
});
}

Expand Down
29 changes: 29 additions & 0 deletions src/state/telegram-callback.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Persisted state for Telegram callback queries (inline button clicks).
*
* Used for deduplication — each callback is stored with its ID to prevent
* duplicate processing on retries or redeliveries.
*/

import { Field, Persisted } from "@shetty4l/core/state";

@Persisted("telegram_callbacks")
export class TelegramCallback {
/** Unique callback query ID from Telegram. */
@Field("string") callbackQueryId: string = "";

/** Chat ID where the callback originated. */
@Field("number") chatId: number = 0;

/** Message ID that contained the inline keyboard. */
@Field("number") messageId: number = 0;

/** User ID who clicked the button. */
@Field("number") userId: number = 0;

/** Callback data string from the button. */
@Field("string") data: string = "";

/** When this callback was processed. */
@Field("date") processedAt: Date | null = null;
}
Loading