From b4d869bd0d388b6d4368207c0c4cdb08d43efc57 Mon Sep 17 00:00:00 2001 From: Trard Date: Mon, 20 Apr 2026 22:57:13 +0300 Subject: [PATCH] Add connector strategy and codex media replies --- .env.example | 8 + package-lock.json | 16 + package.json | 1 + setup.md | 49 ++- src/bot.js | 6 +- src/config.js | 64 +++- src/connectors/index.js | 14 + src/connectors/local-codex.js | 322 ++++++++++++++++++ .../schemas/local-codex-response.schema.json | 36 ++ src/connectors/strategy.js | 13 + src/handlers/message.js | 67 +++- src/index.js | 14 +- src/services/asset-store.js | 76 ++++- src/services/request-processor.js | 34 ++ src/services/telegram-messenger.js | 101 ++++++ 15 files changed, 799 insertions(+), 22 deletions(-) create mode 100644 src/connectors/index.js create mode 100644 src/connectors/local-codex.js create mode 100644 src/connectors/schemas/local-codex-response.schema.json create mode 100644 src/connectors/strategy.js create mode 100644 src/services/request-processor.js create mode 100644 src/services/telegram-messenger.js diff --git a/.env.example b/.env.example index 7ea4075..f9ea631 100644 --- a/.env.example +++ b/.env.example @@ -1,2 +1,10 @@ BOT_TOKEN=your_telegram_bot_token_here BOT_API_ROOT= +BOT_CONNECTOR=local-codex +LOCAL_CODEX_COMMAND=codex +LOCAL_CODEX_WORKDIR= +LOCAL_CODEX_SANDBOX=workspace-write +LOCAL_CODEX_MODEL= +LOCAL_CODEX_PROFILE= +LOCAL_CODEX_SKIP_GIT_REPO_CHECK= +LOCAL_CODEX_ADD_DIRS= diff --git a/package-lock.json b/package-lock.json index 72c90ac..df89fd1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -5,11 +5,27 @@ "packages": { "": { "dependencies": { + "@grammyjs/auto-retry": "^2.0.2", "@grammyjs/files": "^1.2.0", "dotenv": "^17.4.2", "grammy": "^1.42.0" } }, + "node_modules/@grammyjs/auto-retry": { + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/@grammyjs/auto-retry/-/auto-retry-2.0.2.tgz", + "integrity": "sha512-b4A4p5jlYDiQtW0c0FXYe11WMkYoiW+5rvOaDMCOk1h7Pu2SDl7B7gmFF8cthWCx2+M2nWLOsBxAgbBG4kKWYg==", + "license": "MIT", + "dependencies": { + "debug": "^4.3.4" + }, + "engines": { + "node": ">=12.20.0 || >=14.13.1" + }, + "peerDependencies": { + "grammy": "^1.10.0" + } + }, "node_modules/@grammyjs/files": { "version": "1.2.0", "resolved": "https://registry.npmjs.org/@grammyjs/files/-/files-1.2.0.tgz", diff --git a/package.json b/package.json index 1cae7ee..8cefc6a 100644 --- a/package.json +++ b/package.json @@ -3,6 +3,7 @@ "start": "node src/index.js" }, "dependencies": { + "@grammyjs/auto-retry": "^2.0.2", "@grammyjs/files": "^1.2.0", "dotenv": "^17.4.2", "grammy": "^1.42.0" diff --git a/setup.md b/setup.md index 99c3831..e9b1c22 100644 --- a/setup.md +++ b/setup.md @@ -19,6 +19,14 @@ npm install ```env BOT_TOKEN=your_telegram_bot_token_here BOT_API_ROOT= +BOT_CONNECTOR=local-codex +LOCAL_CODEX_COMMAND=codex +LOCAL_CODEX_WORKDIR= +LOCAL_CODEX_SANDBOX=workspace-write +LOCAL_CODEX_MODEL= +LOCAL_CODEX_PROFILE= +LOCAL_CODEX_SKIP_GIT_REPO_CHECK= +LOCAL_CODEX_ADD_DIRS= ``` 3. Start the bot: @@ -31,7 +39,44 @@ npm start - Saves text messages to `assets/request.txt` by appending new lines - Saves photos, videos, voice messages, and documents to `assets/` -- Replies with `Ок` for supported message types +- Immediately replies with `Сообщение доставлено ИИ агенту, оно обрабатывается` +- Sends the saved request to the configured connector strategy +- Returns the connector result back to Telegram as a follow-up reply + +## Connector Strategy + +The bot uses a connector strategy selected by `BOT_CONNECTOR`. + +### `local-codex` + +Runs the local `codex` CLI with `codex exec` for every supported Telegram message. +The connector expects structured JSON output with: + +- `message`: text reply for Telegram +- `attachments`: files to upload back to the user +- attachment fields: `path`, `kind`, `caption` + +Supported attachment `kind` values: + +- `photo` +- `video` +- `animation` +- `document` + +If Codex generates an image, video, or any other file for the user, it should include that file path in `attachments`. Relative paths are resolved from `LOCAL_CODEX_WORKDIR`. + +Relevant variables: + +- `BOT_CONNECTOR=local-codex` +- `LOCAL_CODEX_COMMAND=codex` +- `LOCAL_CODEX_WORKDIR=`: working directory for Codex. Empty means this project root. +- `LOCAL_CODEX_SANDBOX=workspace-write`: sandbox mode passed to `codex exec` +- `LOCAL_CODEX_MODEL=`: optional model override +- `LOCAL_CODEX_PROFILE=`: optional Codex profile +- `LOCAL_CODEX_SKIP_GIT_REPO_CHECK=`: set to `true` if the Codex workdir is not a git repo +- `LOCAL_CODEX_ADD_DIRS=`: comma-separated extra directories for Codex access + +The connector queue is sequential, so messages are processed one by one in arrival order. ## Optional: Local Bot API Server For Files Larger Than 20 MB @@ -111,6 +156,7 @@ Then set this in `.env`: ```env BOT_TOKEN=your_telegram_bot_token_here BOT_API_ROOT=http://127.0.0.1:8081 +BOT_CONNECTOR=local-codex ``` Restart the bot after that: @@ -124,3 +170,4 @@ npm start - `BOT_API_ROOT` is optional. Leave it empty to use the default Telegram hosted Bot API. - The local Bot API server listens over HTTP by default. - If both the bot and the local Bot API server run on the same machine, `127.0.0.1:8081` is enough. +- `BOT_CONNECTOR` defaults to `local-codex` in the current setup. diff --git a/src/bot.js b/src/bot.js index 34a1bf7..d74ca00 100644 --- a/src/bot.js +++ b/src/bot.js @@ -1,14 +1,16 @@ const { Bot } = require("grammy"); +const { autoRetry } = require("@grammyjs/auto-retry"); const { hydrateFiles } = require("@grammyjs/files"); const { registerMessageHandlers } = require("./handlers/message"); -function createBot({ token, botApiRoot, assetStore }) { +function createBot({ token, botApiRoot, assetStore, requestProcessor }) { const bot = new Bot(token, botApiRoot ? { client: { apiRoot: botApiRoot }, } : undefined); bot.api.config.use(hydrateFiles(bot.token)); + bot.api.config.use(autoRetry({ maxRetryAttempts: 4 })); - registerMessageHandlers(bot, assetStore); + registerMessageHandlers(bot, { assetStore, requestProcessor }); bot.catch((error) => { console.error("Bot error:", error.error); diff --git a/src/config.js b/src/config.js index 04a25de..34f5e7f 100644 --- a/src/config.js +++ b/src/config.js @@ -3,17 +3,79 @@ const dotenv = require("dotenv"); dotenv.config({ quiet: true }); +function readString(value, fallback = null) { + if (typeof value !== "string") { + return fallback; + } + + const normalizedValue = value.trim(); + + return normalizedValue || fallback; +} + +function readBoolean(value, fallback = false) { + const normalizedValue = readString(value); + + if (normalizedValue === null) { + return fallback; + } + + return normalizedValue === "1" || normalizedValue.toLowerCase() === "true"; +} + +function readList(value) { + const normalizedValue = readString(value, ""); + + if (!normalizedValue) { + return []; + } + + return normalizedValue + .split(",") + .map((item) => item.trim()) + .filter(Boolean); +} + +function resolveFromRoot(rootDir, value, fallback) { + const resolvedValue = readString(value, fallback); + + if (!resolvedValue) { + return null; + } + + return path.isAbsolute(resolvedValue) + ? resolvedValue + : path.resolve(rootDir, resolvedValue); +} + const token = process.env.BOT_TOKEN; const botApiRoot = process.env.BOT_API_ROOT; +const rootDir = path.join(__dirname, ".."); if (!token) { throw new Error("Missing BOT_TOKEN in .env"); } -const assetsDir = path.join(__dirname, "..", "assets"); +const assetsDir = path.join(rootDir, "assets"); +const connector = { + type: readString(process.env.BOT_CONNECTOR, "local-codex"), + localCodex: { + command: readString(process.env.LOCAL_CODEX_COMMAND, "codex"), + workdir: resolveFromRoot(rootDir, process.env.LOCAL_CODEX_WORKDIR, rootDir), + sandbox: readString(process.env.LOCAL_CODEX_SANDBOX, "workspace-write"), + model: readString(process.env.LOCAL_CODEX_MODEL), + profile: readString(process.env.LOCAL_CODEX_PROFILE), + skipGitRepoCheck: readBoolean(process.env.LOCAL_CODEX_SKIP_GIT_REPO_CHECK), + addDirs: readList(process.env.LOCAL_CODEX_ADD_DIRS).map( + (value) => resolveFromRoot(rootDir, value, value), + ), + }, +}; module.exports = { assetsDir, botApiRoot, + connector, + rootDir, token, }; diff --git a/src/connectors/index.js b/src/connectors/index.js new file mode 100644 index 0000000..545d5d8 --- /dev/null +++ b/src/connectors/index.js @@ -0,0 +1,14 @@ +const { createLocalCodexConnectorStrategy } = require("./local-codex"); + +function createConnectorStrategy(config) { + switch (config.type) { + case "local-codex": + return createLocalCodexConnectorStrategy(config.localCodex); + default: + throw new Error(`Unsupported connector type: ${config.type}`); + } +} + +module.exports = { + createConnectorStrategy, +}; diff --git a/src/connectors/local-codex.js b/src/connectors/local-codex.js new file mode 100644 index 0000000..9140704 --- /dev/null +++ b/src/connectors/local-codex.js @@ -0,0 +1,322 @@ +const fs = require("node:fs/promises"); +const os = require("node:os"); +const path = require("node:path"); +const { spawn } = require("node:child_process"); +const { ConnectorStrategy } = require("./strategy"); + +const RESPONSE_SCHEMA_PATH = path.join( + __dirname, + "schemas", + "local-codex-response.schema.json", +); +const PHOTO_EXTENSIONS = new Set([".jpg", ".jpeg", ".png", ".webp"]); +const VIDEO_EXTENSIONS = new Set([".mp4", ".mov", ".m4v", ".webm", ".mkv"]); +const ANIMATION_EXTENSIONS = new Set([".gif"]); + +function dedupePaths(paths) { + return [...new Set(paths.filter(Boolean).map((value) => path.resolve(value)))]; +} + +function isInsideDirectory(parentPath, childPath) { + const relativePath = path.relative(parentPath, childPath); + + return relativePath === "" || ( + !relativePath.startsWith("..") && + !path.isAbsolute(relativePath) + ); +} + +function collectImagePaths(attachments) { + return attachments + .filter((attachment) => attachment.kind === "photo") + .map((attachment) => attachment.path); +} + +function formatAttachmentBlock(attachments) { + if (attachments.length === 0) { + return "- no attachments"; + } + + return attachments + .map((attachment, index) => { + const lines = [ + `- attachment ${index + 1}`, + ` kind: ${attachment.kind}`, + ` path: ${attachment.path}`, + ]; + + if (attachment.originalName) { + lines.push(` original name: ${attachment.originalName}`); + } + + if (attachment.caption) { + lines.push(` caption: ${attachment.caption}`); + } + + if (attachment.captionPath) { + lines.push(` caption file: ${attachment.captionPath}`); + } + + return lines.join("\n"); + }) + .join("\n"); +} + +function buildPrompt(request) { + const userText = request.text?.trim() || "(empty)"; + const username = request.source.username || "(unknown)"; + const displayName = request.source.displayName || "(unknown)"; + + return [ + "You are answering a Telegram user through a bot connector.", + "Return a JSON object matching the provided response schema.", + "Default to Russian unless the user clearly asked for another language.", + "Set `message` to the final Telegram text reply.", + "If you created files that should be sent back to the user, include them in `attachments`.", + "Each attachment must contain `path`, `kind`, and `caption`.", + "Use `kind` values `photo`, `video`, `animation`, or `document`.", + "Use an empty string for `caption` when it is not needed.", + "Use an empty array when there is nothing to attach.", + "Paths may be absolute or relative to the Codex workdir.", + "Do not mention raw file paths in `message` unless the user explicitly asked for them.", + "", + "Telegram context:", + `- chat id: ${request.source.chatId}`, + `- message id: ${request.source.messageId}`, + `- username: ${username}`, + `- display name: ${displayName}`, + `- requested at: ${request.requestedAt.toISOString()}`, + "", + "User message:", + userText, + "", + "Attachments:", + formatAttachmentBlock(request.attachments), + ].join("\n"); +} + +function buildCodexArgs(config, request, outputPath) { + const args = [ + "exec", + "--ephemeral", + "--color", + "never", + "--sandbox", + config.sandbox, + "-C", + config.workdir, + "--output-schema", + RESPONSE_SCHEMA_PATH, + "-o", + outputPath, + ]; + + if (config.skipGitRepoCheck) { + args.push("--skip-git-repo-check"); + } + + if (config.model) { + args.push("--model", config.model); + } + + if (config.profile) { + args.push("--profile", config.profile); + } + + const attachmentDirs = request.attachments + .map((attachment) => path.dirname(attachment.path)) + .filter((attachmentDir) => !isInsideDirectory(config.workdir, attachmentDir)); + const addDirs = dedupePaths([ + ...config.addDirs, + ...attachmentDirs, + ]); + + for (const addDir of addDirs) { + args.push("--add-dir", addDir); + } + + for (const imagePath of collectImagePaths(request.attachments)) { + args.push("--image", imagePath); + } + + args.push("-"); + + return args; +} + +function resolveAttachmentPath(config, attachmentPath) { + if (!attachmentPath) { + return null; + } + + return path.isAbsolute(attachmentPath) + ? path.resolve(attachmentPath) + : path.resolve(config.workdir, attachmentPath); +} + +function inferAttachmentKind(filePath, rawKind) { + const normalizedKind = String(rawKind || "").trim().toLowerCase(); + + if (["photo", "video", "animation", "document"].includes(normalizedKind)) { + return normalizedKind; + } + + const extension = path.extname(filePath).toLowerCase(); + + if (PHOTO_EXTENSIONS.has(extension)) { + return "photo"; + } + + if (VIDEO_EXTENSIONS.has(extension)) { + return "video"; + } + + if (ANIMATION_EXTENSIONS.has(extension)) { + return "animation"; + } + + return "document"; +} + +function getAllowedFileRoots(config, request) { + return dedupePaths([ + config.workdir, + ...config.addDirs, + ...request.attachments.map((attachment) => path.dirname(attachment.path)), + ]); +} + +async function normalizeResponseAttachment(config, request, attachment) { + const resolvedPath = resolveAttachmentPath(config, attachment.path); + + if (!resolvedPath) { + return null; + } + + const allowedRoots = getAllowedFileRoots(config, request); + const isAllowed = allowedRoots.some((rootPath) => isInsideDirectory(rootPath, resolvedPath)); + + if (!isAllowed) { + console.warn(`Skipping connector attachment outside allowed roots: ${resolvedPath}`); + return null; + } + + let stats; + + try { + stats = await fs.stat(resolvedPath); + } catch (error) { + console.warn(`Skipping missing connector attachment: ${resolvedPath}`); + return null; + } + + if (!stats.isFile()) { + console.warn(`Skipping non-file connector attachment: ${resolvedPath}`); + return null; + } + + return { + path: resolvedPath, + kind: inferAttachmentKind(resolvedPath, attachment.kind), + caption: String(attachment.caption || "").trim(), + }; +} + +async function parseConnectorResult(config, request, outputPath) { + const rawOutput = await fs.readFile(outputPath, "utf8"); + const parsedOutput = JSON.parse(rawOutput); + const attachments = []; + + for (const attachment of parsedOutput.attachments || []) { + const normalizedAttachment = await normalizeResponseAttachment( + config, + request, + attachment, + ); + + if (normalizedAttachment) { + attachments.push(normalizedAttachment); + } + } + + return { + message: String(parsedOutput.message || "").trim(), + attachments, + }; +} + +function runCodex(command, args, cwd, prompt) { + return new Promise((resolve, reject) => { + const child = spawn(command, args, { + cwd, + env: process.env, + stdio: ["pipe", "pipe", "pipe"], + }); + let stdout = ""; + let stderr = ""; + + child.stdout.on("data", (chunk) => { + stdout += chunk.toString(); + }); + + child.stderr.on("data", (chunk) => { + stderr += chunk.toString(); + }); + + child.stdin.end(prompt); + child.on("error", reject); + child.on("close", (code) => { + if (code === 0) { + resolve({ stdout, stderr }); + return; + } + + const error = new Error( + `Codex exited with code ${code}${stderr ? `: ${stderr.trim()}` : ""}`, + ); + + error.code = code; + error.stdout = stdout; + error.stderr = stderr; + reject(error); + }); + }); +} + +class LocalCodexConnectorStrategy extends ConnectorStrategy { + constructor(config) { + super("local-codex"); + this.config = config; + } + + async process(request) { + const outputPath = path.join( + await fs.mkdtemp(path.join(os.tmpdir(), "bot-codex-")), + "last-message.txt", + ); + const args = buildCodexArgs(this.config, request, outputPath); + const prompt = buildPrompt(request); + + try { + await runCodex(this.config.command, args, this.config.workdir, prompt); + const result = await parseConnectorResult(this.config, request, outputPath); + + return { + connector: this.name, + message: result.message || "Задача обработана.", + attachments: result.attachments, + }; + } finally { + await fs.rm(path.dirname(outputPath), { recursive: true, force: true }); + } + } +} + +function createLocalCodexConnectorStrategy(config) { + return new LocalCodexConnectorStrategy(config); +} + +module.exports = { + LocalCodexConnectorStrategy, + createLocalCodexConnectorStrategy, +}; diff --git a/src/connectors/schemas/local-codex-response.schema.json b/src/connectors/schemas/local-codex-response.schema.json new file mode 100644 index 0000000..c564fc0 --- /dev/null +++ b/src/connectors/schemas/local-codex-response.schema.json @@ -0,0 +1,36 @@ +{ + "type": "object", + "properties": { + "message": { + "type": "string" + }, + "attachments": { + "type": "array", + "items": { + "type": "object", + "properties": { + "path": { + "type": "string" + }, + "kind": { + "type": "string" + }, + "caption": { + "type": "string" + } + }, + "required": [ + "path", + "kind", + "caption" + ], + "additionalProperties": false + } + } + }, + "required": [ + "message", + "attachments" + ], + "additionalProperties": false +} diff --git a/src/connectors/strategy.js b/src/connectors/strategy.js new file mode 100644 index 0000000..b515760 --- /dev/null +++ b/src/connectors/strategy.js @@ -0,0 +1,13 @@ +class ConnectorStrategy { + constructor(name) { + this.name = name; + } + + async process(_request) { + throw new Error(`Connector strategy "${this.name}" must implement process()`); + } +} + +module.exports = { + ConnectorStrategy, +}; diff --git a/src/handlers/message.js b/src/handlers/message.js index 010ec18..65db6d5 100644 --- a/src/handlers/message.js +++ b/src/handlers/message.js @@ -49,26 +49,69 @@ function getAttachmentPayload(message) { return null; } -function registerMessageHandlers(bot, assetStore) { +function getDisplayName(from) { + const parts = [from?.first_name, from?.last_name].filter(Boolean); + + return parts.join(" ").trim(); +} + +async function buildConnectorRequest(ctx, assetStore) { + const { message } = ctx; + const requestedAt = typeof message.date === "number" + ? new Date(message.date * 1000) + : new Date(); + const baseRequest = { + text: "", + attachments: [], + requestedAt, + source: { + api: ctx.api, + chatId: ctx.chat?.id, + messageId: message.message_id, + username: message.from?.username ?? null, + displayName: getDisplayName(message.from) || null, + }, + }; + + if (message.text) { + await assetStore.saveText(ctx, message.text); + return { + ...baseRequest, + text: message.text, + }; + } + + const attachment = getAttachmentPayload(message); + + if (!attachment) { + return null; + } + + const savedAttachment = await assetStore.saveTelegramFile( + ctx, + attachment.fileId, + attachment.options, + ); + + return { + ...baseRequest, + text: attachment.options.caption ?? "", + attachments: [savedAttachment], + }; +} + +function registerMessageHandlers(bot, { assetStore, requestProcessor }) { const deliveredReply = "Сообщение доставлено ИИ агенту, оно обрабатывается"; bot.on("message", async (ctx) => { - const { message } = ctx; + const request = await buildConnectorRequest(ctx, assetStore); - if (message.text) { - await assetStore.saveText(ctx, message.text); - await ctx.reply(deliveredReply); + if (!request) { return; } - const attachment = getAttachmentPayload(message); - - if (!attachment) { - return; - } - - await assetStore.saveTelegramFile(ctx, attachment.fileId, attachment.options); await ctx.reply(deliveredReply); + requestProcessor.enqueue(request); }); } diff --git a/src/index.js b/src/index.js index 21875ac..0995342 100644 --- a/src/index.js +++ b/src/index.js @@ -1,15 +1,23 @@ -const { assetsDir, botApiRoot, token } = require("./config"); +const { assetsDir, botApiRoot, connector: connectorConfig, token } = require("./config"); const { createBot } = require("./bot"); +const { createConnectorStrategy } = require("./connectors"); const { createAssetStore } = require("./services/asset-store"); +const { createRequestProcessor } = require("./services/request-processor"); +const { createTelegramMessenger } = require("./services/telegram-messenger"); const assetStore = createAssetStore({ assetsDir }); -const bot = createBot({ token, botApiRoot, assetStore }); +const connector = createConnectorStrategy(connectorConfig); +const messenger = createTelegramMessenger(); +const requestProcessor = createRequestProcessor({ connector, messenger }); +const bot = createBot({ token, botApiRoot, assetStore, requestProcessor }); async function main() { await assetStore.ensureDir(); await bot.start({ onStart: () => { - console.log(`Bot is running. Files are saved to ${assetsDir}`); + console.log( + `Bot is running. Files are saved to ${assetsDir}. Connector: ${connector.name}`, + ); }, }); } diff --git a/src/services/asset-store.js b/src/services/asset-store.js index f705257..f367dfe 100644 --- a/src/services/asset-store.js +++ b/src/services/asset-store.js @@ -1,6 +1,17 @@ const fs = require("node:fs/promises"); const path = require("node:path"); +const DOWNLOAD_RETRY_LIMIT = 4; +const DOWNLOAD_RETRY_DELAY_MS = 1000; +const RETRYABLE_NETWORK_ERROR_CODES = new Set([ + "ECONNRESET", + "ECONNREFUSED", + "EHOSTUNREACH", + "ENETUNREACH", + "ETIMEDOUT", + "EAI_AGAIN", +]); + function makeMessageKey(ctx, kind) { const chatId = ctx.chat?.id ?? "unknown-chat"; const messageId = ctx.message?.message_id ?? Date.now(); @@ -69,11 +80,56 @@ function formatDownloadedFileEntry({ kind, targetPath, originalName, caption }) async function saveCaption(assetsDir, baseName, caption) { if (!caption) { - return; + return null; } const captionPath = path.join(assetsDir, `${baseName}.txt`); await fs.writeFile(captionPath, caption, "utf8"); + return captionPath; +} + +function sleep(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +function isRetryableNetworkError(error) { + if (!error || typeof error !== "object") { + return false; + } + + if (RETRYABLE_NETWORK_ERROR_CODES.has(error.code)) { + return true; + } + + return typeof error.message === "string" && + error.message.includes("Client network socket disconnected"); +} + +async function downloadWithRetry(telegramFile, targetPath, kind) { + let lastError; + + for (let attempt = 1; attempt <= DOWNLOAD_RETRY_LIMIT; attempt += 1) { + try { + await fs.rm(targetPath, { force: true }); + await telegramFile.download(targetPath); + return; + } catch (error) { + lastError = error; + + if (!isRetryableNetworkError(error) || attempt === DOWNLOAD_RETRY_LIMIT) { + throw error; + } + + const delayMs = DOWNLOAD_RETRY_DELAY_MS * (2 ** (attempt - 1)); + + console.warn( + `Retrying ${kind} download after network error (${error.code ?? error.message})`, + ); + await sleep(delayMs); + } + } + + throw lastError; } function createAssetStore({ assetsDir }) { @@ -86,6 +142,12 @@ function createAssetStore({ assetsDir }) { const requestedAt = getRequestDate(ctx); await appendRequestEntry(assetsDir, text, requestedAt); + return { + kind, + text, + requestedAt, + requestLogPath: getRequestLogPath(assetsDir), + }; }, async saveTelegramFile(ctx, fileId, options) { @@ -105,8 +167,8 @@ function createAssetStore({ assetsDir }) { ); const targetPath = path.join(assetsDir, `${baseName}${extension}`); - await telegramFile.download(targetPath); - await saveCaption(assetsDir, baseName, options.caption); + await downloadWithRetry(telegramFile, targetPath, options.kind); + const captionPath = await saveCaption(assetsDir, baseName, options.caption); await appendRequestEntry( assetsDir, formatDownloadedFileEntry({ @@ -117,6 +179,14 @@ function createAssetStore({ assetsDir }) { }), requestedAt, ); + return { + kind: options.kind, + path: targetPath, + originalName: options.originalName ?? null, + caption: options.caption ?? null, + captionPath, + requestedAt, + }; }, }; } diff --git a/src/services/request-processor.js b/src/services/request-processor.js new file mode 100644 index 0000000..0de7f22 --- /dev/null +++ b/src/services/request-processor.js @@ -0,0 +1,34 @@ +function createRequestProcessor({ connector, messenger }) { + let queue = Promise.resolve(); + + async function processRequest(request) { + try { + const result = await connector.process(request); + + await messenger.sendResult(request, result); + } catch (error) { + console.error(`Connector "${connector.name}" failed:`, error); + await messenger.sendResult( + request, + { + message: `Не удалось обработать сообщение через ${connector.name}. Попробуйте ещё раз.`, + attachments: [], + }, + ); + } + } + + return { + enqueue(request) { + queue = queue + .then(() => processRequest(request)) + .catch((error) => { + console.error("Request queue failed:", error); + }); + }, + }; +} + +module.exports = { + createRequestProcessor, +}; diff --git a/src/services/telegram-messenger.js b/src/services/telegram-messenger.js new file mode 100644 index 0000000..09cee40 --- /dev/null +++ b/src/services/telegram-messenger.js @@ -0,0 +1,101 @@ +const path = require("node:path"); +const { InputFile } = require("grammy"); + +const TELEGRAM_MESSAGE_LIMIT = 4096; +const TELEGRAM_CAPTION_LIMIT = 1024; + +function splitMessage(text, limit = TELEGRAM_MESSAGE_LIMIT) { + const normalizedText = String(text ?? "").trim(); + + if (!normalizedText) { + return []; + } + + const chunks = []; + let cursor = 0; + + while (cursor < normalizedText.length) { + let end = Math.min(cursor + limit, normalizedText.length); + + if (end < normalizedText.length) { + const lastNewline = normalizedText.lastIndexOf("\n", end); + + if (lastNewline > cursor + Math.floor(limit / 2)) { + end = lastNewline; + } + } + + chunks.push(normalizedText.slice(cursor, end).trim()); + cursor = end; + } + + return chunks.filter(Boolean); +} + +function createTelegramMessenger() { + function createReplyOptions(request) { + return { + reply_parameters: { + message_id: request.source.messageId, + }, + }; + } + + function normalizeCaption(caption) { + const normalizedCaption = String(caption || "").trim(); + + if (!normalizedCaption) { + return undefined; + } + + return normalizedCaption.slice(0, TELEGRAM_CAPTION_LIMIT); + } + + async function sendAttachment(request, attachment) { + const inputFile = new InputFile(attachment.path, path.basename(attachment.path)); + const options = { + ...createReplyOptions(request), + caption: normalizeCaption(attachment.caption), + }; + + switch (attachment.kind) { + case "photo": + await request.source.api.sendPhoto(request.source.chatId, inputFile, options); + return; + case "video": + await request.source.api.sendVideo(request.source.chatId, inputFile, options); + return; + case "animation": + await request.source.api.sendAnimation(request.source.chatId, inputFile, options); + return; + default: + await request.source.api.sendDocument(request.source.chatId, inputFile, options); + } + } + + return { + async sendResult(request, result) { + const text = result?.message || ""; + const chunks = splitMessage(text); + + for (let index = 0; index < chunks.length; index += 1) { + const chunk = chunks[index]; + const options = index === 0 ? createReplyOptions(request) : undefined; + + await request.source.api.sendMessage( + request.source.chatId, + chunk, + options, + ); + } + + for (const attachment of result?.attachments || []) { + await sendAttachment(request, attachment); + } + }, + }; +} + +module.exports = { + createTelegramMessenger, +};