diff --git a/.codex b/.codex new file mode 100644 index 0000000..e69de29 diff --git a/.env.example b/.env.example index 4de7c47..d1c7056 100644 --- a/.env.example +++ b/.env.example @@ -1,6 +1,8 @@ BOT_TOKEN=your_telegram_bot_token_here BOT_API_ROOT= -BOT_CONNECTOR=local-codex +BOT_CONNECTOR=agent-api +AGENT_API_BASE_URL=http://127.0.0.1:8000 +AGENT_API_WORKSPACE_DIR=../agent/workspace LOCAL_CODEX_COMMAND=codex LOCAL_CODEX_WORKDIR= LOCAL_CODEX_SKILL_DIR= diff --git a/package-lock.json b/package-lock.json index df89fd1..434a9d2 100644 --- a/package-lock.json +++ b/package-lock.json @@ -8,7 +8,8 @@ "@grammyjs/auto-retry": "^2.0.2", "@grammyjs/files": "^1.2.0", "dotenv": "^17.4.2", - "grammy": "^1.42.0" + "grammy": "^1.42.0", + "ws": "^8.20.0" } }, "node_modules/@grammyjs/auto-retry": { @@ -156,6 +157,27 @@ "tr46": "~0.0.3", "webidl-conversions": "^3.0.0" } + }, + "node_modules/ws": { + "version": "8.20.0", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.20.0.tgz", + "integrity": "sha512-sAt8BhgNbzCtgGbt2OxmpuryO63ZoDk/sqaB/znQm94T4fCEsy/yV+7CdC1kJhOU9lboAEU7R3kquuycDoibVA==", + "license": "MIT", + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } } } } diff --git a/package.json b/package.json index 8cefc6a..da692c3 100644 --- a/package.json +++ b/package.json @@ -6,6 +6,7 @@ "@grammyjs/auto-retry": "^2.0.2", "@grammyjs/files": "^1.2.0", "dotenv": "^17.4.2", - "grammy": "^1.42.0" + "grammy": "^1.42.0", + "ws": "^8.20.0" } } diff --git a/setup.md b/setup.md index 8c8f21e..cdc45bf 100644 --- a/setup.md +++ b/setup.md @@ -19,7 +19,9 @@ npm install ```env BOT_TOKEN=your_telegram_bot_token_here BOT_API_ROOT= -BOT_CONNECTOR=local-codex +BOT_CONNECTOR=agent-api +AGENT_API_BASE_URL=http://127.0.0.1:8000 +AGENT_API_WORKSPACE_DIR=../agent/workspace LOCAL_CODEX_COMMAND=codex LOCAL_CODEX_WORKDIR= LOCAL_CODEX_SKILL_DIR= @@ -48,6 +50,23 @@ npm start The bot uses a connector strategy selected by `BOT_CONNECTOR`. +### `agent-api` + +Connects to the websocket API of the external agent service. + +Relevant variables: + +- `BOT_CONNECTOR=agent-api` +- `AGENT_API_BASE_URL=http://127.0.0.1:8000`: base URL of the agent service +- `AGENT_API_WORKSPACE_DIR=../agent/workspace`: host path of the shared agent workspace + +How it works: + +- the bot saves incoming files to `assets/` +- before sending a request to the agent, it copies message attachments to `AGENT_API_WORKSPACE_DIR/telegram-inbox/...` +- the agent receives attachment paths relative to its `/workspace` +- if the agent emits `AGENT_EVENT_SEND_FILE`, the bot resolves the file inside `AGENT_API_WORKSPACE_DIR` and uploads it back to Telegram + ### `local-codex` Runs the local `codex` CLI with `codex exec` for every supported Telegram message. @@ -160,7 +179,7 @@ Then set this in `.env`: ```env BOT_TOKEN=your_telegram_bot_token_here BOT_API_ROOT=http://127.0.0.1:8081 -BOT_CONNECTOR=local-codex +BOT_CONNECTOR=agent-api ``` Restart the bot after that: @@ -174,4 +193,4 @@ npm start - `BOT_API_ROOT` is optional. Leave it empty to use the default Telegram hosted Bot API. - The local Bot API server listens over HTTP by default. - If both the bot and the local Bot API server run on the same machine, `127.0.0.1:8081` is enough. -- `BOT_CONNECTOR` defaults to `local-codex` in the current setup. +- `BOT_CONNECTOR` can be `agent-api` or `local-codex`. diff --git a/src/config.js b/src/config.js index d067f19..c7c9e5b 100644 --- a/src/config.js +++ b/src/config.js @@ -72,6 +72,14 @@ if (!token) { const assetsDir = path.join(rootDir, "assets"); const connector = { type: readString(process.env.BOT_CONNECTOR, "local-codex"), + agentApi: { + baseUrl: readString(process.env.AGENT_API_BASE_URL, "http://127.0.0.1:8000"), + workspaceDir: resolveFromRoot( + rootDir, + process.env.AGENT_API_WORKSPACE_DIR, + "../agent/workspace", + ), + }, localCodex: { command: readString(process.env.LOCAL_CODEX_COMMAND, "codex"), workdir: resolveFromRoot(rootDir, process.env.LOCAL_CODEX_WORKDIR, rootDir), diff --git a/src/connectors/agent-api.js b/src/connectors/agent-api.js new file mode 100644 index 0000000..6112f44 --- /dev/null +++ b/src/connectors/agent-api.js @@ -0,0 +1,323 @@ +const fs = require("node:fs/promises"); +const path = require("node:path"); +const WebSocket = require("ws"); +const { ConnectorStrategy } = require("./strategy"); + +const PHOTO_EXTENSIONS = new Set([".jpg", ".jpeg", ".png", ".webp"]); +const VIDEO_EXTENSIONS = new Set([".mp4", ".mov", ".m4v", ".webm", ".mkv"]); +const ANIMATION_EXTENSIONS = new Set([".gif"]); +const WORKSPACE_INBOX_DIR = "telegram-inbox"; +const SOCKET_TIMEOUT_MS = 15000; + +function dedupePaths(paths) { + return [...new Set(paths.filter(Boolean).map((value) => path.resolve(value)))]; +} + +function isInsideDirectory(parentPath, childPath) { + const relativePath = path.relative(parentPath, childPath); + + return relativePath === "" || ( + !relativePath.startsWith("..") && + !path.isAbsolute(relativePath) + ); +} + +function sanitizeFileName(fileName) { + return String(fileName) + .replace(/[^a-zA-Z0-9._-]/g, "_") + .replace(/_+/g, "_") || "file"; +} + +function buildWebSocketUrl(baseUrl, chatId) { + const url = new URL(baseUrl); + + url.protocol = url.protocol === "https:" ? "wss:" : "ws:"; + url.pathname = `/v1/agent_ws/${encodeURIComponent(chatId)}/`; + url.search = ""; + url.hash = ""; + + return url.toString(); +} + +function normalizeWorkspacePath(filePath) { + return String(filePath).replace(/\\/g, "/").replace(/^\/+/, ""); +} + +function toWorkspaceRelativePath(workspaceDir, filePath) { + const relativePath = path.relative(workspaceDir, filePath); + + if (!relativePath || relativePath.startsWith("..") || path.isAbsolute(relativePath)) { + throw new Error(`Path is outside workspace: ${filePath}`); + } + + return normalizeWorkspacePath(relativePath); +} + +function inferAttachmentKind(filePath) { + const extension = path.extname(filePath).toLowerCase(); + + if (PHOTO_EXTENSIONS.has(extension)) { + return "photo"; + } + + if (VIDEO_EXTENSIONS.has(extension)) { + return "video"; + } + + if (ANIMATION_EXTENSIONS.has(extension)) { + return "animation"; + } + + return "document"; +} + +function waitForSocketOpen(ws) { + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + cleanup(); + reject(new Error("Timed out while opening agent websocket")); + }, SOCKET_TIMEOUT_MS); + + function cleanup() { + clearTimeout(timeout); + ws.removeListener("open", handleOpen); + ws.removeListener("error", handleError); + ws.removeListener("unexpected-response", handleUnexpectedResponse); + } + + function handleOpen() { + cleanup(); + resolve(); + } + + function handleError(error) { + cleanup(); + reject(error); + } + + function handleUnexpectedResponse(_request, response) { + cleanup(); + reject( + new Error( + `Agent websocket handshake failed with status ${response.statusCode ?? "unknown"}`, + ), + ); + } + + ws.once("open", handleOpen); + ws.once("error", handleError); + ws.once("unexpected-response", handleUnexpectedResponse); + }); +} + +function waitForSocketMessage(ws) { + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + cleanup(); + reject(new Error("Timed out while waiting for agent websocket message")); + }, SOCKET_TIMEOUT_MS); + + function cleanup() { + clearTimeout(timeout); + ws.removeListener("message", handleMessage); + ws.removeListener("error", handleError); + ws.removeListener("close", handleClose); + } + + function handleMessage(data, isBinary) { + cleanup(); + + if (isBinary) { + reject(new Error("Agent websocket returned unexpected binary payload")); + return; + } + + resolve(String(data)); + } + + function handleError(error) { + cleanup(); + reject(error); + } + + function handleClose(code, reason) { + cleanup(); + reject( + new Error( + `Agent websocket closed before message (${code}${reason ? `: ${String(reason)}` : ""})`, + ), + ); + } + + ws.once("message", handleMessage); + ws.once("error", handleError); + ws.once("close", handleClose); + }); +} + +function closeSocket(ws) { + return new Promise((resolve) => { + if ( + ws.readyState === WebSocket.CLOSED || + ws.readyState === WebSocket.CLOSING + ) { + resolve(); + return; + } + + ws.once("close", () => resolve()); + ws.close(); + }); +} + +async function copyRequestAttachments(config, request) { + if (request.attachments.length === 0) { + return []; + } + + const targetDir = path.join( + config.workspaceDir, + WORKSPACE_INBOX_DIR, + String(request.source.chatId), + String(request.source.messageId), + ); + + await fs.mkdir(targetDir, { recursive: true }); + + const copiedAttachments = []; + + for (let index = 0; index < request.attachments.length; index += 1) { + const attachment = request.attachments[index]; + const fileName = sanitizeFileName( + attachment.originalName || path.basename(attachment.path), + ); + const targetPath = path.join(targetDir, `${index + 1}-${fileName}`); + + await fs.copyFile(attachment.path, targetPath); + copiedAttachments.push(toWorkspaceRelativePath(config.workspaceDir, targetPath)); + } + + return copiedAttachments; +} + +async function resolveAgentFile(config, workspaceRelativePath) { + const normalizedPath = normalizeWorkspacePath(workspaceRelativePath); + const absolutePath = path.resolve(config.workspaceDir, normalizedPath); + + if (!isInsideDirectory(config.workspaceDir, absolutePath)) { + console.warn(`Skipping agent file outside workspace: ${workspaceRelativePath}`); + return null; + } + + let stats; + + try { + stats = await fs.stat(absolutePath); + } catch (error) { + console.warn(`Skipping missing agent file: ${workspaceRelativePath}`); + return null; + } + + if (!stats.isFile()) { + console.warn(`Skipping non-file agent path: ${workspaceRelativePath}`); + return null; + } + + return { + path: absolutePath, + kind: inferAttachmentKind(absolutePath), + caption: "", + }; +} + +class AgentApiConnectorStrategy extends ConnectorStrategy { + constructor(config) { + super("agent-api"); + this.config = config; + } + + async process(request) { + await fs.mkdir(this.config.workspaceDir, { recursive: true }); + + const ws = new WebSocket( + buildWebSocketUrl(this.config.baseUrl, request.source.chatId), + { handshakeTimeout: SOCKET_TIMEOUT_MS }, + ); + + try { + await waitForSocketOpen(ws); + + const statusMessage = JSON.parse(await waitForSocketMessage(ws)); + + if (statusMessage.type !== "STATUS") { + throw new Error(`Expected STATUS from agent, got ${statusMessage.type}`); + } + + const attachments = await copyRequestAttachments(this.config, request); + + ws.send(JSON.stringify({ + type: "USER_MESSAGE", + text: request.text || "", + attachments, + })); + + let message = ""; + const resultAttachments = []; + const seenAttachmentPaths = new Set(); + + while (true) { + const event = JSON.parse(await waitForSocketMessage(ws)); + + switch (event.type) { + case "AGENT_EVENT_TEXT_CHUNK": + message += event.text || ""; + break; + case "AGENT_EVENT_SEND_FILE": { + const attachment = await resolveAgentFile(this.config, event.path); + + if (attachment && !seenAttachmentPaths.has(attachment.path)) { + seenAttachmentPaths.add(attachment.path); + resultAttachments.push(attachment); + } + break; + } + case "AGENT_EVENT_END": + return { + connector: this.name, + message: message.trim(), + attachments: resultAttachments, + }; + case "ERROR": + throw new Error(`Agent error (${event.code}): ${event.details}`); + case "GRACEFUL_DISCONNECT": + return { + connector: this.name, + message: message.trim(), + attachments: resultAttachments, + }; + case "AGENT_EVENT_TOOL_CALL_CHUNK": + case "AGENT_EVENT_TOOL_RESULT": + case "AGENT_EVENT_CUSTOM_UPDATE": + break; + default: + console.warn(`Ignoring unknown agent event type: ${event.type}`); + } + } + } finally { + await closeSocket(ws); + } + } +} + +function createAgentApiConnectorStrategy(config) { + return new AgentApiConnectorStrategy({ + ...config, + workspaceDir: path.resolve(config.workspaceDir), + addDirs: dedupePaths(config.addDirs || []), + }); +} + +module.exports = { + AgentApiConnectorStrategy, + createAgentApiConnectorStrategy, +}; diff --git a/src/connectors/index.js b/src/connectors/index.js index 545d5d8..77adeaa 100644 --- a/src/connectors/index.js +++ b/src/connectors/index.js @@ -1,7 +1,10 @@ +const { createAgentApiConnectorStrategy } = require("./agent-api"); const { createLocalCodexConnectorStrategy } = require("./local-codex"); function createConnectorStrategy(config) { switch (config.type) { + case "agent-api": + return createAgentApiConnectorStrategy(config.agentApi); case "local-codex": return createLocalCodexConnectorStrategy(config.localCodex); default: