feat: integrate DDMA eligibility into BullMQ queue with persistent session

- Route DDMA eligibility through InProcessQueue (concurrency=1) so it
  queues behind other selenium jobs instead of running concurrently
- New ddmaEligibilityProcessor: starts Python session, polls for OTP/
  completion via socket events, saves PDF and updates patient DB
- Frontend ddma-buton-modal now uses shared app socket + job:update
  pattern (drops private socket connection)
- SeleniumService: upgrade ddma_browser_manager with credential hash
  tracking, anti-detection options, and startup session clearing;
  upgrade DDMA worker with firstName/lastName support, PDF via
  printToPDF, force-logout on credential change; upgrade helpers with
  dual OTP strategy (app API + browser polling); add /clear-ddma-session
  endpoint; reduce fixed sleeps with smart WebDriverWait

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
ff
2026-04-16 09:21:47 -04:00
parent a1cccc8716
commit 289ea426d3
9 changed files with 1429 additions and 1239 deletions

View File

@@ -12,8 +12,21 @@ import os
import time
import helpers_ddma_eligibility as hddma
# Import startup session-clear functions
from ddma_browser_manager import clear_ddma_session_on_startup
from dotenv import load_dotenv
load_dotenv()
load_dotenv()
# Clear DDMA session on startup so fresh login is required after PC restart.
# Device trust tokens are preserved so OTP is still skipped after first login.
print("=" * 50)
print("SELENIUM AGENT STARTING - CLEARING DDMA SESSION")
print("=" * 50)
clear_ddma_session_on_startup()
print("=" * 50)
print("SESSION CLEAR COMPLETE")
print("=" * 50)
app = FastAPI()
@@ -264,6 +277,21 @@ async def session_status(sid: str):
return s
# ── Session management endpoints ─────────────────────────────────────────────
@app.post("/clear-ddma-session")
async def clear_ddma_session_endpoint():
"""
Clears the DDMA browser session (cookies + cached credentials).
Call this when DDMA credentials are deleted or changed.
"""
try:
clear_ddma_session_on_startup()
return {"status": "success", "message": "DDMA session cleared"}
except Exception as e:
return {"status": "error", "message": str(e)}
# ✅ Health Check Endpoint
@app.get("/")
async def health_check():

View File

@@ -1,19 +1,30 @@
"""
Minimal browser manager for DDMA - only handles persistent profile and keeping browser alive.
Does NOT modify any login/OTP logic.
Browser manager for DDMA (Delta Dental MA) - persistent profile with session management.
- Uses --user-data-dir for persistent profile (device trust tokens)
- Clears session cookies on startup (after PC restart) to force fresh login
- Tracks credentials to detect changes mid-session (triggers logout)
- Anti-detection options to avoid bot detection
"""
import os
import glob
import shutil
import hashlib
import threading
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
# Ensure DISPLAY is set for Chrome to work (needed when running from SSH/background)
if not os.environ.get("DISPLAY"):
os.environ["DISPLAY"] = ":0"
class DDMABrowserManager:
"""
Singleton that manages a persistent Chrome browser instance.
- Uses --user-data-dir for persistent profile (device trust tokens, cookies)
- Keeps browser alive between patient runs
- Uses --user-data-dir for persistent profile (device trust tokens)
- Clears session cookies on startup (after PC restart)
- Tracks credentials to detect changes mid-session
"""
_instance = None
_lock = threading.Lock()
@@ -25,55 +36,223 @@ class DDMABrowserManager:
cls._instance._driver = None
cls._instance.profile_dir = os.path.abspath("chrome_profile_ddma")
cls._instance.download_dir = os.path.abspath("seleniumDownloads")
cls._instance._credentials_file = os.path.join(cls._instance.profile_dir, ".last_credentials")
cls._instance._needs_session_clear = False
os.makedirs(cls._instance.profile_dir, exist_ok=True)
os.makedirs(cls._instance.download_dir, exist_ok=True)
return cls._instance
def clear_session_on_startup(self):
"""
Clear session cookies from Chrome profile on startup.
This forces a fresh login after PC restart.
Preserves device trust tokens (LocalStorage, IndexedDB) to avoid OTPs.
"""
print("[DDMA BrowserManager] Clearing session on startup...")
try:
# Clear credentials tracking file
if os.path.exists(self._credentials_file):
os.remove(self._credentials_file)
print("[DDMA BrowserManager] Cleared credentials tracking file")
# Clear session-related Chrome profile files
session_files = [
"Cookies",
"Cookies-journal",
"Login Data",
"Login Data-journal",
"Web Data",
"Web Data-journal",
]
for filename in session_files:
for base in [os.path.join(self.profile_dir, "Default"), self.profile_dir]:
filepath = os.path.join(base, filename)
if os.path.exists(filepath):
try:
os.remove(filepath)
print(f"[DDMA BrowserManager] Removed {filename}")
except Exception as e:
print(f"[DDMA BrowserManager] Could not remove {filename}: {e}")
# Clear Session Storage (contains login state)
session_storage_dir = os.path.join(self.profile_dir, "Default", "Session Storage")
if os.path.exists(session_storage_dir):
try:
shutil.rmtree(session_storage_dir)
print("[DDMA BrowserManager] Cleared Session Storage")
except Exception as e:
print(f"[DDMA BrowserManager] Could not clear Session Storage: {e}")
# Clear Local Storage (may contain auth tokens)
local_storage_dir = os.path.join(self.profile_dir, "Default", "Local Storage")
if os.path.exists(local_storage_dir):
try:
shutil.rmtree(local_storage_dir)
print("[DDMA BrowserManager] Cleared Local Storage")
except Exception as e:
print(f"[DDMA BrowserManager] Could not clear Local Storage: {e}")
# Clear IndexedDB (may contain auth tokens)
indexeddb_dir = os.path.join(self.profile_dir, "Default", "IndexedDB")
if os.path.exists(indexeddb_dir):
try:
shutil.rmtree(indexeddb_dir)
print("[DDMA BrowserManager] Cleared IndexedDB")
except Exception as e:
print(f"[DDMA BrowserManager] Could not clear IndexedDB: {e}")
# Clear browser caches
cache_dirs = [
os.path.join(self.profile_dir, "Default", "Cache"),
os.path.join(self.profile_dir, "Default", "Code Cache"),
os.path.join(self.profile_dir, "Default", "GPUCache"),
os.path.join(self.profile_dir, "Default", "Service Worker"),
os.path.join(self.profile_dir, "Cache"),
os.path.join(self.profile_dir, "Code Cache"),
os.path.join(self.profile_dir, "GPUCache"),
os.path.join(self.profile_dir, "Service Worker"),
os.path.join(self.profile_dir, "ShaderCache"),
]
for cache_dir in cache_dirs:
if os.path.exists(cache_dir):
try:
shutil.rmtree(cache_dir)
print(f"[DDMA BrowserManager] Cleared {os.path.basename(cache_dir)}")
except Exception as e:
print(f"[DDMA BrowserManager] Could not clear {os.path.basename(cache_dir)}: {e}")
self._needs_session_clear = True
print("[DDMA BrowserManager] Session cleared - will require fresh login")
except Exception as e:
print(f"[DDMA BrowserManager] Error clearing session: {e}")
# ── Credential hash tracking ──────────────────────────────────────────────
def _hash_credentials(self, username: str) -> str:
return hashlib.sha256(username.encode()).hexdigest()[:16]
def get_last_credentials_hash(self) -> str | None:
try:
if os.path.exists(self._credentials_file):
with open(self._credentials_file, 'r') as f:
return f.read().strip()
except Exception:
pass
return None
def save_credentials_hash(self, username: str):
try:
cred_hash = self._hash_credentials(username)
with open(self._credentials_file, 'w') as f:
f.write(cred_hash)
except Exception as e:
print(f"[DDMA BrowserManager] Failed to save credentials hash: {e}")
def credentials_changed(self, username: str) -> bool:
last_hash = self.get_last_credentials_hash()
if last_hash is None:
return False # No previous credentials stored — not a change
current_hash = self._hash_credentials(username)
changed = last_hash != current_hash
if changed:
print("[DDMA BrowserManager] Credentials changed — logout required")
return changed
def clear_credentials_hash(self):
try:
if os.path.exists(self._credentials_file):
os.remove(self._credentials_file)
except Exception as e:
print(f"[DDMA BrowserManager] Failed to clear credentials hash: {e}")
# ── Chrome process management ─────────────────────────────────────────────
def _kill_existing_chrome_for_profile(self):
"""Kill any existing Chrome processes using this profile and remove lock files."""
import subprocess
import time as time_module
try:
result = subprocess.run(
["pgrep", "-f", f"user-data-dir={self.profile_dir}"],
capture_output=True, text=True
)
if result.stdout.strip():
for pid in result.stdout.strip().split('\n'):
try:
subprocess.run(["kill", "-9", pid], check=False)
except Exception:
pass
time_module.sleep(1)
except Exception:
pass
for lock_file in ["SingletonLock", "SingletonSocket", "SingletonCookie"]:
lock_path = os.path.join(self.profile_dir, lock_file)
try:
if os.path.islink(lock_path) or os.path.exists(lock_path):
os.remove(lock_path)
except Exception:
pass
# ── Driver lifecycle ──────────────────────────────────────────────────────
def get_driver(self, headless=False):
"""Get or create the persistent browser instance."""
with self._lock:
if self._driver is None:
print("[BrowserManager] Driver is None, creating new driver")
print("[DDMA BrowserManager] Driver is None, creating new driver")
self._kill_existing_chrome_for_profile()
self._create_driver(headless)
elif not self._is_alive():
print("[BrowserManager] Driver not alive, recreating")
print("[DDMA BrowserManager] Driver not alive, recreating")
self._kill_existing_chrome_for_profile()
self._create_driver(headless)
else:
print("[BrowserManager] Reusing existing driver")
print("[DDMA BrowserManager] Reusing existing driver")
return self._driver
def _is_alive(self):
"""Check if browser is still responsive."""
try:
if self._driver is None:
return False
url = self._driver.current_url
print(f"[BrowserManager] Driver alive, current URL: {url[:50]}...")
print(f"[DDMA BrowserManager] Driver alive, current URL: {url[:50]}...")
return True
except Exception as e:
print(f"[BrowserManager] Driver not alive: {e}")
print(f"[DDMA BrowserManager] Driver not alive: {e}")
return False
def _create_driver(self, headless=False):
"""Create browser with persistent profile."""
"""Create browser with persistent profile and anti-detection options."""
if self._driver:
try:
self._driver.quit()
except:
except Exception:
pass
options = webdriver.ChromeOptions()
if headless:
options.add_argument("--headless")
# Persistent profile - THIS IS THE KEY for device trust
# Persistent profile — keeps device trust tokens between runs
options.add_argument(f"--user-data-dir={self.profile_dir}")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
# Anti-detection
options.add_argument("--disable-blink-features=AutomationControlled")
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option("useAutomationExtension", False)
options.add_argument("--disable-infobars")
prefs = {
"download.default_directory": self.download_dir,
"plugins.always_open_pdf_externally": True,
"download.prompt_for_download": False,
"download.directory_upgrade": True
"download.directory_upgrade": True,
}
options.add_experimental_option("prefs", prefs)
@@ -81,22 +260,39 @@ class DDMABrowserManager:
self._driver = webdriver.Chrome(service=service, options=options)
self._driver.maximize_window()
# Remove webdriver property to avoid detection
try:
self._driver.execute_script(
"Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"
)
except Exception:
pass
self._needs_session_clear = False
def quit_driver(self):
"""Quit browser (only call on shutdown)."""
"""Quit browser (only call on shutdown — NOT between patients)."""
with self._lock:
if self._driver:
try:
self._driver.quit()
except:
except Exception:
pass
self._driver = None
# Singleton accessor
# ── Singleton accessor ────────────────────────────────────────────────────────
_manager = None
def get_browser_manager():
def get_browser_manager() -> DDMABrowserManager:
global _manager
if _manager is None:
_manager = DDMABrowserManager()
return _manager
def clear_ddma_session_on_startup():
"""Called by agent.py on startup to clear DDMA session (after PC restart)."""
manager = get_browser_manager()
manager.clear_session_on_startup()

View File

