Improve and condense related controls
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import aiohttp
|
||||
@@ -21,7 +22,12 @@ if _signal_http_url:
|
||||
parsed = urlparse(
|
||||
_signal_http_url if "://" in _signal_http_url else f"http://{_signal_http_url}"
|
||||
)
|
||||
SIGNAL_HOST = parsed.hostname or "signal"
|
||||
configured_host = (parsed.hostname or "").strip().lower()
|
||||
runtime = os.getenv("container", "").strip().lower()
|
||||
if configured_host == "signal" and runtime == "podman":
|
||||
SIGNAL_HOST = "127.0.0.1"
|
||||
else:
|
||||
SIGNAL_HOST = parsed.hostname or "signal"
|
||||
SIGNAL_PORT = parsed.port or 8080
|
||||
else:
|
||||
if settings.DEBUG:
|
||||
@@ -141,6 +147,18 @@ def _typing_started(typing_payload):
|
||||
return True
|
||||
|
||||
|
||||
def _identifier_candidates(*values):
|
||||
out = []
|
||||
seen = set()
|
||||
for value in values:
|
||||
cleaned = str(value or "").strip()
|
||||
if not cleaned or cleaned in seen:
|
||||
continue
|
||||
seen.add(cleaned)
|
||||
out.append(cleaned)
|
||||
return out
|
||||
|
||||
|
||||
class NewSignalBot(SignalBot):
|
||||
def __init__(self, ur, service, config):
|
||||
self.ur = ur
|
||||
@@ -242,6 +260,8 @@ class HandleMessage(Command):
|
||||
source_uuid = c.message.source_uuid
|
||||
text = c.message.text
|
||||
ts = c.message.timestamp
|
||||
source_value = c.message.source
|
||||
envelope = raw.get("envelope", {})
|
||||
|
||||
# Message originating from us
|
||||
same_recipient = source_uuid == dest
|
||||
@@ -253,21 +273,33 @@ class HandleMessage(Command):
|
||||
reply_to_others = is_to_bot and not same_recipient # Reply
|
||||
is_outgoing_message = is_from_bot and not is_to_bot # Do not reply
|
||||
|
||||
# Determine the identifier to use
|
||||
identifier_uuid = dest if is_from_bot else source_uuid
|
||||
if not identifier_uuid:
|
||||
envelope_source_uuid = envelope.get("sourceUuid")
|
||||
envelope_source_number = envelope.get("sourceNumber")
|
||||
envelope_source = envelope.get("source")
|
||||
|
||||
primary_identifier = dest if is_from_bot else source_uuid
|
||||
identifier_candidates = _identifier_candidates(
|
||||
primary_identifier,
|
||||
source_uuid,
|
||||
source_number,
|
||||
source_value,
|
||||
envelope_source_uuid,
|
||||
envelope_source_number,
|
||||
envelope_source,
|
||||
dest,
|
||||
)
|
||||
if not identifier_candidates:
|
||||
log.warning("No Signal identifier available for message routing.")
|
||||
return
|
||||
|
||||
# Resolve person identifiers once for this event.
|
||||
identifiers = await sync_to_async(list)(
|
||||
PersonIdentifier.objects.filter(
|
||||
identifier=identifier_uuid,
|
||||
identifier__in=identifier_candidates,
|
||||
service=self.service,
|
||||
)
|
||||
)
|
||||
|
||||
envelope = raw.get("envelope", {})
|
||||
typing_payload = envelope.get("typingMessage")
|
||||
if isinstance(typing_payload, dict):
|
||||
for identifier in identifiers:
|
||||
@@ -300,7 +332,7 @@ class HandleMessage(Command):
|
||||
message_timestamps=read_timestamps,
|
||||
read_ts=read_ts,
|
||||
payload=receipt_payload,
|
||||
read_by=source_uuid,
|
||||
read_by=(source_uuid or source_number or ""),
|
||||
)
|
||||
return
|
||||
|
||||
@@ -380,21 +412,44 @@ class HandleMessage(Command):
|
||||
attachments=xmpp_attachments,
|
||||
)
|
||||
|
||||
# TODO: Permission checks
|
||||
manips = await sync_to_async(list)(Manipulation.objects.filter(enabled=True))
|
||||
# Persist message history for every resolved identifier, even when no
|
||||
# manipulations are active, so manual chat windows stay complete.
|
||||
session_cache = {}
|
||||
stored_messages = set()
|
||||
for identifier in identifiers:
|
||||
session_key = (identifier.user.id, identifier.person.id)
|
||||
if session_key in session_cache:
|
||||
chat_session = session_cache[session_key]
|
||||
else:
|
||||
chat_session = await history.get_chat_session(identifier.user, identifier)
|
||||
session_cache[session_key] = chat_session
|
||||
sender_key = source_uuid or source_number or identifier_candidates[0]
|
||||
message_key = (chat_session.id, ts, sender_key)
|
||||
if message_key not in stored_messages:
|
||||
await history.store_message(
|
||||
session=chat_session,
|
||||
sender=sender_key,
|
||||
text=text,
|
||||
ts=ts,
|
||||
outgoing=is_from_bot,
|
||||
)
|
||||
stored_messages.add(message_key)
|
||||
|
||||
# TODO: Permission checks
|
||||
manips = await sync_to_async(list)(Manipulation.objects.filter(enabled=True))
|
||||
for manip in manips:
|
||||
try:
|
||||
person_identifier = await sync_to_async(PersonIdentifier.objects.get)(
|
||||
identifier=identifier_uuid,
|
||||
person_identifier = await sync_to_async(
|
||||
lambda: PersonIdentifier.objects.filter(
|
||||
identifier__in=identifier_candidates,
|
||||
user=manip.user,
|
||||
service="signal",
|
||||
person__in=manip.group.people.all(),
|
||||
)
|
||||
except PersonIdentifier.DoesNotExist:
|
||||
).first()
|
||||
)()
|
||||
if person_identifier is None:
|
||||
log.warning(
|
||||
f"{manip.name}: Message from unknown identifier {identifier_uuid}."
|
||||
f"{manip.name}: Message from unknown identifier(s) "
|
||||
f"{', '.join(identifier_candidates)}."
|
||||
)
|
||||
continue
|
||||
|
||||
@@ -408,19 +463,6 @@ class HandleMessage(Command):
|
||||
)
|
||||
session_cache[session_key] = chat_session
|
||||
|
||||
# Store each incoming/outgoing event once per session.
|
||||
message_key = (chat_session.id, ts, source_uuid)
|
||||
if message_key not in stored_messages:
|
||||
log.info(f"Processing history store message {text}")
|
||||
await history.store_message(
|
||||
session=chat_session,
|
||||
sender=source_uuid,
|
||||
text=text,
|
||||
ts=ts,
|
||||
outgoing=is_from_bot,
|
||||
)
|
||||
stored_messages.add(message_key)
|
||||
|
||||
# Get the total history
|
||||
chat_history = await history.get_chat_history(chat_session)
|
||||
|
||||
@@ -493,9 +535,18 @@ class HandleMessage(Command):
|
||||
else:
|
||||
log.error(f"Mode {manip.mode} is not implemented")
|
||||
|
||||
chat_lookup = {"account": account}
|
||||
if source_uuid:
|
||||
chat_lookup["source_uuid"] = source_uuid
|
||||
elif source_number:
|
||||
chat_lookup["source_number"] = source_number
|
||||
else:
|
||||
return
|
||||
|
||||
await sync_to_async(Chat.objects.update_or_create)(
|
||||
source_uuid=source_uuid,
|
||||
**chat_lookup,
|
||||
defaults={
|
||||
"source_uuid": source_uuid,
|
||||
"source_number": source_number,
|
||||
"source_name": source_name,
|
||||
"account": account,
|
||||
|
||||
@@ -124,6 +124,22 @@ def get_service_warning(service: str) -> str:
|
||||
return ""
|
||||
|
||||
|
||||
def request_pairing(service: str, device_name: str = ""):
|
||||
"""
|
||||
Mark a runtime pairing request so UR clients can refresh QR/pair state.
|
||||
"""
|
||||
service_key = _service_key(service)
|
||||
if service_key not in {"whatsapp", "instagram"}:
|
||||
return
|
||||
device = str(device_name or "GIA Device").strip() or "GIA Device"
|
||||
update_runtime_state(
|
||||
service_key,
|
||||
pair_device=device,
|
||||
pair_requested_at=int(time.time()),
|
||||
warning="Waiting for runtime pairing QR.",
|
||||
)
|
||||
|
||||
|
||||
async def _gateway_json(method: str, url: str, payload=None):
|
||||
timeout = aiohttp.ClientTimeout(total=20)
|
||||
async with aiohttp.ClientSession(timeout=timeout) as session:
|
||||
|
||||
@@ -32,6 +32,7 @@ class WhatsAppClient(ClientBase):
|
||||
self._accounts = []
|
||||
self._chat_presence = None
|
||||
self._chat_presence_media = None
|
||||
self._last_pair_request = 0
|
||||
|
||||
self.enabled = bool(
|
||||
str(getattr(settings, "WHATSAPP_ENABLED", "false")).lower()
|
||||
@@ -120,8 +121,39 @@ class WhatsAppClient(ClientBase):
|
||||
|
||||
# Keep task alive so state/callbacks remain active.
|
||||
while not self._stopping:
|
||||
await self._sync_pair_request()
|
||||
await asyncio.sleep(1)
|
||||
|
||||
async def _sync_pair_request(self):
|
||||
state = transport.get_runtime_state(self.service)
|
||||
requested_at = int(state.get("pair_requested_at") or 0)
|
||||
if requested_at <= 0 or requested_at <= self._last_pair_request:
|
||||
return
|
||||
self._last_pair_request = requested_at
|
||||
self._publish_state(
|
||||
connected=False,
|
||||
pair_qr="",
|
||||
warning="Waiting for WhatsApp QR from Neonize.",
|
||||
)
|
||||
|
||||
if self._client is None:
|
||||
return
|
||||
|
||||
try:
|
||||
if hasattr(self._client, "disconnect"):
|
||||
await self._maybe_await(self._client.disconnect())
|
||||
except Exception as exc:
|
||||
self.log.warning("whatsapp disconnect before pairing failed: %s", exc)
|
||||
|
||||
try:
|
||||
await self._maybe_await(self._client.connect())
|
||||
except Exception as exc:
|
||||
self._publish_state(
|
||||
connected=False,
|
||||
warning=f"WhatsApp pairing refresh failed: {exc}",
|
||||
)
|
||||
self.log.warning("whatsapp pairing refresh failed: %s", exc)
|
||||
|
||||
def _register_event(self, event_cls, callback):
|
||||
if event_cls is None:
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user