- Route DDMA eligibility through InProcessQueue (concurrency=1) so it queues behind other selenium jobs instead of running concurrently - New ddmaEligibilityProcessor: starts Python session, polls for OTP/ completion via socket events, saves PDF and updates patient DB - Frontend ddma-buton-modal now uses shared app socket + job:update pattern (drops private socket connection) - SeleniumService: upgrade ddma_browser_manager with credential hash tracking, anti-detection options, and startup session clearing; upgrade DDMA worker with firstName/lastName support, PDF via printToPDF, force-logout on credential change; upgrade helpers with dual OTP strategy (app API + browser polling); add /clear-ddma-session endpoint; reduce fixed sleeps with smart WebDriverWait Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
302 lines
13 KiB
Python
Executable File
302 lines
13 KiB
Python
Executable File
import os
|
|
import time
|
|
import asyncio
|
|
from typing import Dict, Any
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from selenium.common.exceptions import WebDriverException, TimeoutException
|
|
|
|
from selenium_DDMA_eligibilityCheckWorker import AutomationDeltaDentalMAEligibilityCheck
|
|
|
|
# In-memory session store
|
|
sessions: Dict[str, Dict[str, Any]] = {}
|
|
|
|
SESSION_OTP_TIMEOUT = int(os.getenv("SESSION_OTP_TIMEOUT", "120")) # seconds
|
|
|
|
|
|
def make_session_entry() -> str:
|
|
"""Create a new session entry and return its ID."""
|
|
import uuid
|
|
sid = str(uuid.uuid4())
|
|
sessions[sid] = {
|
|
"status": "created", # created → running → waiting_for_otp → completed / error
|
|
"created_at": time.time(),
|
|
"last_activity": time.time(),
|
|
"bot": None, # AutomationDeltaDentalMAEligibilityCheck instance
|
|
"driver": None, # selenium webdriver
|
|
"otp_event": asyncio.Event(),
|
|
"otp_value": None, # OTP submitted from the app
|
|
"result": None,
|
|
"message": None,
|
|
"type": None,
|
|
}
|
|
return sid
|
|
|
|
|
|
async def cleanup_session(sid: str, message: str | None = None):
|
|
"""
|
|
Wake any OTP waiter, set final state, and remove the session entry.
|
|
Safe to call multiple times (idempotent).
|
|
NOTE: Does NOT quit the browser driver — the persistent browser stays alive.
|
|
"""
|
|
s = sessions.get(sid)
|
|
if not s:
|
|
return
|
|
try:
|
|
if s.get("status") not in ("completed", "error", "not_found"):
|
|
s["status"] = "error"
|
|
if message:
|
|
s["message"] = message
|
|
|
|
ev = s.get("otp_event")
|
|
if ev and not ev.is_set():
|
|
ev.set()
|
|
finally:
|
|
sessions.pop(sid, None)
|
|
print(f"[helpers_ddma] cleaned session {sid}")
|
|
|
|
|
|
async def _remove_session_later(sid: str, delay: int = 20):
|
|
await asyncio.sleep(delay)
|
|
await cleanup_session(sid)
|
|
|
|
|
|
async def start_ddma_run(sid: str, data: dict, url: str):
|
|
"""
|
|
Run the full DDMA eligibility workflow for one session.
|
|
Called by agent.py inside a wrapper that manages the semaphore/counters.
|
|
|
|
OTP handling uses two complementary strategies:
|
|
1. Accept OTP submitted from the app (via /submit-otp endpoint → otp_value field)
|
|
2. Poll the browser URL/DOM directly to detect when the user enters OTP themselves
|
|
"""
|
|
s = sessions.get(sid)
|
|
if not s:
|
|
return {"status": "error", "message": "session not found"}
|
|
|
|
s["status"] = "running"
|
|
s["last_activity"] = time.time()
|
|
|
|
try:
|
|
bot = AutomationDeltaDentalMAEligibilityCheck({"data": data})
|
|
bot.config_driver()
|
|
|
|
s["bot"] = bot
|
|
s["driver"] = bot.driver
|
|
s["last_activity"] = time.time()
|
|
|
|
# Navigate to login page
|
|
try:
|
|
if not url:
|
|
raise ValueError("URL not provided for DDMA run")
|
|
bot.driver.maximize_window()
|
|
bot.driver.get(url)
|
|
await asyncio.sleep(1)
|
|
except Exception as e:
|
|
s["status"] = "error"
|
|
s["message"] = f"Navigation failed: {e}"
|
|
await cleanup_session(sid)
|
|
return {"status": "error", "message": s["message"]}
|
|
|
|
# Login
|
|
try:
|
|
login_result = bot.login(url)
|
|
except WebDriverException as wde:
|
|
s["status"] = "error"
|
|
s["message"] = f"Selenium driver error during login: {wde}"
|
|
await cleanup_session(sid, s["message"])
|
|
return {"status": "error", "message": s["message"]}
|
|
except Exception as e:
|
|
s["status"] = "error"
|
|
s["message"] = f"Unexpected error during login: {e}"
|
|
await cleanup_session(sid, s["message"])
|
|
return {"status": "error", "message": s["message"]}
|
|
|
|
# ── Path: already logged in (persistent session) ──────────────────────
|
|
if isinstance(login_result, str) and login_result == "ALREADY_LOGGED_IN":
|
|
print("[start_ddma_run] Session persisted - skipping OTP")
|
|
s["status"] = "running"
|
|
s["message"] = "Session persisted"
|
|
|
|
# ── Path: OTP required ────────────────────────────────────────────────
|
|
elif isinstance(login_result, str) and login_result == "OTP_REQUIRED":
|
|
s["status"] = "waiting_for_otp"
|
|
s["message"] = "OTP required for login - please enter OTP"
|
|
s["last_activity"] = time.time()
|
|
|
|
driver = s["driver"]
|
|
|
|
# Poll every second for up to SESSION_OTP_TIMEOUT seconds.
|
|
# Accept OTP from two sources:
|
|
# a) app API (otp_value set by submit_otp())
|
|
# b) user entering OTP directly in the browser window
|
|
max_polls = SESSION_OTP_TIMEOUT
|
|
login_success = False
|
|
|
|
print(f"[OTP] Polling for OTP completion (up to {SESSION_OTP_TIMEOUT}s)...")
|
|
|
|
for poll in range(max_polls):
|
|
await asyncio.sleep(1)
|
|
s["last_activity"] = time.time()
|
|
|
|
try:
|
|
# a) App submitted OTP via /submit-otp endpoint
|
|
otp_value = s.get("otp_value")
|
|
if otp_value:
|
|
print(f"[OTP poll {poll+1}] OTP received from app, typing it in...")
|
|
try:
|
|
otp_input = driver.find_element(By.XPATH,
|
|
"//input[contains(@aria-label,'Verification') or contains(@placeholder,'verification') or @type='tel' or contains(@aria-lable,'Verification code') or contains(@placeholder,'Enter your verification code')]"
|
|
)
|
|
otp_input.clear()
|
|
otp_input.send_keys(otp_value)
|
|
try:
|
|
verify_btn = driver.find_element(By.XPATH, "//button[@type='button' and @aria-label='Verify']")
|
|
verify_btn.click()
|
|
except Exception:
|
|
otp_input.send_keys("\n")
|
|
print("[OTP] OTP typed and submitted via app")
|
|
s["otp_value"] = None # Clear so we don't re-submit
|
|
await asyncio.sleep(3)
|
|
except Exception as type_err:
|
|
print(f"[OTP] Failed to type OTP from app: {type_err}")
|
|
|
|
# b) Check URL — if we're past OTP page, login succeeded
|
|
current_url = driver.current_url.lower()
|
|
print(f"[OTP poll {poll+1}/{max_polls}] URL: {current_url[:70]}...")
|
|
|
|
if "member" in current_url or "dashboard" in current_url or "eligibility" in current_url:
|
|
try:
|
|
member_search = WebDriverWait(driver, 5).until(
|
|
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
|
|
)
|
|
print("[OTP] Member search found — login successful!")
|
|
login_success = True
|
|
break
|
|
except TimeoutException:
|
|
print("[OTP] On member page but search input not found, continuing...")
|
|
|
|
# Check if OTP input is still visible (user hasn't finished)
|
|
try:
|
|
otp_input_elem = driver.find_element(By.XPATH,
|
|
"//input[contains(@aria-label,'Verification') or contains(@placeholder,'verification') or @type='tel']"
|
|
)
|
|
print(f"[OTP poll {poll+1}] OTP input still visible - waiting...")
|
|
except Exception:
|
|
# OTP input gone — may mean login is completing; try members page
|
|
if "onboarding" in current_url or "start" in current_url:
|
|
print("[OTP] OTP input gone, trying to navigate to members page...")
|
|
try:
|
|
driver.get("https://providers.deltadentalma.com/members")
|
|
await asyncio.sleep(2)
|
|
except Exception:
|
|
pass
|
|
|
|
except Exception as poll_err:
|
|
print(f"[OTP poll {poll+1}] Error: {poll_err}")
|
|
|
|
if not login_success:
|
|
# Final check — navigate directly to members page
|
|
try:
|
|
print("[OTP] Final attempt - navigating to members page...")
|
|
driver.get("https://providers.deltadentalma.com/members")
|
|
await asyncio.sleep(3)
|
|
member_search = WebDriverWait(driver, 10).until(
|
|
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
|
|
)
|
|
print("[OTP] Member search found — login successful!")
|
|
login_success = True
|
|
except TimeoutException:
|
|
s["status"] = "error"
|
|
s["message"] = "OTP timeout - login not completed"
|
|
await cleanup_session(sid)
|
|
return {"status": "error", "message": "OTP not completed in time"}
|
|
except Exception as final_err:
|
|
s["status"] = "error"
|
|
s["message"] = f"OTP verification failed: {final_err}"
|
|
await cleanup_session(sid)
|
|
return {"status": "error", "message": s["message"]}
|
|
|
|
if login_success:
|
|
s["status"] = "running"
|
|
s["message"] = "Login successful after OTP"
|
|
print("[OTP] Proceeding to step1...")
|
|
|
|
# ── Path: login succeeded without OTP ─────────────────────────────────
|
|
elif isinstance(login_result, str) and login_result == "SUCCESS":
|
|
print("[start_ddma_run] Login succeeded without OTP")
|
|
s["status"] = "running"
|
|
s["message"] = "Login succeeded"
|
|
|
|
# ── Path: login error ──────────────────────────────────────────────────
|
|
elif isinstance(login_result, str) and login_result.startswith("ERROR"):
|
|
s["status"] = "error"
|
|
s["message"] = login_result
|
|
await cleanup_session(sid)
|
|
return {"status": "error", "message": login_result}
|
|
|
|
# ── Step 1: search ────────────────────────────────────────────────────
|
|
step1_result = bot.step1()
|
|
if isinstance(step1_result, str) and step1_result.startswith("ERROR"):
|
|
s["status"] = "error"
|
|
s["message"] = step1_result
|
|
await cleanup_session(sid)
|
|
return {"status": "error", "message": step1_result}
|
|
|
|
# ── Step 2: PDF generation ────────────────────────────────────────────
|
|
step2_result = bot.step2()
|
|
if isinstance(step2_result, dict) and step2_result.get("status") == "success":
|
|
s["status"] = "completed"
|
|
s["result"] = step2_result
|
|
s["message"] = "completed"
|
|
asyncio.create_task(_remove_session_later(sid, 30))
|
|
return step2_result
|
|
else:
|
|
s["status"] = "error"
|
|
if isinstance(step2_result, dict):
|
|
s["message"] = step2_result.get("message", "unknown error")
|
|
else:
|
|
s["message"] = str(step2_result)
|
|
await cleanup_session(sid)
|
|
return {"status": "error", "message": s["message"]}
|
|
|
|
except Exception as e:
|
|
s["status"] = "error"
|
|
s["message"] = f"worker exception: {e}"
|
|
await cleanup_session(sid)
|
|
return {"status": "error", "message": s["message"]}
|
|
|
|
|
|
def submit_otp(sid: str, otp: str) -> Dict[str, Any]:
|
|
"""
|
|
Called when the app sends an OTP via POST /submit-otp.
|
|
Sets otp_value on the session so the polling loop picks it up.
|
|
"""
|
|
s = sessions.get(sid)
|
|
if not s:
|
|
return {"status": "error", "message": "session not found"}
|
|
if s.get("status") != "waiting_for_otp":
|
|
return {"status": "error", "message": f"session not waiting for OTP (state={s.get('status')})"}
|
|
s["otp_value"] = otp
|
|
s["last_activity"] = time.time()
|
|
try:
|
|
s["otp_event"].set()
|
|
except Exception:
|
|
pass
|
|
return {"status": "ok", "message": "otp accepted"}
|
|
|
|
|
|
def get_session_status(sid: str) -> Dict[str, Any]:
|
|
s = sessions.get(sid)
|
|
if not s:
|
|
return {"status": "not_found"}
|
|
return {
|
|
"session_id": sid,
|
|
"status": s.get("status"),
|
|
"message": s.get("message"),
|
|
"created_at": s.get("created_at"),
|
|
"last_activity": s.get("last_activity"),
|
|
"result": s.get("result") if s.get("status") == "completed" else None,
|
|
}
|