# -----------------------------------------------------------
# Astra - WhatsApp Client Framework
# Licensed under the Apache License 2.0.
# -----------------------------------------------------------
"""
This module implements the Conversation API for Astra.
It allows for stateful, interactive dialog flows within a chat.
"""
import asyncio
import logging
from typing import Optional, Any, TYPE_CHECKING
from ..models import Message
if TYPE_CHECKING:
from .client import Client
logger = logging.getLogger("Chat")
[docs]
class Conversation:
"""
Conversation manager.
Provides an imperative interface for interactive message flows.
Example:
async with client.conversation(chat_id) as conv:
await conv.send_message("What's your name?")
name_msg = await conv.get_response()
await conv.send_message(f"Hello, {name_msg.text}!")
"""
def __init__(self, client: 'Client', chat_id: str, timeout: float = 120.0):
self._client = client
self.chat_id = chat_id
self.timeout = timeout
self._queue = asyncio.Queue()
self._subscription_id = None
async def __aenter__(self):
# Register a temporary listener for this chat
from ..events import Filters
# Define a filter that only catches messages from this specific chat
# Special case: allow outgoing messages if we are in a self-chat for testing
criteria = Filters.chat(self.chat_id)
# Don't strictly enforce incoming if we're in the same chat as 'me'
# (This helps with subagent/manual testing in self-chat)
try:
me = await self._client.get_me()
if me and me.id.serialized != self.chat_id:
criteria &= Filters.incoming
except:
# Fallback to incoming only if get_me fails
criteria &= Filters.incoming
async def _handler(msg: Message):
await self._queue.put(msg)
self._subscription_id = self._client.dispatcher.subscribe("message", _handler, criteria=criteria)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self._subscription_id:
self._client.dispatcher.unsubscribe(self._subscription_id)
[docs]
async def send_message(self, text: str, **kwargs) -> Message:
"""Sends a message to the current conversation chat."""
return await self._client.send_message(self.chat_id, text, **kwargs)
[docs]
async def get_response(self, timeout: Optional[float] = None) -> Message:
"""
Waits for a response message in the conversation.
Skips the message if it was sent by the bot (e.g. the prompt).
"""
to = timeout or self.timeout
try:
while True:
msg = await asyncio.wait_for(self._queue.get(), timeout=to)
# Skip the prompt we just sent if it was captured
if msg.id == getattr(self, '_last_sent_id', None):
continue
return msg
except asyncio.TimeoutError:
logger.warning(f"Conversation with {self.chat_id} timed out.")
raise
[docs]
async def ask(self, text: str, timeout: Optional[float] = None, **kwargs) -> Message:
"""Helper to send a message and immediately wait for a response."""
sent = await self.send_message(text, **kwargs)
self._last_sent_id = sent.id
return await self.get_response(timeout=timeout)