fix(whatsapp): image downloading, bridge reuse, LID allowlist, Baileys 7.x compat
Salvaged from PR #2162 by @Zindar. Reply prefix changes excluded (already on main via #1756 configurable prefix). Bridge improvements (bridge.js): - Download incoming images to ~/.hermes/image_cache/ via downloadMediaMessage so the agent can actually see user-sent photos - Add getMessage callback required for Baileys 7.x E2EE session re-establishment (without it, some messages arrive as null) - Build LID→phone reverse map for allowlist resolution (WhatsApp LID format) - Add placeholder body for media without caption: [image received] - Bind express to 127.0.0.1 instead of 0.0.0.0 for security - Use 127.0.0.1 consistently throughout (more reliable than localhost) Adapter improvements (whatsapp.py): - Detect and reuse already-running bridge (only if status=connected) - Handle local file paths from bridge-cached images in _build_message_event - Don't kill external bridges on disconnect - Use 127.0.0.1 throughout for consistency with bridge binding Fix vs original PR: bridge reuse now checks status=connected, not just HTTP 200. A disconnected bridge gets restarted instead of reused. Co-authored-by: Zindar <zindar@users.noreply.github.com>
This commit is contained in:
parent
8f6ecd5c64
commit
a5beb6d8f0
2 changed files with 90 additions and 20 deletions
|
|
@ -182,9 +182,31 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||||
# Ensure session directory exists
|
# Ensure session directory exists
|
||||||
self._session_path.mkdir(parents=True, exist_ok=True)
|
self._session_path.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
# Check if bridge is already running and connected
|
||||||
|
import aiohttp
|
||||||
|
import asyncio
|
||||||
|
try:
|
||||||
|
async with aiohttp.ClientSession() as session:
|
||||||
|
async with session.get(
|
||||||
|
f"http://127.0.0.1:{self._bridge_port}/health",
|
||||||
|
timeout=aiohttp.ClientTimeout(total=2)
|
||||||
|
) as resp:
|
||||||
|
if resp.status == 200:
|
||||||
|
data = await resp.json()
|
||||||
|
bridge_status = data.get("status", "unknown")
|
||||||
|
if bridge_status == "connected":
|
||||||
|
print(f"[{self.name}] Using existing bridge (status: {bridge_status})")
|
||||||
|
self._running = True
|
||||||
|
self._bridge_process = None # Not managed by us
|
||||||
|
asyncio.create_task(self._poll_messages())
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
print(f"[{self.name}] Bridge found but not connected (status: {bridge_status}), restarting")
|
||||||
|
except Exception:
|
||||||
|
pass # Bridge not running, start a new one
|
||||||
|
|
||||||
# Kill any orphaned bridge from a previous gateway run
|
# Kill any orphaned bridge from a previous gateway run
|
||||||
_kill_port_process(self._bridge_port)
|
_kill_port_process(self._bridge_port)
|
||||||
import asyncio
|
|
||||||
await asyncio.sleep(1)
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
# Start the bridge process in its own process group.
|
# Start the bridge process in its own process group.
|
||||||
|
|
@ -232,7 +254,7 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||||
try:
|
try:
|
||||||
async with aiohttp.ClientSession() as session:
|
async with aiohttp.ClientSession() as session:
|
||||||
async with session.get(
|
async with session.get(
|
||||||
f"http://localhost:{self._bridge_port}/health",
|
f"http://127.0.0.1:{self._bridge_port}/health",
|
||||||
timeout=aiohttp.ClientTimeout(total=2)
|
timeout=aiohttp.ClientTimeout(total=2)
|
||||||
) as resp:
|
) as resp:
|
||||||
if resp.status == 200:
|
if resp.status == 200:
|
||||||
|
|
@ -264,7 +286,7 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||||
try:
|
try:
|
||||||
async with aiohttp.ClientSession() as session:
|
async with aiohttp.ClientSession() as session:
|
||||||
async with session.get(
|
async with session.get(
|
||||||
f"http://localhost:{self._bridge_port}/health",
|
f"http://127.0.0.1:{self._bridge_port}/health",
|
||||||
timeout=aiohttp.ClientTimeout(total=2)
|
timeout=aiohttp.ClientTimeout(total=2)
|
||||||
) as resp:
|
) as resp:
|
||||||
if resp.status == 200:
|
if resp.status == 200:
|
||||||
|
|
@ -326,9 +348,9 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||||
self._bridge_process.kill()
|
self._bridge_process.kill()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"[{self.name}] Error stopping bridge: {e}")
|
print(f"[{self.name}] Error stopping bridge: {e}")
|
||||||
|
else:
|
||||||
# Also kill any orphaned bridge processes on our port
|
# Bridge was not started by us, don't kill it
|
||||||
_kill_port_process(self._bridge_port)
|
print(f"[{self.name}] Disconnecting (external bridge left running)")
|
||||||
|
|
||||||
self._running = False
|
self._running = False
|
||||||
self._bridge_process = None
|
self._bridge_process = None
|
||||||
|
|
@ -358,7 +380,7 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||||
payload["replyTo"] = reply_to
|
payload["replyTo"] = reply_to
|
||||||
|
|
||||||
async with session.post(
|
async with session.post(
|
||||||
f"http://localhost:{self._bridge_port}/send",
|
f"http://127.0.0.1:{self._bridge_port}/send",
|
||||||
json=payload,
|
json=payload,
|
||||||
timeout=aiohttp.ClientTimeout(total=30)
|
timeout=aiohttp.ClientTimeout(total=30)
|
||||||
) as resp:
|
) as resp:
|
||||||
|
|
@ -394,7 +416,7 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||||
import aiohttp
|
import aiohttp
|
||||||
async with aiohttp.ClientSession() as session:
|
async with aiohttp.ClientSession() as session:
|
||||||
async with session.post(
|
async with session.post(
|
||||||
f"http://localhost:{self._bridge_port}/edit",
|
f"http://127.0.0.1:{self._bridge_port}/edit",
|
||||||
json={
|
json={
|
||||||
"chatId": chat_id,
|
"chatId": chat_id,
|
||||||
"messageId": message_id,
|
"messageId": message_id,
|
||||||
|
|
@ -439,7 +461,7 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||||
|
|
||||||
async with aiohttp.ClientSession() as session:
|
async with aiohttp.ClientSession() as session:
|
||||||
async with session.post(
|
async with session.post(
|
||||||
f"http://localhost:{self._bridge_port}/send-media",
|
f"http://127.0.0.1:{self._bridge_port}/send-media",
|
||||||
json=payload,
|
json=payload,
|
||||||
timeout=aiohttp.ClientTimeout(total=120),
|
timeout=aiohttp.ClientTimeout(total=120),
|
||||||
) as resp:
|
) as resp:
|
||||||
|
|
@ -515,7 +537,7 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||||
|
|
||||||
async with aiohttp.ClientSession() as session:
|
async with aiohttp.ClientSession() as session:
|
||||||
await session.post(
|
await session.post(
|
||||||
f"http://localhost:{self._bridge_port}/typing",
|
f"http://127.0.0.1:{self._bridge_port}/typing",
|
||||||
json={"chatId": chat_id},
|
json={"chatId": chat_id},
|
||||||
timeout=aiohttp.ClientTimeout(total=5)
|
timeout=aiohttp.ClientTimeout(total=5)
|
||||||
)
|
)
|
||||||
|
|
@ -532,7 +554,7 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||||
|
|
||||||
async with aiohttp.ClientSession() as session:
|
async with aiohttp.ClientSession() as session:
|
||||||
async with session.get(
|
async with session.get(
|
||||||
f"http://localhost:{self._bridge_port}/chat/{chat_id}",
|
f"http://127.0.0.1:{self._bridge_port}/chat/{chat_id}",
|
||||||
timeout=aiohttp.ClientTimeout(total=10)
|
timeout=aiohttp.ClientTimeout(total=10)
|
||||||
) as resp:
|
) as resp:
|
||||||
if resp.status == 200:
|
if resp.status == 200:
|
||||||
|
|
@ -559,7 +581,7 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||||
try:
|
try:
|
||||||
async with aiohttp.ClientSession() as session:
|
async with aiohttp.ClientSession() as session:
|
||||||
async with session.get(
|
async with session.get(
|
||||||
f"http://localhost:{self._bridge_port}/messages",
|
f"http://127.0.0.1:{self._bridge_port}/messages",
|
||||||
timeout=aiohttp.ClientTimeout(total=30)
|
timeout=aiohttp.ClientTimeout(total=30)
|
||||||
) as resp:
|
) as resp:
|
||||||
if resp.status == 200:
|
if resp.status == 200:
|
||||||
|
|
@ -621,6 +643,11 @@ class WhatsAppAdapter(BasePlatformAdapter):
|
||||||
print(f"[{self.name}] Failed to cache image: {e}", flush=True)
|
print(f"[{self.name}] Failed to cache image: {e}", flush=True)
|
||||||
cached_urls.append(url)
|
cached_urls.append(url)
|
||||||
media_types.append("image/jpeg")
|
media_types.append("image/jpeg")
|
||||||
|
elif msg_type == MessageType.PHOTO and os.path.isabs(url):
|
||||||
|
# Local file path — bridge already downloaded the image
|
||||||
|
cached_urls.append(url)
|
||||||
|
media_types.append("image/jpeg")
|
||||||
|
print(f"[{self.name}] Using bridge-cached image: {url}", flush=True)
|
||||||
elif msg_type == MessageType.VOICE and url.startswith(("http://", "https://")):
|
elif msg_type == MessageType.VOICE and url.startswith(("http://", "https://")):
|
||||||
try:
|
try:
|
||||||
cached_path = await cache_audio_from_url(url, ext=".ogg")
|
cached_path = await cache_audio_from_url(url, ext=".ogg")
|
||||||
|
|
|
||||||
|
|
@ -18,12 +18,13 @@
|
||||||
* node bridge.js --port 3000 --session ~/.hermes/whatsapp/session
|
* node bridge.js --port 3000 --session ~/.hermes/whatsapp/session
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { makeWASocket, useMultiFileAuthState, DisconnectReason, fetchLatestBaileysVersion } from '@whiskeysockets/baileys';
|
import { makeWASocket, useMultiFileAuthState, DisconnectReason, fetchLatestBaileysVersion, downloadMediaMessage } from '@whiskeysockets/baileys';
|
||||||
import express from 'express';
|
import express from 'express';
|
||||||
import { Boom } from '@hapi/boom';
|
import { Boom } from '@hapi/boom';
|
||||||
import pino from 'pino';
|
import pino from 'pino';
|
||||||
import path from 'path';
|
import path from 'path';
|
||||||
import { mkdirSync, readFileSync, existsSync } from 'fs';
|
import { mkdirSync, readFileSync, writeFileSync, existsSync, readdirSync } from 'fs';
|
||||||
|
import { randomBytes } from 'crypto';
|
||||||
import qrcode from 'qrcode-terminal';
|
import qrcode from 'qrcode-terminal';
|
||||||
|
|
||||||
// Parse CLI args
|
// Parse CLI args
|
||||||
|
|
@ -41,6 +42,7 @@ const WHATSAPP_DEBUG =
|
||||||
|
|
||||||
const PORT = parseInt(getArg('port', '3000'), 10);
|
const PORT = parseInt(getArg('port', '3000'), 10);
|
||||||
const SESSION_DIR = getArg('session', path.join(process.env.HOME || '~', '.hermes', 'whatsapp', 'session'));
|
const SESSION_DIR = getArg('session', path.join(process.env.HOME || '~', '.hermes', 'whatsapp', 'session'));
|
||||||
|
const IMAGE_CACHE_DIR = path.join(process.env.HOME || '~', '.hermes', 'image_cache');
|
||||||
const PAIR_ONLY = args.includes('--pair-only');
|
const PAIR_ONLY = args.includes('--pair-only');
|
||||||
const WHATSAPP_MODE = getArg('mode', process.env.WHATSAPP_MODE || 'self-chat'); // "bot" or "self-chat"
|
const WHATSAPP_MODE = getArg('mode', process.env.WHATSAPP_MODE || 'self-chat'); // "bot" or "self-chat"
|
||||||
const ALLOWED_USERS = (process.env.WHATSAPP_ALLOWED_USERS || '').split(',').map(s => s.trim()).filter(Boolean);
|
const ALLOWED_USERS = (process.env.WHATSAPP_ALLOWED_USERS || '').split(',').map(s => s.trim()).filter(Boolean);
|
||||||
|
|
@ -55,6 +57,22 @@ function formatOutgoingMessage(message) {
|
||||||
|
|
||||||
mkdirSync(SESSION_DIR, { recursive: true });
|
mkdirSync(SESSION_DIR, { recursive: true });
|
||||||
|
|
||||||
|
// Build LID → phone reverse map from session files (lid-mapping-{phone}.json)
|
||||||
|
function buildLidMap() {
|
||||||
|
const map = {};
|
||||||
|
try {
|
||||||
|
for (const f of readdirSync(SESSION_DIR)) {
|
||||||
|
const m = f.match(/^lid-mapping-(\d+)\.json$/);
|
||||||
|
if (!m) continue;
|
||||||
|
const phone = m[1];
|
||||||
|
const lid = JSON.parse(readFileSync(path.join(SESSION_DIR, f), 'utf8'));
|
||||||
|
if (lid) map[String(lid)] = phone;
|
||||||
|
}
|
||||||
|
} catch {}
|
||||||
|
return map;
|
||||||
|
}
|
||||||
|
let lidToPhone = buildLidMap();
|
||||||
|
|
||||||
const logger = pino({ level: 'warn' });
|
const logger = pino({ level: 'warn' });
|
||||||
|
|
||||||
// Message queue for polling
|
// Message queue for polling
|
||||||
|
|
@ -80,9 +98,16 @@ async function startSocket() {
|
||||||
browser: ['Hermes Agent', 'Chrome', '120.0'],
|
browser: ['Hermes Agent', 'Chrome', '120.0'],
|
||||||
syncFullHistory: false,
|
syncFullHistory: false,
|
||||||
markOnlineOnConnect: false,
|
markOnlineOnConnect: false,
|
||||||
|
// Required for Baileys 7.x: without this, incoming messages that need
|
||||||
|
// E2EE session re-establishment are silently dropped (msg.message === null)
|
||||||
|
getMessage: async (key) => {
|
||||||
|
// We don't maintain a message store, so return a placeholder.
|
||||||
|
// This is enough for Baileys to complete the retry handshake.
|
||||||
|
return { conversation: '' };
|
||||||
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
sock.ev.on('creds.update', saveCreds);
|
sock.ev.on('creds.update', () => { saveCreds(); lidToPhone = buildLidMap(); });
|
||||||
|
|
||||||
sock.ev.on('connection.update', (update) => {
|
sock.ev.on('connection.update', (update) => {
|
||||||
const { connection, lastDisconnect, qr } = update;
|
const { connection, lastDisconnect, qr } = update;
|
||||||
|
|
@ -120,7 +145,7 @@ async function startSocket() {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
sock.ev.on('messages.upsert', ({ messages, type }) => {
|
sock.ev.on('messages.upsert', async ({ messages, type }) => {
|
||||||
// In self-chat mode, your own messages commonly arrive as 'append' rather
|
// In self-chat mode, your own messages commonly arrive as 'append' rather
|
||||||
// than 'notify'. Accept both and filter agent echo-backs below.
|
// than 'notify'. Accept both and filter agent echo-backs below.
|
||||||
if (type !== 'notify' && type !== 'append') return;
|
if (type !== 'notify' && type !== 'append') return;
|
||||||
|
|
@ -163,9 +188,10 @@ async function startSocket() {
|
||||||
if (!isSelfChat) continue;
|
if (!isSelfChat) continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check allowlist for messages from others
|
// Check allowlist for messages from others (resolve LID → phone if needed)
|
||||||
if (!msg.key.fromMe && ALLOWED_USERS.length > 0 && !ALLOWED_USERS.includes(senderNumber)) {
|
if (!msg.key.fromMe && ALLOWED_USERS.length > 0) {
|
||||||
continue;
|
const resolvedNumber = lidToPhone[senderNumber] || senderNumber;
|
||||||
|
if (!ALLOWED_USERS.includes(resolvedNumber)) continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Extract message body
|
// Extract message body
|
||||||
|
|
@ -182,6 +208,18 @@ async function startSocket() {
|
||||||
body = msg.message.imageMessage.caption || '';
|
body = msg.message.imageMessage.caption || '';
|
||||||
hasMedia = true;
|
hasMedia = true;
|
||||||
mediaType = 'image';
|
mediaType = 'image';
|
||||||
|
try {
|
||||||
|
const buf = await downloadMediaMessage(msg, 'buffer', {}, { logger, reuploadRequest: sock.updateMediaMessage });
|
||||||
|
const mime = msg.message.imageMessage.mimetype || 'image/jpeg';
|
||||||
|
const extMap = { 'image/jpeg': '.jpg', 'image/png': '.png', 'image/webp': '.webp', 'image/gif': '.gif' };
|
||||||
|
const ext = extMap[mime] || '.jpg';
|
||||||
|
mkdirSync(IMAGE_CACHE_DIR, { recursive: true });
|
||||||
|
const filePath = path.join(IMAGE_CACHE_DIR, `img_${randomBytes(6).toString('hex')}${ext}`);
|
||||||
|
writeFileSync(filePath, buf);
|
||||||
|
mediaUrls.push(filePath);
|
||||||
|
} catch (err) {
|
||||||
|
console.error('[bridge] Failed to download image:', err.message);
|
||||||
|
}
|
||||||
} else if (msg.message.videoMessage) {
|
} else if (msg.message.videoMessage) {
|
||||||
body = msg.message.videoMessage.caption || '';
|
body = msg.message.videoMessage.caption || '';
|
||||||
hasMedia = true;
|
hasMedia = true;
|
||||||
|
|
@ -195,6 +233,11 @@ async function startSocket() {
|
||||||
mediaType = 'document';
|
mediaType = 'document';
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// For media without caption, use a placeholder so the API message is never empty
|
||||||
|
if (hasMedia && !body) {
|
||||||
|
body = `[${mediaType} received]`;
|
||||||
|
}
|
||||||
|
|
||||||
// Ignore Hermes' own reply messages in self-chat mode to avoid loops.
|
// Ignore Hermes' own reply messages in self-chat mode to avoid loops.
|
||||||
if (msg.key.fromMe && ((REPLY_PREFIX && body.startsWith(REPLY_PREFIX)) || recentlySentIds.has(msg.key.id))) {
|
if (msg.key.fromMe && ((REPLY_PREFIX && body.startsWith(REPLY_PREFIX)) || recentlySentIds.has(msg.key.id))) {
|
||||||
if (WHATSAPP_DEBUG) {
|
if (WHATSAPP_DEBUG) {
|
||||||
|
|
@ -433,7 +476,7 @@ if (PAIR_ONLY) {
|
||||||
console.log();
|
console.log();
|
||||||
startSocket();
|
startSocket();
|
||||||
} else {
|
} else {
|
||||||
app.listen(PORT, () => {
|
app.listen(PORT, '127.0.0.1', () => {
|
||||||
console.log(`🌉 WhatsApp bridge listening on port ${PORT} (mode: ${WHATSAPP_MODE})`);
|
console.log(`🌉 WhatsApp bridge listening on port ${PORT} (mode: ${WHATSAPP_MODE})`);
|
||||||
console.log(`📁 Session stored in: ${SESSION_DIR}`);
|
console.log(`📁 Session stored in: ${SESSION_DIR}`);
|
||||||
if (ALLOWED_USERS.length > 0) {
|
if (ALLOWED_USERS.length > 0) {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue