feat(eligibility-check) - add CCA eligibility workflow with new routes and frontend components; enhance patient data processing and eligibility status updates; update insurance provider handling across various workflows

This commit is contained in:
2026-02-25 22:38:33 -05:00
parent 27e6e6a4a0
commit 4cb7ec7e2e
26 changed files with 2893 additions and 346 deletions

View File

@@ -12,12 +12,14 @@ import helpers_ddma_eligibility as hddma
import helpers_dentaquest_eligibility as hdentaquest
import helpers_unitedsco_eligibility as hunitedsco
import helpers_deltains_eligibility as hdeltains
import helpers_cca_eligibility as hcca
# Import session clear functions for startup
from ddma_browser_manager import clear_ddma_session_on_startup
from dentaquest_browser_manager import clear_dentaquest_session_on_startup
from unitedsco_browser_manager import clear_unitedsco_session_on_startup
from deltains_browser_manager import clear_deltains_session_on_startup
from cca_browser_manager import clear_cca_session_on_startup
from dotenv import load_dotenv
load_dotenv()
@@ -31,6 +33,7 @@ clear_ddma_session_on_startup()
clear_dentaquest_session_on_startup()
clear_unitedsco_session_on_startup()
clear_deltains_session_on_startup()
clear_cca_session_on_startup()
print("=" * 50)
print("SESSION CLEAR COMPLETE - FRESH LOGINS REQUIRED")
print("=" * 50)
@@ -425,6 +428,48 @@ async def deltains_session_status(sid: str):
return s
# Endpoint:9 - CCA eligibility (background, no OTP)
async def _cca_worker_wrapper(sid: str, data: dict, url: str):
global active_jobs, waiting_jobs
async with semaphore:
async with lock:
waiting_jobs -= 1
active_jobs += 1
try:
await hcca.start_cca_run(sid, data, url)
finally:
async with lock:
active_jobs -= 1
@app.post("/cca-eligibility")
async def cca_eligibility(request: Request):
global waiting_jobs
body = await request.json()
data = body.get("data", {})
sid = hcca.make_session_entry()
hcca.sessions[sid]["type"] = "cca_eligibility"
hcca.sessions[sid]["last_activity"] = time.time()
async with lock:
waiting_jobs += 1
asyncio.create_task(_cca_worker_wrapper(sid, data, url="https://pwp.sciondental.com/PWP/Landing"))
return {"status": "started", "session_id": sid}
@app.get("/cca-session/{sid}/status")
async def cca_session_status(sid: str):
s = hcca.get_session_status(sid)
if s.get("status") == "not_found":
raise HTTPException(status_code=404, detail="session not found")
return s
@app.post("/submit-otp")
async def submit_otp(request: Request):
"""
@@ -511,6 +556,15 @@ async def clear_deltains_session():
return {"status": "error", "message": str(e)}
@app.post("/clear-cca-session")
async def clear_cca_session_endpoint():
try:
clear_cca_session_on_startup()
return {"status": "success", "message": "CCA session cleared"}
except Exception as e:
return {"status": "error", "message": str(e)}
if __name__ == "__main__":
host = os.getenv("HOST")
port = int(os.getenv("PORT"))

View File

@@ -0,0 +1,292 @@
"""
Browser manager for CCA (Commonwealth Care Alliance) via ScionDental portal.
Handles persistent Chrome profile, cookie save/restore, and credential tracking.
No OTP required for this provider.
"""
import os
import json
import shutil
import hashlib
import threading
import subprocess
import time
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
if not os.environ.get("DISPLAY"):
os.environ["DISPLAY"] = ":0"
class CCABrowserManager:
_instance = None
_lock = threading.Lock()
def __new__(cls):
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._driver = None
cls._instance.profile_dir = os.path.abspath("chrome_profile_cca")
cls._instance.download_dir = os.path.abspath("seleniumDownloads")
cls._instance._credentials_file = os.path.join(cls._instance.profile_dir, ".last_credentials")
cls._instance._cookies_file = os.path.join(cls._instance.profile_dir, ".saved_cookies.json")
cls._instance._needs_session_clear = False
os.makedirs(cls._instance.profile_dir, exist_ok=True)
os.makedirs(cls._instance.download_dir, exist_ok=True)
return cls._instance
def save_cookies(self):
try:
if not self._driver:
return
cookies = self._driver.get_cookies()
if not cookies:
return
with open(self._cookies_file, "w") as f:
json.dump(cookies, f)
print(f"[CCA BrowserManager] Saved {len(cookies)} cookies to disk")
except Exception as e:
print(f"[CCA BrowserManager] Failed to save cookies: {e}")
def restore_cookies(self):
if not os.path.exists(self._cookies_file):
print("[CCA BrowserManager] No saved cookies file found")
return False
try:
with open(self._cookies_file, "r") as f:
cookies = json.load(f)
if not cookies:
print("[CCA BrowserManager] Saved cookies file is empty")
return False
try:
self._driver.get("https://pwp.sciondental.com/favicon.ico")
time.sleep(2)
except Exception:
self._driver.get("https://pwp.sciondental.com")
time.sleep(3)
restored = 0
for cookie in cookies:
try:
for key in ["sameSite", "storeId", "hostOnly", "session"]:
cookie.pop(key, None)
cookie["sameSite"] = "None"
self._driver.add_cookie(cookie)
restored += 1
except Exception:
pass
print(f"[CCA BrowserManager] Restored {restored}/{len(cookies)} cookies")
return restored > 0
except Exception as e:
print(f"[CCA BrowserManager] Failed to restore cookies: {e}")
return False
def clear_saved_cookies(self):
try:
if os.path.exists(self._cookies_file):
os.remove(self._cookies_file)
print("[CCA BrowserManager] Cleared saved cookies file")
except Exception as e:
print(f"[CCA BrowserManager] Failed to clear saved cookies: {e}")
def clear_session_on_startup(self):
print("[CCA BrowserManager] Clearing session on startup...")
try:
if os.path.exists(self._credentials_file):
os.remove(self._credentials_file)
self.clear_saved_cookies()
session_files = [
"Cookies", "Cookies-journal",
"Login Data", "Login Data-journal",
"Web Data", "Web Data-journal",
]
for filename in session_files:
for base in [os.path.join(self.profile_dir, "Default"), self.profile_dir]:
filepath = os.path.join(base, filename)
if os.path.exists(filepath):
try:
os.remove(filepath)
except Exception:
pass
for dirname in ["Session Storage", "Local Storage", "IndexedDB"]:
dirpath = os.path.join(self.profile_dir, "Default", dirname)
if os.path.exists(dirpath):
try:
shutil.rmtree(dirpath)
except Exception:
pass
for cache_name in ["Cache", "Code Cache", "GPUCache", "Service Worker", "ShaderCache"]:
for base in [os.path.join(self.profile_dir, "Default"), self.profile_dir]:
cache_dir = os.path.join(base, cache_name)
if os.path.exists(cache_dir):
try:
shutil.rmtree(cache_dir)
except Exception:
pass
self._needs_session_clear = True
print("[CCA BrowserManager] Session cleared - will require fresh login")
except Exception as e:
print(f"[CCA BrowserManager] Error clearing session: {e}")
def _hash_credentials(self, username: str) -> str:
return hashlib.sha256(username.encode()).hexdigest()[:16]
def get_last_credentials_hash(self):
try:
if os.path.exists(self._credentials_file):
with open(self._credentials_file, 'r') as f:
return f.read().strip()
except Exception:
pass
return None
def save_credentials_hash(self, username: str):
try:
cred_hash = self._hash_credentials(username)
with open(self._credentials_file, 'w') as f:
f.write(cred_hash)
except Exception as e:
print(f"[CCA BrowserManager] Failed to save credentials hash: {e}")
def credentials_changed(self, username: str) -> bool:
last_hash = self.get_last_credentials_hash()
if last_hash is None:
return False
current_hash = self._hash_credentials(username)
changed = last_hash != current_hash
if changed:
print("[CCA BrowserManager] Credentials changed - logout required")
return changed
def clear_credentials_hash(self):
try:
if os.path.exists(self._credentials_file):
os.remove(self._credentials_file)
except Exception:
pass
def _kill_existing_chrome_for_profile(self):
try:
result = subprocess.run(
["pgrep", "-f", f"user-data-dir={self.profile_dir}"],
capture_output=True, text=True
)
if result.stdout.strip():
for pid in result.stdout.strip().split('\n'):
try:
subprocess.run(["kill", "-9", pid], check=False)
except Exception:
pass
time.sleep(1)
except Exception:
pass
for lock_file in ["SingletonLock", "SingletonSocket", "SingletonCookie"]:
lock_path = os.path.join(self.profile_dir, lock_file)
try:
if os.path.islink(lock_path) or os.path.exists(lock_path):
os.remove(lock_path)
except Exception:
pass
def get_driver(self, headless=False):
with self._lock:
need_cookie_restore = False
if self._driver is None:
print("[CCA BrowserManager] Driver is None, creating new driver")
self._kill_existing_chrome_for_profile()
self._create_driver(headless)
need_cookie_restore = True
elif not self._is_alive():
print("[CCA BrowserManager] Driver not alive, recreating")
self._kill_existing_chrome_for_profile()
self._create_driver(headless)
need_cookie_restore = True
else:
print("[CCA BrowserManager] Reusing existing driver")
if need_cookie_restore and os.path.exists(self._cookies_file):
print("[CCA BrowserManager] Restoring saved cookies into new browser...")
self.restore_cookies()
return self._driver
def _is_alive(self):
try:
if self._driver is None:
return False
_ = self._driver.current_url
return True
except Exception:
return False
def _create_driver(self, headless=False):
if self._driver:
try:
self._driver.quit()
except Exception:
pass
self._driver = None
time.sleep(1)
options = webdriver.ChromeOptions()
if headless:
options.add_argument("--headless")
options.add_argument(f"--user-data-dir={self.profile_dir}")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-blink-features=AutomationControlled")
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option("useAutomationExtension", False)
options.add_argument("--disable-infobars")
prefs = {
"download.default_directory": self.download_dir,
"plugins.always_open_pdf_externally": True,
"download.prompt_for_download": False,
"download.directory_upgrade": True,
"credentials_enable_service": False,
"profile.password_manager_enabled": False,
"profile.password_manager_leak_detection": False,
}
options.add_experimental_option("prefs", prefs)
service = Service(ChromeDriverManager().install())
self._driver = webdriver.Chrome(service=service, options=options)
self._driver.maximize_window()
try:
self._driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
except Exception:
pass
self._needs_session_clear = False
def quit_driver(self):
with self._lock:
if self._driver:
try:
self._driver.quit()
except Exception:
pass
self._driver = None
self._kill_existing_chrome_for_profile()
_manager = None
def get_browser_manager():
global _manager
if _manager is None:
_manager = CCABrowserManager()
return _manager
def clear_cca_session_on_startup():
manager = get_browser_manager()
manager.clear_session_on_startup()

View File

@@ -0,0 +1,180 @@
import os
import time
import asyncio
from typing import Dict, Any
from selenium.common.exceptions import WebDriverException
from selenium_CCA_eligibilityCheckWorker import AutomationCCAEligibilityCheck
from cca_browser_manager import get_browser_manager
sessions: Dict[str, Dict[str, Any]] = {}
def make_session_entry() -> str:
import uuid
sid = str(uuid.uuid4())
sessions[sid] = {
"status": "created",
"created_at": time.time(),
"last_activity": time.time(),
"bot": None,
"driver": None,
"result": None,
"message": None,
"type": None,
}
return sid
async def cleanup_session(sid: str, message: str | None = None):
s = sessions.get(sid)
if not s:
return
try:
if s.get("status") not in ("completed", "error", "not_found"):
s["status"] = "error"
if message:
s["message"] = message
finally:
sessions.pop(sid, None)
async def _remove_session_later(sid: str, delay: int = 30):
await asyncio.sleep(delay)
await cleanup_session(sid)
def _close_browser(bot):
try:
bm = get_browser_manager()
try:
bm.save_cookies()
except Exception:
pass
try:
bm.quit_driver()
print("[CCA] Browser closed")
except Exception:
pass
except Exception as e:
print(f"[CCA] Could not close browser: {e}")
async def start_cca_run(sid: str, data: dict, url: str):
"""
Run the CCA eligibility check workflow (no OTP):
1. Login
2. Search patient by Subscriber ID + DOB
3. Extract eligibility info + PDF
"""
s = sessions.get(sid)
if not s:
return {"status": "error", "message": "session not found"}
s["status"] = "running"
s["last_activity"] = time.time()
bot = None
try:
bot = AutomationCCAEligibilityCheck({"data": data})
bot.config_driver()
s["bot"] = bot
s["driver"] = bot.driver
s["last_activity"] = time.time()
try:
bot.driver.maximize_window()
except Exception:
pass
try:
login_result = bot.login(url)
except WebDriverException as wde:
s["status"] = "error"
s["message"] = f"Selenium driver error during login: {wde}"
s["result"] = {"status": "error", "message": s["message"]}
_close_browser(bot)
asyncio.create_task(_remove_session_later(sid, 30))
return {"status": "error", "message": s["message"]}
except Exception as e:
s["status"] = "error"
s["message"] = f"Unexpected error during login: {e}"
s["result"] = {"status": "error", "message": s["message"]}
_close_browser(bot)
asyncio.create_task(_remove_session_later(sid, 30))
return {"status": "error", "message": s["message"]}
if isinstance(login_result, str) and login_result == "ALREADY_LOGGED_IN":
s["status"] = "running"
s["message"] = "Session persisted"
print("[CCA] Session persisted - skipping login")
get_browser_manager().save_cookies()
elif isinstance(login_result, str) and login_result.startswith("ERROR"):
s["status"] = "error"
s["message"] = login_result
s["result"] = {"status": "error", "message": login_result}
_close_browser(bot)
asyncio.create_task(_remove_session_later(sid, 30))
return {"status": "error", "message": login_result}
elif isinstance(login_result, str) and login_result == "SUCCESS":
print("[CCA] Login succeeded")
s["status"] = "running"
s["message"] = "Login succeeded"
get_browser_manager().save_cookies()
# Step 1 - search patient and verify eligibility
step1_result = bot.step1()
print(f"[CCA] step1 result: {step1_result}")
if isinstance(step1_result, str) and step1_result.startswith("ERROR"):
s["status"] = "error"
s["message"] = step1_result
s["result"] = {"status": "error", "message": step1_result}
_close_browser(bot)
asyncio.create_task(_remove_session_later(sid, 30))
return {"status": "error", "message": step1_result}
# Step 2 - extract eligibility info + PDF
step2_result = bot.step2()
print(f"[CCA] step2 result: {step2_result.get('status') if isinstance(step2_result, dict) else step2_result}")
if isinstance(step2_result, dict):
s["status"] = "completed"
s["result"] = step2_result
s["message"] = "completed"
asyncio.create_task(_remove_session_later(sid, 60))
return step2_result
else:
s["status"] = "error"
s["message"] = f"step2 returned unexpected result: {step2_result}"
s["result"] = {"status": "error", "message": s["message"]}
_close_browser(bot)
asyncio.create_task(_remove_session_later(sid, 30))
return {"status": "error", "message": s["message"]}
except Exception as e:
if s:
s["status"] = "error"
s["message"] = f"worker exception: {e}"
s["result"] = {"status": "error", "message": s["message"]}
if bot:
_close_browser(bot)
asyncio.create_task(_remove_session_later(sid, 30))
return {"status": "error", "message": f"worker exception: {e}"}
def get_session_status(sid: str) -> Dict[str, Any]:
s = sessions.get(sid)
if not s:
return {"status": "not_found"}
return {
"session_id": sid,
"status": s.get("status"),
"message": s.get("message"),
"created_at": s.get("created_at"),
"last_activity": s.get("last_activity"),
"result": s.get("result") if s.get("status") in ("completed", "error") else None,
}

View File

@@ -0,0 +1,723 @@
from selenium import webdriver
from selenium.common.exceptions import WebDriverException, TimeoutException
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
import time
import os
import base64
import re
import glob
from datetime import datetime
from cca_browser_manager import get_browser_manager
LOGIN_URL = "https://pwp.sciondental.com/PWP/Landing"
LANDING_URL = "https://pwp.sciondental.com/PWP/Landing"
class AutomationCCAEligibilityCheck:
def __init__(self, data):
self.headless = False
self.driver = None
self.data = data.get("data", {}) if isinstance(data, dict) else {}
self.memberId = self.data.get("memberId", "")
self.dateOfBirth = self.data.get("dateOfBirth", "")
self.firstName = self.data.get("firstName", "")
self.lastName = self.data.get("lastName", "")
self.cca_username = self.data.get("cca_username", "")
self.cca_password = self.data.get("cca_password", "")
self.download_dir = get_browser_manager().download_dir
os.makedirs(self.download_dir, exist_ok=True)
def config_driver(self):
self.driver = get_browser_manager().get_driver(self.headless)
def _close_browser(self):
browser_manager = get_browser_manager()
try:
browser_manager.save_cookies()
except Exception as e:
print(f"[CCA] Failed to save cookies before close: {e}")
try:
browser_manager.quit_driver()
print("[CCA] Browser closed")
except Exception as e:
print(f"[CCA] Could not close browser: {e}")
def _force_logout(self):
try:
print("[CCA login] Forcing logout due to credential change...")
browser_manager = get_browser_manager()
try:
self.driver.delete_all_cookies()
except Exception:
pass
browser_manager.clear_credentials_hash()
print("[CCA login] Logout complete")
return True
except Exception as e:
print(f"[CCA login] Error during forced logout: {e}")
return False
def _page_has_logged_in_content(self):
"""Quick check if the current page shows logged-in portal content."""
try:
body_text = self.driver.find_element(By.TAG_NAME, "body").text
return ("Verify Patient Eligibility" in body_text
or "Patient Management" in body_text
or "Submit a Claim" in body_text
or "Claim Inquiry" in body_text)
except Exception:
return False
def login(self, url):
"""
Login to ScionDental portal for CCA.
No OTP required - simple username/password login.
Returns: ALREADY_LOGGED_IN, SUCCESS, or ERROR:...
"""
browser_manager = get_browser_manager()
try:
if self.cca_username and browser_manager.credentials_changed(self.cca_username):
self._force_logout()
self.driver.get(url)
time.sleep(2)
# Check current page state first (no navigation needed)
try:
current_url = self.driver.current_url
print(f"[CCA login] Current URL: {current_url}")
if ("sciondental.com" in current_url
and "login" not in current_url.lower()
and self._page_has_logged_in_content()):
print("[CCA login] Already logged in")
return "ALREADY_LOGGED_IN"
except Exception as e:
print(f"[CCA login] Error checking current state: {e}")
# Navigate to landing page to check session
print("[CCA login] Checking session at landing page...")
self.driver.get(LANDING_URL)
try:
WebDriverWait(self.driver, 10).until(
lambda d: "sciondental.com" in d.current_url
)
except TimeoutException:
pass
time.sleep(2)
current_url = self.driver.current_url
print(f"[CCA login] After landing nav URL: {current_url}")
if self._page_has_logged_in_content():
print("[CCA login] Session still valid")
return "ALREADY_LOGGED_IN"
# Session expired — navigate to login URL
print("[CCA login] Session not valid, navigating to login page...")
self.driver.get(url)
time.sleep(2)
current_url = self.driver.current_url
print(f"[CCA login] After login nav URL: {current_url}")
# Enter username
print("[CCA login] Looking for username field...")
username_entered = False
for sel in [
(By.ID, "Username"),
(By.NAME, "Username"),
(By.XPATH, "//input[@type='text']"),
]:
try:
field = WebDriverWait(self.driver, 6).until(
EC.presence_of_element_located(sel))
if field.is_displayed():
field.clear()
field.send_keys(self.cca_username)
username_entered = True
print(f"[CCA login] Username entered via {sel}")
break
except Exception:
continue
if not username_entered:
if self._page_has_logged_in_content():
return "ALREADY_LOGGED_IN"
return "ERROR: Could not find username field"
# Enter password
print("[CCA login] Looking for password field...")
pw_entered = False
for sel in [
(By.ID, "Password"),
(By.NAME, "Password"),
(By.XPATH, "//input[@type='password']"),
]:
try:
field = self.driver.find_element(*sel)
if field.is_displayed():
field.clear()
field.send_keys(self.cca_password)
pw_entered = True
print(f"[CCA login] Password entered via {sel}")
break
except Exception:
continue
if not pw_entered:
return "ERROR: Password field not found"
# Click login button
for sel in [
(By.XPATH, "//button[@type='submit']"),
(By.XPATH, "//input[@type='submit']"),
(By.XPATH, "//button[contains(text(),'Sign In') or contains(text(),'Log In') or contains(text(),'Login')]"),
(By.XPATH, "//input[@value='Sign In' or @value='Log In' or @value='Login']"),
]:
try:
btn = self.driver.find_element(*sel)
if btn.is_displayed():
btn.click()
print(f"[CCA login] Clicked login button via {sel}")
break
except Exception:
continue
if self.cca_username:
browser_manager.save_credentials_hash(self.cca_username)
# Wait for page to load after login
try:
WebDriverWait(self.driver, 15).until(
lambda d: "Landing" in d.current_url
or "Dental" in d.current_url
or "Home" in d.current_url
)
print("[CCA login] Redirected to portal page")
except TimeoutException:
time.sleep(3)
current_url = self.driver.current_url
print(f"[CCA login] After login submit URL: {current_url}")
# Check for login errors
try:
body_text = self.driver.find_element(By.TAG_NAME, "body").text
if "invalid" in body_text.lower() and ("password" in body_text.lower() or "username" in body_text.lower()):
return "ERROR: Invalid username or password"
except Exception:
pass
if self._page_has_logged_in_content():
print("[CCA login] Login successful")
return "SUCCESS"
if "Landing" in current_url or "Home" in current_url or "Dental" in current_url:
return "SUCCESS"
# Check for errors
try:
errors = self.driver.find_elements(By.XPATH,
"//*[contains(@class,'error') or contains(@class,'alert-danger') or contains(@class,'validation-summary')]")
for err in errors:
if err.is_displayed() and err.text.strip():
return f"ERROR: {err.text.strip()[:200]}"
except Exception:
pass
print("[CCA login] Login completed (assuming success)")
return "SUCCESS"
except Exception as e:
print(f"[CCA login] Exception: {e}")
return f"ERROR:LOGIN FAILED: {e}"
def _format_dob(self, dob_str):
if dob_str and "-" in dob_str:
dob_parts = dob_str.split("-")
if len(dob_parts) == 3:
return f"{dob_parts[1]}/{dob_parts[2]}/{dob_parts[0]}"
return dob_str
def step1(self):
"""
Enter patient info and click Verify Eligibility.
"""
try:
formatted_dob = self._format_dob(self.dateOfBirth)
today_str = datetime.now().strftime("%m/%d/%Y")
print(f"[CCA step1] Starting — memberId={self.memberId}, DOB={formatted_dob}, DateOfService={today_str}")
# Always navigate fresh to Landing to reset page state
print("[CCA step1] Navigating to eligibility page...")
self.driver.get(LANDING_URL)
# Wait for the page to fully load with the eligibility form
try:
WebDriverWait(self.driver, 15).until(
lambda d: "Verify Patient Eligibility" in d.find_element(By.TAG_NAME, "body").text
)
print("[CCA step1] Eligibility form loaded")
except TimeoutException:
print("[CCA step1] Eligibility form not found after 15s, checking page...")
body_text = self.driver.find_element(By.TAG_NAME, "body").text
print(f"[CCA step1] Page text (first 300): {body_text[:300]}")
time.sleep(1)
# Select "Subscriber ID and date of birth" radio
print("[CCA step1] Selecting 'Subscriber ID and date of birth' option...")
for sel in [
(By.XPATH, "//input[@type='radio' and contains(@id,'SubscriberId')]"),
(By.XPATH, "//input[@type='radio'][following-sibling::*[contains(text(),'Subscriber ID')]]"),
(By.XPATH, "//label[contains(text(),'Subscriber ID')]//input[@type='radio']"),
(By.XPATH, "(//input[@type='radio'])[1]"),
]:
try:
radio = self.driver.find_element(*sel)
if radio.is_displayed():
if not radio.is_selected():
radio.click()
print(f"[CCA step1] Selected radio via {sel}")
else:
print("[CCA step1] Radio already selected")
break
except Exception:
continue
# Enter Subscriber ID
print(f"[CCA step1] Entering Subscriber ID: {self.memberId}")
sub_id_entered = False
for sel in [
(By.ID, "SubscriberId"),
(By.NAME, "SubscriberId"),
(By.XPATH, "//input[contains(@id,'SubscriberId')]"),
(By.XPATH, "//label[contains(text(),'Subscriber ID')]/following::input[1]"),
]:
try:
field = self.driver.find_element(*sel)
if field.is_displayed():
field.click()
field.send_keys(Keys.CONTROL + "a")
field.send_keys(Keys.DELETE)
field.send_keys(self.memberId)
time.sleep(0.3)
print(f"[CCA step1] Subscriber ID entered: '{field.get_attribute('value')}'")
sub_id_entered = True
break
except Exception:
continue
if not sub_id_entered:
return "ERROR: Subscriber ID field not found"
# Enter Date of Birth
print(f"[CCA step1] Entering DOB: {formatted_dob}")
dob_entered = False
for sel in [
(By.ID, "DateOfBirth"),
(By.NAME, "DateOfBirth"),
(By.XPATH, "//input[contains(@id,'DateOfBirth') or contains(@id,'dob')]"),
(By.XPATH, "//label[contains(text(),'Date of Birth')]/following::input[1]"),
]:
try:
field = self.driver.find_element(*sel)
if field.is_displayed():
field.click()
field.send_keys(Keys.CONTROL + "a")
field.send_keys(Keys.DELETE)
field.send_keys(formatted_dob)
time.sleep(0.3)
print(f"[CCA step1] DOB entered: '{field.get_attribute('value')}'")
dob_entered = True
break
except Exception:
continue
if not dob_entered:
return "ERROR: Date of Birth field not found"
# Set Date of Service to today
print(f"[CCA step1] Setting Date of Service: {today_str}")
for sel in [
(By.ID, "DateOfService"),
(By.NAME, "DateOfService"),
(By.XPATH, "//input[contains(@id,'DateOfService')]"),
(By.XPATH, "//label[contains(text(),'Date of Service')]/following::input[1]"),
]:
try:
field = self.driver.find_element(*sel)
if field.is_displayed():
field.click()
field.send_keys(Keys.CONTROL + "a")
field.send_keys(Keys.DELETE)
field.send_keys(today_str)
time.sleep(0.3)
print(f"[CCA step1] Date of Service set: '{field.get_attribute('value')}'")
break
except Exception:
continue
# Click "Verify Eligibility"
print("[CCA step1] Clicking 'Verify Eligibility'...")
clicked = False
for sel in [
(By.XPATH, "//button[contains(text(),'Verify Eligibility')]"),
(By.XPATH, "//input[@value='Verify Eligibility']"),
(By.XPATH, "//a[contains(text(),'Verify Eligibility')]"),
(By.XPATH, "//*[@id='btnVerifyEligibility']"),
]:
try:
btn = self.driver.find_element(*sel)
if btn.is_displayed():
btn.click()
clicked = True
print(f"[CCA step1] Clicked Verify Eligibility via {sel}")
break
except Exception:
continue
if not clicked:
return "ERROR: Could not find 'Verify Eligibility' button"
# Wait for result using WebDriverWait instead of fixed sleep
print("[CCA step1] Waiting for eligibility result...")
try:
WebDriverWait(self.driver, 30).until(
lambda d: "Patient Selected" in d.find_element(By.TAG_NAME, "body").text
or "Patient Information" in d.find_element(By.TAG_NAME, "body").text
or "patient is eligible" in d.find_element(By.TAG_NAME, "body").text.lower()
or "not eligible" in d.find_element(By.TAG_NAME, "body").text.lower()
or "no results" in d.find_element(By.TAG_NAME, "body").text.lower()
or "not found" in d.find_element(By.TAG_NAME, "body").text.lower()
)
print("[CCA step1] Eligibility result appeared")
except TimeoutException:
print("[CCA step1] Timed out waiting for result, checking page...")
time.sleep(1)
# Check for errors
body_text = self.driver.find_element(By.TAG_NAME, "body").text
if "no results" in body_text.lower() or "not found" in body_text.lower() or "no patient" in body_text.lower():
return "ERROR: No patient found with the provided Subscriber ID and DOB"
# Check for error alerts
try:
alerts = self.driver.find_elements(By.XPATH,
"//*[@role='alert'] | //*[contains(@class,'alert-danger')]")
for alert in alerts:
if alert.is_displayed() and alert.text.strip():
return f"ERROR: {alert.text.strip()[:200]}"
except Exception:
pass
return "SUCCESS"
except Exception as e:
print(f"[CCA step1] Exception: {e}")
return f"ERROR: step1 failed: {e}"
def step2(self):
"""
Extract all patient information from the result popup,
capture the eligibility report PDF, and return everything.
"""
try:
print("[CCA step2] Extracting eligibility data...")
time.sleep(1)
patientName = ""
extractedDob = ""
foundMemberId = ""
eligibility = "Unknown"
address = ""
city = ""
zipCode = ""
insurerName = ""
body_text = self.driver.find_element(By.TAG_NAME, "body").text
print(f"[CCA step2] Page text (first 800): {body_text[:800]}")
# --- Eligibility status ---
if "patient is eligible" in body_text.lower():
eligibility = "Eligible"
elif "not eligible" in body_text.lower() or "ineligible" in body_text.lower():
eligibility = "Not Eligible"
# --- Patient name ---
for sel in [
(By.XPATH, "//*[contains(@class,'patient-name') or contains(@class,'PatientName')]"),
(By.XPATH, "//div[contains(@class,'modal')]//strong"),
(By.XPATH, "//div[contains(@class,'modal')]//b"),
(By.XPATH, "//*[contains(text(),'Patient Information')]/following::*[1]"),
]:
try:
el = self.driver.find_element(*sel)
name = el.text.strip()
if name and 2 < len(name) < 100:
patientName = name
print(f"[CCA step2] Patient name via DOM: {patientName}")
break
except Exception:
continue
if not patientName:
name_match = re.search(r'Patient Information\s*\n+\s*([A-Z][A-Za-z\s\-\']+)', body_text)
if name_match:
raw = name_match.group(1).strip().split('\n')[0].strip()
for stop in ['Subscriber', 'Address', 'Date', 'DOB', 'Member']:
if stop in raw:
raw = raw[:raw.index(stop)].strip()
patientName = raw
print(f"[CCA step2] Patient name via regex: {patientName}")
# --- Subscriber ID ---
sub_match = re.search(r'Subscriber\s*ID:?\s*(\d+)', body_text)
if sub_match:
foundMemberId = sub_match.group(1).strip()
print(f"[CCA step2] Subscriber ID: {foundMemberId}")
else:
foundMemberId = self.memberId
# --- Date of Birth ---
dob_match = re.search(r'Date\s*of\s*Birth:?\s*([\d/]+)', body_text)
if dob_match:
extractedDob = dob_match.group(1).strip()
print(f"[CCA step2] DOB: {extractedDob}")
else:
extractedDob = self._format_dob(self.dateOfBirth)
# --- Address, City, State, Zip ---
# The search results table shows: "YVONNE KADLIK\n107 HARTFORD AVE W\nMENDON, MA 01756"
# Try extracting from the result table row (name followed by address lines)
if patientName:
addr_block_match = re.search(
re.escape(patientName) + r'\s*\n\s*(.+?)\s*\n\s*([A-Z][A-Za-z\s]+),\s*([A-Z]{2})\s+(\d{5}(?:-?\d{4})?)',
body_text
)
if addr_block_match:
address = addr_block_match.group(1).strip()
city = addr_block_match.group(2).strip()
state = addr_block_match.group(3).strip()
zipCode = addr_block_match.group(4).strip()
address = f"{address}, {city}, {state} {zipCode}"
print(f"[CCA step2] Address: {address}, City: {city}, State: {state}, Zip: {zipCode}")
# Fallback: look for "Address: ..." in Patient Information section
if not address:
addr_match = re.search(
r'Patient Information.*?Address:?\s+(\d+.+?)(?:Date of Birth|DOB|\n\s*\n)',
body_text, re.DOTALL
)
if addr_match:
raw_addr = addr_match.group(1).strip().replace('\n', ', ')
address = raw_addr
print(f"[CCA step2] Address (from Patient Info): {address}")
if not city:
city_match = re.search(
r'([A-Z][A-Za-z]+),\s*([A-Z]{2})\s+(\d{5}(?:-?\d{4})?)',
address or body_text
)
if city_match:
city = city_match.group(1).strip()
zipCode = city_match.group(3).strip()
print(f"[CCA step2] City: {city}, Zip: {zipCode}")
# --- Insurance provider name ---
# Look for insurer name like "Commonwealth Care Alliance"
insurer_match = re.search(
r'(?:Commonwealth\s+Care\s+Alliance|'
r'Delta\s+Dental|'
r'Tufts\s+Health|'
r'MassHealth|'
r'United\s+Healthcare)',
body_text,
re.IGNORECASE
)
if insurer_match:
insurerName = insurer_match.group(0).strip()
print(f"[CCA step2] Insurer: {insurerName}")
# Also try generic pattern after "View Benefits" section
if not insurerName:
ins_match = re.search(
r'View Eligibility Report\s*\n+\s*(.+?)(?:\n|View Benefits)',
body_text
)
if ins_match:
candidate = ins_match.group(1).strip()
if 3 < len(candidate) < 80 and not candidate.startswith("Start"):
insurerName = candidate
print(f"[CCA step2] Insurer via context: {insurerName}")
# --- PDF capture ---
print("[CCA step2] Clicking 'View Eligibility Report'...")
pdfBase64 = ""
try:
existing_files = set(glob.glob(os.path.join(self.download_dir, "*")))
original_window = self.driver.current_window_handle
original_handles = set(self.driver.window_handles)
view_report_clicked = False
for sel in [
(By.XPATH, "//button[contains(text(),'View Eligibility Report')]"),
(By.XPATH, "//input[@value='View Eligibility Report']"),
(By.XPATH, "//a[contains(text(),'View Eligibility Report')]"),
(By.XPATH, "//*[contains(text(),'View Eligibility Report')]"),
]:
try:
btn = self.driver.find_element(*sel)
if btn.is_displayed():
btn.click()
view_report_clicked = True
print(f"[CCA step2] Clicked 'View Eligibility Report' via {sel}")
break
except Exception:
continue
if not view_report_clicked:
print("[CCA step2] 'View Eligibility Report' button not found")
raise Exception("View Eligibility Report button not found")
# Wait for download to start
time.sleep(3)
# Check for downloaded file (this site downloads rather than opens in-tab)
pdf_path = None
for i in range(15):
time.sleep(1)
current_files = set(glob.glob(os.path.join(self.download_dir, "*")))
new_files = current_files - existing_files
completed = [f for f in new_files
if not f.endswith(".crdownload") and not f.endswith(".tmp")]
if completed:
pdf_path = completed[0]
print(f"[CCA step2] PDF downloaded: {pdf_path}")
break
if pdf_path and os.path.exists(pdf_path):
with open(pdf_path, "rb") as f:
pdfBase64 = base64.b64encode(f.read()).decode()
print(f"[CCA step2] PDF from download: {os.path.basename(pdf_path)} "
f"({os.path.getsize(pdf_path)} bytes), b64 len={len(pdfBase64)}")
try:
os.remove(pdf_path)
except Exception:
pass
else:
# Fallback: check for new window
new_handles = set(self.driver.window_handles) - original_handles
if new_handles:
new_window = new_handles.pop()
self.driver.switch_to.window(new_window)
time.sleep(3)
print(f"[CCA step2] Switched to new window: {self.driver.current_url}")
try:
cdp_result = self.driver.execute_cdp_cmd("Page.printToPDF", {
"printBackground": True,
"preferCSSPageSize": True,
"scale": 0.8,
"paperWidth": 8.5,
"paperHeight": 11,
})
pdf_data = cdp_result.get("data", "")
if len(pdf_data) > 2000:
pdfBase64 = pdf_data
print(f"[CCA step2] PDF from new window, b64 len={len(pdfBase64)}")
except Exception as e:
print(f"[CCA step2] CDP in new window failed: {e}")
try:
self.driver.close()
self.driver.switch_to.window(original_window)
except Exception:
pass
# Final fallback: CDP on main page
if not pdfBase64 or len(pdfBase64) < 2000:
print("[CCA step2] Falling back to CDP PDF from main page...")
try:
try:
self.driver.switch_to.window(original_window)
except Exception:
pass
cdp_result = self.driver.execute_cdp_cmd("Page.printToPDF", {
"printBackground": True,
"preferCSSPageSize": True,
"scale": 0.7,
"paperWidth": 11,
"paperHeight": 17,
})
pdfBase64 = cdp_result.get("data", "")
print(f"[CCA step2] Main page CDP PDF, b64 len={len(pdfBase64)}")
except Exception as e2:
print(f"[CCA step2] Main page CDP failed: {e2}")
except Exception as e:
print(f"[CCA step2] PDF capture failed: {e}")
try:
cdp_result = self.driver.execute_cdp_cmd("Page.printToPDF", {
"printBackground": True,
"preferCSSPageSize": True,
"scale": 0.7,
"paperWidth": 11,
"paperHeight": 17,
})
pdfBase64 = cdp_result.get("data", "")
print(f"[CCA step2] CDP fallback PDF, b64 len={len(pdfBase64)}")
except Exception as e2:
print(f"[CCA step2] CDP fallback also failed: {e2}")
self._close_browser()
result = {
"status": "success",
"patientName": patientName,
"eligibility": eligibility,
"pdfBase64": pdfBase64,
"extractedDob": extractedDob,
"memberId": foundMemberId,
"address": address,
"city": city,
"zipCode": zipCode,
"insurerName": insurerName,
}
print(f"[CCA step2] Result: name={result['patientName']}, "
f"eligibility={result['eligibility']}, "
f"memberId={result['memberId']}, "
f"address={result['address']}, "
f"city={result['city']}, zip={result['zipCode']}, "
f"insurer={result['insurerName']}")
return result
except Exception as e:
print(f"[CCA step2] Exception: {e}")
self._close_browser()
return {
"status": "error",
"patientName": f"{self.firstName} {self.lastName}".strip(),
"eligibility": "Unknown",
"pdfBase64": "",
"extractedDob": self._format_dob(self.dateOfBirth),
"memberId": self.memberId,
"address": "",
"city": "",
"zipCode": "",
"insurerName": "",
"error": str(e),
}