Files
DentalManagementMH04/apps/SeleniumService/selenium_DDMA_eligibilityCheckWorker.py
Gitead e26ebf7fd5 feat: fix DDMA eligibility — patient list, name extraction, PDF page, OTP session
- Filter patient list by userId so each user sees only their own patients
- Sort patients by updatedAt DESC so recently checked patients appear first
- Add updatedAt field to Patient model (DB migration via raw SQL + db:generate)
- Fix DDMA name extraction: read from detail page "Name:" label, not search
  results row text which included appended dates
- Fix PDF capture: use driver.get() instead of click() to avoid race condition
  that was saving the search results page instead of the patient detail page
- Strip trailing bare dates from extracted names (e.g. "Rodriguez 04/27/2026")
- Handle "Last, First" comma format and single-word last names in splitName
- Normalize insuranceId consistently in createOrUpdatePatientByInsuranceId
- Fix OTP persistent session: stop clearing LocalStorage/IndexedDB on startup
  (these hold the DDMA device trust token that skips OTP on subsequent logins)
- Increase post-navigation wait time for full page render before PDF generation

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-01 21:40:04 -04:00

647 lines
30 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from selenium import webdriver
from selenium.common.exceptions import WebDriverException, TimeoutException
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
import time
import os
import base64
from ddma_browser_manager import get_browser_manager
class AutomationDeltaDentalMAEligibilityCheck:
def __init__(self, data):
self.headless = False
self.driver = None
self.data = data.get("data", {}) if isinstance(data, dict) else {}
# Flatten values for convenience
self.memberId = self.data.get("memberId", "")
self.dateOfBirth = self.data.get("dateOfBirth", "")
self.firstName = self.data.get("firstName", "")
self.lastName = self.data.get("lastName", "")
self.massddma_username = self.data.get("massddmaUsername", "")
self.massddma_password = self.data.get("massddmaPassword", "")
# Use browser manager's download dir
self.download_dir = get_browser_manager().download_dir
os.makedirs(self.download_dir, exist_ok=True)
def config_driver(self):
# Use persistent browser from manager (keeps device trust tokens)
self.driver = get_browser_manager().get_driver(self.headless)
def _force_logout(self):
"""Force logout by clearing cookies when credentials change."""
try:
print("[DDMA login] Forcing logout due to credential change...")
browser_manager = get_browser_manager()
# Try to click logout button if visible
try:
self.driver.get("https://providers.deltadentalma.com/")
time.sleep(2)
logout_selectors = [
"//button[contains(text(), 'Log out') or contains(text(), 'Logout') or contains(text(), 'Sign out')]",
"//a[contains(text(), 'Log out') or contains(text(), 'Logout') or contains(text(), 'Sign out')]",
"//button[@aria-label='Log out' or @aria-label='Logout' or @aria-label='Sign out']",
"//*[contains(@class, 'logout') or contains(@class, 'signout')]"
]
for selector in logout_selectors:
try:
logout_btn = WebDriverWait(self.driver, 3).until(
EC.element_to_be_clickable((By.XPATH, selector))
)
logout_btn.click()
print("[DDMA login] Clicked logout button")
time.sleep(2)
break
except TimeoutException:
continue
except Exception as e:
print(f"[DDMA login] Could not click logout button: {e}")
# Clear cookies as backup
try:
self.driver.delete_all_cookies()
print("[DDMA login] Cleared all cookies")
except Exception as e:
print(f"[DDMA login] Error clearing cookies: {e}")
browser_manager.clear_credentials_hash()
print("[DDMA login] Logout complete")
return True
except Exception as e:
print(f"[DDMA login] Error during forced logout: {e}")
return False
def login(self, url):
wait = WebDriverWait(self.driver, 30)
browser_manager = get_browser_manager()
try:
# Check if credentials changed — force logout first
if self.massddma_username and browser_manager.credentials_changed(self.massddma_username):
self._force_logout()
self.driver.get(url)
time.sleep(2)
# Check if already on a logged-in page (persistent session from profile)
try:
current_url = self.driver.current_url
print(f"[login] Current URL: {current_url}")
logged_in_patterns = ["member", "dashboard", "eligibility", "search", "patients"]
is_logged_in_url = any(pattern in current_url.lower() for pattern in logged_in_patterns)
if is_logged_in_url and "onboarding" not in current_url.lower():
print("[login] Already on logged-in page - skipping login entirely")
if "member" not in current_url.lower():
try:
member_search = WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
)
print("[login] Found member search input - returning ALREADY_LOGGED_IN")
return "ALREADY_LOGGED_IN"
except TimeoutException:
members_url = "https://providers.deltadentalma.com/members"
print(f"[login] Navigating to members page: {members_url}")
self.driver.get(members_url)
time.sleep(2)
try:
member_search = WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
)
print("[login] Member search found - ALREADY_LOGGED_IN")
return "ALREADY_LOGGED_IN"
except TimeoutException:
print("[login] Could not find member search, will try login")
except Exception as e:
print(f"[login] Error checking current state: {e}")
# Navigate to login URL
self.driver.get(url)
time.sleep(2)
# Check if session redirected us straight to member search
try:
current_url = self.driver.current_url
print(f"[login] URL after navigation: {current_url}")
if "onboarding" not in current_url.lower():
member_search = WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
)
if member_search:
print("[login] Session valid - skipping login")
return "ALREADY_LOGGED_IN"
except TimeoutException:
print("[login] Proceeding with login")
# Dismiss any "Authentication flow continued in another tab" modal
modal_dismissed = False
try:
ok_button = WebDriverWait(self.driver, 3).until(
EC.element_to_be_clickable((By.XPATH, "//button[normalize-space(text())='Ok' or normalize-space(text())='OK']"))
)
ok_button.click()
print("[login] Dismissed authentication modal")
modal_dismissed = True
time.sleep(2)
all_windows = self.driver.window_handles
print(f"[login] Windows after modal dismiss: {len(all_windows)}")
if len(all_windows) > 1:
original_window = self.driver.current_window_handle
for window in all_windows:
if window != original_window:
self.driver.switch_to.window(window)
print("[login] Switched to auth popup window")
break
try:
otp_candidate = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located(
(By.XPATH, "//input[contains(@aria-lable,'Verification code') or contains(@placeholder,'Enter your verification code') or contains(@aria-label,'Verification code')]")
)
)
if otp_candidate:
print("[login] OTP input found in popup -> OTP_REQUIRED")
return "OTP_REQUIRED"
except TimeoutException:
print("[login] No OTP in popup, checking main window")
self.driver.switch_to.window(original_window)
except TimeoutException:
pass # No modal present
if modal_dismissed:
time.sleep(2)
try:
member_search = WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
)
if member_search:
print("[login] Already authenticated after modal dismiss")
return "ALREADY_LOGGED_IN"
except TimeoutException:
pass
# Fill login form
try:
email_field = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH, "//input[@name='username' and @type='text']"))
)
except TimeoutException:
print("[login] Could not find login form - page may have changed")
return "ERROR: Login form not found"
email_field = wait.until(EC.presence_of_element_located((By.XPATH, "//input[@name='username' and @type='text']")))
email_field.clear()
email_field.send_keys(self.massddma_username)
password_field = wait.until(EC.presence_of_element_located((By.XPATH, "//input[@name='password' and @type='password']")))
password_field.clear()
password_field.send_keys(self.massddma_password)
# Remember me
try:
remember_me_checkbox = wait.until(EC.element_to_be_clickable(
(By.XPATH, "//label[.//span[contains(text(),'Remember me')]]")
))
remember_me_checkbox.click()
except Exception:
print("[login] Remember me checkbox not found (continuing).")
login_button = wait.until(EC.element_to_be_clickable((By.XPATH, "//button[@type='submit' and @aria-label='Sign in']")))
login_button.click()
# Save credentials hash after login attempt
if self.massddma_username:
browser_manager.save_credentials_hash(self.massddma_username)
# OTP detection — wait up to 30 seconds for OTP input
try:
otp_candidate = WebDriverWait(self.driver, 30).until(
EC.presence_of_element_located(
(By.XPATH, "//input[contains(@aria-lable,'Verification code') or contains(@placeholder,'Enter your verification code')]")
)
)
if otp_candidate:
print("[login] OTP input detected -> OTP_REQUIRED")
return "OTP_REQUIRED"
except TimeoutException:
print("[login] No OTP input detected in allowed time.")
# Check if we're now on the member search page (login succeeded without OTP)
try:
current_url = self.driver.current_url.lower()
if "member" in current_url or "dashboard" in current_url:
member_search = WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
)
print("[login] Login successful - now on member search page")
return "SUCCESS"
except TimeoutException:
pass
# Check for error messages
try:
error_elem = WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located((By.XPATH, "//*[contains(@class,'error') or contains(text(),'invalid') or contains(text(),'failed')]"))
)
print(f"[login] Login failed - error detected: {error_elem.text}")
return f"ERROR:LOGIN FAILED: {error_elem.text}"
except TimeoutException:
pass
if "onboarding" in self.driver.current_url.lower() or "login" in self.driver.current_url.lower():
print("[login] Login failed - still on login page")
return "ERROR:LOGIN FAILED: Still on login page"
print("[login] Assuming login succeeded (no errors detected)")
return "SUCCESS"
except Exception as e:
print("[login] Exception during login:", e)
return f"ERROR:LOGIN FAILED: {e}"
def step1(self):
"""Fill search form with all available fields (flexible search)."""
wait = WebDriverWait(self.driver, 30)
try:
fields = []
if self.memberId:
fields.append(f"ID: {self.memberId}")
if self.firstName:
fields.append(f"FirstName: {self.firstName}")
if self.lastName:
fields.append(f"LastName: {self.lastName}")
if self.dateOfBirth:
fields.append(f"DOB: {self.dateOfBirth}")
print(f"[DDMA step1] Starting search with: {', '.join(fields)}")
def replace_with_sendkeys(el, value):
el.click()
el.send_keys(Keys.CONTROL, "a")
el.send_keys(Keys.BACKSPACE)
el.send_keys(value)
# 1. Fill Member ID if provided
if self.memberId:
try:
member_id_input = wait.until(EC.presence_of_element_located(
(By.XPATH, '//input[@placeholder="Search by member ID"]')
))
member_id_input.clear()
member_id_input.send_keys(self.memberId)
print(f"[DDMA step1] Entered Member ID: {self.memberId}")
time.sleep(0.2)
except Exception as e:
print(f"[DDMA step1] Warning: Could not fill Member ID: {e}")
# 2. Fill DOB if provided
if self.dateOfBirth:
try:
dob_parts = self.dateOfBirth.split("-")
year = dob_parts[0]
month = dob_parts[1].zfill(2)
day = dob_parts[2].zfill(2)
dob_container = wait.until(
EC.presence_of_element_located(
(By.XPATH, "//div[@data-testid='member-search_date-of-birth']")
)
)
month_elem = dob_container.find_element(By.XPATH, ".//span[@data-type='month' and @contenteditable='true']")
day_elem = dob_container.find_element(By.XPATH, ".//span[@data-type='day' and @contenteditable='true']")
year_elem = dob_container.find_element(By.XPATH, ".//span[@data-type='year' and @contenteditable='true']")
replace_with_sendkeys(month_elem, month)
time.sleep(0.05)
replace_with_sendkeys(day_elem, day)
time.sleep(0.05)
replace_with_sendkeys(year_elem, year)
print(f"[DDMA step1] Filled DOB: {month}/{day}/{year}")
except Exception as e:
print(f"[DDMA step1] Warning: Could not fill DOB: {e}")
time.sleep(0.3)
# Click Search button
continue_btn = wait.until(EC.element_to_be_clickable(
(By.XPATH, '//button[@data-testid="member-search_search-button"]')
))
continue_btn.click()
print("[DDMA step1] Clicked Search button")
# Wait for either results row or no-results message (up to 15s)
try:
WebDriverWait(self.driver, 15).until(
EC.any_of(
EC.presence_of_element_located((By.XPATH, "//tbody//tr")),
EC.presence_of_element_located((By.XPATH, '//div[@data-testid="member-search-result-no-results"]')),
)
)
except TimeoutException:
pass # proceed and let step2 handle missing results
# Check for no-results error
try:
error_msg = self.driver.find_element(By.XPATH, '//div[@data-testid="member-search-result-no-results"]')
if error_msg:
print("[DDMA step1] Error: No results found")
return "ERROR: INVALID SEARCH CRITERIA"
except Exception:
pass
print("[DDMA step1] Search completed successfully")
return "Success"
except Exception as e:
print(f"[DDMA step1] Exception: {e}")
return f"ERROR:STEP1 - {e}"
def step2(self):
"""Navigate to patient detail page and generate PDF."""
wait = WebDriverWait(self.driver, 90)
try:
import re
# Wait for results table, then pause for full render
try:
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.XPATH, "//tbody//tr"))
)
time.sleep(2) # Let the row content fully render after table appears
except TimeoutException:
print("[DDMA step2] Warning: Results table not found within timeout")
eligibilityText = "unknown"
foundMemberId = self.memberId or ""
patientName = ""
# Extract eligibility status and member ID from search results row
try:
first_row = self.driver.find_element(By.XPATH, "(//tbody//tr)[1]")
row_text = first_row.text.strip()
print(f"[DDMA step2] First row text: {row_text[:150]}...")
lines = row_text.split('\n') if row_text else []
for line in lines:
line = line.strip()
if line and re.match(r'^[A-Z0-9]{5,}$', line) and not line.startswith('DOB'):
foundMemberId = line
print(f"[DDMA step2] Extracted Member ID: {foundMemberId}")
break
except Exception as e:
print(f"[DDMA step2] Error reading first row: {e}")
try:
short_wait = WebDriverWait(self.driver, 3)
status_link = short_wait.until(EC.presence_of_element_located((
By.XPATH,
"(//tbody//tr)[1]//a[contains(@href, 'member-eligibility-search')]"
)))
eligibilityText = status_link.text.strip().lower()
print(f"[DDMA step2] Found eligibility status: {eligibilityText}")
except Exception:
try:
alt_status = self.driver.find_element(By.XPATH, "//*[contains(text(),'Active') or contains(text(),'Inactive') or contains(text(),'Eligible')]")
eligibilityText = alt_status.text.strip().lower()
if "active" in eligibilityText or "eligible" in eligibilityText:
eligibilityText = "active"
elif "inactive" in eligibilityText:
eligibilityText = "inactive"
print(f"[DDMA step2] Found eligibility via alternative: {eligibilityText}")
except Exception:
pass
# Find the member-details URL from the first row
print("[DDMA step2] Looking for patient detail link...")
detail_url = None
link_selectors = [
"(//table//tbody//tr)[1]//td[1]//a",
"(//tbody//tr)[1]//a[contains(@href, 'member-details')]",
"(//tbody//tr)[1]//a[contains(@href, 'member')]",
"//a[contains(@href, 'member-details')]",
]
for selector in link_selectors:
try:
link_el = WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.XPATH, selector))
)
href = link_el.get_attribute("href")
if href and "member-details" in href:
detail_url = href
print(f"[DDMA step2] Found detail URL: {href}")
break
except Exception:
continue
if detail_url:
# Always navigate via driver.get() — this blocks until the new page loads,
# unlike click() which returns immediately and causes a race condition.
print(f"[DDMA step2] Navigating to detail page: {detail_url}")
self.driver.get(detail_url)
# Confirm we actually landed on the detail page (not redirected away)
try:
WebDriverWait(self.driver, 15).until(
lambda d: "member-details" in d.current_url
)
print(f"[DDMA step2] Confirmed on detail page: {self.driver.current_url}")
except Exception:
print(f"[DDMA step2] Warning: URL after navigation: {self.driver.current_url}")
# Wait for meaningful content on the detail page
for selector in [
"//*[contains(text(),'Date of Birth') or contains(text(),'Address') or contains(text(),'Member ID')]",
"//table", "//h1", "//h2",
]:
try:
WebDriverWait(self.driver, 15).until(
EC.presence_of_element_located((By.XPATH, selector))
)
print(f"[DDMA step2] Detail page content loaded: {selector}")
break
except Exception:
continue
time.sleep(3) # Let JavaScript finish rendering all sections
# Get full page text as lines
try:
page_lines = self.driver.execute_script(
"return document.body.innerText;"
).split('\n')
page_lines = [l.strip() for l in page_lines if l.strip()]
except Exception as e:
print(f"[DDMA step2] Could not get page text: {e}")
page_lines = []
# UI noise words that appear in accessibility/sort-button text or field labels
ui_noise = ['click', 'sort', 'activate', 'direction', 'enter', 'space',
'current', 'use ', 'table', 'column', 'filter', 'search',
'date', 'birth', 'relationship', 'subscriber', 'coverage',
'status', 'period', 'network', 'plan', 'deductible']
def looks_like_name(text):
"""Return True if text is a plausible patient name."""
t = text.strip()
# Must be 260 chars, letters/spaces/hyphens/apostrophes only (no periods)
if not t or not (2 <= len(t) <= 60):
return False
if not re.match(r"^[A-Za-z\s\-']+$", t):
return False
# Must not be UI noise
if any(w in t.lower() for w in ui_noise):
return False
return True
# Scan lines for the "Name" label. When "Name" appears alone we scan
# forward past accessibility text to the first plausible name value.
for i, line in enumerate(page_lines):
# Case 1: "Name : Value" or "Name: Value" on the same line
same_line = re.match(
r'^(?:member\s+)?name\s*[:\-]\s*(.+)$', line, re.IGNORECASE
)
if same_line and not re.search(
r'(provider|group|subscriber|plan)\s+name', line, re.IGNORECASE
):
candidate = same_line.group(1).strip()
if looks_like_name(candidate):
patientName = candidate
print(f"[DDMA step2] Extracted name from 'Name:' label: '{patientName}'")
break
# Case 2: "Name" or "Name:" alone on a line — scan forward for value
elif re.match(r'^(?:member\s+)?name\s*:?\s*$', line, re.IGNORECASE):
for j in range(i + 1, min(i + 6, len(page_lines))):
candidate = page_lines[j].strip()
if looks_like_name(candidate):
patientName = candidate
print(f"[DDMA step2] Extracted name from 'Name' label (line +{j-i}): '{patientName}'")
break
if patientName:
break
else:
print("[DDMA step2] Warning: Could not find patient detail link")
# Clean patient name — strip any remaining date artifacts
if patientName:
cleaned = re.sub(r'\s*DOB[:\s]*\d{1,2}/\d{1,2}/\d{2,4}\s*', '', patientName, flags=re.IGNORECASE).strip()
cleaned = re.sub(r'\s+\d{1,2}/\d{1,2}/\d{2,4}$', '', cleaned).strip()
patientName = cleaned if cleaned else patientName
if not patientName:
print("[DDMA step2] Could not extract patient name from detail page")
# Wait for the page to be fully ready before capturing PDF
try:
WebDriverWait(self.driver, 30).until(
lambda d: d.execute_script("return document.readyState") == "complete"
)
except Exception:
pass
# Extra wait for lazy-loaded sections (benefits summary, member history, etc.)
time.sleep(4)
# Generate PDF via Chrome DevTools Protocol
print("[DDMA step2] Generating PDF of patient detail page...")
pdf_options = {
"landscape": False,
"displayHeaderFooter": False,
"printBackground": True,
"preferCSSPageSize": True,
"paperWidth": 8.5,
"paperHeight": 11,
"marginTop": 0.4,
"marginBottom": 0.4,
"marginLeft": 0.4,
"marginRight": 0.4,
"scale": 0.9,
}
result = self.driver.execute_cdp_cmd("Page.printToPDF", pdf_options)
pdf_data = base64.b64decode(result.get('data', ''))
pdf_id = foundMemberId or self.memberId or "unknown"
pdf_path = os.path.join(self.download_dir, f"eligibility_{pdf_id}.pdf")
with open(pdf_path, "wb") as f:
f.write(pdf_data)
print(f"[DDMA step2] PDF saved at: {pdf_path}")
# Close the browser window after PDF (session preserved in profile)
try:
from ddma_browser_manager import get_browser_manager
get_browser_manager().quit_driver()
print("[step2] Browser closed - session preserved in profile")
except Exception as e:
print(f"[step2] Error closing browser: {e}")
print(f"[DDMA step2] Final — PatientName: '{patientName}', MemberID: '{foundMemberId}'")
return {
"status": "success",
"eligibility": eligibilityText,
"ss_path": pdf_path, # kept for backward compatibility
"pdf_path": pdf_path, # explicit pdf_path
"patientName": patientName,
"memberId": foundMemberId,
}
except Exception as e:
print("ERROR in step2:", e)
# Cleanup download dir on error
try:
dl = os.path.abspath(self.download_dir)
if os.path.isdir(dl):
for name in os.listdir(dl):
item = os.path.join(dl, name)
try:
if os.path.isfile(item) or os.path.islink(item):
os.remove(item)
except Exception as rm_err:
print(f"[cleanup] failed to remove {item}: {rm_err}")
except Exception as cleanup_exc:
print(f"[cleanup] unexpected error: {cleanup_exc}")
return {"status": "error", "message": str(e)}
def main_workflow(self, url):
try:
self.config_driver()
self.driver.maximize_window()
time.sleep(3)
login_result = self.login(url)
if login_result.startswith("ERROR"):
return {"status": "error", "message": login_result}
if login_result == "OTP_REQUIRED":
return {"status": "otp_required", "message": "OTP required after login"}
step1_result = self.step1()
if step1_result.startswith("ERROR"):
return {"status": "error", "message": step1_result}
step2_result = self.step2()
if step2_result.get("status") == "error":
return {"status": "error", "message": step2_result.get("message")}
return step2_result
except Exception as e:
return {"status": "error", "message": str(e)}
# NOTE: Do NOT quit driver — keep browser alive for next patient