# -----------------------------------------------------------
# Astra - WhatsApp Client Framework
# Licensed under the Apache License 2.0.
# -----------------------------------------------------------
"""
The ProtocolBridge is the primary conduit for executing commands and
receiving events from the WhatsApp Web browser context.
"""
import logging
import asyncio
import os
from typing import Any, Optional, Callable, Awaitable, Dict
from playwright.async_api import Page
from ..errors import (
BridgeCallError, BridgeMethodNotFoundError,
MessageTimeoutError, RateLimitedError,
ConnectionLostError, PageUnresponsiveError,
)
from ..constants import BRIDGE_NAMESPACE, PROTOCOL_CALL_TIMEOUT
from .js_engine import JS_ENGINE_SOURCE
logger = logging.getLogger("Bridge")
[docs]
class ProtocolBridge:
"""
Manages the two-way bridge between Python and JavaScript.
This class handles the injection of the Astra engine into the browser
and facilitates calling remote methods with automatic result normalization.
"""
def __init__(self, page: Optional[Page] = None):
self._page = page
self._is_active = False
self._on_event: Optional[Callable[[str, Any], Awaitable[None]]] = None
self._connect_count: int = 0
self._progress_callbacks: Dict[str, Callable[[int, int], Any]] = {}
[docs]
async def connect(self):
"""
Initializes the bridge by injecting the core engine scripts.
This method uses persistent init scripts to ensure the bridge
survives page reloads and navigations.
"""
logger.debug("Connecting Protocol Bridge...")
# 1. Expose the event uplink (allows JS to call Python)
try:
# Direct exposure of the uplink function
async def astra_uplink_py(name, payload):
if name == "log":
msg = payload.get("msg", "") if isinstance(payload, dict) else str(payload)
# Suppress WA internal noise that isn't actionable
if "Requiring unknown module" in msg or "ErrorUtils" in msg or "fburl.com" in msg:
return
level = payload.get("level", "log") if isinstance(payload, dict) else "log"
if level == "error":
logger.error(f"[Bridge] {msg}")
elif level == "warn":
logger.warning(f"[Bridge] {msg}")
else:
logger.debug(f"[Bridge] {msg}")
return
await self._process_event(name, payload)
await self._page.expose_function("astra_uplink", astra_uplink_py)
if os.getenv("DEBUG", "false").lower() == "true":
logger.info("Bridge uplink exposed.")
except Exception as e:
logger.debug(f"Uplink Already Exposed: {e}")
# 2. Prepare the Bridge Proxy Snippet
# This snippet maps window.Astra methods to the bridge namespace
bridge_boot = f"""
window.{BRIDGE_NAMESPACE} = window.{BRIDGE_NAMESPACE} || {{}};
Object.assign(window.{BRIDGE_NAMESPACE}, {{
emit: (name, payload) => window.astra_uplink(name, payload),
call: async (method, payload, id) => {{
const engine = window.AstraEngine;
if (!engine || typeof engine[method] !== 'function') {{
console.error(`[Astra] Remote method [${{method}}] not found.`);
throw new Error(`Method not found: ${{method}}`);
}}
// Inject ID into payload if it's an object, otherwise wrap it
const params = (payload && typeof payload === 'object') ? {{ ...payload, _call_id: id }} : {{ value: payload, _call_id: id }};
return await engine[method](params);
}}
}});
console.log("[Astra] Bridge uplink established.");
"""
# 3. Inject Scripts (Persistent + Immediate)
# add_init_script ensures the code runs on every navigation
await self._page.add_init_script(JS_ENGINE_SOURCE)
await self._page.add_init_script(bridge_boot)
# evaluate runs the code immediately for the current page
try:
await self._page.evaluate(JS_ENGINE_SOURCE)
await self._page.evaluate(bridge_boot)
except Exception as e:
logger.debug(f"Immediate injection skipped: {e}")
self._is_active = True
self._connect_count += 1
[docs]
async def call(self, method: str, params: Any = None, timeout: float = PROTOCOL_CALL_TIMEOUT, progress: Optional[Callable] = None) -> Any:
"""
Calls a remote JS method via the bridge with automatic recovery.
If the bridge is unreachable (page closed, bridge missing), it
attempts a single self-heal via ensure_bridge() before retrying.
"""
if not self._is_active:
raise BridgeCallError("Bridge is not connected. Wait for client.start() to complete.", method=method)
call_id = f"call_{asyncio.get_event_loop().time()}"
if progress:
self._progress_callbacks[call_id] = progress
logger.debug(f"Calling remote method: {method} [ID: {call_id}]")
for attempt in range(2): # At most 1 retry after bridge recovery
try:
result = await self._page.evaluate(
f"(args) => window.{BRIDGE_NAMESPACE}.call(args.method, args.params, args.id)",
{"method": method, "params": params, "id": call_id}
)
return result
except Exception as e:
err_text = str(e).lower()
is_recoverable = any(k in err_text for k in [
"target closed", "target page", "not found",
"execution context", "frame was detached",
"page closed", "navigation"
])
if is_recoverable and attempt == 0:
logger.warning(f"Bridge connection error: {err_text}. Attempting recovery...")
healed = await self.ensure_bridge()
if healed:
logger.info("Bridge recovered, retrying...")
continue
else:
logger.error("Bridge recovery failed.")
# Classify the error
if "method not found" in err_text:
raise BridgeMethodNotFoundError(method=method) from e
elif "timeout" in err_text or "sendmsgresultpromise timeout" in err_text:
raise MessageTimeoutError(f"Call to '{method}' timed out.") from e
elif "rate" in err_text or "too many" in err_text:
raise RateLimitedError(f"Rate limited during '{method}'.") from e
elif "target closed" in err_text or "page has been closed" in err_text or "page closed" in err_text:
raise ConnectionLostError(f"Browser lost during '{method}'. Check if the WhatsApp tab was closed or reloaded.") from e
else:
raise BridgeCallError(f"'{method}' failed: {e}", cause=e, method=method) from e
finally:
self._progress_callbacks.pop(call_id, None)
[docs]
async def ensure_bridge(self) -> bool:
"""
Verifies the JS bridge is alive and re-injects it if needed.
Returns True if the bridge is healthy after this call.
"""
try:
if not self._page or self._page.is_closed():
logger.warning("ensure_bridge: page is closed — cannot heal.")
return False
is_alive = await self._page.evaluate(
"() => typeof window.AstraEngine !== 'undefined'"
)
if is_alive:
return True
logger.debug("Bridge re-injected successfully.")
return True
except Exception as exc:
logger.error(f"ensure_bridge failed: {exc}")
return False
[docs]
async def get_bridge_diagnostics(self) -> Dict[str, Any]:
"""
Returns a structured health snapshot of the JS bridge.
"""
try:
if not self._page or self._page.is_closed():
return {"alive": False, "reason": "page_closed"}
return await self._page.evaluate("""
() => {
try {
const store = window.Store || {};
return {
alive: !!window.AstraEngine,
storeReady: !!(store.Chat && store.Msg),
msgListener: !!(window.Astra && window.Astra._msgListenerAttached),
waVersion: (window.Debug && window.Debug.VERSION) || null,
socketState: (store.AppState && store.AppState.state) || 'unknown',
injections: window.AstraInjected || 0,
};
} catch (e) {
return { alive: false, error: e.message };
}
}
""")
except Exception as exc:
return {"alive": False, "error": str(exc)}
[docs]
def set_event_handler(self, handler: Callable[[str, Any], Awaitable[None]]):
"""Sets the sink for incoming browser events."""
self._on_event = handler
async def _process_event(self, name: str, payload: Any):
"""Internal handler for messages arriving from the JS uplink."""
if name == "progress":
call_id = payload.get("id")
if call_id in self._progress_callbacks:
cb = self._progress_callbacks[call_id]
try:
if asyncio.iscoroutinefunction(cb) or asyncio.iscoroutine(cb):
await cb(payload.get("current", 0), payload.get("total", 0))
else:
cb(payload.get("current", 0), payload.get("total", 0))
except Exception as e:
logger.debug(f"Progress callback error: {e}")
return
if self._on_event:
logger.debug(f"[Gate] Forwarding event: {name}")
await self._on_event(name, payload)
@property
def is_connected(self) -> bool:
return self._is_active
[docs]
async def execute(self, code: str) -> Any:
"""
Executes raw JavaScript in the page context.
This is used for bootstrapping and complex low-level operations.
"""
if not self._page or self._page.is_closed():
raise BridgeCallError("Page reached an unreachable state during execute.", method="execute")
try:
return await self._page.evaluate(code)
except Exception as e:
logger.error(f"JavaScript execution failed: {e}")
raise BridgeCallError(f"Execution failed: {e}", cause=e, method="execute") from e
[docs]
async def edit_message_native(self, message_id: str, text: str) -> bool:
"""
High-speed native Playwright fallback for editing messages.
"""
if not self._page or self._page.is_closed():
return False
try:
# 1. Faster selector
msg_locator = self._page.locator(f'div[data-id="{message_id}"]')
if await msg_locator.count() == 0:
await self._page.mouse.wheel(0, 2000)
await asyncio.sleep(0.5)
if await msg_locator.count() == 0: return False
# 2. Optimized hover and menu trigger
await msg_locator.hover()
# Try multiple menu icons for speed
menu_btn = msg_locator.locator('span[data-icon="down-context"], [aria-label="Context menu"]').first
if await menu_btn.count() > 0:
await menu_btn.click()
else:
await msg_locator.click(button="right")
# 3. Find Edit button with tighter wait
edit_btn = self._page.locator('div[role="button"]:has-text("Edit"), [aria-label*="Edit"]').first
await edit_btn.wait_for(state="visible", timeout=1500)
await edit_btn.click()
# 4. Instant type and Enter
input_box = self._page.locator('div[contenteditable="true"]').first
await input_box.focus()
# Select all + Backspace
await self._page.keyboard.down("Meta")
await self._page.keyboard.press("A")
await self._page.keyboard.up("Meta")
await self._page.keyboard.press("Backspace")
await self._page.keyboard.type(text)
await self._page.keyboard.press("Enter")
return True
except Exception as e:
logger.error(f"High-speed native edit failed: {e}")
return False
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# PLAYWRIGHT NATIVE FALLBACKS
# Human-like automation using keyboard, mouse, and locators.
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
async def _pw_click_text(self, text, timeout=3000):
try:
loc = self._page.get_by_text(text, exact=False).first
await loc.wait_for(state="visible", timeout=timeout)
await loc.click()
await asyncio.sleep(0.8)
return True
except Exception:
return False
async def _pw_click_role(self, role, name, timeout=3000):
try:
loc = self._page.get_by_role(role, name=name).first
await loc.wait_for(state="visible", timeout=timeout)
await loc.click()
await asyncio.sleep(0.8)
return True
except Exception:
return False
async def _pw_click_sel(self, sel, timeout=3000):
try:
loc = self._page.locator(sel).first
await loc.wait_for(state="visible", timeout=timeout)
await loc.click()
await asyncio.sleep(0.8)
return True
except Exception:
return False
async def _pw_open_settings(self):
if not self._page or self._page.is_closed(): return False
if await self._pw_click_sel('[data-testid="menu-bar-settings"]'): return True
if await self._pw_click_role("button", "Settings"): return True
if await self._pw_click_sel('span[data-icon="menu"]'):
await asyncio.sleep(0.5)
return await self._pw_click_text("Settings")
return False
async def _pw_open_profile(self):
if not self._page or self._page.is_closed(): return False
if await self._pw_click_sel('[data-testid="menu-bar-profile"]'): return True
if await self._pw_click_role("button", "Profile"): return True
return False
async def _pw_go_back(self):
try:
b = self._page.locator('[data-testid="back"], button[aria-label="Back"]').first
if await b.count() > 0 and await b.is_visible():
await b.click()
await asyncio.sleep(0.5)
return True
except Exception:
pass
await self._page.keyboard.press("Escape")
await asyncio.sleep(0.5)
return True
async def _pw_close_all(self):
for _ in range(4):
try:
await self._page.keyboard.press("Escape")
await asyncio.sleep(0.3)
except Exception:
break
[docs]
async def set_privacy_native(self, category, value):
"""Playwright fallback: Settings → Privacy → Category → Select."""
if not self._page or self._page.is_closed(): return False
try:
logger.info(f"[Playwright] Privacy: {category} → {value}")
if not await self._pw_open_settings():
return False
await asyncio.sleep(1)
if not await self._pw_click_text("Privacy"):
await self._pw_close_all(); return False
await asyncio.sleep(1)
if category == "read_receipts":
t = self._page.locator('input[role="switch"]').first
if await t.count() > 0:
want = value in ("all", "contacts", "true", True)
if await t.is_checked() != want: await t.click()
await self._pw_close_all(); return True
cats = {"last_seen": ["Last seen", "Last seen and online"], "profile_pic": ["Profile photo", "Profile picture"], "about": ["About"], "status": ["Status"]}
for lbl in cats.get(category, [category]):
if await self._pw_click_text(lbl): break
else:
await self._pw_close_all(); return False
await asyncio.sleep(1)
vm = {"all": "Everyone", "contacts": "My contacts", "none": "Nobody", "nobody": "Nobody"}
vl = vm.get(value, value)
if not await self._pw_click_role("radio", vl):
if not await self._pw_click_text(vl):
await self._pw_close_all(); return False
if category == "last_seen":
await asyncio.sleep(0.5)
ol = "Everyone" if value == "all" else "Same as last seen"
await self._pw_click_role("radio", ol) or await self._pw_click_text(ol)
await asyncio.sleep(0.5)
await self._pw_close_all()
logger.info(f"[Playwright] Privacy done: {category} → {value}")
return True
except Exception as e:
logger.error(f"[Playwright] set_privacy_native: {e}")
await self._pw_close_all(); return False
[docs]
async def set_profile_name_native(self, name):
"""Playwright fallback for updating profile name."""
if not self._page or self._page.is_closed(): return False
try:
logger.info(f"[Playwright] Profile name: {name}")
if not await self._pw_open_profile(): return False
await asyncio.sleep(1)
pencils = self._page.locator('span[data-icon="pencil"]')
if await pencils.count() > 0: await pencils.first.click()
else: await self._pw_close_all(); return False
await asyncio.sleep(0.8)
tb = self._page.locator('div[role="textbox"]').first
await tb.click()
await self._page.keyboard.down("Control")
await self._page.keyboard.press("A")
await self._page.keyboard.up("Control")
await self._page.keyboard.press("Backspace")
await self._page.keyboard.type(name, delay=30)
sv = self._page.locator('span[data-icon="checkmark-medium"]').first
if await sv.count() > 0: await sv.click()
else: await self._page.keyboard.press("Enter")
await asyncio.sleep(1)
await self._pw_close_all()
return True
except Exception as e:
logger.error(f"[Playwright] set_profile_name_native: {e}")
await self._pw_close_all(); return False
[docs]
async def set_about_native(self, text):
"""Playwright fallback for updating About/Bio."""
if not self._page or self._page.is_closed(): return False
try:
logger.info(f"[Playwright] About: {text}")
if not await self._pw_open_profile(): return False
await asyncio.sleep(1)
pencils = self._page.locator('span[data-icon="pencil"]')
c = await pencils.count()
if c >= 2: await pencils.nth(1).click()
elif c == 1: await pencils.first.click()
else: await self._pw_close_all(); return False
await asyncio.sleep(0.8)
tb = self._page.locator('div[role="textbox"]').first
await tb.click()
await self._page.keyboard.down("Control")
await self._page.keyboard.press("A")
await self._page.keyboard.up("Control")
await self._page.keyboard.press("Backspace")
await self._page.keyboard.type(text, delay=30)
sv = self._page.locator('span[data-icon="checkmark-medium"]').first
if await sv.count() > 0: await sv.click()
else: await self._page.keyboard.press("Enter")
await asyncio.sleep(1)
await self._pw_close_all()
return True
except Exception as e:
logger.error(f"[Playwright] set_about_native: {e}")
await self._pw_close_all(); return False