- Route DDMA eligibility through InProcessQueue (concurrency=1) so it queues behind other selenium jobs instead of running concurrently - New ddmaEligibilityProcessor: starts Python session, polls for OTP/ completion via socket events, saves PDF and updates patient DB - Frontend ddma-buton-modal now uses shared app socket + job:update pattern (drops private socket connection) - SeleniumService: upgrade ddma_browser_manager with credential hash tracking, anti-detection options, and startup session clearing; upgrade DDMA worker with firstName/lastName support, PDF via printToPDF, force-logout on credential change; upgrade helpers with dual OTP strategy (app API + browser polling); add /clear-ddma-session endpoint; reduce fixed sleeps with smart WebDriverWait Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
669 lines
30 KiB
Python
Executable File
669 lines
30 KiB
Python
Executable File
from selenium import webdriver
|
|
from selenium.common.exceptions import WebDriverException, TimeoutException
|
|
from selenium.webdriver.chrome.service import Service
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.common.keys import Keys
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from webdriver_manager.chrome import ChromeDriverManager
|
|
import time
|
|
import os
|
|
import base64
|
|
|
|
from ddma_browser_manager import get_browser_manager
|
|
|
|
class AutomationDeltaDentalMAEligibilityCheck:
|
|
def __init__(self, data):
|
|
self.headless = False
|
|
self.driver = None
|
|
|
|
self.data = data.get("data", {}) if isinstance(data, dict) else {}
|
|
|
|
# Flatten values for convenience
|
|
self.memberId = self.data.get("memberId", "")
|
|
self.dateOfBirth = self.data.get("dateOfBirth", "")
|
|
self.firstName = self.data.get("firstName", "")
|
|
self.lastName = self.data.get("lastName", "")
|
|
self.massddma_username = self.data.get("massddmaUsername", "")
|
|
self.massddma_password = self.data.get("massddmaPassword", "")
|
|
|
|
# Use browser manager's download dir
|
|
self.download_dir = get_browser_manager().download_dir
|
|
os.makedirs(self.download_dir, exist_ok=True)
|
|
|
|
def config_driver(self):
|
|
# Use persistent browser from manager (keeps device trust tokens)
|
|
self.driver = get_browser_manager().get_driver(self.headless)
|
|
|
|
def _force_logout(self):
|
|
"""Force logout by clearing cookies when credentials change."""
|
|
try:
|
|
print("[DDMA login] Forcing logout due to credential change...")
|
|
browser_manager = get_browser_manager()
|
|
|
|
# Try to click logout button if visible
|
|
try:
|
|
self.driver.get("https://providers.deltadentalma.com/")
|
|
time.sleep(2)
|
|
|
|
logout_selectors = [
|
|
"//button[contains(text(), 'Log out') or contains(text(), 'Logout') or contains(text(), 'Sign out')]",
|
|
"//a[contains(text(), 'Log out') or contains(text(), 'Logout') or contains(text(), 'Sign out')]",
|
|
"//button[@aria-label='Log out' or @aria-label='Logout' or @aria-label='Sign out']",
|
|
"//*[contains(@class, 'logout') or contains(@class, 'signout')]"
|
|
]
|
|
|
|
for selector in logout_selectors:
|
|
try:
|
|
logout_btn = WebDriverWait(self.driver, 3).until(
|
|
EC.element_to_be_clickable((By.XPATH, selector))
|
|
)
|
|
logout_btn.click()
|
|
print("[DDMA login] Clicked logout button")
|
|
time.sleep(2)
|
|
break
|
|
except TimeoutException:
|
|
continue
|
|
except Exception as e:
|
|
print(f"[DDMA login] Could not click logout button: {e}")
|
|
|
|
# Clear cookies as backup
|
|
try:
|
|
self.driver.delete_all_cookies()
|
|
print("[DDMA login] Cleared all cookies")
|
|
except Exception as e:
|
|
print(f"[DDMA login] Error clearing cookies: {e}")
|
|
|
|
browser_manager.clear_credentials_hash()
|
|
print("[DDMA login] Logout complete")
|
|
return True
|
|
except Exception as e:
|
|
print(f"[DDMA login] Error during forced logout: {e}")
|
|
return False
|
|
|
|
def login(self, url):
|
|
wait = WebDriverWait(self.driver, 30)
|
|
browser_manager = get_browser_manager()
|
|
|
|
try:
|
|
# Check if credentials changed — force logout first
|
|
if self.massddma_username and browser_manager.credentials_changed(self.massddma_username):
|
|
self._force_logout()
|
|
self.driver.get(url)
|
|
time.sleep(2)
|
|
|
|
# Check if already on a logged-in page (persistent session from profile)
|
|
try:
|
|
current_url = self.driver.current_url
|
|
print(f"[login] Current URL: {current_url}")
|
|
|
|
logged_in_patterns = ["member", "dashboard", "eligibility", "search", "patients"]
|
|
is_logged_in_url = any(pattern in current_url.lower() for pattern in logged_in_patterns)
|
|
|
|
if is_logged_in_url and "onboarding" not in current_url.lower():
|
|
print("[login] Already on logged-in page - skipping login entirely")
|
|
if "member" not in current_url.lower():
|
|
try:
|
|
member_search = WebDriverWait(self.driver, 5).until(
|
|
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
|
|
)
|
|
print("[login] Found member search input - returning ALREADY_LOGGED_IN")
|
|
return "ALREADY_LOGGED_IN"
|
|
except TimeoutException:
|
|
members_url = "https://providers.deltadentalma.com/members"
|
|
print(f"[login] Navigating to members page: {members_url}")
|
|
self.driver.get(members_url)
|
|
time.sleep(2)
|
|
|
|
try:
|
|
member_search = WebDriverWait(self.driver, 5).until(
|
|
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
|
|
)
|
|
print("[login] Member search found - ALREADY_LOGGED_IN")
|
|
return "ALREADY_LOGGED_IN"
|
|
except TimeoutException:
|
|
print("[login] Could not find member search, will try login")
|
|
except Exception as e:
|
|
print(f"[login] Error checking current state: {e}")
|
|
|
|
# Navigate to login URL
|
|
self.driver.get(url)
|
|
time.sleep(2)
|
|
|
|
# Check if session redirected us straight to member search
|
|
try:
|
|
current_url = self.driver.current_url
|
|
print(f"[login] URL after navigation: {current_url}")
|
|
|
|
if "onboarding" not in current_url.lower():
|
|
member_search = WebDriverWait(self.driver, 3).until(
|
|
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
|
|
)
|
|
if member_search:
|
|
print("[login] Session valid - skipping login")
|
|
return "ALREADY_LOGGED_IN"
|
|
except TimeoutException:
|
|
print("[login] Proceeding with login")
|
|
|
|
# Dismiss any "Authentication flow continued in another tab" modal
|
|
modal_dismissed = False
|
|
try:
|
|
ok_button = WebDriverWait(self.driver, 3).until(
|
|
EC.element_to_be_clickable((By.XPATH, "//button[normalize-space(text())='Ok' or normalize-space(text())='OK']"))
|
|
)
|
|
ok_button.click()
|
|
print("[login] Dismissed authentication modal")
|
|
modal_dismissed = True
|
|
time.sleep(2)
|
|
|
|
all_windows = self.driver.window_handles
|
|
print(f"[login] Windows after modal dismiss: {len(all_windows)}")
|
|
|
|
if len(all_windows) > 1:
|
|
original_window = self.driver.current_window_handle
|
|
for window in all_windows:
|
|
if window != original_window:
|
|
self.driver.switch_to.window(window)
|
|
print("[login] Switched to auth popup window")
|
|
break
|
|
|
|
try:
|
|
otp_candidate = WebDriverWait(self.driver, 10).until(
|
|
EC.presence_of_element_located(
|
|
(By.XPATH, "//input[contains(@aria-lable,'Verification code') or contains(@placeholder,'Enter your verification code') or contains(@aria-label,'Verification code')]")
|
|
)
|
|
)
|
|
if otp_candidate:
|
|
print("[login] OTP input found in popup -> OTP_REQUIRED")
|
|
return "OTP_REQUIRED"
|
|
except TimeoutException:
|
|
print("[login] No OTP in popup, checking main window")
|
|
self.driver.switch_to.window(original_window)
|
|
|
|
except TimeoutException:
|
|
pass # No modal present
|
|
|
|
if modal_dismissed:
|
|
time.sleep(2)
|
|
try:
|
|
member_search = WebDriverWait(self.driver, 5).until(
|
|
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
|
|
)
|
|
if member_search:
|
|
print("[login] Already authenticated after modal dismiss")
|
|
return "ALREADY_LOGGED_IN"
|
|
except TimeoutException:
|
|
pass
|
|
|
|
# Fill login form
|
|
try:
|
|
email_field = WebDriverWait(self.driver, 10).until(
|
|
EC.element_to_be_clickable((By.XPATH, "//input[@name='username' and @type='text']"))
|
|
)
|
|
except TimeoutException:
|
|
print("[login] Could not find login form - page may have changed")
|
|
return "ERROR: Login form not found"
|
|
|
|
email_field = wait.until(EC.presence_of_element_located((By.XPATH, "//input[@name='username' and @type='text']")))
|
|
email_field.clear()
|
|
email_field.send_keys(self.massddma_username)
|
|
|
|
password_field = wait.until(EC.presence_of_element_located((By.XPATH, "//input[@name='password' and @type='password']")))
|
|
password_field.clear()
|
|
password_field.send_keys(self.massddma_password)
|
|
|
|
# Remember me
|
|
try:
|
|
remember_me_checkbox = wait.until(EC.element_to_be_clickable(
|
|
(By.XPATH, "//label[.//span[contains(text(),'Remember me')]]")
|
|
))
|
|
remember_me_checkbox.click()
|
|
except Exception:
|
|
print("[login] Remember me checkbox not found (continuing).")
|
|
|
|
login_button = wait.until(EC.element_to_be_clickable((By.XPATH, "//button[@type='submit' and @aria-label='Sign in']")))
|
|
login_button.click()
|
|
|
|
# Save credentials hash after login attempt
|
|
if self.massddma_username:
|
|
browser_manager.save_credentials_hash(self.massddma_username)
|
|
|
|
# OTP detection — wait up to 30 seconds for OTP input
|
|
try:
|
|
otp_candidate = WebDriverWait(self.driver, 30).until(
|
|
EC.presence_of_element_located(
|
|
(By.XPATH, "//input[contains(@aria-lable,'Verification code') or contains(@placeholder,'Enter your verification code')]")
|
|
)
|
|
)
|
|
if otp_candidate:
|
|
print("[login] OTP input detected -> OTP_REQUIRED")
|
|
return "OTP_REQUIRED"
|
|
except TimeoutException:
|
|
print("[login] No OTP input detected in allowed time.")
|
|
# Check if we're now on the member search page (login succeeded without OTP)
|
|
try:
|
|
current_url = self.driver.current_url.lower()
|
|
if "member" in current_url or "dashboard" in current_url:
|
|
member_search = WebDriverWait(self.driver, 5).until(
|
|
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
|
|
)
|
|
print("[login] Login successful - now on member search page")
|
|
return "SUCCESS"
|
|
except TimeoutException:
|
|
pass
|
|
|
|
# Check for error messages
|
|
try:
|
|
error_elem = WebDriverWait(self.driver, 3).until(
|
|
EC.presence_of_element_located((By.XPATH, "//*[contains(@class,'error') or contains(text(),'invalid') or contains(text(),'failed')]"))
|
|
)
|
|
print(f"[login] Login failed - error detected: {error_elem.text}")
|
|
return f"ERROR:LOGIN FAILED: {error_elem.text}"
|
|
except TimeoutException:
|
|
pass
|
|
|
|
if "onboarding" in self.driver.current_url.lower() or "login" in self.driver.current_url.lower():
|
|
print("[login] Login failed - still on login page")
|
|
return "ERROR:LOGIN FAILED: Still on login page"
|
|
|
|
print("[login] Assuming login succeeded (no errors detected)")
|
|
return "SUCCESS"
|
|
|
|
except Exception as e:
|
|
print("[login] Exception during login:", e)
|
|
return f"ERROR:LOGIN FAILED: {e}"
|
|
|
|
def step1(self):
|
|
"""Fill search form with all available fields (flexible search)."""
|
|
wait = WebDriverWait(self.driver, 30)
|
|
|
|
try:
|
|
fields = []
|
|
if self.memberId:
|
|
fields.append(f"ID: {self.memberId}")
|
|
if self.firstName:
|
|
fields.append(f"FirstName: {self.firstName}")
|
|
if self.lastName:
|
|
fields.append(f"LastName: {self.lastName}")
|
|
if self.dateOfBirth:
|
|
fields.append(f"DOB: {self.dateOfBirth}")
|
|
print(f"[DDMA step1] Starting search with: {', '.join(fields)}")
|
|
|
|
def replace_with_sendkeys(el, value):
|
|
el.click()
|
|
el.send_keys(Keys.CONTROL, "a")
|
|
el.send_keys(Keys.BACKSPACE)
|
|
el.send_keys(value)
|
|
|
|
# 1. Fill Member ID if provided
|
|
if self.memberId:
|
|
try:
|
|
member_id_input = wait.until(EC.presence_of_element_located(
|
|
(By.XPATH, '//input[@placeholder="Search by member ID"]')
|
|
))
|
|
member_id_input.clear()
|
|
member_id_input.send_keys(self.memberId)
|
|
print(f"[DDMA step1] Entered Member ID: {self.memberId}")
|
|
time.sleep(0.2)
|
|
except Exception as e:
|
|
print(f"[DDMA step1] Warning: Could not fill Member ID: {e}")
|
|
|
|
# 2. Fill DOB if provided
|
|
if self.dateOfBirth:
|
|
try:
|
|
dob_parts = self.dateOfBirth.split("-")
|
|
year = dob_parts[0]
|
|
month = dob_parts[1].zfill(2)
|
|
day = dob_parts[2].zfill(2)
|
|
|
|
dob_container = wait.until(
|
|
EC.presence_of_element_located(
|
|
(By.XPATH, "//div[@data-testid='member-search_date-of-birth']")
|
|
)
|
|
)
|
|
|
|
month_elem = dob_container.find_element(By.XPATH, ".//span[@data-type='month' and @contenteditable='true']")
|
|
day_elem = dob_container.find_element(By.XPATH, ".//span[@data-type='day' and @contenteditable='true']")
|
|
year_elem = dob_container.find_element(By.XPATH, ".//span[@data-type='year' and @contenteditable='true']")
|
|
|
|
replace_with_sendkeys(month_elem, month)
|
|
time.sleep(0.05)
|
|
replace_with_sendkeys(day_elem, day)
|
|
time.sleep(0.05)
|
|
replace_with_sendkeys(year_elem, year)
|
|
print(f"[DDMA step1] Filled DOB: {month}/{day}/{year}")
|
|
except Exception as e:
|
|
print(f"[DDMA step1] Warning: Could not fill DOB: {e}")
|
|
|
|
# 3. Fill First Name if provided
|
|
if self.firstName:
|
|
try:
|
|
first_name_input = wait.until(EC.presence_of_element_located(
|
|
(By.XPATH, '//input[@placeholder="First name - 1 char minimum" or contains(@placeholder,"first name") or contains(@name,"firstName")]')
|
|
))
|
|
first_name_input.clear()
|
|
first_name_input.send_keys(self.firstName)
|
|
print(f"[DDMA step1] Entered First Name: {self.firstName}")
|
|
time.sleep(0.2)
|
|
except Exception as e:
|
|
print(f"[DDMA step1] Warning: Could not fill First Name: {e}")
|
|
|
|
# 4. Fill Last Name if provided
|
|
if self.lastName:
|
|
try:
|
|
last_name_input = wait.until(EC.presence_of_element_located(
|
|
(By.XPATH, '//input[@placeholder="Last name - 2 char minimum" or contains(@placeholder,"last name") or contains(@name,"lastName")]')
|
|
))
|
|
last_name_input.clear()
|
|
last_name_input.send_keys(self.lastName)
|
|
print(f"[DDMA step1] Entered Last Name: {self.lastName}")
|
|
time.sleep(0.2)
|
|
except Exception as e:
|
|
print(f"[DDMA step1] Warning: Could not fill Last Name: {e}")
|
|
|
|
time.sleep(0.3)
|
|
|
|
# Click Search button
|
|
continue_btn = wait.until(EC.element_to_be_clickable(
|
|
(By.XPATH, '//button[@data-testid="member-search_search-button"]')
|
|
))
|
|
continue_btn.click()
|
|
print("[DDMA step1] Clicked Search button")
|
|
|
|
# Wait for either results row or no-results message (up to 15s)
|
|
try:
|
|
WebDriverWait(self.driver, 15).until(
|
|
EC.any_of(
|
|
EC.presence_of_element_located((By.XPATH, "//tbody//tr")),
|
|
EC.presence_of_element_located((By.XPATH, '//div[@data-testid="member-search-result-no-results"]')),
|
|
)
|
|
)
|
|
except TimeoutException:
|
|
pass # proceed and let step2 handle missing results
|
|
|
|
# Check for no-results error
|
|
try:
|
|
error_msg = self.driver.find_element(By.XPATH, '//div[@data-testid="member-search-result-no-results"]')
|
|
if error_msg:
|
|
print("[DDMA step1] Error: No results found")
|
|
return "ERROR: INVALID SEARCH CRITERIA"
|
|
except Exception:
|
|
pass
|
|
|
|
print("[DDMA step1] Search completed successfully")
|
|
return "Success"
|
|
|
|
except Exception as e:
|
|
print(f"[DDMA step1] Exception: {e}")
|
|
return f"ERROR:STEP1 - {e}"
|
|
|
|
def step2(self):
|
|
"""Navigate to patient detail page and generate PDF."""
|
|
wait = WebDriverWait(self.driver, 90)
|
|
|
|
try:
|
|
import re
|
|
|
|
# Wait for results table
|
|
try:
|
|
WebDriverWait(self.driver, 10).until(
|
|
EC.presence_of_element_located((By.XPATH, "//tbody//tr"))
|
|
)
|
|
except TimeoutException:
|
|
print("[DDMA step2] Warning: Results table not found within timeout")
|
|
|
|
eligibilityText = "unknown"
|
|
foundMemberId = ""
|
|
patientName = ""
|
|
|
|
# Extract data from first result row
|
|
try:
|
|
first_row = self.driver.find_element(By.XPATH, "(//tbody//tr)[1]")
|
|
row_text = first_row.text.strip()
|
|
print(f"[DDMA step2] First row text: {row_text[:150]}...")
|
|
|
|
if row_text:
|
|
lines = row_text.split('\n')
|
|
|
|
# Extract patient name (first line, strip DOB if present)
|
|
if lines:
|
|
potential_name = lines[0].strip()
|
|
potential_name = re.sub(r'\s*DOB[:\s]*\d{1,2}/\d{1,2}/\d{2,4}\s*', '', potential_name, flags=re.IGNORECASE).strip()
|
|
if potential_name and not potential_name.startswith('DOB') and not potential_name.isdigit():
|
|
patientName = potential_name
|
|
print(f"[DDMA step2] Extracted patient name: '{patientName}'")
|
|
|
|
# Extract Member ID
|
|
for line in lines:
|
|
line = line.strip()
|
|
if line and re.match(r'^[A-Z0-9]{5,}$', line) and not line.startswith('DOB'):
|
|
foundMemberId = line
|
|
print(f"[DDMA step2] Extracted Member ID: {foundMemberId}")
|
|
break
|
|
|
|
if not foundMemberId and self.memberId:
|
|
foundMemberId = self.memberId
|
|
print(f"[DDMA step2] Using input Member ID: {foundMemberId}")
|
|
|
|
except Exception as e:
|
|
print(f"[DDMA step2] Error extracting data from row: {e}")
|
|
if self.memberId:
|
|
foundMemberId = self.memberId
|
|
|
|
# Extract eligibility status
|
|
try:
|
|
short_wait = WebDriverWait(self.driver, 3)
|
|
status_link = short_wait.until(EC.presence_of_element_located((
|
|
By.XPATH,
|
|
"(//tbody//tr)[1]//a[contains(@href, 'member-eligibility-search')]"
|
|
)))
|
|
eligibilityText = status_link.text.strip().lower()
|
|
print(f"[DDMA step2] Found eligibility status: {eligibilityText}")
|
|
except Exception:
|
|
try:
|
|
alt_status = self.driver.find_element(By.XPATH, "//*[contains(text(),'Active') or contains(text(),'Inactive') or contains(text(),'Eligible')]")
|
|
eligibilityText = alt_status.text.strip().lower()
|
|
if "active" in eligibilityText or "eligible" in eligibilityText:
|
|
eligibilityText = "active"
|
|
elif "inactive" in eligibilityText:
|
|
eligibilityText = "inactive"
|
|
print(f"[DDMA step2] Found eligibility via alternative: {eligibilityText}")
|
|
except Exception:
|
|
pass
|
|
|
|
# Navigate to detailed patient page
|
|
print("[DDMA step2] Navigating to patient detail page...")
|
|
patient_name_clicked = False
|
|
detail_url = None
|
|
|
|
patient_link_selectors = [
|
|
"(//table//tbody//tr)[1]//td[1]//a",
|
|
"(//tbody//tr)[1]//a[contains(@href, 'member-details')]",
|
|
"(//tbody//tr)[1]//a[contains(@href, 'member')]",
|
|
]
|
|
|
|
for selector in patient_link_selectors:
|
|
try:
|
|
patient_link = WebDriverWait(self.driver, 5).until(
|
|
EC.presence_of_element_located((By.XPATH, selector))
|
|
)
|
|
link_text = patient_link.text.strip()
|
|
href = patient_link.get_attribute("href")
|
|
print(f"[DDMA step2] Found patient link: text='{link_text}', href={href}")
|
|
|
|
if link_text and not patientName:
|
|
patientName = link_text
|
|
|
|
if href and "member-details" in href:
|
|
detail_url = href
|
|
patient_name_clicked = True
|
|
break
|
|
except Exception as e:
|
|
print(f"[DDMA step2] Selector '{selector}' failed: {e}")
|
|
continue
|
|
|
|
if not detail_url:
|
|
try:
|
|
all_links = self.driver.find_elements(By.XPATH, "//a[contains(@href, 'member-details')]")
|
|
if all_links:
|
|
detail_url = all_links[0].get_attribute("href")
|
|
patient_name_clicked = True
|
|
print(f"[DDMA step2] Fallback member-details link: {detail_url}")
|
|
except Exception as e:
|
|
print(f"[DDMA step2] Could not find member-details link: {e}")
|
|
|
|
if patient_name_clicked and detail_url:
|
|
print(f"[DDMA step2] Navigating directly to: {detail_url}")
|
|
self.driver.get(detail_url)
|
|
|
|
# Wait for page to be ready
|
|
try:
|
|
WebDriverWait(self.driver, 30).until(
|
|
lambda d: d.execute_script("return document.readyState") == "complete"
|
|
)
|
|
except Exception:
|
|
print("[DDMA step2] Warning: document.readyState did not become 'complete'")
|
|
|
|
# Wait for meaningful content to appear
|
|
content_selectors = [
|
|
"//div[contains(@class,'member') or contains(@class,'detail') or contains(@class,'patient')]",
|
|
"//h1", "//h2", "//table",
|
|
"//*[contains(text(),'Member ID') or contains(text(),'Name') or contains(text(),'Date of Birth')]",
|
|
]
|
|
for selector in content_selectors:
|
|
try:
|
|
WebDriverWait(self.driver, 10).until(
|
|
EC.presence_of_element_located((By.XPATH, selector))
|
|
)
|
|
print(f"[DDMA step2] Content loaded: {selector}")
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
time.sleep(1) # Brief settle for any late-rendering elements
|
|
|
|
# Try to extract patient name from detail page if not already found
|
|
if not patientName:
|
|
for selector in ["//h1", "//h2", "//*[contains(@class,'patient-name') or contains(@class,'member-name')]"]:
|
|
try:
|
|
name_elem = self.driver.find_element(By.XPATH, selector)
|
|
name_text = name_elem.text.strip()
|
|
if name_text and len(name_text) > 1:
|
|
if not any(x in name_text.lower() for x in ['active', 'inactive', 'eligible', 'search', 'date', 'print']):
|
|
patientName = name_text
|
|
print(f"[DDMA step2] Found patient name on detail page: {patientName}")
|
|
break
|
|
except Exception:
|
|
continue
|
|
else:
|
|
print("[DDMA step2] Warning: Could not navigate to patient detail page")
|
|
if not patientName:
|
|
try:
|
|
name_elem = self.driver.find_element(By.XPATH, "(//tbody//tr)[1]//td[1]")
|
|
patientName = name_elem.text.strip()
|
|
except Exception:
|
|
pass
|
|
|
|
if not patientName:
|
|
print("[DDMA step2] Could not extract patient name")
|
|
|
|
# Clean patient name
|
|
if patientName:
|
|
cleaned = re.sub(r'\s*DOB[:\s]*\d{1,2}/\d{1,2}/\d{2,4}\s*', '', patientName, flags=re.IGNORECASE).strip()
|
|
if cleaned:
|
|
patientName = cleaned
|
|
|
|
# Wait for page ready before PDF
|
|
try:
|
|
WebDriverWait(self.driver, 30).until(
|
|
lambda d: d.execute_script("return document.readyState") == "complete"
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
# Generate PDF via Chrome DevTools Protocol
|
|
print("[DDMA step2] Generating PDF of patient detail page...")
|
|
pdf_options = {
|
|
"landscape": False,
|
|
"displayHeaderFooter": False,
|
|
"printBackground": True,
|
|
"preferCSSPageSize": True,
|
|
"paperWidth": 8.5,
|
|
"paperHeight": 11,
|
|
"marginTop": 0.4,
|
|
"marginBottom": 0.4,
|
|
"marginLeft": 0.4,
|
|
"marginRight": 0.4,
|
|
"scale": 0.9,
|
|
}
|
|
|
|
result = self.driver.execute_cdp_cmd("Page.printToPDF", pdf_options)
|
|
pdf_data = base64.b64decode(result.get('data', ''))
|
|
pdf_id = foundMemberId or self.memberId or "unknown"
|
|
pdf_path = os.path.join(self.download_dir, f"eligibility_{pdf_id}.pdf")
|
|
with open(pdf_path, "wb") as f:
|
|
f.write(pdf_data)
|
|
|
|
print(f"[DDMA step2] PDF saved at: {pdf_path}")
|
|
|
|
# Close the browser window after PDF (session preserved in profile)
|
|
try:
|
|
from ddma_browser_manager import get_browser_manager
|
|
get_browser_manager().quit_driver()
|
|
print("[step2] Browser closed - session preserved in profile")
|
|
except Exception as e:
|
|
print(f"[step2] Error closing browser: {e}")
|
|
|
|
print(f"[DDMA step2] Final — PatientName: '{patientName}', MemberID: '{foundMemberId}'")
|
|
|
|
return {
|
|
"status": "success",
|
|
"eligibility": eligibilityText,
|
|
"ss_path": pdf_path, # kept for backward compatibility
|
|
"pdf_path": pdf_path, # explicit pdf_path
|
|
"patientName": patientName,
|
|
"memberId": foundMemberId,
|
|
}
|
|
|
|
except Exception as e:
|
|
print("ERROR in step2:", e)
|
|
# Cleanup download dir on error
|
|
try:
|
|
dl = os.path.abspath(self.download_dir)
|
|
if os.path.isdir(dl):
|
|
for name in os.listdir(dl):
|
|
item = os.path.join(dl, name)
|
|
try:
|
|
if os.path.isfile(item) or os.path.islink(item):
|
|
os.remove(item)
|
|
except Exception as rm_err:
|
|
print(f"[cleanup] failed to remove {item}: {rm_err}")
|
|
except Exception as cleanup_exc:
|
|
print(f"[cleanup] unexpected error: {cleanup_exc}")
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
def main_workflow(self, url):
|
|
try:
|
|
self.config_driver()
|
|
self.driver.maximize_window()
|
|
time.sleep(3)
|
|
|
|
login_result = self.login(url)
|
|
if login_result.startswith("ERROR"):
|
|
return {"status": "error", "message": login_result}
|
|
if login_result == "OTP_REQUIRED":
|
|
return {"status": "otp_required", "message": "OTP required after login"}
|
|
|
|
step1_result = self.step1()
|
|
if step1_result.startswith("ERROR"):
|
|
return {"status": "error", "message": step1_result}
|
|
|
|
step2_result = self.step2()
|
|
if step2_result.get("status") == "error":
|
|
return {"status": "error", "message": step2_result.get("message")}
|
|
|
|
return step2_result
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|
|
# NOTE: Do NOT quit driver — keep browser alive for next patient
|