This commit is contained in:
Eduard Baturin 2026-04-21 00:16:28 +03:00
parent 054e77a370
commit 6a286fb90b
8 changed files with 384 additions and 6 deletions

0
.codex Normal file
View file

View file

@ -1,6 +1,8 @@
BOT_TOKEN=your_telegram_bot_token_here
BOT_API_ROOT=
BOT_CONNECTOR=local-codex
BOT_CONNECTOR=agent-api
AGENT_API_BASE_URL=http://127.0.0.1:8000
AGENT_API_WORKSPACE_DIR=../agent/workspace
LOCAL_CODEX_COMMAND=codex
LOCAL_CODEX_WORKDIR=
LOCAL_CODEX_SKILL_DIR=

24
package-lock.json generated
View file

@ -8,7 +8,8 @@
"@grammyjs/auto-retry": "^2.0.2",
"@grammyjs/files": "^1.2.0",
"dotenv": "^17.4.2",
"grammy": "^1.42.0"
"grammy": "^1.42.0",
"ws": "^8.20.0"
}
},
"node_modules/@grammyjs/auto-retry": {
@ -156,6 +157,27 @@
"tr46": "~0.0.3",
"webidl-conversions": "^3.0.0"
}
},
"node_modules/ws": {
"version": "8.20.0",
"resolved": "https://registry.npmjs.org/ws/-/ws-8.20.0.tgz",
"integrity": "sha512-sAt8BhgNbzCtgGbt2OxmpuryO63ZoDk/sqaB/znQm94T4fCEsy/yV+7CdC1kJhOU9lboAEU7R3kquuycDoibVA==",
"license": "MIT",
"engines": {
"node": ">=10.0.0"
},
"peerDependencies": {
"bufferutil": "^4.0.1",
"utf-8-validate": ">=5.0.2"
},
"peerDependenciesMeta": {
"bufferutil": {
"optional": true
},
"utf-8-validate": {
"optional": true
}
}
}
}
}

View file

@ -6,6 +6,7 @@
"@grammyjs/auto-retry": "^2.0.2",
"@grammyjs/files": "^1.2.0",
"dotenv": "^17.4.2",
"grammy": "^1.42.0"
"grammy": "^1.42.0",
"ws": "^8.20.0"
}
}

View file

@ -19,7 +19,9 @@ npm install
```env
BOT_TOKEN=your_telegram_bot_token_here
BOT_API_ROOT=
BOT_CONNECTOR=local-codex
BOT_CONNECTOR=agent-api
AGENT_API_BASE_URL=http://127.0.0.1:8000
AGENT_API_WORKSPACE_DIR=../agent/workspace
LOCAL_CODEX_COMMAND=codex
LOCAL_CODEX_WORKDIR=
LOCAL_CODEX_SKILL_DIR=
@ -48,6 +50,23 @@ npm start
The bot uses a connector strategy selected by `BOT_CONNECTOR`.
### `agent-api`
Connects to the websocket API of the external agent service.
Relevant variables:
- `BOT_CONNECTOR=agent-api`
- `AGENT_API_BASE_URL=http://127.0.0.1:8000`: base URL of the agent service
- `AGENT_API_WORKSPACE_DIR=../agent/workspace`: host path of the shared agent workspace
How it works:
- the bot saves incoming files to `assets/`
- before sending a request to the agent, it copies message attachments to `AGENT_API_WORKSPACE_DIR/telegram-inbox/...`
- the agent receives attachment paths relative to its `/workspace`
- if the agent emits `AGENT_EVENT_SEND_FILE`, the bot resolves the file inside `AGENT_API_WORKSPACE_DIR` and uploads it back to Telegram
### `local-codex`
Runs the local `codex` CLI with `codex exec` for every supported Telegram message.
@ -160,7 +179,7 @@ Then set this in `.env`:
```env
BOT_TOKEN=your_telegram_bot_token_here
BOT_API_ROOT=http://127.0.0.1:8081
BOT_CONNECTOR=local-codex
BOT_CONNECTOR=agent-api
```
Restart the bot after that:
@ -174,4 +193,4 @@ npm start
- `BOT_API_ROOT` is optional. Leave it empty to use the default Telegram hosted Bot API.
- The local Bot API server listens over HTTP by default.
- If both the bot and the local Bot API server run on the same machine, `127.0.0.1:8081` is enough.
- `BOT_CONNECTOR` defaults to `local-codex` in the current setup.
- `BOT_CONNECTOR` can be `agent-api` or `local-codex`.

View file

@ -72,6 +72,14 @@ if (!token) {
const assetsDir = path.join(rootDir, "assets");
const connector = {
type: readString(process.env.BOT_CONNECTOR, "local-codex"),
agentApi: {
baseUrl: readString(process.env.AGENT_API_BASE_URL, "http://127.0.0.1:8000"),
workspaceDir: resolveFromRoot(
rootDir,
process.env.AGENT_API_WORKSPACE_DIR,
"../agent/workspace",
),
},
localCodex: {
command: readString(process.env.LOCAL_CODEX_COMMAND, "codex"),
workdir: resolveFromRoot(rootDir, process.env.LOCAL_CODEX_WORKDIR, rootDir),

323
src/connectors/agent-api.js Normal file
View file

@ -0,0 +1,323 @@
const fs = require("node:fs/promises");
const path = require("node:path");
const WebSocket = require("ws");
const { ConnectorStrategy } = require("./strategy");
const PHOTO_EXTENSIONS = new Set([".jpg", ".jpeg", ".png", ".webp"]);
const VIDEO_EXTENSIONS = new Set([".mp4", ".mov", ".m4v", ".webm", ".mkv"]);
const ANIMATION_EXTENSIONS = new Set([".gif"]);
const WORKSPACE_INBOX_DIR = "telegram-inbox";
const SOCKET_TIMEOUT_MS = 15000;
function dedupePaths(paths) {
return [...new Set(paths.filter(Boolean).map((value) => path.resolve(value)))];
}
function isInsideDirectory(parentPath, childPath) {
const relativePath = path.relative(parentPath, childPath);
return relativePath === "" || (
!relativePath.startsWith("..") &&
!path.isAbsolute(relativePath)
);
}
function sanitizeFileName(fileName) {
return String(fileName)
.replace(/[^a-zA-Z0-9._-]/g, "_")
.replace(/_+/g, "_") || "file";
}
function buildWebSocketUrl(baseUrl, chatId) {
const url = new URL(baseUrl);
url.protocol = url.protocol === "https:" ? "wss:" : "ws:";
url.pathname = `/v1/agent_ws/${encodeURIComponent(chatId)}/`;
url.search = "";
url.hash = "";
return url.toString();
}
function normalizeWorkspacePath(filePath) {
return String(filePath).replace(/\\/g, "/").replace(/^\/+/, "");
}
function toWorkspaceRelativePath(workspaceDir, filePath) {
const relativePath = path.relative(workspaceDir, filePath);
if (!relativePath || relativePath.startsWith("..") || path.isAbsolute(relativePath)) {
throw new Error(`Path is outside workspace: ${filePath}`);
}
return normalizeWorkspacePath(relativePath);
}
function inferAttachmentKind(filePath) {
const extension = path.extname(filePath).toLowerCase();
if (PHOTO_EXTENSIONS.has(extension)) {
return "photo";
}
if (VIDEO_EXTENSIONS.has(extension)) {
return "video";
}
if (ANIMATION_EXTENSIONS.has(extension)) {
return "animation";
}
return "document";
}
function waitForSocketOpen(ws) {
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
cleanup();
reject(new Error("Timed out while opening agent websocket"));
}, SOCKET_TIMEOUT_MS);
function cleanup() {
clearTimeout(timeout);
ws.removeListener("open", handleOpen);
ws.removeListener("error", handleError);
ws.removeListener("unexpected-response", handleUnexpectedResponse);
}
function handleOpen() {
cleanup();
resolve();
}
function handleError(error) {
cleanup();
reject(error);
}
function handleUnexpectedResponse(_request, response) {
cleanup();
reject(
new Error(
`Agent websocket handshake failed with status ${response.statusCode ?? "unknown"}`,
),
);
}
ws.once("open", handleOpen);
ws.once("error", handleError);
ws.once("unexpected-response", handleUnexpectedResponse);
});
}
function waitForSocketMessage(ws) {
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
cleanup();
reject(new Error("Timed out while waiting for agent websocket message"));
}, SOCKET_TIMEOUT_MS);
function cleanup() {
clearTimeout(timeout);
ws.removeListener("message", handleMessage);
ws.removeListener("error", handleError);
ws.removeListener("close", handleClose);
}
function handleMessage(data, isBinary) {
cleanup();
if (isBinary) {
reject(new Error("Agent websocket returned unexpected binary payload"));
return;
}
resolve(String(data));
}
function handleError(error) {
cleanup();
reject(error);
}
function handleClose(code, reason) {
cleanup();
reject(
new Error(
`Agent websocket closed before message (${code}${reason ? `: ${String(reason)}` : ""})`,
),
);
}
ws.once("message", handleMessage);
ws.once("error", handleError);
ws.once("close", handleClose);
});
}
function closeSocket(ws) {
return new Promise((resolve) => {
if (
ws.readyState === WebSocket.CLOSED ||
ws.readyState === WebSocket.CLOSING
) {
resolve();
return;
}
ws.once("close", () => resolve());
ws.close();
});
}
async function copyRequestAttachments(config, request) {
if (request.attachments.length === 0) {
return [];
}
const targetDir = path.join(
config.workspaceDir,
WORKSPACE_INBOX_DIR,
String(request.source.chatId),
String(request.source.messageId),
);
await fs.mkdir(targetDir, { recursive: true });
const copiedAttachments = [];
for (let index = 0; index < request.attachments.length; index += 1) {
const attachment = request.attachments[index];
const fileName = sanitizeFileName(
attachment.originalName || path.basename(attachment.path),
);
const targetPath = path.join(targetDir, `${index + 1}-${fileName}`);
await fs.copyFile(attachment.path, targetPath);
copiedAttachments.push(toWorkspaceRelativePath(config.workspaceDir, targetPath));
}
return copiedAttachments;
}
async function resolveAgentFile(config, workspaceRelativePath) {
const normalizedPath = normalizeWorkspacePath(workspaceRelativePath);
const absolutePath = path.resolve(config.workspaceDir, normalizedPath);
if (!isInsideDirectory(config.workspaceDir, absolutePath)) {
console.warn(`Skipping agent file outside workspace: ${workspaceRelativePath}`);
return null;
}
let stats;
try {
stats = await fs.stat(absolutePath);
} catch (error) {
console.warn(`Skipping missing agent file: ${workspaceRelativePath}`);
return null;
}
if (!stats.isFile()) {
console.warn(`Skipping non-file agent path: ${workspaceRelativePath}`);
return null;
}
return {
path: absolutePath,
kind: inferAttachmentKind(absolutePath),
caption: "",
};
}
class AgentApiConnectorStrategy extends ConnectorStrategy {
constructor(config) {
super("agent-api");
this.config = config;
}
async process(request) {
await fs.mkdir(this.config.workspaceDir, { recursive: true });
const ws = new WebSocket(
buildWebSocketUrl(this.config.baseUrl, request.source.chatId),
{ handshakeTimeout: SOCKET_TIMEOUT_MS },
);
try {
await waitForSocketOpen(ws);
const statusMessage = JSON.parse(await waitForSocketMessage(ws));
if (statusMessage.type !== "STATUS") {
throw new Error(`Expected STATUS from agent, got ${statusMessage.type}`);
}
const attachments = await copyRequestAttachments(this.config, request);
ws.send(JSON.stringify({
type: "USER_MESSAGE",
text: request.text || "",
attachments,
}));
let message = "";
const resultAttachments = [];
const seenAttachmentPaths = new Set();
while (true) {
const event = JSON.parse(await waitForSocketMessage(ws));
switch (event.type) {
case "AGENT_EVENT_TEXT_CHUNK":
message += event.text || "";
break;
case "AGENT_EVENT_SEND_FILE": {
const attachment = await resolveAgentFile(this.config, event.path);
if (attachment && !seenAttachmentPaths.has(attachment.path)) {
seenAttachmentPaths.add(attachment.path);
resultAttachments.push(attachment);
}
break;
}
case "AGENT_EVENT_END":
return {
connector: this.name,
message: message.trim(),
attachments: resultAttachments,
};
case "ERROR":
throw new Error(`Agent error (${event.code}): ${event.details}`);
case "GRACEFUL_DISCONNECT":
return {
connector: this.name,
message: message.trim(),
attachments: resultAttachments,
};
case "AGENT_EVENT_TOOL_CALL_CHUNK":
case "AGENT_EVENT_TOOL_RESULT":
case "AGENT_EVENT_CUSTOM_UPDATE":
break;
default:
console.warn(`Ignoring unknown agent event type: ${event.type}`);
}
}
} finally {
await closeSocket(ws);
}
}
}
function createAgentApiConnectorStrategy(config) {
return new AgentApiConnectorStrategy({
...config,
workspaceDir: path.resolve(config.workspaceDir),
addDirs: dedupePaths(config.addDirs || []),
});
}
module.exports = {
AgentApiConnectorStrategy,
createAgentApiConnectorStrategy,
};

View file

@ -1,7 +1,10 @@
const { createAgentApiConnectorStrategy } = require("./agent-api");
const { createLocalCodexConnectorStrategy } = require("./local-codex");
function createConnectorStrategy(config) {
switch (config.type) {
case "agent-api":
return createAgentApiConnectorStrategy(config.agentApi);
case "local-codex":
return createLocalCodexConnectorStrategy(config.localCodex);
default: