surfaces/tests/adapter/matrix/test_converter.py

179 lines
5.8 KiB
Python

from __future__ import annotations
from types import SimpleNamespace
import adapter.matrix.converter as converter
from adapter.matrix.converter import from_command, from_room_event
from core.protocol import IncomingCallback, IncomingCommand, IncomingMessage
def text_event(body: str, sender: str = "@a:m.org", event_id: str = "$e1"):
return SimpleNamespace(
sender=sender, body=body, event_id=event_id, msgtype="m.text", replyto_event_id=None
)
def file_event(url: str = "mxc://x/y", filename: str = "doc.pdf", mime: str = "application/pdf"):
return SimpleNamespace(
sender="@a:m.org",
body=filename,
event_id="$e2",
msgtype="m.file",
replyto_event_id=None,
url=url,
mimetype=mime,
)
def image_event(url: str = "mxc://x/img", mime: str = "image/jpeg"):
return SimpleNamespace(
sender="@a:m.org",
body="img.jpg",
event_id="$e3",
msgtype="m.image",
replyto_event_id=None,
url=url,
mimetype=mime,
)
def content_file_event():
return SimpleNamespace(
sender="@a:m.org",
body="doc.pdf",
event_id="$e4",
msgtype=None,
replyto_event_id=None,
content={
"msgtype": "m.file",
"body": "nested.pdf",
"url": "mxc://x/nested",
"info": {"mimetype": "application/pdf"},
},
)
def source_only_content_file_event():
return SimpleNamespace(
sender="@a:m.org",
body="doc.pdf",
event_id="$e5",
msgtype=None,
replyto_event_id=None,
source={
"content": {
"msgtype": "m.file",
"body": "source-only.pdf",
"url": "mxc://x/source-only",
"info": {"mimetype": "application/pdf"},
}
},
)
def test_plain_text_to_incoming_message():
result = from_room_event(text_event("Hello"), room_id="!r:m.org", chat_id="C1")
assert isinstance(result, IncomingMessage)
assert result.text == "Hello"
assert result.platform == "matrix"
assert result.chat_id == "C1"
assert result.attachments == []
def test_bang_command_to_incoming_command():
result = from_room_event(text_event("!new Analysis"), room_id="!r:m.org", chat_id="C1")
assert isinstance(result, IncomingCommand)
assert result.command == "new"
assert result.args == ["Analysis"]
def test_list_command_maps_to_matrix_list_attachments():
result = from_room_event(text_event("!list"), room_id="!r:m.org", chat_id="C1")
assert isinstance(result, IncomingCommand)
assert result.command == "matrix_list_attachments"
assert result.args == []
def test_remove_all_maps_to_matrix_remove_attachment():
result = from_room_event(text_event("!remove all"), room_id="!r:m.org", chat_id="C1")
assert isinstance(result, IncomingCommand)
assert result.command == "matrix_remove_attachment"
assert result.args == ["all"]
def test_remove_index_maps_to_matrix_remove_attachment():
result = from_room_event(text_event("!remove 2"), room_id="!r:m.org", chat_id="C1")
assert isinstance(result, IncomingCommand)
assert result.command == "matrix_remove_attachment"
assert result.args == ["2"]
def test_remove_arbitrary_index_maps_to_matrix_remove_attachment():
result = from_room_event(text_event("!remove 99"), room_id="!r:m.org", chat_id="C1")
assert isinstance(result, IncomingCommand)
assert result.command == "matrix_remove_attachment"
assert result.args == ["99"]
def test_skills_alias_to_settings_command():
result = from_command("!skills", sender="@a:m.org", chat_id="C1")
assert isinstance(result, IncomingCommand)
assert result.command == "settings_skills"
def test_yes_to_callback():
result = from_room_event(text_event("!yes"), room_id="!room:example.org", chat_id="C7")
assert isinstance(result, IncomingCallback)
assert result.action == "confirm"
assert result.chat_id == "C7"
assert result.payload["room_id"] == "!room:example.org"
def test_no_to_callback():
result = from_room_event(text_event("!no"), room_id="!room:example.org", chat_id="C7")
assert isinstance(result, IncomingCallback)
assert result.action == "cancel"
assert result.chat_id == "C7"
assert result.payload["room_id"] == "!room:example.org"
def test_file_attachment():
result = from_room_event(file_event(), room_id="!r:m.org", chat_id="C1")
assert isinstance(result, IncomingMessage)
assert len(result.attachments) == 1
a = result.attachments[0]
assert a.type == "document"
assert a.url == "mxc://x/y"
assert a.filename == "doc.pdf"
assert a.mime_type == "application/pdf"
def test_image_attachment():
result = from_room_event(image_event(), room_id="!r:m.org", chat_id="C1")
assert result.attachments[0].type == "image"
assert result.attachments[0].filename == "img.jpg"
assert result.attachments[0].mime_type == "image/jpeg"
def test_attachment_falls_back_to_content_payload():
result = from_room_event(content_file_event(), room_id="!r:m.org", chat_id="C1")
assert isinstance(result, IncomingMessage)
a = result.attachments[0]
assert a.type == "document"
assert a.url == "mxc://x/nested"
assert a.filename == "nested.pdf"
assert a.mime_type == "application/pdf"
def test_attachment_falls_back_to_source_content_payload():
result = from_room_event(source_only_content_file_event(), room_id="!r:m.org", chat_id="C1")
assert isinstance(result, IncomingMessage)
a = result.attachments[0]
assert a.type == "document"
assert a.url == "mxc://x/source-only"
assert a.filename == "source-only.pdf"
assert a.mime_type == "application/pdf"
def test_converter_module_does_not_expose_reaction_callbacks():
assert not hasattr(converter, "from_reaction")