import os import time import asyncio import uuid from typing import Dict, Any from selenium.common.exceptions import WebDriverException from selenium_BCBS_MA_eligibilityCheckWorker import AutomationBCBSMAEligibilityCheck # In-memory session store sessions: Dict[str, Dict[str, Any]] = {} SESSION_OTP_TIMEOUT = int(os.getenv("SESSION_OTP_TIMEOUT", "120")) # seconds def make_session_entry() -> str: sid = str(uuid.uuid4()) sessions[sid] = { "status": "created", # created → running → waiting_for_otp → completed / error "created_at": time.time(), "last_activity": time.time(), "bot": None, "driver": None, "otp_event": asyncio.Event(), "otp_value": None, "result": None, "message": None, "type": None, } return sid async def _remove_session_later(sid: str, delay: int = 30): await asyncio.sleep(delay) sessions.pop(sid, None) print(f"[helpers_bcbs_ma] cleaned session {sid}") async def start_bcbs_ma_run(sid: str, data: dict, url: str): """ Full BCBS MA eligibility workflow for one session. Always fresh Chrome — no persistent session because BCBS MA always requires OTP (the prefix changes each login). OTP handling: a) Accept OTP submitted from the app via /submit-otp (sets otp_value) b) Poll the browser directly to detect user entry in the open window """ s = sessions.get(sid) if not s: return {"status": "error", "message": "session not found"} s["status"] = "running" s["last_activity"] = time.time() bot = None try: bot = AutomationBCBSMAEligibilityCheck({"data": data}) bot.config_driver() s["bot"] = bot s["driver"] = bot.driver # ── Login ────────────────────────────────────────────────────────────── try: login_result = bot.login(url) except WebDriverException as e: s["status"] = "error" s["message"] = f"WebDriver error during login: {e}" return {"status": "error", "message": s["message"]} except Exception as e: s["status"] = "error" s["message"] = f"Login failed: {e}" return {"status": "error", "message": s["message"]} if isinstance(login_result, str) and login_result.startswith("ERROR"): s["status"] = "error" s["message"] = login_result return {"status": "error", "message": login_result} # ── OTP required (always for BCBS MA) ───────────────────────────────── if isinstance(login_result, str) and login_result == "OTP_REQUIRED": s["status"] = "waiting_for_otp" s["message"] = "OTP required — enter the 6-digit code from the BCBS MA email" s["last_activity"] = time.time() driver = bot.driver login_success = False print(f"[BCBS MA] Waiting for OTP (up to {SESSION_OTP_TIMEOUT}s)...") for poll in range(SESSION_OTP_TIMEOUT): await asyncio.sleep(1) s["last_activity"] = time.time() try: # a) App submitted OTP otp_value = s.get("otp_value") if otp_value: print(f"[BCBS MA OTP poll {poll+1}] Submitting OTP from app...") otp_result = bot.submit_otp_step(otp_value) s["otp_value"] = None if isinstance(otp_result, str) and otp_result == "SUCCESS": login_success = True break elif isinstance(otp_result, str) and otp_result.startswith("ERROR"): print(f"[BCBS MA OTP] submit_otp_step returned: {otp_result}") # Don't abort yet — let the poll loop check the browser state # b) Detect success by browser URL current_url = driver.current_url.lower() print(f"[BCBS MA OTP poll {poll+1}/{SESSION_OTP_TIMEOUT}] URL: {current_url[:70]}") if "authsvc" not in current_url and "/mga/sps/" not in current_url: # Left the OTP page — login completed (user entered OTP in browser) print("[BCBS MA OTP] Browser left OTP page — login successful") login_success = True break except Exception as poll_err: print(f"[BCBS MA OTP poll {poll+1}] Error: {poll_err}") if not login_success: s["status"] = "error" s["message"] = "OTP timeout — login not completed in time" return {"status": "error", "message": s["message"]} s["status"] = "running" s["message"] = "Login successful after OTP" print("[BCBS MA] OTP accepted, proceeding to eligibility search...") await asyncio.sleep(2) # ── Already logged in (no OTP path) ────────────────────────────────── elif isinstance(login_result, str) and login_result == "SUCCESS": print("[BCBS MA] Login succeeded without OTP") s["status"] = "running" # ── Step 1: search member ────────────────────────────────────────────── step1_result = bot.step1() if isinstance(step1_result, str) and step1_result.startswith("ERROR"): s["status"] = "error" s["message"] = step1_result return {"status": "error", "message": step1_result} # ── Step 2: extract + PDF ────────────────────────────────────────────── step2_result = bot.step2() if isinstance(step2_result, dict) and step2_result.get("status") == "success": s["status"] = "completed" s["result"] = step2_result s["message"] = "completed" asyncio.create_task(_remove_session_later(sid, 30)) return step2_result else: s["status"] = "error" s["message"] = step2_result.get("message", "unknown error") if isinstance(step2_result, dict) else str(step2_result) return {"status": "error", "message": s["message"]} except Exception as e: s["status"] = "error" s["message"] = f"worker exception: {e}" print(f"[helpers_bcbs_ma] Unexpected error: {e}") return {"status": "error", "message": s["message"]} finally: # Always close the disposable browser try: if bot: bot.close_driver() except Exception: pass s["driver"] = None def submit_otp(sid: str, otp: str) -> Dict[str, Any]: """Called by /submit-otp to hand OTP to the polling loop.""" s = sessions.get(sid) if not s: return {"status": "error", "message": "session not found"} if s.get("status") != "waiting_for_otp": return {"status": "error", "message": f"session not waiting for OTP (state={s.get('status')})"} s["otp_value"] = otp s["last_activity"] = time.time() try: s["otp_event"].set() except Exception: pass return {"status": "ok", "message": "otp accepted"} def get_session_status(sid: str) -> Dict[str, Any]: s = sessions.get(sid) if not s: return {"status": "not_found"} return { "session_id": sid, "status": s.get("status"), "message": s.get("message"), "created_at": s.get("created_at"), "last_activity": s.get("last_activity"), "result": s.get("result") if s.get("status") == "completed" else None, }