surfaces/adapter/max/converter.py
Vladislav Yashnov eed1533cdc max first steps
2026-05-06 00:24:47 +03:00

88 lines
No EOL
2.1 KiB
Python

"""MAX event to internal protocol converter."""
from typing import Union, List
from core.protocol import (
IncomingMessage,
IncomingCommand,
IncomingCallback,
Attachment,
)
def _extract_command(text: str) -> Union[IncomingCommand, IncomingCallback, None]:
if not text.startswith("!"):
return None
parts = text.strip().split(maxsplit=1)
cmd = parts[0].lower()
args = parts[1] if len(parts) > 1 else ""
if cmd == "!yes":
return IncomingCallback(
user_id="",
platform="max",
chat_id="",
action="confirm",
)
elif cmd == "!no":
return IncomingCallback(
user_id="",
platform="max",
chat_id="",
action="cancel",
)
else:
return IncomingCommand(
user_id="",
platform="max",
chat_id="",
command=cmd.lstrip("!"),
args=args.split() if args else [],
)
def max_message_to_incoming(
*,
text: str,
user_id: str,
chat_id: str,
attachments: List[Attachment] = None,
callback_data: str = None,
message_id: str = None,
) -> Union[IncomingMessage, IncomingCommand, IncomingCallback]:
if callback_data:
return IncomingCallback(
user_id=user_id,
platform="max",
chat_id=chat_id,
action=callback_data,
payload={"message_id": message_id} if message_id else {},
)
if text:
cmd = _extract_command(text)
if cmd is not None:
cmd.user_id = user_id
cmd.chat_id = chat_id
return cmd
return IncomingMessage(
user_id=user_id,
platform="max",
chat_id=chat_id,
text=text or "",
attachments=attachments or [],
)
def max_attachment_to_internal(
*,
filename: str,
mime_type: str,
download_url: str,
) -> Attachment:
return Attachment(
type="document",
url=download_url,
filename=filename,
mime_type=mime_type,
)