@@ -5,7 +5,7 @@ from typing import Dict, Any
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import WebDriverException
from selenium.common.exceptions import WebDriverException, TimeoutException
from selenium_DDMA_eligibilityCheckWorker import AutomationDeltaDentalMAEligibilityCheck
@@ -20,13 +20,13 @@ def make_session_entry() -> str:
import uuid
sid = str(uuid.uuid4())
sessions[sid] = {
"status": "created", # created -> running -> waiting_for_otp -> otp_submitted -> completed / error
"status": "created", # created running waiting_for_otp completed / error
"created_at": time.time(),
"last_activity": time.time(),
"bot": None, # worker instance
"bot": None, # AutomationDeltaDentalMAEligibilityCheck instance
"driver": None, # selenium webdriver
"otp_event": asyncio.Event(),
"otp_value": None,
"otp_value": None, # OTP submitted from the app
"result": None,
"message": None,
"type": None,
@@ -36,37 +36,26 @@ def make_session_entry() -> str:
async def cleanup_session(sid: str, message: str | None = None):
"""
Close driver (if any), wake OTP waiter, set final state, and remove session entry.
Idempotent: safe to call multiple times.
Wake any OTP waiter, set final state, and remove the session entry.
Safe to call multiple times (idempotent).
NOTE: Does NOT quit the browser driver — the persistent browser stays alive.
"""
s = sessions.get(sid)
if not s:
return
try:
# Ensure final state
try:
if s.get("status") not in ("completed", "error", "not_found"):
s["status"] = "error"
if message:
s["message"] = message
except Exception:
pass
# Wake any OTP waiter (so awaiting coroutines don't hang)
try:
ev = s.get("otp_event")
if ev and not ev.is_set():
ev.set()
except Exception:
pass
# NOTE: Do NOT quit driver - keep browser alive for next patient
# Browser manager handles the persistent browser instance
if s.get("status") not in ("completed", "error", "not_found"):
s["status"] = "error"
if message:
s["message"] = message
ev = s.get("otp_event")
if ev and not ev.is_set():
ev.set()
finally:
# Remove session entry from map
sessions.pop(sid, None)
print(f"[helpers] cleaned session {sid}")
print(f"[helpers_ddma] cleaned session {sid}")
async def _remove_session_later(sid: str, delay: int = 20):
await asyncio.sleep(delay)
@@ -75,8 +64,12 @@ async def _remove_session_later(sid: str, delay: int = 20):
async def start_ddma_run(sid: str, data: dict, url: str):
"""
Run the DDMA workflow for a session (WITHOUT managing semaphore/counters).
Called by agent.py inside a wrapper that handles queue/counters.
Run the full DDMA eligibility workflow for one session.
Called by agent.py inside a wrapper that manages the semaphore/counters.
OTP handling uses two complementary strategies:
1. Accept OTP submitted from the app (via /submit-otp endpoint → otp_value field)
2. Poll the browser URL/DOM directly to detect when the user enters OTP themselves
"""
s = sessions.get(sid)
if not s:
@@ -93,7 +86,7 @@ async def start_ddma_run(sid: str, data: dict, url: str):
s["driver"] = bot.driver
s["last_activity"] = time.time()
# Navigate to login URL
# Navigate to login page
try:
if not url:
raise ValueError("URL not provided for DDMA run")
@@ -120,89 +113,130 @@ async def start_ddma_run(sid: str, data: dict, url: str):
await cleanup_session(sid, s["message"])
return {"status": "error", "message": s["message"]}
# Already logged in - session persisted from profile, skip to step1
# ── Path: already logged in (persistent session) ──────────────────────
if isinstance(login_result, str) and login_result == "ALREADY_LOGGED_IN":
print("[start_ddma_run] Session persisted - skipping OTP")
s["status"] = "running"
s["message"] = "Session persisted"
# Continue to step1 below
# OTP required path
# ── Path: OTP required ────────────────────────────────────────────────
elif isinstance(login_result, str) and login_result == "OTP_REQUIRED":
s["status"] = "waiting_for_otp"
s["message"] = "OTP required for login"
s["message"] = "OTP required for login - please enter OTP"
s["last_activity"] = time.time()
try:
await asyncio.wait_for(s["otp_event"].wait(), timeout=SESSION_OTP_TIMEOUT)
except asyncio.TimeoutError:
s["status"] = "error"
s["message"] = "OTP timeout"
await cleanup_session(sid)
return {"status": "error", "message": "OTP not provided in time"}
driver = s["driver"]
otp_value = s.get("otp_value")
if not otp_value:
s["status"] = "error"
s["message"] = "OTP missing after event"
await cleanup_session(sid)
return {"status": "error", "message": "OTP missing after event"}
# Poll every second for up to SESSION_OTP_TIMEOUT seconds.
# Accept OTP from two sources:
# a) app API (otp_value set by submit_otp())
# b) user entering OTP directly in the browser window
max_polls = SESSION_OTP_TIMEOUT
login_success = False
# Submit OTP - check if it's in a popup window
try:
driver = s["driver"]
wait = WebDriverWait(driver, 30)
# Check if there's a popup window and switch to it
original_window = driver.current_window_handle
all_windows = driver.window_handles
if len(all_windows) > 1:
for window in all_windows:
if window != original_window:
driver.switch_to.window(window)
print(f"[OTP] Switched to popup window for OTP entry")
break
print(f"[OTP] Polling for OTP completion (up to {SESSION_OTP_TIMEOUT}s)...")
otp_input = wait.until(
EC.presence_of_element_located(
(By.XPATH, "//input[contains(@aria-lable,'Verification code') or contains(@placeholder,'Enter your verification code')]")
)
)
otp_input.clear()
otp_input.send_keys(otp_value)
for poll in range(max_polls):
await asyncio.sleep(1)
s["last_activity"] = time.time()
try:
submit_btn = wait.until(
EC.element_to_be_clickable(
(By.XPATH, "//button[@type='button' and @aria-label='Verify']")
# a) App submitted OTP via /submit-otp endpoint
otp_value = s.get("otp_value")
if otp_value:
print(f"[OTP poll {poll+1}] OTP received from app, typing it in...")
try:
otp_input = driver.find_element(By.XPATH,
"//input[contains(@aria-label,'Verification') or contains(@placeholder,'verification') or @type='tel' or contains(@aria-lable,'Verification code') or contains(@placeholder,'Enter your verification code')]"
)
otp_input.clear()
otp_input.send_keys(otp_value)
try:
verify_btn = driver.find_element(By.XPATH, "//button[@type='button' and @aria-label='Verify']")
verify_btn.click()
except Exception:
otp_input.send_keys("\n")
print("[OTP] OTP typed and submitted via app")
s["otp_value"] = None # Clear so we don't re-submit
await asyncio.sleep(3)
except Exception as type_err:
print(f"[OTP] Failed to type OTP from app: {type_err}")
# b) Check URL — if we're past OTP page, login succeeded
current_url = driver.current_url.lower()
print(f"[OTP poll {poll+1}/{max_polls}] URL: {current_url[:70]}...")
if "member" in current_url or "dashboard" in current_url or "eligibility" in current_url:
try:
member_search = WebDriverWait(driver, 5).until(
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
)
print("[OTP] Member search found — login successful!")
login_success = True
break
except TimeoutException:
print("[OTP] On member page but search input not found, continuing...")
# Check if OTP input is still visible (user hasn't finished)
try:
otp_input_elem = driver.find_element(By.XPATH,
"//input[contains(@aria-label,'Verification') or contains(@placeholder,'verification') or @type='tel']"
)
print(f"[OTP poll {poll+1}] OTP input still visible - waiting...")
except Exception:
# OTP input gone — may mean login is completing; try members page
if "onboarding" in current_url or "start" in current_url:
print("[OTP] OTP input gone, trying to navigate to members page...")
try:
driver.get("https://providers.deltadentalma.com/members")
await asyncio.sleep(2)
except Exception:
pass
except Exception as poll_err:
print(f"[OTP poll {poll+1}] Error: {poll_err}")
if not login_success:
# Final check — navigate directly to members page
try:
print("[OTP] Final attempt - navigating to members page...")
driver.get("https://providers.deltadentalma.com/members")
await asyncio.sleep(3)
member_search = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
)
submit_btn.click()
except Exception:
otp_input.send_keys("\n")
# Wait for verification and switch back to main window if needed
await asyncio.sleep(2)
if len(driver.window_handles) > 0:
driver.switch_to.window(driver.window_handles[0])
print("[OTP] Member search found — login successful!")
login_success = True
except TimeoutException:
s["status"] = "error"
s["message"] = "OTP timeout - login not completed"
await cleanup_session(sid)
return {"status": "error", "message": "OTP not completed in time"}
except Exception as final_err:
s["status"] = "error"
s["message"] = f"OTP verification failed: {final_err}"
await cleanup_session(sid)
return {"status": "error", "message": s["message"]}
s["status"] = "otp_submitted"
s["last_activity"] = time.time()
await asyncio.sleep(0.5)
if login_success:
s["status"] = "running"
s["message"] = "Login successful after OTP"
print("[OTP] Proceeding to step1...")
except Exception as e:
s["status"] = "error"
s["message"] = f"Failed to submit OTP into page: {e}"
await cleanup_session(sid)
return {"status": "error", "message": s["message"]}
# ── Path: login succeeded without OTP ─────────────────────────────────
elif isinstance(login_result, str) and login_result == "SUCCESS":
print("[start_ddma_run] Login succeeded without OTP")
s["status"] = "running"
s["message"] = "Login succeeded"
# ── Path: login error ──────────────────────────────────────────────────
elif isinstance(login_result, str) and login_result.startswith("ERROR"):
s["status"] = "error"
s["message"] = login_result
await cleanup_session(sid)
return {"status": "error", "message": login_result}
# Step 1
# ── Step 1: search ────────────────────────────────────────────────────
step1_result = bot.step1()
if isinstance(step1_result, str) and step1_result.startswith("ERROR"):
s["status"] = "error"
@@ -210,7 +244,7 @@ async def start_ddma_run(sid: str, data: dict, url: str):
await cleanup_session(sid)
return {"status": "error", "message": step1_result}
# Step 2 (PDF)
# ── Step 2: PDF generation ────────────────────────────────────────────
step2_result = bot.step2()
if isinstance(step2_result, dict) and step2_result.get("status") == "success":
s["status"] = "completed"
@@ -235,12 +269,15 @@ async def start_ddma_run(sid: str, data: dict, url: str):
def submit_otp(sid: str, otp: str) -> Dict[str, Any]:
"""Set OTP for a session and wake waiting runner."""
"""
Called when the app sends an OTP via POST /submit-otp.
Sets otp_value on the session so the polling loop picks it up.
"""
s = sessions.get(sid)
if not s:
return {"status": "error", "message": "session not found"}
if s.get("status") != "waiting_for_otp":
return {"status": "error", "message": f"session not waiting for otp (state={s.get('status')})"}
return {"status": "error", "message": f"session not waiting for OTP (state={s.get('status')})"}
s["otp_value"] = otp
s["last_activity"] = time.time()
try:

View File

@@ -1,25 +1,29 @@
from selenium.common.exceptions import TimeoutException
from selenium import webdriver
from selenium.common.exceptions import WebDriverException, TimeoutException
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
import time
import os
import base64
from ddma_browser_manager import get_browser_manager
class AutomationDeltaDentalMAEligibilityCheck:
class AutomationDeltaDentalMAEligibilityCheck:
def __init__(self, data):
self.headless = False
self.driver = None
self.data = data.get("data", {}) if isinstance(data, dict) else {}
# Flatten values for convenience
self.memberId = self.data.get("memberId", "")
self.dateOfBirth = self.data.get("dateOfBirth", "")
self.firstName = self.data.get("firstName", "")
self.lastName = self.data.get("lastName", "")
self.massddma_username = self.data.get("massddmaUsername", "")
self.massddma_password = self.data.get("massddmaPassword", "")
@@ -31,23 +35,74 @@ class AutomationDeltaDentalMAEligibilityCheck:
# Use persistent browser from manager (keeps device trust tokens)
self.driver = get_browser_manager().get_driver(self.headless)
def _force_logout(self):
"""Force logout by clearing cookies when credentials change."""
try:
print("[DDMA login] Forcing logout due to credential change...")
browser_manager = get_browser_manager()
# Try to click logout button if visible
try:
self.driver.get("https://providers.deltadentalma.com/")
time.sleep(2)
logout_selectors = [
"//button[contains(text(), 'Log out') or contains(text(), 'Logout') or contains(text(), 'Sign out')]",
"//a[contains(text(), 'Log out') or contains(text(), 'Logout') or contains(text(), 'Sign out')]",
"//button[@aria-label='Log out' or @aria-label='Logout' or @aria-label='Sign out']",
"//*[contains(@class, 'logout') or contains(@class, 'signout')]"
]
for selector in logout_selectors:
try:
logout_btn = WebDriverWait(self.driver, 3).until(
EC.element_to_be_clickable((By.XPATH, selector))
)
logout_btn.click()
print("[DDMA login] Clicked logout button")
time.sleep(2)
break
except TimeoutException:
continue
except Exception as e:
print(f"[DDMA login] Could not click logout button: {e}")
# Clear cookies as backup
try:
self.driver.delete_all_cookies()
print("[DDMA login] Cleared all cookies")
except Exception as e:
print(f"[DDMA login] Error clearing cookies: {e}")
browser_manager.clear_credentials_hash()
print("[DDMA login] Logout complete")
return True
except Exception as e:
print(f"[DDMA login] Error during forced logout: {e}")
return False
def login(self, url):
wait = WebDriverWait(self.driver, 30)
browser_manager = get_browser_manager()
try:
# First check if we're already on a logged-in page (from previous run)
# Check if credentials changed — force logout first
if self.massddma_username and browser_manager.credentials_changed(self.massddma_username):
self._force_logout()
self.driver.get(url)
time.sleep(2)
# Check if already on a logged-in page (persistent session from profile)
try:
current_url = self.driver.current_url
print(f"[login] Current URL: {current_url}")
# Check if we're on any logged-in page (dashboard, member pages, etc.)
logged_in_patterns = ["member", "dashboard", "eligibility", "search", "patients"]
is_logged_in_url = any(pattern in current_url.lower() for pattern in logged_in_patterns)
if is_logged_in_url and "onboarding" not in current_url.lower():
print(f"[login] Already on logged-in page - skipping login entirely")
# Navigate directly to member search if not already there
print("[login] Already on logged-in page - skipping login entirely")
if "member" not in current_url.lower():
# Try to find a link to member search or just check for search input
try:
member_search = WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
@@ -55,13 +110,11 @@ class AutomationDeltaDentalMAEligibilityCheck:
print("[login] Found member search input - returning ALREADY_LOGGED_IN")
return "ALREADY_LOGGED_IN"
except TimeoutException:
# Try navigating to members page
members_url = "https://providers.deltadentalma.com/members"
print(f"[login] Navigating to members page: {members_url}")
self.driver.get(members_url)
time.sleep(2)
# Verify we have the member search input
try:
member_search = WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
@@ -72,16 +125,16 @@ class AutomationDeltaDentalMAEligibilityCheck:
print("[login] Could not find member search, will try login")
except Exception as e:
print(f"[login] Error checking current state: {e}")
# Navigate to login URL
self.driver.get(url)
time.sleep(2) # Wait for page to load and any redirects
# Check if we got redirected to member search (session still valid)
time.sleep(2)
# Check if session redirected us straight to member search
try:
current_url = self.driver.current_url
print(f"[login] URL after navigation: {current_url}")
if "onboarding" not in current_url.lower():
member_search = WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
@@ -91,7 +144,7 @@ class AutomationDeltaDentalMAEligibilityCheck:
return "ALREADY_LOGGED_IN"
except TimeoutException:
print("[login] Proceeding with login")
# Dismiss any "Authentication flow continued in another tab" modal
modal_dismissed = False
try:
@@ -102,21 +155,18 @@ class AutomationDeltaDentalMAEligibilityCheck:
print("[login] Dismissed authentication modal")
modal_dismissed = True
time.sleep(2)
# Check if a popup window opened for authentication
all_windows = self.driver.window_handles
print(f"[login] Windows after modal dismiss: {len(all_windows)}")
if len(all_windows) > 1:
# Switch to the auth popup
original_window = self.driver.current_window_handle
for window in all_windows:
if window != original_window:
self.driver.switch_to.window(window)
print(f"[login] Switched to auth popup window")
print("[login] Switched to auth popup window")
break
# Look for OTP input in the popup
try:
otp_candidate = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located(
@@ -129,14 +179,12 @@ class AutomationDeltaDentalMAEligibilityCheck:
except TimeoutException:
print("[login] No OTP in popup, checking main window")
self.driver.switch_to.window(original_window)
except TimeoutException:
pass # No modal present
# If modal was dismissed but no popup, page might have changed - wait and check
if modal_dismissed:
time.sleep(2)
# Check if we're now on member search page (already authenticated)
try:
member_search = WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
@@ -146,8 +194,8 @@ class AutomationDeltaDentalMAEligibilityCheck:
return "ALREADY_LOGGED_IN"
except TimeoutException:
pass
# Try to fill login form
# Fill login form
try:
email_field = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH, "//input[@name='username' and @type='text']"))
@@ -155,7 +203,7 @@ class AutomationDeltaDentalMAEligibilityCheck:
except TimeoutException:
print("[login] Could not find login form - page may have changed")
return "ERROR: Login form not found"
email_field = wait.until(EC.presence_of_element_located((By.XPATH, "//input[@name='username' and @type='text']")))
email_field.clear()
email_field.send_keys(self.massddma_username)
@@ -164,19 +212,23 @@ class AutomationDeltaDentalMAEligibilityCheck:
password_field.clear()
password_field.send_keys(self.massddma_password)
# remember me
# Remember me
try:
remember_me_checkbox = wait.until(EC.element_to_be_clickable(
(By.XPATH, "//label[.//span[contains(text(),'Remember me')]]")
))
remember_me_checkbox.click()
except:
except Exception:
print("[login] Remember me checkbox not found (continuing).")
login_button = wait.until(EC.element_to_be_clickable((By.XPATH, "//button[@type='submit' and @aria-label='Sign in']")))
login_button.click()
# OTP detection
# Save credentials hash after login attempt
if self.massddma_username:
browser_manager.save_credentials_hash(self.massddma_username)
# OTP detection — wait up to 30 seconds for OTP input
try:
otp_candidate = WebDriverWait(self.driver, 30).until(
EC.presence_of_element_located(
@@ -188,170 +240,394 @@ class AutomationDeltaDentalMAEligibilityCheck:
return "OTP_REQUIRED"
except TimeoutException:
print("[login] No OTP input detected in allowed time.")
# Check if we're now on the member search page (login succeeded without OTP)
try:
current_url = self.driver.current_url.lower()
if "member" in current_url or "dashboard" in current_url:
member_search = WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
)
print("[login] Login successful - now on member search page")
return "SUCCESS"
except TimeoutException:
pass
# Check for error messages
try:
error_elem = WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located((By.XPATH, "//*[contains(@class,'error') or contains(text(),'invalid') or contains(text(),'failed')]"))
)
print(f"[login] Login failed - error detected: {error_elem.text}")
return f"ERROR:LOGIN FAILED: {error_elem.text}"
except TimeoutException:
pass
if "onboarding" in self.driver.current_url.lower() or "login" in self.driver.current_url.lower():
print("[login] Login failed - still on login page")
return "ERROR:LOGIN FAILED: Still on login page"
print("[login] Assuming login succeeded (no errors detected)")
return "SUCCESS"
except Exception as e:
print("[login] Exception during login:", e)
return f"ERROR:LOGIN FAILED: {e}"
def step1(self):
"""Fill search form with all available fields (flexible search)."""
wait = WebDriverWait(self.driver, 30)
try:
# Fill Member ID
member_id_input = wait.until(EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]')))
member_id_input.clear()
member_id_input.send_keys(self.memberId)
fields = []
if self.memberId:
fields.append(f"ID: {self.memberId}")
if self.firstName:
fields.append(f"FirstName: {self.firstName}")
if self.lastName:
fields.append(f"LastName: {self.lastName}")
if self.dateOfBirth:
fields.append(f"DOB: {self.dateOfBirth}")
print(f"[DDMA step1] Starting search with: {', '.join(fields)}")
# Fill DOB parts
try:
dob_parts = self.dateOfBirth.split("-")
year = dob_parts[0] # "1964"
month = dob_parts[1].zfill(2) # "04"
day = dob_parts[2].zfill(2) # "17"
except Exception as e:
print(f"Error parsing DOB: {e}")
return "ERROR: PARSING DOB"
# 1) locate the specific member DOB container
dob_container = wait.until(
EC.presence_of_element_located(
(By.XPATH, "//div[@data-testid='member-search_date-of-birth']")
)
)
# 2) find the editable spans *inside that container* using relative XPaths
month_elem = dob_container.find_element(By.XPATH, ".//span[@data-type='month' and @contenteditable='true']")
day_elem = dob_container.find_element(By.XPATH, ".//span[@data-type='day' and @contenteditable='true']")
year_elem = dob_container.find_element(By.XPATH, ".//span[@data-type='year' and @contenteditable='true']")
# Helper to click, select-all and type (pure send_keys approach)
def replace_with_sendkeys(el, value):
# focus (same as click)
el.click()
# select all (Ctrl+A) and delete (some apps pick up BACKSPACE better — we use BACKSPACE after select)
el.send_keys(Keys.CONTROL, "a")
el.send_keys(Keys.BACKSPACE)
# type the value
el.send_keys(value)
# optionally blur or tab out if app expects it
# el.send_keys(Keys.TAB)
replace_with_sendkeys(month_elem, month)
time.sleep(0.05)
replace_with_sendkeys(day_elem, day)
time.sleep(0.05)
replace_with_sendkeys(year_elem, year)
# 1. Fill Member ID if provided
if self.memberId:
try:
member_id_input = wait.until(EC.presence_of_element_located(
(By.XPATH, '//input[@placeholder="Search by member ID"]')
))
member_id_input.clear()
member_id_input.send_keys(self.memberId)
print(f"[DDMA step1] Entered Member ID: {self.memberId}")
time.sleep(0.2)
except Exception as e:
print(f"[DDMA step1] Warning: Could not fill Member ID: {e}")
# 2. Fill DOB if provided
if self.dateOfBirth:
try:
dob_parts = self.dateOfBirth.split("-")
year = dob_parts[0]
month = dob_parts[1].zfill(2)
day = dob_parts[2].zfill(2)
# Click Continue button
continue_btn = wait.until(EC.element_to_be_clickable((By.XPATH, '//button[@data-testid="member-search_search-button"]')))
dob_container = wait.until(
EC.presence_of_element_located(
(By.XPATH, "//div[@data-testid='member-search_date-of-birth']")
)
)
month_elem = dob_container.find_element(By.XPATH, ".//span[@data-type='month' and @contenteditable='true']")
day_elem = dob_container.find_element(By.XPATH, ".//span[@data-type='day' and @contenteditable='true']")
year_elem = dob_container.find_element(By.XPATH, ".//span[@data-type='year' and @contenteditable='true']")
replace_with_sendkeys(month_elem, month)
time.sleep(0.05)
replace_with_sendkeys(day_elem, day)
time.sleep(0.05)
replace_with_sendkeys(year_elem, year)
print(f"[DDMA step1] Filled DOB: {month}/{day}/{year}")
except Exception as e:
print(f"[DDMA step1] Warning: Could not fill DOB: {e}")
# 3. Fill First Name if provided
if self.firstName:
try:
first_name_input = wait.until(EC.presence_of_element_located(
(By.XPATH, '//input[@placeholder="First name - 1 char minimum" or contains(@placeholder,"first name") or contains(@name,"firstName")]')
))
first_name_input.clear()
first_name_input.send_keys(self.firstName)
print(f"[DDMA step1] Entered First Name: {self.firstName}")
time.sleep(0.2)
except Exception as e:
print(f"[DDMA step1] Warning: Could not fill First Name: {e}")
# 4. Fill Last Name if provided
if self.lastName:
try:
last_name_input = wait.until(EC.presence_of_element_located(
(By.XPATH, '//input[@placeholder="Last name - 2 char minimum" or contains(@placeholder,"last name") or contains(@name,"lastName")]')
))
last_name_input.clear()
last_name_input.send_keys(self.lastName)
print(f"[DDMA step1] Entered Last Name: {self.lastName}")
time.sleep(0.2)
except Exception as e:
print(f"[DDMA step1] Warning: Could not fill Last Name: {e}")
time.sleep(0.3)
# Click Search button
continue_btn = wait.until(EC.element_to_be_clickable(
(By.XPATH, '//button[@data-testid="member-search_search-button"]')
))
continue_btn.click()
# Check for error message
print("[DDMA step1] Clicked Search button")
# Wait for either results row or no-results message (up to 15s)
try:
error_msg = WebDriverWait(self.driver, 5).until(EC.presence_of_element_located(
(By.XPATH, '//div[@data-testid="member-search-result-no-results"]')
))
if error_msg:
print("Error: Invalid Member ID or Date of Birth.")
return "ERROR: INVALID MEMBERID OR DOB"
WebDriverWait(self.driver, 15).until(
EC.any_of(
EC.presence_of_element_located((By.XPATH, "//tbody//tr")),
EC.presence_of_element_located((By.XPATH, '//div[@data-testid="member-search-result-no-results"]')),
)
)
except TimeoutException:
pass # proceed and let step2 handle missing results
# Check for no-results error
try:
error_msg = self.driver.find_element(By.XPATH, '//div[@data-testid="member-search-result-no-results"]')
if error_msg:
print("[DDMA step1] Error: No results found")
return "ERROR: INVALID SEARCH CRITERIA"
except Exception:
pass
print("[DDMA step1] Search completed successfully")
return "Success"
except Exception as e:
print(f"Error while step1 i.e Cheking the MemberId and DOB in: {e}")
return "ERROR:STEP1"
except Exception as e:
print(f"[DDMA step1] Exception: {e}")
return f"ERROR:STEP1 - {e}"
def step2(self):
"""Navigate to patient detail page and generate PDF."""
wait = WebDriverWait(self.driver, 90)
try:
# 1) find the eligibility <a> inside the correct cell
status_link = wait.until(EC.presence_of_element_located((
By.XPATH,
"(//tbody//tr)[1]//a[contains(@href, 'member-eligibility-search')]"
)))
import re
eligibilityText = status_link.text.strip().lower()
# Wait for results table
try:
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.XPATH, "//tbody//tr"))
)
except TimeoutException:
print("[DDMA step2] Warning: Results table not found within timeout")
# 2) finding patient name.
patient_name_div = wait.until(EC.presence_of_element_located((
By.XPATH,
'//div[@class="flex flex-row w-full items-center"]'
)))
eligibilityText = "unknown"
foundMemberId = ""
patientName = ""
patientName = patient_name_div.text.strip().lower()
# Extract data from first result row
try:
first_row = self.driver.find_element(By.XPATH, "(//tbody//tr)[1]")
row_text = first_row.text.strip()
print(f"[DDMA step2] First row text: {row_text[:150]}...")
if row_text:
lines = row_text.split('\n')
# Extract patient name (first line, strip DOB if present)
if lines:
potential_name = lines[0].strip()
potential_name = re.sub(r'\s*DOB[:\s]*\d{1,2}/\d{1,2}/\d{2,4}\s*', '', potential_name, flags=re.IGNORECASE).strip()
if potential_name and not potential_name.startswith('DOB') and not potential_name.isdigit():
patientName = potential_name
print(f"[DDMA step2] Extracted patient name: '{patientName}'")
# Extract Member ID
for line in lines:
line = line.strip()
if line and re.match(r'^[A-Z0-9]{5,}$', line) and not line.startswith('DOB'):
foundMemberId = line
print(f"[DDMA step2] Extracted Member ID: {foundMemberId}")
break
if not foundMemberId and self.memberId:
foundMemberId = self.memberId
print(f"[DDMA step2] Using input Member ID: {foundMemberId}")
except Exception as e:
print(f"[DDMA step2] Error extracting data from row: {e}")
if self.memberId:
foundMemberId = self.memberId
# Extract eligibility status
try:
short_wait = WebDriverWait(self.driver, 3)
status_link = short_wait.until(EC.presence_of_element_located((
By.XPATH,
"(//tbody//tr)[1]//a[contains(@href, 'member-eligibility-search')]"
)))
eligibilityText = status_link.text.strip().lower()
print(f"[DDMA step2] Found eligibility status: {eligibilityText}")
except Exception:
try:
alt_status = self.driver.find_element(By.XPATH, "//*[contains(text(),'Active') or contains(text(),'Inactive') or contains(text(),'Eligible')]")
eligibilityText = alt_status.text.strip().lower()
if "active" in eligibilityText or "eligible" in eligibilityText:
eligibilityText = "active"
elif "inactive" in eligibilityText:
eligibilityText = "inactive"
print(f"[DDMA step2] Found eligibility via alternative: {eligibilityText}")
except Exception:
pass
# Navigate to detailed patient page
print("[DDMA step2] Navigating to patient detail page...")
patient_name_clicked = False
detail_url = None
patient_link_selectors = [
"(//table//tbody//tr)[1]//td[1]//a",
"(//tbody//tr)[1]//a[contains(@href, 'member-details')]",
"(//tbody//tr)[1]//a[contains(@href, 'member')]",
]
for selector in patient_link_selectors:
try:
patient_link = WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.XPATH, selector))
)
link_text = patient_link.text.strip()
href = patient_link.get_attribute("href")
print(f"[DDMA step2] Found patient link: text='{link_text}', href={href}")
if link_text and not patientName:
patientName = link_text
if href and "member-details" in href:
detail_url = href
patient_name_clicked = True
break
except Exception as e:
print(f"[DDMA step2] Selector '{selector}' failed: {e}")
continue
if not detail_url:
try:
all_links = self.driver.find_elements(By.XPATH, "//a[contains(@href, 'member-details')]")
if all_links:
detail_url = all_links[0].get_attribute("href")
patient_name_clicked = True
print(f"[DDMA step2] Fallback member-details link: {detail_url}")
except Exception as e:
print(f"[DDMA step2] Could not find member-details link: {e}")
if patient_name_clicked and detail_url:
print(f"[DDMA step2] Navigating directly to: {detail_url}")
self.driver.get(detail_url)
# Wait for page to be ready
try:
WebDriverWait(self.driver, 30).until(
lambda d: d.execute_script("return document.readyState") == "complete"
)
except Exception:
print("[DDMA step2] Warning: document.readyState did not become 'complete'")
# Wait for meaningful content to appear
content_selectors = [
"//div[contains(@class,'member') or contains(@class,'detail') or contains(@class,'patient')]",
"//h1", "//h2", "//table",
"//*[contains(text(),'Member ID') or contains(text(),'Name') or contains(text(),'Date of Birth')]",
]
for selector in content_selectors:
try:
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.XPATH, selector))
)
print(f"[DDMA step2] Content loaded: {selector}")
break
except Exception:
continue
time.sleep(1) # Brief settle for any late-rendering elements
# Try to extract patient name from detail page if not already found
if not patientName:
for selector in ["//h1", "//h2", "//*[contains(@class,'patient-name') or contains(@class,'member-name')]"]:
try:
name_elem = self.driver.find_element(By.XPATH, selector)
name_text = name_elem.text.strip()
if name_text and len(name_text) > 1:
if not any(x in name_text.lower() for x in ['active', 'inactive', 'eligible', 'search', 'date', 'print']):
patientName = name_text
print(f"[DDMA step2] Found patient name on detail page: {patientName}")
break
except Exception:
continue
else:
print("[DDMA step2] Warning: Could not navigate to patient detail page")
if not patientName:
try:
name_elem = self.driver.find_element(By.XPATH, "(//tbody//tr)[1]//td[1]")
patientName = name_elem.text.strip()
except Exception:
pass
if not patientName:
print("[DDMA step2] Could not extract patient name")
# Clean patient name
if patientName:
cleaned = re.sub(r'\s*DOB[:\s]*\d{1,2}/\d{1,2}/\d{2,4}\s*', '', patientName, flags=re.IGNORECASE).strip()
if cleaned:
patientName = cleaned
# Wait for page ready before PDF
try:
WebDriverWait(self.driver, 30).until(
lambda d: d.execute_script("return document.readyState") == "complete"
)
except Exception:
print("Warning: document.readyState did not become 'complete' within timeout")
# Give some time for lazy content to finish rendering (adjust if needed)
time.sleep(0.6)
# Get total page size and DPR
total_width = int(self.driver.execute_script(
"return Math.max(document.body.scrollWidth, document.documentElement.scrollWidth, document.documentElement.clientWidth);"
))
total_height = int(self.driver.execute_script(
"return Math.max(document.body.scrollHeight, document.documentElement.scrollHeight, document.documentElement.clientHeight);"
))
dpr = float(self.driver.execute_script("return window.devicePixelRatio || 1;"))
# Set device metrics to the full page size so Page.captureScreenshot captures everything
# Note: Some pages are extremely tall; if you hit memory limits, you can capture in chunks.
self.driver.execute_cdp_cmd('Emulation.setDeviceMetricsOverride', {
"mobile": False,
"width": total_width,
"height": total_height,
"deviceScaleFactor": dpr,
"screenOrientation": {"angle": 0, "type": "portraitPrimary"}
})
# Small pause for layout to settle after emulation change
time.sleep(0.15)
# Capture screenshot (base64 PNG)
result = self.driver.execute_cdp_cmd("Page.captureScreenshot", {"format": "png", "fromSurface": True})
image_data = base64.b64decode(result.get('data', ''))
screenshot_path = os.path.join(self.download_dir, f"ss_{self.memberId}.png")
with open(screenshot_path, "wb") as f:
f.write(image_data)
# Restore original metrics to avoid affecting further interactions
try:
self.driver.execute_cdp_cmd('Emulation.clearDeviceMetricsOverride', {})
except Exception:
# non-fatal: continue
pass
print("Screenshot saved at:", screenshot_path)
# Close the browser window after screenshot (session preserved in profile)
# Generate PDF via Chrome DevTools Protocol
print("[DDMA step2] Generating PDF of patient detail page...")
pdf_options = {
"landscape": False,
"displayHeaderFooter": False,
"printBackground": True,
"preferCSSPageSize": True,
"paperWidth": 8.5,
"paperHeight": 11,
"marginTop": 0.4,
"marginBottom": 0.4,
"marginLeft": 0.4,
"marginRight": 0.4,
"scale": 0.9,
}
result = self.driver.execute_cdp_cmd("Page.printToPDF", pdf_options)
pdf_data = base64.b64decode(result.get('data', ''))
pdf_id = foundMemberId or self.memberId or "unknown"
pdf_path = os.path.join(self.download_dir, f"eligibility_{pdf_id}.pdf")
with open(pdf_path, "wb") as f:
f.write(pdf_data)
print(f"[DDMA step2] PDF saved at: {pdf_path}")
# Close the browser window after PDF (session preserved in profile)
try:
from ddma_browser_manager import get_browser_manager
get_browser_manager().quit_driver()
print("[step2] Browser closed - session preserved in profile")
except Exception as e:
print(f"[step2] Error closing browser: {e}")
output = {
"status": "success",
"eligibility": eligibilityText,
"ss_path": screenshot_path,
"patientName":patientName
}
return output
print(f"[DDMA step2] Final — PatientName: '{patientName}', MemberID: '{foundMemberId}'")
return {
"status": "success",
"eligibility": eligibilityText,
"ss_path": pdf_path, # kept for backward compatibility
"pdf_path": pdf_path, # explicit pdf_path
"patientName": patientName,
"memberId": foundMemberId,
}
except Exception as e:
print("ERROR in step2:", e)
# Empty the download folder (remove files / symlinks only)
# Cleanup download dir on error
try:
dl = os.path.abspath(self.download_dir)
if os.path.isdir(dl):
@@ -360,20 +636,14 @@ class AutomationDeltaDentalMAEligibilityCheck:
try:
if os.path.isfile(item) or os.path.islink(item):
os.remove(item)
print(f"[cleanup] removed: {item}")
except Exception as rm_err:
print(f"[cleanup] failed to remove {item}: {rm_err}")
print(f"[cleanup] emptied download dir: {dl}")
else:
print(f"[cleanup] download dir does not exist: {dl}")
except Exception as cleanup_exc:
print(f"[cleanup] unexpected error while cleaning downloads dir: {cleanup_exc}")
print(f"[cleanup] unexpected error: {cleanup_exc}")
return {"status": "error", "message": str(e)}
# NOTE: Do NOT quit driver here - keep browser alive for next patient
def main_workflow(self, url):
try:
try:
self.config_driver()
self.driver.maximize_window()
time.sleep(3)
@@ -393,9 +663,6 @@ class AutomationDeltaDentalMAEligibilityCheck:
return {"status": "error", "message": step2_result.get("message")}
return step2_result
except Exception as e:
return {
"status": "error",
"message": e
}
# NOTE: Do NOT quit driver - keep browser alive for next patient
except Exception as e:
return {"status": "error", "message": str(e)}
# NOTE: Do NOT quit driver — keep browser alive for next patient