- Filter patient list by userId so each user sees only their own patients - Sort patients by updatedAt DESC so recently checked patients appear first - Add updatedAt field to Patient model (DB migration via raw SQL + db:generate) - Fix DDMA name extraction: read from detail page "Name:" label, not search results row text which included appended dates - Fix PDF capture: use driver.get() instead of click() to avoid race condition that was saving the search results page instead of the patient detail page - Strip trailing bare dates from extracted names (e.g. "Rodriguez 04/27/2026") - Handle "Last, First" comma format and single-word last names in splitName - Normalize insuranceId consistently in createOrUpdatePatientByInsuranceId - Fix OTP persistent session: stop clearing LocalStorage/IndexedDB on startup (these hold the DDMA device trust token that skips OTP on subsequent logins) - Increase post-navigation wait time for full page render before PDF generation Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
647 lines
30 KiB
Python
Executable File
647 lines
30 KiB
Python
Executable File
from selenium import webdriver
|
||
from selenium.common.exceptions import WebDriverException, TimeoutException
|
||
from selenium.webdriver.chrome.service import Service
|
||
from selenium.webdriver.common.by import By
|
||
from selenium.webdriver.common.keys import Keys
|
||
from selenium.webdriver.support.ui import WebDriverWait
|
||
from selenium.webdriver.support import expected_conditions as EC
|
||
from webdriver_manager.chrome import ChromeDriverManager
|
||
import time
|
||
import os
|
||
import base64
|
||
|
||
from ddma_browser_manager import get_browser_manager
|
||
|
||
class AutomationDeltaDentalMAEligibilityCheck:
|
||
def __init__(self, data):
|
||
self.headless = False
|
||
self.driver = None
|
||
|
||
self.data = data.get("data", {}) if isinstance(data, dict) else {}
|
||
|
||
# Flatten values for convenience
|
||
self.memberId = self.data.get("memberId", "")
|
||
self.dateOfBirth = self.data.get("dateOfBirth", "")
|
||
self.firstName = self.data.get("firstName", "")
|
||
self.lastName = self.data.get("lastName", "")
|
||
self.massddma_username = self.data.get("massddmaUsername", "")
|
||
self.massddma_password = self.data.get("massddmaPassword", "")
|
||
|
||
# Use browser manager's download dir
|
||
self.download_dir = get_browser_manager().download_dir
|
||
os.makedirs(self.download_dir, exist_ok=True)
|
||
|
||
def config_driver(self):
|
||
# Use persistent browser from manager (keeps device trust tokens)
|
||
self.driver = get_browser_manager().get_driver(self.headless)
|
||
|
||
def _force_logout(self):
|
||
"""Force logout by clearing cookies when credentials change."""
|
||
try:
|
||
print("[DDMA login] Forcing logout due to credential change...")
|
||
browser_manager = get_browser_manager()
|
||
|
||
# Try to click logout button if visible
|
||
try:
|
||
self.driver.get("https://providers.deltadentalma.com/")
|
||
time.sleep(2)
|
||
|
||
logout_selectors = [
|
||
"//button[contains(text(), 'Log out') or contains(text(), 'Logout') or contains(text(), 'Sign out')]",
|
||
"//a[contains(text(), 'Log out') or contains(text(), 'Logout') or contains(text(), 'Sign out')]",
|
||
"//button[@aria-label='Log out' or @aria-label='Logout' or @aria-label='Sign out']",
|
||
"//*[contains(@class, 'logout') or contains(@class, 'signout')]"
|
||
]
|
||
|
||
for selector in logout_selectors:
|
||
try:
|
||
logout_btn = WebDriverWait(self.driver, 3).until(
|
||
EC.element_to_be_clickable((By.XPATH, selector))
|
||
)
|
||
logout_btn.click()
|
||
print("[DDMA login] Clicked logout button")
|
||
time.sleep(2)
|
||
break
|
||
except TimeoutException:
|
||
continue
|
||
except Exception as e:
|
||
print(f"[DDMA login] Could not click logout button: {e}")
|
||
|
||
# Clear cookies as backup
|
||
try:
|
||
self.driver.delete_all_cookies()
|
||
print("[DDMA login] Cleared all cookies")
|
||
except Exception as e:
|
||
print(f"[DDMA login] Error clearing cookies: {e}")
|
||
|
||
browser_manager.clear_credentials_hash()
|
||
print("[DDMA login] Logout complete")
|
||
return True
|
||
except Exception as e:
|
||
print(f"[DDMA login] Error during forced logout: {e}")
|
||
return False
|
||
|
||
def login(self, url):
|
||
wait = WebDriverWait(self.driver, 30)
|
||
browser_manager = get_browser_manager()
|
||
|
||
try:
|
||
# Check if credentials changed — force logout first
|
||
if self.massddma_username and browser_manager.credentials_changed(self.massddma_username):
|
||
self._force_logout()
|
||
self.driver.get(url)
|
||
time.sleep(2)
|
||
|
||
# Check if already on a logged-in page (persistent session from profile)
|
||
try:
|
||
current_url = self.driver.current_url
|
||
print(f"[login] Current URL: {current_url}")
|
||
|
||
logged_in_patterns = ["member", "dashboard", "eligibility", "search", "patients"]
|
||
is_logged_in_url = any(pattern in current_url.lower() for pattern in logged_in_patterns)
|
||
|
||
if is_logged_in_url and "onboarding" not in current_url.lower():
|
||
print("[login] Already on logged-in page - skipping login entirely")
|
||
if "member" not in current_url.lower():
|
||
try:
|
||
member_search = WebDriverWait(self.driver, 5).until(
|
||
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
|
||
)
|
||
print("[login] Found member search input - returning ALREADY_LOGGED_IN")
|
||
return "ALREADY_LOGGED_IN"
|
||
except TimeoutException:
|
||
members_url = "https://providers.deltadentalma.com/members"
|
||
print(f"[login] Navigating to members page: {members_url}")
|
||
self.driver.get(members_url)
|
||
time.sleep(2)
|
||
|
||
try:
|
||
member_search = WebDriverWait(self.driver, 5).until(
|
||
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
|
||
)
|
||
print("[login] Member search found - ALREADY_LOGGED_IN")
|
||
return "ALREADY_LOGGED_IN"
|
||
except TimeoutException:
|
||
print("[login] Could not find member search, will try login")
|
||
except Exception as e:
|
||
print(f"[login] Error checking current state: {e}")
|
||
|
||
# Navigate to login URL
|
||
self.driver.get(url)
|
||
time.sleep(2)
|
||
|
||
# Check if session redirected us straight to member search
|
||
try:
|
||
current_url = self.driver.current_url
|
||
print(f"[login] URL after navigation: {current_url}")
|
||
|
||
if "onboarding" not in current_url.lower():
|
||
member_search = WebDriverWait(self.driver, 3).until(
|
||
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
|
||
)
|
||
if member_search:
|
||
print("[login] Session valid - skipping login")
|
||
return "ALREADY_LOGGED_IN"
|
||
except TimeoutException:
|
||
print("[login] Proceeding with login")
|
||
|
||
# Dismiss any "Authentication flow continued in another tab" modal
|
||
modal_dismissed = False
|
||
try:
|
||
ok_button = WebDriverWait(self.driver, 3).until(
|
||
EC.element_to_be_clickable((By.XPATH, "//button[normalize-space(text())='Ok' or normalize-space(text())='OK']"))
|
||
)
|
||
ok_button.click()
|
||
print("[login] Dismissed authentication modal")
|
||
modal_dismissed = True
|
||
time.sleep(2)
|
||
|
||
all_windows = self.driver.window_handles
|
||
print(f"[login] Windows after modal dismiss: {len(all_windows)}")
|
||
|
||
if len(all_windows) > 1:
|
||
original_window = self.driver.current_window_handle
|
||
for window in all_windows:
|
||
if window != original_window:
|
||
self.driver.switch_to.window(window)
|
||
print("[login] Switched to auth popup window")
|
||
break
|
||
|
||
try:
|
||
otp_candidate = WebDriverWait(self.driver, 10).until(
|
||
EC.presence_of_element_located(
|
||
(By.XPATH, "//input[contains(@aria-lable,'Verification code') or contains(@placeholder,'Enter your verification code') or contains(@aria-label,'Verification code')]")
|
||
)
|
||
)
|
||
if otp_candidate:
|
||
print("[login] OTP input found in popup -> OTP_REQUIRED")
|
||
return "OTP_REQUIRED"
|
||
except TimeoutException:
|
||
print("[login] No OTP in popup, checking main window")
|
||
self.driver.switch_to.window(original_window)
|
||
|
||
except TimeoutException:
|
||
pass # No modal present
|
||
|
||
if modal_dismissed:
|
||
time.sleep(2)
|
||
try:
|
||
member_search = WebDriverWait(self.driver, 5).until(
|
||
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
|
||
)
|
||
if member_search:
|
||
print("[login] Already authenticated after modal dismiss")
|
||
return "ALREADY_LOGGED_IN"
|
||
except TimeoutException:
|
||
pass
|
||
|
||
# Fill login form
|
||
try:
|
||
email_field = WebDriverWait(self.driver, 10).until(
|
||
EC.element_to_be_clickable((By.XPATH, "//input[@name='username' and @type='text']"))
|
||
)
|
||
except TimeoutException:
|
||
print("[login] Could not find login form - page may have changed")
|
||
return "ERROR: Login form not found"
|
||
|
||
email_field = wait.until(EC.presence_of_element_located((By.XPATH, "//input[@name='username' and @type='text']")))
|
||
email_field.clear()
|
||
email_field.send_keys(self.massddma_username)
|
||
|
||
password_field = wait.until(EC.presence_of_element_located((By.XPATH, "//input[@name='password' and @type='password']")))
|
||
password_field.clear()
|
||
password_field.send_keys(self.massddma_password)
|
||
|
||
# Remember me
|
||
try:
|
||
remember_me_checkbox = wait.until(EC.element_to_be_clickable(
|
||
(By.XPATH, "//label[.//span[contains(text(),'Remember me')]]")
|
||
))
|
||
remember_me_checkbox.click()
|
||
except Exception:
|
||
print("[login] Remember me checkbox not found (continuing).")
|
||
|
||
login_button = wait.until(EC.element_to_be_clickable((By.XPATH, "//button[@type='submit' and @aria-label='Sign in']")))
|
||
login_button.click()
|
||
|
||
# Save credentials hash after login attempt
|
||
if self.massddma_username:
|
||
browser_manager.save_credentials_hash(self.massddma_username)
|
||
|
||
# OTP detection — wait up to 30 seconds for OTP input
|
||
try:
|
||
otp_candidate = WebDriverWait(self.driver, 30).until(
|
||
EC.presence_of_element_located(
|
||
(By.XPATH, "//input[contains(@aria-lable,'Verification code') or contains(@placeholder,'Enter your verification code')]")
|
||
)
|
||
)
|
||
if otp_candidate:
|
||
print("[login] OTP input detected -> OTP_REQUIRED")
|
||
return "OTP_REQUIRED"
|
||
except TimeoutException:
|
||
print("[login] No OTP input detected in allowed time.")
|
||
# Check if we're now on the member search page (login succeeded without OTP)
|
||
try:
|
||
current_url = self.driver.current_url.lower()
|
||
if "member" in current_url or "dashboard" in current_url:
|
||
member_search = WebDriverWait(self.driver, 5).until(
|
||
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
|
||
)
|
||
print("[login] Login successful - now on member search page")
|
||
return "SUCCESS"
|
||
except TimeoutException:
|
||
pass
|
||
|
||
# Check for error messages
|
||
try:
|
||
error_elem = WebDriverWait(self.driver, 3).until(
|
||
EC.presence_of_element_located((By.XPATH, "//*[contains(@class,'error') or contains(text(),'invalid') or contains(text(),'failed')]"))
|
||
)
|
||
print(f"[login] Login failed - error detected: {error_elem.text}")
|
||
return f"ERROR:LOGIN FAILED: {error_elem.text}"
|
||
except TimeoutException:
|
||
pass
|
||
|
||
if "onboarding" in self.driver.current_url.lower() or "login" in self.driver.current_url.lower():
|
||
print("[login] Login failed - still on login page")
|
||
return "ERROR:LOGIN FAILED: Still on login page"
|
||
|
||
print("[login] Assuming login succeeded (no errors detected)")
|
||
return "SUCCESS"
|
||
|
||
except Exception as e:
|
||
print("[login] Exception during login:", e)
|
||
return f"ERROR:LOGIN FAILED: {e}"
|
||
|
||
def step1(self):
|
||
"""Fill search form with all available fields (flexible search)."""
|
||
wait = WebDriverWait(self.driver, 30)
|
||
|
||
try:
|
||
fields = []
|
||
if self.memberId:
|
||
fields.append(f"ID: {self.memberId}")
|
||
if self.firstName:
|
||
fields.append(f"FirstName: {self.firstName}")
|
||
if self.lastName:
|
||
fields.append(f"LastName: {self.lastName}")
|
||
if self.dateOfBirth:
|
||
fields.append(f"DOB: {self.dateOfBirth}")
|
||
print(f"[DDMA step1] Starting search with: {', '.join(fields)}")
|
||
|
||
def replace_with_sendkeys(el, value):
|
||
el.click()
|
||
el.send_keys(Keys.CONTROL, "a")
|
||
el.send_keys(Keys.BACKSPACE)
|
||
el.send_keys(value)
|
||
|
||
# 1. Fill Member ID if provided
|
||
if self.memberId:
|
||
try:
|
||
member_id_input = wait.until(EC.presence_of_element_located(
|
||
(By.XPATH, '//input[@placeholder="Search by member ID"]')
|
||
))
|
||
member_id_input.clear()
|
||
member_id_input.send_keys(self.memberId)
|
||
print(f"[DDMA step1] Entered Member ID: {self.memberId}")
|
||
time.sleep(0.2)
|
||
except Exception as e:
|
||
print(f"[DDMA step1] Warning: Could not fill Member ID: {e}")
|
||
|
||
# 2. Fill DOB if provided
|
||
if self.dateOfBirth:
|
||
try:
|
||
dob_parts = self.dateOfBirth.split("-")
|
||
year = dob_parts[0]
|
||
month = dob_parts[1].zfill(2)
|
||
day = dob_parts[2].zfill(2)
|
||
|
||
dob_container = wait.until(
|
||
EC.presence_of_element_located(
|
||
(By.XPATH, "//div[@data-testid='member-search_date-of-birth']")
|
||
)
|
||
)
|
||
|
||
month_elem = dob_container.find_element(By.XPATH, ".//span[@data-type='month' and @contenteditable='true']")
|
||
day_elem = dob_container.find_element(By.XPATH, ".//span[@data-type='day' and @contenteditable='true']")
|
||
year_elem = dob_container.find_element(By.XPATH, ".//span[@data-type='year' and @contenteditable='true']")
|
||
|
||
replace_with_sendkeys(month_elem, month)
|
||
time.sleep(0.05)
|
||
replace_with_sendkeys(day_elem, day)
|
||
time.sleep(0.05)
|
||
replace_with_sendkeys(year_elem, year)
|
||
print(f"[DDMA step1] Filled DOB: {month}/{day}/{year}")
|
||
except Exception as e:
|
||
print(f"[DDMA step1] Warning: Could not fill DOB: {e}")
|
||
|
||
time.sleep(0.3)
|
||
|
||
# Click Search button
|
||
continue_btn = wait.until(EC.element_to_be_clickable(
|
||
(By.XPATH, '//button[@data-testid="member-search_search-button"]')
|
||
))
|
||
continue_btn.click()
|
||
print("[DDMA step1] Clicked Search button")
|
||
|
||
# Wait for either results row or no-results message (up to 15s)
|
||
try:
|
||
WebDriverWait(self.driver, 15).until(
|
||
EC.any_of(
|
||
EC.presence_of_element_located((By.XPATH, "//tbody//tr")),
|
||
EC.presence_of_element_located((By.XPATH, '//div[@data-testid="member-search-result-no-results"]')),
|
||
)
|
||
)
|
||
except TimeoutException:
|
||
pass # proceed and let step2 handle missing results
|
||
|
||
# Check for no-results error
|
||
try:
|
||
error_msg = self.driver.find_element(By.XPATH, '//div[@data-testid="member-search-result-no-results"]')
|
||
if error_msg:
|
||
print("[DDMA step1] Error: No results found")
|
||
return "ERROR: INVALID SEARCH CRITERIA"
|
||
except Exception:
|
||
pass
|
||
|
||
print("[DDMA step1] Search completed successfully")
|
||
return "Success"
|
||
|
||
except Exception as e:
|
||
print(f"[DDMA step1] Exception: {e}")
|
||
return f"ERROR:STEP1 - {e}"
|
||
|
||
def step2(self):
|
||
"""Navigate to patient detail page and generate PDF."""
|
||
wait = WebDriverWait(self.driver, 90)
|
||
|
||
try:
|
||
import re
|
||
|
||
# Wait for results table, then pause for full render
|
||
try:
|
||
WebDriverWait(self.driver, 10).until(
|
||
EC.presence_of_element_located((By.XPATH, "//tbody//tr"))
|
||
)
|
||
time.sleep(2) # Let the row content fully render after table appears
|
||
except TimeoutException:
|
||
print("[DDMA step2] Warning: Results table not found within timeout")
|
||
|
||
eligibilityText = "unknown"
|
||
foundMemberId = self.memberId or ""
|
||
patientName = ""
|
||
|
||
# Extract eligibility status and member ID from search results row
|
||
try:
|
||
first_row = self.driver.find_element(By.XPATH, "(//tbody//tr)[1]")
|
||
row_text = first_row.text.strip()
|
||
print(f"[DDMA step2] First row text: {row_text[:150]}...")
|
||
|
||
lines = row_text.split('\n') if row_text else []
|
||
for line in lines:
|
||
line = line.strip()
|
||
if line and re.match(r'^[A-Z0-9]{5,}$', line) and not line.startswith('DOB'):
|
||
foundMemberId = line
|
||
print(f"[DDMA step2] Extracted Member ID: {foundMemberId}")
|
||
break
|
||
except Exception as e:
|
||
print(f"[DDMA step2] Error reading first row: {e}")
|
||
|
||
try:
|
||
short_wait = WebDriverWait(self.driver, 3)
|
||
status_link = short_wait.until(EC.presence_of_element_located((
|
||
By.XPATH,
|
||
"(//tbody//tr)[1]//a[contains(@href, 'member-eligibility-search')]"
|
||
)))
|
||
eligibilityText = status_link.text.strip().lower()
|
||
print(f"[DDMA step2] Found eligibility status: {eligibilityText}")
|
||
except Exception:
|
||
try:
|
||
alt_status = self.driver.find_element(By.XPATH, "//*[contains(text(),'Active') or contains(text(),'Inactive') or contains(text(),'Eligible')]")
|
||
eligibilityText = alt_status.text.strip().lower()
|
||
if "active" in eligibilityText or "eligible" in eligibilityText:
|
||
eligibilityText = "active"
|
||
elif "inactive" in eligibilityText:
|
||
eligibilityText = "inactive"
|
||
print(f"[DDMA step2] Found eligibility via alternative: {eligibilityText}")
|
||
except Exception:
|
||
pass
|
||
|
||
# Find the member-details URL from the first row
|
||
print("[DDMA step2] Looking for patient detail link...")
|
||
detail_url = None
|
||
|
||
link_selectors = [
|
||
"(//table//tbody//tr)[1]//td[1]//a",
|
||
"(//tbody//tr)[1]//a[contains(@href, 'member-details')]",
|
||
"(//tbody//tr)[1]//a[contains(@href, 'member')]",
|
||
"//a[contains(@href, 'member-details')]",
|
||
]
|
||
for selector in link_selectors:
|
||
try:
|
||
link_el = WebDriverWait(self.driver, 5).until(
|
||
EC.presence_of_element_located((By.XPATH, selector))
|
||
)
|
||
href = link_el.get_attribute("href")
|
||
if href and "member-details" in href:
|
||
detail_url = href
|
||
print(f"[DDMA step2] Found detail URL: {href}")
|
||
break
|
||
except Exception:
|
||
continue
|
||
|
||
if detail_url:
|
||
# Always navigate via driver.get() — this blocks until the new page loads,
|
||
# unlike click() which returns immediately and causes a race condition.
|
||
print(f"[DDMA step2] Navigating to detail page: {detail_url}")
|
||
self.driver.get(detail_url)
|
||
|
||
# Confirm we actually landed on the detail page (not redirected away)
|
||
try:
|
||
WebDriverWait(self.driver, 15).until(
|
||
lambda d: "member-details" in d.current_url
|
||
)
|
||
print(f"[DDMA step2] Confirmed on detail page: {self.driver.current_url}")
|
||
except Exception:
|
||
print(f"[DDMA step2] Warning: URL after navigation: {self.driver.current_url}")
|
||
|
||
# Wait for meaningful content on the detail page
|
||
for selector in [
|
||
"//*[contains(text(),'Date of Birth') or contains(text(),'Address') or contains(text(),'Member ID')]",
|
||
"//table", "//h1", "//h2",
|
||
]:
|
||
try:
|
||
WebDriverWait(self.driver, 15).until(
|
||
EC.presence_of_element_located((By.XPATH, selector))
|
||
)
|
||
print(f"[DDMA step2] Detail page content loaded: {selector}")
|
||
break
|
||
except Exception:
|
||
continue
|
||
|
||
time.sleep(3) # Let JavaScript finish rendering all sections
|
||
|
||
# Get full page text as lines
|
||
try:
|
||
page_lines = self.driver.execute_script(
|
||
"return document.body.innerText;"
|
||
).split('\n')
|
||
page_lines = [l.strip() for l in page_lines if l.strip()]
|
||
except Exception as e:
|
||
print(f"[DDMA step2] Could not get page text: {e}")
|
||
page_lines = []
|
||
|
||
# UI noise words that appear in accessibility/sort-button text or field labels
|
||
ui_noise = ['click', 'sort', 'activate', 'direction', 'enter', 'space',
|
||
'current', 'use ', 'table', 'column', 'filter', 'search',
|
||
'date', 'birth', 'relationship', 'subscriber', 'coverage',
|
||
'status', 'period', 'network', 'plan', 'deductible']
|
||
|
||
def looks_like_name(text):
|
||
"""Return True if text is a plausible patient name."""
|
||
t = text.strip()
|
||
# Must be 2–60 chars, letters/spaces/hyphens/apostrophes only (no periods)
|
||
if not t or not (2 <= len(t) <= 60):
|
||
return False
|
||
if not re.match(r"^[A-Za-z\s\-']+$", t):
|
||
return False
|
||
# Must not be UI noise
|
||
if any(w in t.lower() for w in ui_noise):
|
||
return False
|
||
return True
|
||
|
||
# Scan lines for the "Name" label. When "Name" appears alone we scan
|
||
# forward past accessibility text to the first plausible name value.
|
||
for i, line in enumerate(page_lines):
|
||
# Case 1: "Name : Value" or "Name: Value" on the same line
|
||
same_line = re.match(
|
||
r'^(?:member\s+)?name\s*[:\-]\s*(.+)$', line, re.IGNORECASE
|
||
)
|
||
if same_line and not re.search(
|
||
r'(provider|group|subscriber|plan)\s+name', line, re.IGNORECASE
|
||
):
|
||
candidate = same_line.group(1).strip()
|
||
if looks_like_name(candidate):
|
||
patientName = candidate
|
||
print(f"[DDMA step2] Extracted name from 'Name:' label: '{patientName}'")
|
||
break
|
||
|
||
# Case 2: "Name" or "Name:" alone on a line — scan forward for value
|
||
elif re.match(r'^(?:member\s+)?name\s*:?\s*$', line, re.IGNORECASE):
|
||
for j in range(i + 1, min(i + 6, len(page_lines))):
|
||
candidate = page_lines[j].strip()
|
||
if looks_like_name(candidate):
|
||
patientName = candidate
|
||
print(f"[DDMA step2] Extracted name from 'Name' label (line +{j-i}): '{patientName}'")
|
||
break
|
||
if patientName:
|
||
break
|
||
else:
|
||
print("[DDMA step2] Warning: Could not find patient detail link")
|
||
|
||
# Clean patient name — strip any remaining date artifacts
|
||
if patientName:
|
||
cleaned = re.sub(r'\s*DOB[:\s]*\d{1,2}/\d{1,2}/\d{2,4}\s*', '', patientName, flags=re.IGNORECASE).strip()
|
||
cleaned = re.sub(r'\s+\d{1,2}/\d{1,2}/\d{2,4}$', '', cleaned).strip()
|
||
patientName = cleaned if cleaned else patientName
|
||
|
||
if not patientName:
|
||
print("[DDMA step2] Could not extract patient name from detail page")
|
||
|
||
# Wait for the page to be fully ready before capturing PDF
|
||
try:
|
||
WebDriverWait(self.driver, 30).until(
|
||
lambda d: d.execute_script("return document.readyState") == "complete"
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
# Extra wait for lazy-loaded sections (benefits summary, member history, etc.)
|
||
time.sleep(4)
|
||
|
||
# Generate PDF via Chrome DevTools Protocol
|
||
print("[DDMA step2] Generating PDF of patient detail page...")
|
||
pdf_options = {
|
||
"landscape": False,
|
||
"displayHeaderFooter": False,
|
||
"printBackground": True,
|
||
"preferCSSPageSize": True,
|
||
"paperWidth": 8.5,
|
||
"paperHeight": 11,
|
||
"marginTop": 0.4,
|
||
"marginBottom": 0.4,
|
||
"marginLeft": 0.4,
|
||
"marginRight": 0.4,
|
||
"scale": 0.9,
|
||
}
|
||
|
||
result = self.driver.execute_cdp_cmd("Page.printToPDF", pdf_options)
|
||
pdf_data = base64.b64decode(result.get('data', ''))
|
||
pdf_id = foundMemberId or self.memberId or "unknown"
|
||
pdf_path = os.path.join(self.download_dir, f"eligibility_{pdf_id}.pdf")
|
||
with open(pdf_path, "wb") as f:
|
||
f.write(pdf_data)
|
||
|
||
print(f"[DDMA step2] PDF saved at: {pdf_path}")
|
||
|
||
# Close the browser window after PDF (session preserved in profile)
|
||
try:
|
||
from ddma_browser_manager import get_browser_manager
|
||
get_browser_manager().quit_driver()
|
||
print("[step2] Browser closed - session preserved in profile")
|
||
except Exception as e:
|
||
print(f"[step2] Error closing browser: {e}")
|
||
|
||
print(f"[DDMA step2] Final — PatientName: '{patientName}', MemberID: '{foundMemberId}'")
|
||
|
||
return {
|
||
"status": "success",
|
||
"eligibility": eligibilityText,
|
||
"ss_path": pdf_path, # kept for backward compatibility
|
||
"pdf_path": pdf_path, # explicit pdf_path
|
||
"patientName": patientName,
|
||
"memberId": foundMemberId,
|
||
}
|
||
|
||
except Exception as e:
|
||
print("ERROR in step2:", e)
|
||
# Cleanup download dir on error
|
||
try:
|
||
dl = os.path.abspath(self.download_dir)
|
||
if os.path.isdir(dl):
|
||
for name in os.listdir(dl):
|
||
item = os.path.join(dl, name)
|
||
try:
|
||
if os.path.isfile(item) or os.path.islink(item):
|
||
os.remove(item)
|
||
except Exception as rm_err:
|
||
print(f"[cleanup] failed to remove {item}: {rm_err}")
|
||
except Exception as cleanup_exc:
|
||
print(f"[cleanup] unexpected error: {cleanup_exc}")
|
||
return {"status": "error", "message": str(e)}
|
||
|
||
def main_workflow(self, url):
|
||
try:
|
||
self.config_driver()
|
||
self.driver.maximize_window()
|
||
time.sleep(3)
|
||
|
||
login_result = self.login(url)
|
||
if login_result.startswith("ERROR"):
|
||
return {"status": "error", "message": login_result}
|
||
if login_result == "OTP_REQUIRED":
|
||
return {"status": "otp_required", "message": "OTP required after login"}
|
||
|
||
step1_result = self.step1()
|
||
if step1_result.startswith("ERROR"):
|
||
return {"status": "error", "message": step1_result}
|
||
|
||
step2_result = self.step2()
|
||
if step2_result.get("status") == "error":
|
||
return {"status": "error", "message": step2_result.get("message")}
|
||
|
||
return step2_result
|
||
except Exception as e:
|
||
return {"status": "error", "message": str(e)}
|
||
# NOTE: Do NOT quit driver — keep browser alive for next patient
|