Add United SCO eligibility feature and fix issues

- United SCO: Add complete eligibility check workflow with flexible input
  (supports either Member ID + DOB or First Name + Last Name + DOB)
- Tufts SCO (DentaQuest): Fix Date of Service validation by adding proper
  Tab key navigation between date fields
- Delta MA: Improve patient name extraction with more robust selectors
  and pattern matching for saving names to database
This commit is contained in:
2026-02-03 18:33:26 -05:00
parent 5370a0e445
commit e43329e95f
11 changed files with 2919 additions and 26 deletions

View File

@@ -10,10 +10,12 @@ import os
import time
import helpers_ddma_eligibility as hddma
import helpers_dentaquest_eligibility as hdentaquest
import helpers_unitedsco_eligibility as hunitedsco
# Import session clear functions for startup
from ddma_browser_manager import clear_ddma_session_on_startup
from dentaquest_browser_manager import clear_dentaquest_session_on_startup
from unitedsco_browser_manager import clear_unitedsco_session_on_startup
from dotenv import load_dotenv
load_dotenv()
@@ -25,6 +27,7 @@ print("SELENIUM AGENT STARTING - CLEARING ALL SESSIONS")
print("=" * 50)
clear_ddma_session_on_startup()
clear_dentaquest_session_on_startup()
clear_unitedsco_session_on_startup()
print("=" * 50)
print("SESSION CLEAR COMPLETE - FRESH LOGINS REQUIRED")
print("=" * 50)
@@ -275,6 +278,79 @@ async def dentaquest_session_status(sid: str):
return s
# Endpoint:7 - United SCO eligibility (background, OTP)
async def _unitedsco_worker_wrapper(sid: str, data: dict, url: str):
"""
Background worker that:
- acquires semaphore (to keep 1 selenium at a time),
- updates active/queued counters,
- runs the United SCO flow via helpers.start_unitedsco_run.
"""
global active_jobs, waiting_jobs
async with semaphore:
async with lock:
waiting_jobs -= 1
active_jobs += 1
try:
await hunitedsco.start_unitedsco_run(sid, data, url)
finally:
async with lock:
active_jobs -= 1
@app.post("/unitedsco-eligibility")
async def unitedsco_eligibility(request: Request):
"""
Starts a United SCO eligibility session in the background.
Body: { "data": { ... }, "url"?: string }
Returns: { status: "started", session_id: "<uuid>" }
"""
global waiting_jobs
body = await request.json()
data = body.get("data", {})
# create session
sid = hunitedsco.make_session_entry()
hunitedsco.sessions[sid]["type"] = "unitedsco_eligibility"
hunitedsco.sessions[sid]["last_activity"] = time.time()
async with lock:
waiting_jobs += 1
# run in background (queued under semaphore)
asyncio.create_task(_unitedsco_worker_wrapper(sid, data, url="https://app.dentalhub.com/app/login"))
return {"status": "started", "session_id": sid}
@app.post("/unitedsco-submit-otp")
async def unitedsco_submit_otp(request: Request):
"""
Body: { "session_id": "<sid>", "otp": "123456" }
Node / frontend call this when user provides OTP for United SCO.
"""
body = await request.json()
sid = body.get("session_id")
otp = body.get("otp")
if not sid or not otp:
raise HTTPException(status_code=400, detail="session_id and otp required")
res = hunitedsco.submit_otp(sid, otp)
if res.get("status") == "error":
raise HTTPException(status_code=400, detail=res.get("message"))
return res
@app.get("/unitedsco-session/{sid}/status")
async def unitedsco_session_status(sid: str):
s = hunitedsco.get_session_status(sid)
if s.get("status") == "not_found":
raise HTTPException(status_code=404, detail="session not found")
return s
@app.post("/submit-otp")
async def submit_otp(request: Request):
"""
@@ -311,6 +387,44 @@ async def get_status():
"status": "busy" if active_jobs > 0 or waiting_jobs > 0 else "idle"
}
# ✅ Clear session endpoints - called when credentials are deleted
@app.post("/clear-ddma-session")
async def clear_ddma_session():
"""
Clears the DDMA browser session. Called when DDMA credentials are deleted.
"""
try:
clear_ddma_session_on_startup()
return {"status": "success", "message": "DDMA session cleared"}
except Exception as e:
return {"status": "error", "message": str(e)}
@app.post("/clear-dentaquest-session")
async def clear_dentaquest_session():
"""
Clears the DentaQuest browser session. Called when DentaQuest credentials are deleted.
"""
try:
clear_dentaquest_session_on_startup()
return {"status": "success", "message": "DentaQuest session cleared"}
except Exception as e:
return {"status": "error", "message": str(e)}
@app.post("/clear-unitedsco-session")
async def clear_unitedsco_session():
"""
Clears the United SCO browser session. Called when United SCO credentials are deleted.
"""
try:
clear_unitedsco_session_on_startup()
return {"status": "success", "message": "United SCO session cleared"}
except Exception as e:
return {"status": "error", "message": str(e)}
if __name__ == "__main__":
host = os.getenv("HOST")
port = int(os.getenv("PORT"))

View File

@@ -0,0 +1,323 @@
import os
import time
import asyncio
from typing import Dict, Any
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import WebDriverException, TimeoutException
from selenium_UnitedSCO_eligibilityCheckWorker import AutomationUnitedSCOEligibilityCheck
# In-memory session store
sessions: Dict[str, Dict[str, Any]] = {}
SESSION_OTP_TIMEOUT = int(os.getenv("SESSION_OTP_TIMEOUT", "120")) # seconds
def make_session_entry() -> str:
"""Create a new session entry and return its ID."""
import uuid
sid = str(uuid.uuid4())
sessions[sid] = {
"status": "created", # created -> running -> waiting_for_otp -> otp_submitted -> completed / error
"created_at": time.time(),
"last_activity": time.time(),
"bot": None, # worker instance
"driver": None, # selenium webdriver
"otp_event": asyncio.Event(),
"otp_value": None,
"result": None,
"message": None,
"type": None,
}
return sid
async def cleanup_session(sid: str, message: str | None = None):
"""
Close driver (if any), wake OTP waiter, set final state, and remove session entry.
Idempotent: safe to call multiple times.
"""
s = sessions.get(sid)
if not s:
return
try:
# Ensure final state
try:
if s.get("status") not in ("completed", "error", "not_found"):
s["status"] = "error"
if message:
s["message"] = message
except Exception:
pass
# Wake any OTP waiter (so awaiting coroutines don't hang)
try:
ev = s.get("otp_event")
if ev and not ev.is_set():
ev.set()
except Exception:
pass
# NOTE: Do NOT quit driver - keep browser alive for next patient
# Browser manager handles the persistent browser instance
finally:
# Remove session entry from map
sessions.pop(sid, None)
async def _remove_session_later(sid: str, delay: int = 20):
await asyncio.sleep(delay)
await cleanup_session(sid)
async def start_unitedsco_run(sid: str, data: dict, url: str):
"""
Run the United SCO workflow for a session (WITHOUT managing semaphore/counters).
Called by agent.py inside a wrapper that handles queue/counters.
"""
s = sessions.get(sid)
if not s:
return {"status": "error", "message": "session not found"}
s["status"] = "running"
s["last_activity"] = time.time()
try:
bot = AutomationUnitedSCOEligibilityCheck({"data": data})
bot.config_driver()
s["bot"] = bot
s["driver"] = bot.driver
s["last_activity"] = time.time()
# Navigate to login URL
try:
if not url:
raise ValueError("URL not provided for United SCO run")
bot.driver.maximize_window()
bot.driver.get(url)
await asyncio.sleep(1)
except Exception as e:
s["status"] = "error"
s["message"] = f"Navigation failed: {e}"
await cleanup_session(sid)
return {"status": "error", "message": s["message"]}
# Login
try:
login_result = bot.login(url)
except WebDriverException as wde:
s["status"] = "error"
s["message"] = f"Selenium driver error during login: {wde}"
await cleanup_session(sid, s["message"])
return {"status": "error", "message": s["message"]}
except Exception as e:
s["status"] = "error"
s["message"] = f"Unexpected error during login: {e}"
await cleanup_session(sid, s["message"])
return {"status": "error", "message": s["message"]}
# Already logged in - session persisted from profile, skip to step1
if isinstance(login_result, str) and login_result == "ALREADY_LOGGED_IN":
s["status"] = "running"
s["message"] = "Session persisted"
print("[start_unitedsco_run] Session persisted - skipping OTP")
# Continue to step1 below
# OTP required path - POLL THE BROWSER to detect when user enters OTP
elif isinstance(login_result, str) and login_result == "OTP_REQUIRED":
s["status"] = "waiting_for_otp"
s["message"] = "OTP required for login - please enter OTP in browser"
s["last_activity"] = time.time()
driver = s["driver"]
# Poll the browser to detect when OTP is completed (user enters it directly)
# We check every 1 second for up to SESSION_OTP_TIMEOUT seconds (faster response)
max_polls = SESSION_OTP_TIMEOUT
login_success = False
print(f"[UnitedSCO OTP] Waiting for user to enter OTP (polling browser for {SESSION_OTP_TIMEOUT}s)...")
for poll in range(max_polls):
await asyncio.sleep(1)
s["last_activity"] = time.time()
try:
# Check if OTP was submitted via API (from app)
otp_value = s.get("otp_value")
if otp_value:
print(f"[UnitedSCO OTP] OTP received from app: {otp_value}")
try:
otp_input = driver.find_element(By.XPATH,
"//input[contains(@name,'otp') or contains(@name,'code') or @type='tel' or contains(@aria-label,'Verification') or contains(@placeholder,'code')]"
)
otp_input.clear()
otp_input.send_keys(otp_value)
# Click verify button - use same pattern as Delta MA
try:
verify_btn = driver.find_element(By.XPATH, "//button[@type='button' and @aria-label='Verify']")
verify_btn.click()
print("[UnitedSCO OTP] Clicked verify button (aria-label)")
except:
try:
# Fallback: try other button patterns
verify_btn = driver.find_element(By.XPATH, "//button[contains(text(),'Verify') or contains(text(),'Submit') or @type='submit']")
verify_btn.click()
print("[UnitedSCO OTP] Clicked verify button (text/type)")
except:
otp_input.send_keys("\n") # Press Enter as fallback
print("[UnitedSCO OTP] Pressed Enter as fallback")
print("[UnitedSCO OTP] OTP typed and submitted via app")
s["otp_value"] = None # Clear so we don't submit again
await asyncio.sleep(3) # Wait for verification
except Exception as type_err:
print(f"[UnitedSCO OTP] Failed to type OTP from app: {type_err}")
# Check current URL - if we're on dashboard/member page, login succeeded
current_url = driver.current_url.lower()
print(f"[UnitedSCO OTP Poll {poll+1}/{max_polls}] URL: {current_url[:60]}...")
# Check if we've navigated away from login/OTP pages
if "member" in current_url or "dashboard" in current_url or "eligibility" in current_url or "home" in current_url:
# Verify by checking for member search input or dashboard element
try:
# Try multiple selectors for logged-in state
dashboard_elem = WebDriverWait(driver, 5).until(
EC.presence_of_element_located((By.XPATH,
'//input[@placeholder="Search by member ID"] | //input[contains(@placeholder,"Search")] | //*[contains(@class,"dashboard")]'
))
)
print("[UnitedSCO OTP] Dashboard/search element found - login successful!")
login_success = True
break
except TimeoutException:
print("[UnitedSCO OTP] On member page but search input not found, continuing to poll...")
# Also check if OTP input is still visible
try:
otp_input = driver.find_element(By.XPATH,
"//input[contains(@name,'otp') or contains(@name,'code') or @type='tel' or contains(@aria-label,'Verification') or contains(@placeholder,'code') or contains(@placeholder,'Code')]"
)
# OTP input still visible - user hasn't entered OTP yet
print(f"[UnitedSCO OTP Poll {poll+1}] OTP input still visible - waiting...")
except:
# OTP input not found - might mean login is in progress or succeeded
# Try navigating to dashboard
if "login" in current_url or "app/login" in current_url:
print("[UnitedSCO OTP] OTP input gone, trying to navigate to dashboard...")
try:
driver.get("https://app.dentalhub.com/app/dashboard")
await asyncio.sleep(2)
except:
pass
except Exception as poll_err:
print(f"[UnitedSCO OTP Poll {poll+1}] Error: {poll_err}")
if not login_success:
# Final attempt - navigate to dashboard and check
try:
print("[UnitedSCO OTP] Final attempt - navigating to dashboard...")
driver.get("https://app.dentalhub.com/app/dashboard")
await asyncio.sleep(3)
dashboard_elem = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.XPATH,
'//input[@placeholder="Search by member ID"] | //input[contains(@placeholder,"Search")] | //*[contains(@class,"dashboard")]'
))
)
print("[UnitedSCO OTP] Dashboard element found - login successful!")
login_success = True
except TimeoutException:
s["status"] = "error"
s["message"] = "OTP timeout - login not completed"
await cleanup_session(sid)
return {"status": "error", "message": "OTP not completed in time"}
except Exception as final_err:
s["status"] = "error"
s["message"] = f"OTP verification failed: {final_err}"
await cleanup_session(sid)
return {"status": "error", "message": s["message"]}
if login_success:
s["status"] = "running"
s["message"] = "Login successful after OTP"
print("[UnitedSCO OTP] Proceeding to step1...")
elif isinstance(login_result, str) and login_result.startswith("ERROR"):
s["status"] = "error"
s["message"] = login_result
await cleanup_session(sid)
return {"status": "error", "message": login_result}
# Login succeeded without OTP (SUCCESS)
elif isinstance(login_result, str) and login_result == "SUCCESS":
print("[start_unitedsco_run] Login succeeded without OTP")
s["status"] = "running"
s["message"] = "Login succeeded"
# Continue to step1 below
# Step 1
step1_result = bot.step1()
if isinstance(step1_result, str) and step1_result.startswith("ERROR"):
s["status"] = "error"
s["message"] = step1_result
await cleanup_session(sid)
return {"status": "error", "message": step1_result}
# Step 2 (PDF)
step2_result = bot.step2()
if isinstance(step2_result, dict) and step2_result.get("status") == "success":
s["status"] = "completed"
s["result"] = step2_result
s["message"] = "completed"
asyncio.create_task(_remove_session_later(sid, 30))
return step2_result
else:
s["status"] = "error"
if isinstance(step2_result, dict):
s["message"] = step2_result.get("message", "unknown error")
else:
s["message"] = str(step2_result)
await cleanup_session(sid)
return {"status": "error", "message": s["message"]}
except Exception as e:
s["status"] = "error"
s["message"] = f"worker exception: {e}"
await cleanup_session(sid)
return {"status": "error", "message": s["message"]}
def submit_otp(sid: str, otp: str) -> Dict[str, Any]:
"""Set OTP for a session and wake waiting runner."""
s = sessions.get(sid)
if not s:
return {"status": "error", "message": "session not found"}
if s.get("status") != "waiting_for_otp":
return {"status": "error", "message": f"session not waiting for otp (state={s.get('status')})"}
s["otp_value"] = otp
s["last_activity"] = time.time()
try:
s["otp_event"].set()
except Exception:
pass
return {"status": "ok", "message": "otp accepted"}
def get_session_status(sid: str) -> Dict[str, Any]:
s = sessions.get(sid)
if not s:
return {"status": "not_found"}
return {
"session_id": sid,
"status": s.get("status"),
"message": s.get("message"),
"created_at": s.get("created_at"),
"last_activity": s.get("last_activity"),
"result": s.get("result") if s.get("status") == "completed" else None,
}

View File

@@ -391,8 +391,8 @@ class AutomationDeltaDentalMAEligibilityCheck:
except:
pass
# 2) Click on patient name to navigate to detailed patient page
print("[DDMA step2] Clicking on patient name to open detailed page...")
# 2) Extract patient name and click to navigate to detailed patient page
print("[DDMA step2] Extracting patient name and finding detail link...")
patient_name_clicked = False
patientName = ""
@@ -400,6 +400,29 @@ class AutomationDeltaDentalMAEligibilityCheck:
current_url_before = self.driver.current_url
print(f"[DDMA step2] Current URL before click: {current_url_before}")
# Try to extract patient name from the first row of search results
# This is more reliable than extracting from link text
name_extraction_selectors = [
"(//tbody//tr)[1]//td[1]", # First column of first row (usually name)
"(//table//tbody//tr)[1]//td[1]", # Alternative table structure
"//table//tr[2]//td[1]", # Skip header row
"(//tbody//tr)[1]//td[contains(@class,'name')]", # Name column by class
"(//tbody//tr)[1]//a", # Link in first row (might contain name)
]
for selector in name_extraction_selectors:
try:
elem = self.driver.find_element(By.XPATH, selector)
text = elem.text.strip()
# Filter out non-name text
if text and len(text) > 1 and len(text) < 100:
if not any(x in text.lower() for x in ['active', 'inactive', 'eligible', 'search', 'date', 'print', 'view', 'details', 'status']):
patientName = text
print(f"[DDMA step2] Extracted patient name from search results: '{patientName}'")
break
except Exception:
continue
# Try to find all links in the first row and print them for debugging
try:
all_links = self.driver.find_elements(By.XPATH, "(//tbody//tr)[1]//a")
@@ -408,6 +431,11 @@ class AutomationDeltaDentalMAEligibilityCheck:
href = link.get_attribute("href") or "no-href"
text = link.text.strip() or "(empty text)"
print(f" Link {i}: href={href[:80]}..., text={text}")
# Also try to get name from link if we haven't found it yet
if not patientName and text and len(text) > 1:
if not any(x in text.lower() for x in ['active', 'inactive', 'eligible', 'search', 'view', 'details']):
patientName = text
print(f"[DDMA step2] Got patient name from link text: '{patientName}'")
except Exception as e:
print(f"[DDMA step2] Error listing links: {e}")
@@ -424,9 +452,14 @@ class AutomationDeltaDentalMAEligibilityCheck:
patient_link = WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.XPATH, selector))
)
patientName = patient_link.text.strip()
link_text = patient_link.text.strip()
href = patient_link.get_attribute("href")
print(f"[DDMA step2] Found patient link: text='{patientName}', href={href}")
print(f"[DDMA step2] Found patient link: text='{link_text}', href={href}")
# Use link text as name if we don't have one yet
if not patientName and link_text and len(link_text) > 1:
if not any(x in link_text.lower() for x in ['active', 'inactive', 'view', 'details']):
patientName = link_text
if href and "member-details" in href:
detail_url = href
@@ -507,30 +540,70 @@ class AutomationDeltaDentalMAEligibilityCheck:
# Try to extract patient name from detailed page if not already found
if not patientName:
detail_name_selectors = [
"//h1",
"//h2",
"//*[contains(@class,'patient-name') or contains(@class,'member-name')]",
"//div[contains(@class,'header')]//span",
"//*[contains(@class,'member-name')]",
"//*[contains(@class,'patient-name')]",
"//h1[not(contains(@class,'page-title'))]",
"//h2[not(contains(@class,'section-title'))]",
"//div[contains(@class,'header')]//span[string-length(text()) > 2]",
"//div[contains(@class,'member-info')]//span",
"//div[contains(@class,'patient-info')]//span",
"//span[contains(@class,'name')]",
]
for selector in detail_name_selectors:
try:
name_elem = self.driver.find_element(By.XPATH, selector)
name_text = name_elem.text.strip()
if name_text and len(name_text) > 1:
if not any(x in name_text.lower() for x in ['active', 'inactive', 'eligible', 'search', 'date', 'print']):
if name_text and len(name_text) > 2 and len(name_text) < 100:
# Filter out common non-name text
skip_words = ['active', 'inactive', 'eligible', 'search', 'date', 'print',
'view', 'details', 'member', 'patient', 'status', 'eligibility',
'welcome', 'home', 'logout', 'menu', 'close', 'expand']
if not any(x in name_text.lower() for x in skip_words):
patientName = name_text
print(f"[DDMA step2] Found patient name on detail page: {patientName}")
break
except:
continue
# As a last resort, try to find name in page text using patterns
if not patientName:
try:
# Look for text that looks like a name (First Last format)
import re
page_text = self.driver.find_element(By.TAG_NAME, "body").text
# Look for "Member Name:" or "Patient Name:" followed by text
name_patterns = [
r'Member Name[:\s]+([A-Z][a-z]+\s+[A-Z][a-z]+)',
r'Patient Name[:\s]+([A-Z][a-z]+\s+[A-Z][a-z]+)',
r'Name[:\s]+([A-Z][a-z]+\s+[A-Z][a-z]+)',
]
for pattern in name_patterns:
match = re.search(pattern, page_text, re.IGNORECASE)
if match:
patientName = match.group(1).strip()
print(f"[DDMA step2] Found patient name via pattern match: {patientName}")
break
except:
pass
else:
print("[DDMA step2] Warning: Could not click on patient, capturing search results page")
# Still try to get patient name from search results
try:
name_elem = self.driver.find_element(By.XPATH, "(//tbody//tr)[1]//td[1]")
patientName = name_elem.text.strip()
except:
pass
if not patientName:
name_selectors = [
"(//tbody//tr)[1]//td[1]", # First column of first row
"(//table//tbody//tr)[1]//td[1]",
"(//tbody//tr)[1]//a", # Link in first row
]
for selector in name_selectors:
try:
name_elem = self.driver.find_element(By.XPATH, selector)
text = name_elem.text.strip()
if text and len(text) > 1 and not any(x in text.lower() for x in ['active', 'inactive', 'view', 'details']):
patientName = text
print(f"[DDMA step2] Got patient name from search results: {patientName}")
break
except:
continue
if not patientName:
print("[DDMA step2] Could not extract patient name")

View File

@@ -285,16 +285,34 @@ class AutomationDentaQuestEligibilityCheck:
def replace_with_sendkeys(el, value):
el.click()
time.sleep(0.05)
time.sleep(0.1)
# Clear existing content
el.send_keys(Keys.CONTROL, "a")
time.sleep(0.05)
el.send_keys(Keys.BACKSPACE)
time.sleep(0.05)
# Type new value
el.send_keys(value)
time.sleep(0.1)
# Fill month
replace_with_sendkeys(month_elem, month_val)
# Tab to day field
month_elem.send_keys(Keys.TAB)
time.sleep(0.1)
# Fill day
replace_with_sendkeys(day_elem, day_val)
# Tab to year field
day_elem.send_keys(Keys.TAB)
time.sleep(0.1)
# Fill year
replace_with_sendkeys(year_elem, year_val)
# Tab out of the field to trigger validation
year_elem.send_keys(Keys.TAB)
time.sleep(0.2)
print(f"[DentaQuest step1] Filled {field_name}: {month_val}/{day_val}/{year_val}")
return True
except Exception as e:
@@ -303,11 +321,11 @@ class AutomationDentaQuestEligibilityCheck:
# 1. Fill Date of Service with TODAY's date using specific data-testid
fill_date_by_testid("member-search_date-of-service", service_month, service_day, service_year, "Date of Service")
time.sleep(0.3)
time.sleep(0.5)
# 2. Fill Date of Birth with patient's DOB using specific data-testid
fill_date_by_testid("member-search_date-of-birth", dob_month, dob_day, dob_year, "Date of Birth")
time.sleep(0.3)
time.sleep(0.5)
# 3. Fill Member ID
member_id_input = wait.until(EC.presence_of_element_located(
@@ -406,6 +424,25 @@ class AutomationDentaQuestEligibilityCheck:
current_url_before = self.driver.current_url
print(f"[DentaQuest step2] Current URL before: {current_url_before}")
# Try to extract patient name from search results first
name_extraction_selectors = [
"(//tbody//tr)[1]//td[1]", # First column of first row
"(//table//tbody//tr)[1]//td[1]",
"//table//tr[2]//td[1]", # Skip header row
"(//tbody//tr)[1]//a", # Link in first row
]
for selector in name_extraction_selectors:
try:
elem = self.driver.find_element(By.XPATH, selector)
text = elem.text.strip()
if text and len(text) > 1 and len(text) < 100:
if not any(x in text.lower() for x in ['active', 'inactive', 'eligible', 'search', 'view', 'details', 'status']):
patientName = text
print(f"[DentaQuest step2] Extracted patient name from search results: '{patientName}'")
break
except:
continue
# Find all links in first row and log them
try:
all_links = self.driver.find_elements(By.XPATH, "(//tbody//tr)[1]//a")

View File

@@ -0,0 +1,637 @@
from selenium import webdriver
from selenium.common.exceptions import WebDriverException, TimeoutException
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
import time
import os
import base64
from unitedsco_browser_manager import get_browser_manager
class AutomationUnitedSCOEligibilityCheck:
def __init__(self, data):
self.headless = False
self.driver = None
self.data = data.get("data", {}) if isinstance(data, dict) else {}
# Flatten values for convenience
self.memberId = self.data.get("memberId", "")
self.dateOfBirth = self.data.get("dateOfBirth", "")
self.firstName = self.data.get("firstName", "")
self.lastName = self.data.get("lastName", "")
self.unitedsco_username = self.data.get("unitedscoUsername", "")
self.unitedsco_password = self.data.get("unitedscoPassword", "")
# Use browser manager's download dir
self.download_dir = get_browser_manager().download_dir
os.makedirs(self.download_dir, exist_ok=True)
def config_driver(self):
# Use persistent browser from manager (keeps device trust tokens)
self.driver = get_browser_manager().get_driver(self.headless)
def _force_logout(self):
"""Force logout by clearing cookies for United SCO domain."""
try:
print("[UnitedSCO login] Forcing logout due to credential change...")
browser_manager = get_browser_manager()
# First try to click logout button if visible
try:
self.driver.get("https://app.dentalhub.com/app/dashboard")
time.sleep(2)
logout_selectors = [
"//button[contains(text(), 'Log out') or contains(text(), 'Logout') or contains(text(), 'Sign out')]",
"//a[contains(text(), 'Log out') or contains(text(), 'Logout') or contains(text(), 'Sign out')]",
"//button[@aria-label='Log out' or @aria-label='Logout' or @aria-label='Sign out']",
"//*[contains(@class, 'logout') or contains(@class, 'signout')]"
]
for selector in logout_selectors:
try:
logout_btn = WebDriverWait(self.driver, 3).until(
EC.element_to_be_clickable((By.XPATH, selector))
)
logout_btn.click()
print("[UnitedSCO login] Clicked logout button")
time.sleep(2)
break
except TimeoutException:
continue
except Exception as e:
print(f"[UnitedSCO login] Could not click logout button: {e}")
# Clear cookies as backup
try:
self.driver.delete_all_cookies()
print("[UnitedSCO login] Cleared all cookies")
except Exception as e:
print(f"[UnitedSCO login] Error clearing cookies: {e}")
browser_manager.clear_credentials_hash()
print("[UnitedSCO login] Logout complete")
return True
except Exception as e:
print(f"[UnitedSCO login] Error during forced logout: {e}")
return False
def login(self, url):
wait = WebDriverWait(self.driver, 30)
browser_manager = get_browser_manager()
try:
# Check if credentials have changed - if so, force logout first
if self.unitedsco_username and browser_manager.credentials_changed(self.unitedsco_username):
self._force_logout()
self.driver.get(url)
time.sleep(2)
# First check if we're already on a logged-in page (from previous run)
try:
current_url = self.driver.current_url
print(f"[UnitedSCO login] Current URL: {current_url}")
# Check if we're already on dentalhub dashboard (not the login page)
if "app.dentalhub.com" in current_url and "login" not in current_url.lower():
try:
# Look for dashboard element or member search
dashboard_elem = WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located((By.XPATH,
'//input[contains(@placeholder,"Search")] | //*[contains(@class,"dashboard")] | '
'//a[contains(@href,"member")] | //nav'))
)
print("[UnitedSCO login] Already logged in - on dashboard")
return "ALREADY_LOGGED_IN"
except TimeoutException:
pass
except Exception as e:
print(f"[UnitedSCO login] Error checking current state: {e}")
# Navigate to login URL
self.driver.get(url)
time.sleep(3)
current_url = self.driver.current_url
print(f"[UnitedSCO login] After navigation URL: {current_url}")
# If already on dentalhub dashboard (not login page), we're logged in
if "app.dentalhub.com" in current_url and "login" not in current_url.lower():
print("[UnitedSCO login] Already on dashboard")
return "ALREADY_LOGGED_IN"
# Check for OTP input first (in case we're on B2C OTP page)
try:
otp_input = WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located((By.XPATH,
"//input[@type='tel' or contains(@placeholder,'code') or contains(@aria-label,'Verification')]"))
)
print("[UnitedSCO login] OTP input found")
return "OTP_REQUIRED"
except TimeoutException:
pass
# Step 1: Click the LOGIN button on the initial dentalhub page
# This redirects to Azure B2C login
if "app.dentalhub.com" in current_url:
try:
login_btn = WebDriverWait(self.driver, 5).until(
EC.element_to_be_clickable((By.XPATH,
"//button[contains(text(),'LOGIN') or contains(text(),'Log In') or contains(text(),'Login')]"))
)
login_btn.click()
print("[UnitedSCO login] Clicked LOGIN button on dentalhub.com")
time.sleep(5) # Wait for redirect to B2C login page
except TimeoutException:
print("[UnitedSCO login] No LOGIN button found on dentalhub page, proceeding...")
# Now we should be on the Azure B2C login page (dentalhubauth.b2clogin.com)
current_url = self.driver.current_url
print(f"[UnitedSCO login] After LOGIN click URL: {current_url}")
# Step 2: Fill in credentials on B2C login page
if "b2clogin.com" in current_url or "login" in current_url.lower():
print("[UnitedSCO login] On B2C login page - filling credentials")
try:
# Find email field by id="signInName" (Azure B2C specific)
email_field = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH,
"//input[@id='signInName' or @name='signInName' or @name='Email address' or @type='email']"))
)
email_field.clear()
email_field.send_keys(self.unitedsco_username)
print(f"[UnitedSCO login] Entered username: {self.unitedsco_username}")
# Find password field by id="password"
password_field = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.XPATH,
"//input[@id='password' or @type='password']"))
)
password_field.clear()
password_field.send_keys(self.unitedsco_password)
print("[UnitedSCO login] Entered password")
# Click "Sign in" button (id="next" on B2C page)
signin_button = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH,
"//button[@id='next'] | //button[@type='submit' and contains(text(),'Sign')]"))
)
signin_button.click()
print("[UnitedSCO login] Clicked Sign in button")
# Save credentials hash after login attempt
if self.unitedsco_username:
browser_manager.save_credentials_hash(self.unitedsco_username)
time.sleep(5) # Wait for login to process
# Check for OTP input after login
try:
otp_input = WebDriverWait(self.driver, 15).until(
EC.presence_of_element_located((By.XPATH,
"//input[@type='tel' or contains(@placeholder,'code') or contains(@placeholder,'Code') or "
"contains(@aria-label,'Verification') or contains(@aria-label,'verification') or "
"contains(@aria-label,'Code') or contains(@aria-label,'code') or "
"contains(@placeholder,'verification') or contains(@placeholder,'Verification') or "
"contains(@name,'otp') or contains(@name,'code') or contains(@id,'otp') or contains(@id,'code')]"
))
)
print("[UnitedSCO login] OTP input detected -> OTP_REQUIRED")
return "OTP_REQUIRED"
except TimeoutException:
print("[UnitedSCO login] No OTP input detected")
# Check if login succeeded (redirected back to dentalhub dashboard)
current_url_after_login = self.driver.current_url.lower()
print(f"[UnitedSCO login] After login URL: {current_url_after_login}")
if "app.dentalhub.com" in current_url_after_login and "login" not in current_url_after_login:
print("[UnitedSCO login] Login successful - redirected to dashboard")
return "SUCCESS"
# Check for error messages on B2C page
try:
error_elem = self.driver.find_element(By.XPATH,
"//*[contains(@class,'error') or contains(@class,'alert')]")
error_text = error_elem.text
if error_text:
print(f"[UnitedSCO login] Error on page: {error_text}")
return f"ERROR: {error_text}"
except:
pass
# Still on B2C page - might need OTP or login failed
if "b2clogin.com" in current_url_after_login:
print("[UnitedSCO login] Still on B2C page - checking for OTP or error")
# Give it more time for OTP
try:
otp_input = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.XPATH,
"//input[@type='tel' or contains(@id,'code') or contains(@name,'code')]"))
)
print("[UnitedSCO login] OTP input found on second check")
return "OTP_REQUIRED"
except TimeoutException:
return "ERROR: Login failed - still on B2C page"
except TimeoutException as te:
print(f"[UnitedSCO login] Login form elements not found: {te}")
return "ERROR: Login form not found"
except Exception as form_err:
print(f"[UnitedSCO login] Error filling form: {form_err}")
return f"ERROR: {form_err}"
# If we got here without going through login, we're already logged in
return "SUCCESS"
except Exception as e:
print(f"[UnitedSCO login] Exception: {e}")
return f"ERROR:LOGIN FAILED: {e}"
def _format_dob(self, dob_str):
"""Convert DOB from YYYY-MM-DD to MM/DD/YYYY format"""
if dob_str and "-" in dob_str:
dob_parts = dob_str.split("-")
if len(dob_parts) == 3:
# YYYY-MM-DD -> MM/DD/YYYY
return f"{dob_parts[1]}/{dob_parts[2]}/{dob_parts[0]}"
return dob_str
def step1(self):
"""
Navigate to Eligibility page and fill the Patient Information form.
FLEXIBLE INPUT SUPPORT:
- If Member ID is provided: Fill Subscriber ID + DOB (+ optional First/Last Name)
- If no Member ID but First/Last Name provided: Fill First Name + Last Name + DOB
Workflow:
1. Navigate directly to eligibility page
2. Fill available fields based on input
3. Select Payer: "UnitedHealthcare Massachusetts" from ng-select dropdown
4. Click Continue
5. Handle Practitioner & Location page - click paymentGroupId dropdown, select Summit Dental Care
6. Click Continue again
"""
from selenium.webdriver.common.action_chains import ActionChains
try:
# Determine which input mode to use
has_member_id = bool(self.memberId and self.memberId.strip())
has_name = bool(self.firstName and self.firstName.strip() and self.lastName and self.lastName.strip())
if has_member_id:
print(f"[UnitedSCO step1] Using Member ID mode: ID={self.memberId}, DOB={self.dateOfBirth}")
elif has_name:
print(f"[UnitedSCO step1] Using Name mode: {self.firstName} {self.lastName}, DOB={self.dateOfBirth}")
else:
print("[UnitedSCO step1] ERROR: Need either Member ID or First Name + Last Name")
return "ERROR: Missing required input (Member ID or Name)"
# Navigate directly to eligibility page
print("[UnitedSCO step1] Navigating to eligibility page...")
self.driver.get("https://app.dentalhub.com/app/patient/eligibility")
time.sleep(3)
current_url = self.driver.current_url
print(f"[UnitedSCO step1] Current URL: {current_url}")
# Step 1.1: Fill the Patient Information form
print("[UnitedSCO step1] Filling Patient Information form...")
# Wait for form to load
try:
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.ID, "subscriberId_Front"))
)
print("[UnitedSCO step1] Patient Information form loaded")
except TimeoutException:
print("[UnitedSCO step1] Patient Information form not found")
return "ERROR: Patient Information form not found"
# Fill Subscriber ID if provided (id='subscriberId_Front')
if has_member_id:
try:
subscriber_id_input = self.driver.find_element(By.ID, "subscriberId_Front")
subscriber_id_input.clear()
subscriber_id_input.send_keys(self.memberId)
print(f"[UnitedSCO step1] Entered Subscriber ID: {self.memberId}")
except Exception as e:
print(f"[UnitedSCO step1] Error entering Subscriber ID: {e}")
# Fill First Name if provided (id='firstName_Back')
if self.firstName and self.firstName.strip():
try:
first_name_input = self.driver.find_element(By.ID, "firstName_Back")
first_name_input.clear()
first_name_input.send_keys(self.firstName)
print(f"[UnitedSCO step1] Entered First Name: {self.firstName}")
except Exception as e:
print(f"[UnitedSCO step1] Error entering First Name: {e}")
if not has_member_id: # Only fail if we're relying on name
return "ERROR: Could not enter First Name"
# Fill Last Name if provided (id='lastName_Back')
if self.lastName and self.lastName.strip():
try:
last_name_input = self.driver.find_element(By.ID, "lastName_Back")
last_name_input.clear()
last_name_input.send_keys(self.lastName)
print(f"[UnitedSCO step1] Entered Last Name: {self.lastName}")
except Exception as e:
print(f"[UnitedSCO step1] Error entering Last Name: {e}")
if not has_member_id: # Only fail if we're relying on name
return "ERROR: Could not enter Last Name"
# Fill Date of Birth (id='dateOfBirth_Back', format: MM/DD/YYYY) - always required
try:
dob_input = self.driver.find_element(By.ID, "dateOfBirth_Back")
dob_input.clear()
dob_formatted = self._format_dob(self.dateOfBirth)
dob_input.send_keys(dob_formatted)
print(f"[UnitedSCO step1] Entered DOB: {dob_formatted}")
except Exception as e:
print(f"[UnitedSCO step1] Error entering DOB: {e}")
return "ERROR: Could not enter Date of Birth"
time.sleep(1)
# Step 1.2: Select Payer - UnitedHealthcare Massachusetts
print("[UnitedSCO step1] Selecting Payer...")
try:
# Click the Payer ng-select dropdown
payer_ng_select = self.driver.find_element(By.XPATH,
"//label[contains(text(),'Payer')]/following-sibling::ng-select"
)
payer_ng_select.click()
time.sleep(1)
# Find and click "UnitedHealthcare Massachusetts" option
payer_options = self.driver.find_elements(By.XPATH,
"//ng-dropdown-panel//div[contains(@class,'ng-option')]"
)
for opt in payer_options:
if "UnitedHealthcare Massachusetts" in opt.text:
opt.click()
print("[UnitedSCO step1] Selected Payer: UnitedHealthcare Massachusetts")
break
# Press Escape to close any dropdown
ActionChains(self.driver).send_keys(Keys.ESCAPE).perform()
time.sleep(1)
except Exception as e:
print(f"[UnitedSCO step1] Error selecting Payer: {e}")
# Try to continue anyway - payer might be pre-selected
ActionChains(self.driver).send_keys(Keys.ESCAPE).perform()
# Step 1.3: Click Continue button (Step 1 - Patient Info)
try:
continue_btn = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH, "//button[contains(text(),'Continue')]"))
)
continue_btn.click()
print("[UnitedSCO step1] Clicked Continue button (Patient Info)")
time.sleep(4)
except Exception as e:
print(f"[UnitedSCO step1] Error clicking Continue: {e}")
return "ERROR: Could not click Continue button"
# Step 1.4: Handle Practitioner & Location page
print("[UnitedSCO step1] Handling Practitioner & Location page...")
try:
# Click Practitioner Taxonomy dropdown (id='paymentGroupId')
taxonomy_input = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.ID, "paymentGroupId"))
)
taxonomy_input.click()
print("[UnitedSCO step1] Clicked Practitioner Taxonomy dropdown")
time.sleep(1)
# Select "Summit Dental Care" option
summit_option = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH,
"//ng-dropdown-panel//div[contains(@class,'ng-option') and contains(.,'Summit Dental Care')]"
))
)
summit_option.click()
print("[UnitedSCO step1] Selected: Summit Dental Care")
# Press Escape to close dropdown
ActionChains(self.driver).send_keys(Keys.ESCAPE).perform()
time.sleep(1)
except TimeoutException:
print("[UnitedSCO step1] Practitioner Taxonomy not found or already selected")
except Exception as e:
print(f"[UnitedSCO step1] Practitioner Taxonomy handling: {e}")
# Step 1.5: Click Continue button (Step 2 - Practitioner)
try:
continue_btn2 = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH, "//button[contains(text(),'Continue')]"))
)
continue_btn2.click()
print("[UnitedSCO step1] Clicked Continue button (Practitioner)")
time.sleep(5)
except Exception as e:
print(f"[UnitedSCO step1] Error clicking Continue on Practitioner page: {e}")
# Check for errors
try:
error_selectors = [
"//*[contains(text(),'No results')]",
"//*[contains(text(),'not found')]",
"//*[contains(text(),'Invalid')]",
]
for sel in error_selectors:
try:
error_elem = self.driver.find_element(By.XPATH, sel)
if error_elem and error_elem.is_displayed():
error_text = error_elem.text
print(f"[UnitedSCO step1] Error found: {error_text}")
return f"ERROR: {error_text}"
except:
continue
except:
pass
print("[UnitedSCO step1] Patient search completed successfully")
return "Success"
except Exception as e:
print(f"[UnitedSCO step1] Exception: {e}")
return f"ERROR:STEP1 - {e}"
def step2(self):
"""
Navigate to eligibility detail page and capture PDF.
At this point we should be on the "Selected Patient" page after step1.
Workflow based on actual DOM testing:
1. Extract eligibility status and Member ID from the page
2. Click the "Eligibility" button (id='eligibility-link')
3. Generate PDF using Chrome DevTools Protocol (same as other insurances)
"""
try:
print("[UnitedSCO step2] Starting eligibility capture")
# Wait for page to load
time.sleep(3)
current_url = self.driver.current_url
print(f"[UnitedSCO step2] Current URL: {current_url}")
# 1) Extract eligibility status and Member ID from the Selected Patient page
eligibilityText = "unknown"
patientName = f"{self.firstName} {self.lastName}".strip()
foundMemberId = self.memberId # Use provided memberId as default
# Extract eligibility status
try:
status_elem = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.XPATH,
"//*[contains(text(),'Member Eligible')]"
))
)
status_text = status_elem.text.strip().lower()
print(f"[UnitedSCO step2] Found status: {status_text}")
if "eligible" in status_text:
eligibilityText = "active"
elif "ineligible" in status_text or "not eligible" in status_text:
eligibilityText = "inactive"
except TimeoutException:
print("[UnitedSCO step2] Eligibility status badge not found")
except Exception as e:
print(f"[UnitedSCO step2] Error extracting status: {e}")
print(f"[UnitedSCO step2] Eligibility status: {eligibilityText}")
# Extract Member ID from the page (for database storage)
try:
# Look for Member ID on the page
page_text = self.driver.find_element(By.TAG_NAME, "body").text
import re
# Look for "Member ID" followed by a number
member_id_match = re.search(r'Member ID\s*[\n:]\s*(\d+)', page_text)
if member_id_match:
foundMemberId = member_id_match.group(1)
print(f"[UnitedSCO step2] Extracted Member ID from page: {foundMemberId}")
except Exception as e:
print(f"[UnitedSCO step2] Could not extract Member ID: {e}")
# 2) Click the "Eligibility" button (id='eligibility-link')
print("[UnitedSCO step2] Looking for 'Eligibility' button...")
try:
eligibility_btn = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.ID, "eligibility-link"))
)
eligibility_btn.click()
print("[UnitedSCO step2] Clicked 'Eligibility' button")
time.sleep(5)
except TimeoutException:
print("[UnitedSCO step2] Eligibility button not found, trying alternative selectors...")
try:
# Alternative: find button with text "Eligibility"
eligibility_btn = self.driver.find_element(By.XPATH,
"//button[normalize-space(text())='Eligibility']"
)
eligibility_btn.click()
print("[UnitedSCO step2] Clicked 'Eligibility' button (alternative)")
time.sleep(5)
except Exception as e:
print(f"[UnitedSCO step2] Could not click Eligibility button: {e}")
# Wait for page to fully load
try:
WebDriverWait(self.driver, 30).until(
lambda d: d.execute_script("return document.readyState") == "complete"
)
except Exception:
pass
time.sleep(2)
print(f"[UnitedSCO step2] Final URL: {self.driver.current_url}")
# 3) Generate PDF using Chrome DevTools Protocol (same as other insurances)
print("[UnitedSCO step2] Generating PDF...")
pdf_options = {
"landscape": False,
"displayHeaderFooter": False,
"printBackground": True,
"preferCSSPageSize": True,
"paperWidth": 8.5,
"paperHeight": 11,
"marginTop": 0.4,
"marginBottom": 0.4,
"marginLeft": 0.4,
"marginRight": 0.4,
"scale": 0.9,
}
# Use foundMemberId for filename
file_identifier = foundMemberId if foundMemberId else f"{self.firstName}_{self.lastName}"
result = self.driver.execute_cdp_cmd("Page.printToPDF", pdf_options)
pdf_data = base64.b64decode(result.get('data', ''))
pdf_path = os.path.join(self.download_dir, f"unitedsco_eligibility_{file_identifier}_{int(time.time())}.pdf")
with open(pdf_path, "wb") as f:
f.write(pdf_data)
print(f"[UnitedSCO step2] PDF saved: {pdf_path}")
# Keep browser alive for next patient
print("[UnitedSCO step2] Eligibility capture complete - session preserved")
return {
"status": "success",
"eligibility": eligibilityText,
"ss_path": pdf_path,
"pdf_path": pdf_path,
"patientName": patientName,
"memberId": foundMemberId # Return the Member ID found on the page
}
except Exception as e:
print(f"[UnitedSCO step2] Exception: {e}")
return {"status": "error", "message": f"STEP2 FAILED: {str(e)}"}
def main_workflow(self, url):
"""Main workflow that runs all steps."""
try:
self.config_driver()
login_result = self.login(url)
print(f"[main_workflow] Login result: {login_result}")
if login_result == "OTP_REQUIRED":
return {"status": "otp_required", "message": "OTP required after login"}
if isinstance(login_result, str) and login_result.startswith("ERROR"):
return {"status": "error", "message": login_result}
step1_result = self.step1()
print(f"[main_workflow] Step1 result: {step1_result}")
if isinstance(step1_result, str) and step1_result.startswith("ERROR"):
return {"status": "error", "message": step1_result}
step2_result = self.step2()
print(f"[main_workflow] Step2 result: {step2_result}")
return step2_result
except Exception as e:
return {"status": "error", "message": str(e)}

View File

@@ -0,0 +1,277 @@
"""
Minimal browser manager for United SCO - only handles persistent profile and keeping browser alive.
Clears session cookies on startup (after PC restart) to force fresh login.
Tracks credentials to detect changes mid-session.
"""
import os
import shutil
import hashlib
import threading
import subprocess
import time
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
# Ensure DISPLAY is set for Chrome to work (needed when running from SSH/background)
if not os.environ.get("DISPLAY"):
os.environ["DISPLAY"] = ":0"
class UnitedSCOBrowserManager:
"""
Singleton that manages a persistent Chrome browser instance for United SCO.
- Uses --user-data-dir for persistent profile (device trust tokens)
- Clears session cookies on startup (after PC restart)
- Tracks credentials to detect changes mid-session
"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._driver = None
cls._instance.profile_dir = os.path.abspath("chrome_profile_unitedsco")
cls._instance.download_dir = os.path.abspath("seleniumDownloads")
cls._instance._credentials_file = os.path.join(cls._instance.profile_dir, ".last_credentials")
cls._instance._needs_session_clear = False # Flag to clear session on next driver creation
os.makedirs(cls._instance.profile_dir, exist_ok=True)
os.makedirs(cls._instance.download_dir, exist_ok=True)
return cls._instance
def clear_session_on_startup(self):
"""
Clear session cookies from Chrome profile on startup.
This forces a fresh login after PC restart.
Preserves device trust tokens (LocalStorage, IndexedDB) to avoid OTPs.
"""
print("[UnitedSCO BrowserManager] Clearing session on startup...")
try:
# Clear the credentials tracking file
if os.path.exists(self._credentials_file):
os.remove(self._credentials_file)
print("[UnitedSCO BrowserManager] Cleared credentials tracking file")
# Clear session-related files from Chrome profile
# These are the files that store login session cookies
session_files = [
"Cookies",
"Cookies-journal",
"Login Data",
"Login Data-journal",
"Web Data",
"Web Data-journal",
]
for filename in session_files:
filepath = os.path.join(self.profile_dir, "Default", filename)
if os.path.exists(filepath):
try:
os.remove(filepath)
print(f"[UnitedSCO BrowserManager] Removed {filename}")
except Exception as e:
print(f"[UnitedSCO BrowserManager] Could not remove {filename}: {e}")
# Also try root level (some Chrome versions)
for filename in session_files:
filepath = os.path.join(self.profile_dir, filename)
if os.path.exists(filepath):
try:
os.remove(filepath)
print(f"[UnitedSCO BrowserManager] Removed root {filename}")
except Exception as e:
print(f"[UnitedSCO BrowserManager] Could not remove root {filename}: {e}")
# Clear Session Storage (contains login state)
session_storage_dir = os.path.join(self.profile_dir, "Default", "Session Storage")
if os.path.exists(session_storage_dir):
try:
shutil.rmtree(session_storage_dir)
print("[UnitedSCO BrowserManager] Cleared Session Storage")
except Exception as e:
print(f"[UnitedSCO BrowserManager] Could not clear Session Storage: {e}")
# Clear Local Storage (may contain auth tokens)
local_storage_dir = os.path.join(self.profile_dir, "Default", "Local Storage")
if os.path.exists(local_storage_dir):
try:
shutil.rmtree(local_storage_dir)
print("[UnitedSCO BrowserManager] Cleared Local Storage")
except Exception as e:
print(f"[UnitedSCO BrowserManager] Could not clear Local Storage: {e}")
# Clear IndexedDB (may contain auth tokens)
indexeddb_dir = os.path.join(self.profile_dir, "Default", "IndexedDB")
if os.path.exists(indexeddb_dir):
try:
shutil.rmtree(indexeddb_dir)
print("[UnitedSCO BrowserManager] Cleared IndexedDB")
except Exception as e:
print(f"[UnitedSCO BrowserManager] Could not clear IndexedDB: {e}")
# Set flag to clear session via JavaScript after browser opens
self._needs_session_clear = True
print("[UnitedSCO BrowserManager] Session cleared - will require fresh login")
except Exception as e:
print(f"[UnitedSCO BrowserManager] Error clearing session: {e}")
def _hash_credentials(self, username: str) -> str:
"""Create a hash of the username to track credential changes."""
return hashlib.sha256(username.encode()).hexdigest()[:16]
def get_last_credentials_hash(self) -> str | None:
"""Get the hash of the last-used credentials."""
try:
if os.path.exists(self._credentials_file):
with open(self._credentials_file, 'r') as f:
return f.read().strip()
except Exception:
pass
return None
def save_credentials_hash(self, username: str):
"""Save the hash of the current credentials."""
try:
cred_hash = self._hash_credentials(username)
with open(self._credentials_file, 'w') as f:
f.write(cred_hash)
except Exception as e:
print(f"[UnitedSCO BrowserManager] Failed to save credentials hash: {e}")
def credentials_changed(self, username: str) -> bool:
"""Check if the credentials have changed since last login."""
last_hash = self.get_last_credentials_hash()
if last_hash is None:
return False # No previous credentials, not a change
current_hash = self._hash_credentials(username)
changed = last_hash != current_hash
if changed:
print(f"[UnitedSCO BrowserManager] Credentials changed - logout required")
return changed
def clear_credentials_hash(self):
"""Clear the saved credentials hash (used after logout)."""
try:
if os.path.exists(self._credentials_file):
os.remove(self._credentials_file)
except Exception as e:
print(f"[UnitedSCO BrowserManager] Failed to clear credentials hash: {e}")
def _kill_existing_chrome_for_profile(self):
"""Kill any existing Chrome processes using this profile."""
try:
# Find and kill Chrome processes using this profile
result = subprocess.run(
["pgrep", "-f", f"user-data-dir={self.profile_dir}"],
capture_output=True, text=True
)
if result.stdout.strip():
pids = result.stdout.strip().split('\n')
for pid in pids:
try:
subprocess.run(["kill", "-9", pid], check=False)
except:
pass
time.sleep(1)
except Exception as e:
pass
# Remove SingletonLock if exists
lock_file = os.path.join(self.profile_dir, "SingletonLock")
try:
if os.path.islink(lock_file) or os.path.exists(lock_file):
os.remove(lock_file)
except:
pass
def get_driver(self, headless=False):
"""Get or create the persistent browser instance."""
with self._lock:
if self._driver is None:
print("[UnitedSCO BrowserManager] Driver is None, creating new driver")
self._kill_existing_chrome_for_profile()
self._create_driver(headless)
elif not self._is_alive():
print("[UnitedSCO BrowserManager] Driver not alive, recreating")
self._kill_existing_chrome_for_profile()
self._create_driver(headless)
else:
print("[UnitedSCO BrowserManager] Reusing existing driver")
return self._driver
def _is_alive(self):
"""Check if browser is still responsive."""
try:
if self._driver is None:
return False
url = self._driver.current_url
return True
except Exception as e:
return False
def _create_driver(self, headless=False):
"""Create browser with persistent profile."""
if self._driver:
try:
self._driver.quit()
except:
pass
self._driver = None
time.sleep(1)
options = webdriver.ChromeOptions()
if headless:
options.add_argument("--headless")
# Persistent profile - THIS IS THE KEY for device trust
options.add_argument(f"--user-data-dir={self.profile_dir}")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
prefs = {
"download.default_directory": self.download_dir,
"plugins.always_open_pdf_externally": True,
"download.prompt_for_download": False,
"download.directory_upgrade": True
}
options.add_experimental_option("prefs", prefs)
service = Service(ChromeDriverManager().install())
self._driver = webdriver.Chrome(service=service, options=options)
self._driver.maximize_window()
# Reset the session clear flag (file-based clearing is done on startup)
self._needs_session_clear = False
def quit_driver(self):
"""Quit browser (only call on shutdown)."""
with self._lock:
if self._driver:
try:
self._driver.quit()
except:
pass
self._driver = None
# Also clean up any orphaned processes
self._kill_existing_chrome_for_profile()
# Singleton accessor
_manager = None
def get_browser_manager():
global _manager
if _manager is None:
_manager = UnitedSCOBrowserManager()
return _manager
def clear_unitedsco_session_on_startup():
"""Called by agent.py on startup to clear session."""
manager = get_browser_manager()
manager.clear_session_on_startup()