Files
DentalManagementMH04/apps/SeleniumServiceold/selenium_DDMA_eligibilityCheckWorker.py

669 lines
30 KiB
Python
Executable File

from selenium import webdriver
from selenium.common.exceptions import WebDriverException, TimeoutException
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
import time
import os
import base64
from ddma_browser_manager import get_browser_manager
class AutomationDeltaDentalMAEligibilityCheck:
def __init__(self, data):
self.headless = False
self.driver = None
self.data = data.get("data", {}) if isinstance(data, dict) else {}
# Flatten values for convenience
self.memberId = self.data.get("memberId", "")
self.dateOfBirth = self.data.get("dateOfBirth", "")
self.firstName = self.data.get("firstName", "")
self.lastName = self.data.get("lastName", "")
self.massddma_username = self.data.get("massddmaUsername", "")
self.massddma_password = self.data.get("massddmaPassword", "")
# Use browser manager's download dir
self.download_dir = get_browser_manager().download_dir
os.makedirs(self.download_dir, exist_ok=True)
def config_driver(self):
# Use persistent browser from manager (keeps device trust tokens)
self.driver = get_browser_manager().get_driver(self.headless)
def _force_logout(self):
"""Force logout by clearing cookies when credentials change."""
try:
print("[DDMA login] Forcing logout due to credential change...")
browser_manager = get_browser_manager()
# Try to click logout button if visible
try:
self.driver.get("https://providers.deltadentalma.com/")
time.sleep(2)
logout_selectors = [
"//button[contains(text(), 'Log out') or contains(text(), 'Logout') or contains(text(), 'Sign out')]",
"//a[contains(text(), 'Log out') or contains(text(), 'Logout') or contains(text(), 'Sign out')]",
"//button[@aria-label='Log out' or @aria-label='Logout' or @aria-label='Sign out']",
"//*[contains(@class, 'logout') or contains(@class, 'signout')]"
]
for selector in logout_selectors:
try:
logout_btn = WebDriverWait(self.driver, 3).until(
EC.element_to_be_clickable((By.XPATH, selector))
)
logout_btn.click()
print("[DDMA login] Clicked logout button")
time.sleep(2)
break
except TimeoutException:
continue
except Exception as e:
print(f"[DDMA login] Could not click logout button: {e}")
# Clear cookies as backup
try:
self.driver.delete_all_cookies()
print("[DDMA login] Cleared all cookies")
except Exception as e:
print(f"[DDMA login] Error clearing cookies: {e}")
browser_manager.clear_credentials_hash()
print("[DDMA login] Logout complete")
return True
except Exception as e:
print(f"[DDMA login] Error during forced logout: {e}")
return False
def login(self, url):
wait = WebDriverWait(self.driver, 30)
browser_manager = get_browser_manager()
try:
# Check if credentials changed — force logout first
if self.massddma_username and browser_manager.credentials_changed(self.massddma_username):
self._force_logout()
self.driver.get(url)
time.sleep(2)
# Check if already on a logged-in page (persistent session from profile)
try:
current_url = self.driver.current_url
print(f"[login] Current URL: {current_url}")
logged_in_patterns = ["member", "dashboard", "eligibility", "search", "patients"]
is_logged_in_url = any(pattern in current_url.lower() for pattern in logged_in_patterns)
if is_logged_in_url and "onboarding" not in current_url.lower():
print("[login] Already on logged-in page - skipping login entirely")
if "member" not in current_url.lower():
try:
member_search = WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
)
print("[login] Found member search input - returning ALREADY_LOGGED_IN")
return "ALREADY_LOGGED_IN"
except TimeoutException:
members_url = "https://providers.deltadentalma.com/members"
print(f"[login] Navigating to members page: {members_url}")
self.driver.get(members_url)
time.sleep(2)
try:
member_search = WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
)
print("[login] Member search found - ALREADY_LOGGED_IN")
return "ALREADY_LOGGED_IN"
except TimeoutException:
print("[login] Could not find member search, will try login")
except Exception as e:
print(f"[login] Error checking current state: {e}")
# Navigate to login URL
self.driver.get(url)
time.sleep(2)
# Check if session redirected us straight to member search
try:
current_url = self.driver.current_url
print(f"[login] URL after navigation: {current_url}")
if "onboarding" not in current_url.lower():
member_search = WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
)
if member_search:
print("[login] Session valid - skipping login")
return "ALREADY_LOGGED_IN"
except TimeoutException:
print("[login] Proceeding with login")
# Dismiss any "Authentication flow continued in another tab" modal
modal_dismissed = False
try:
ok_button = WebDriverWait(self.driver, 3).until(
EC.element_to_be_clickable((By.XPATH, "//button[normalize-space(text())='Ok' or normalize-space(text())='OK']"))
)
ok_button.click()
print("[login] Dismissed authentication modal")
modal_dismissed = True
time.sleep(2)
all_windows = self.driver.window_handles
print(f"[login] Windows after modal dismiss: {len(all_windows)}")
if len(all_windows) > 1:
original_window = self.driver.current_window_handle
for window in all_windows:
if window != original_window:
self.driver.switch_to.window(window)
print("[login] Switched to auth popup window")
break
try:
otp_candidate = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located(
(By.XPATH, "//input[contains(@aria-lable,'Verification code') or contains(@placeholder,'Enter your verification code') or contains(@aria-label,'Verification code')]")
)
)
if otp_candidate:
print("[login] OTP input found in popup -> OTP_REQUIRED")
return "OTP_REQUIRED"
except TimeoutException:
print("[login] No OTP in popup, checking main window")
self.driver.switch_to.window(original_window)
except TimeoutException:
pass # No modal present
if modal_dismissed:
time.sleep(2)
try:
member_search = WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
)
if member_search:
print("[login] Already authenticated after modal dismiss")
return "ALREADY_LOGGED_IN"
except TimeoutException:
pass
# Fill login form
try:
email_field = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH, "//input[@name='username' and @type='text']"))
)
except TimeoutException:
print("[login] Could not find login form - page may have changed")
return "ERROR: Login form not found"
email_field = wait.until(EC.presence_of_element_located((By.XPATH, "//input[@name='username' and @type='text']")))
email_field.clear()
email_field.send_keys(self.massddma_username)
password_field = wait.until(EC.presence_of_element_located((By.XPATH, "//input[@name='password' and @type='password']")))
password_field.clear()
password_field.send_keys(self.massddma_password)
# Remember me
try:
remember_me_checkbox = wait.until(EC.element_to_be_clickable(
(By.XPATH, "//label[.//span[contains(text(),'Remember me')]]")
))
remember_me_checkbox.click()
except Exception:
print("[login] Remember me checkbox not found (continuing).")
login_button = wait.until(EC.element_to_be_clickable((By.XPATH, "//button[@type='submit' and @aria-label='Sign in']")))
login_button.click()
# Save credentials hash after login attempt
if self.massddma_username:
browser_manager.save_credentials_hash(self.massddma_username)
# OTP detection — wait up to 30 seconds for OTP input
try:
otp_candidate = WebDriverWait(self.driver, 30).until(
EC.presence_of_element_located(
(By.XPATH, "//input[contains(@aria-lable,'Verification code') or contains(@placeholder,'Enter your verification code')]")
)
)
if otp_candidate:
print("[login] OTP input detected -> OTP_REQUIRED")
return "OTP_REQUIRED"
except TimeoutException:
print("[login] No OTP input detected in allowed time.")
# Check if we're now on the member search page (login succeeded without OTP)
try:
current_url = self.driver.current_url.lower()
if "member" in current_url or "dashboard" in current_url:
member_search = WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
)
print("[login] Login successful - now on member search page")
return "SUCCESS"
except TimeoutException:
pass
# Check for error messages
try:
error_elem = WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located((By.XPATH, "//*[contains(@class,'error') or contains(text(),'invalid') or contains(text(),'failed')]"))
)
print(f"[login] Login failed - error detected: {error_elem.text}")
return f"ERROR:LOGIN FAILED: {error_elem.text}"
except TimeoutException:
pass
if "onboarding" in self.driver.current_url.lower() or "login" in self.driver.current_url.lower():
print("[login] Login failed - still on login page")
return "ERROR:LOGIN FAILED: Still on login page"
print("[login] Assuming login succeeded (no errors detected)")
return "SUCCESS"
except Exception as e:
print("[login] Exception during login:", e)
return f"ERROR:LOGIN FAILED: {e}"
def step1(self):
"""Fill search form with all available fields (flexible search)."""
wait = WebDriverWait(self.driver, 30)
try:
fields = []
if self.memberId:
fields.append(f"ID: {self.memberId}")
if self.firstName:
fields.append(f"FirstName: {self.firstName}")
if self.lastName:
fields.append(f"LastName: {self.lastName}")
if self.dateOfBirth:
fields.append(f"DOB: {self.dateOfBirth}")
print(f"[DDMA step1] Starting search with: {', '.join(fields)}")
def replace_with_sendkeys(el, value):
el.click()
el.send_keys(Keys.CONTROL, "a")
el.send_keys(Keys.BACKSPACE)
el.send_keys(value)
# 1. Fill Member ID if provided
if self.memberId:
try:
member_id_input = wait.until(EC.presence_of_element_located(
(By.XPATH, '//input[@placeholder="Search by member ID"]')
))
member_id_input.clear()
member_id_input.send_keys(self.memberId)
print(f"[DDMA step1] Entered Member ID: {self.memberId}")
time.sleep(0.2)
except Exception as e:
print(f"[DDMA step1] Warning: Could not fill Member ID: {e}")
# 2. Fill DOB if provided
if self.dateOfBirth:
try:
dob_parts = self.dateOfBirth.split("-")
year = dob_parts[0]
month = dob_parts[1].zfill(2)
day = dob_parts[2].zfill(2)
dob_container = wait.until(
EC.presence_of_element_located(
(By.XPATH, "//div[@data-testid='member-search_date-of-birth']")
)
)
month_elem = dob_container.find_element(By.XPATH, ".//span[@data-type='month' and @contenteditable='true']")
day_elem = dob_container.find_element(By.XPATH, ".//span[@data-type='day' and @contenteditable='true']")
year_elem = dob_container.find_element(By.XPATH, ".//span[@data-type='year' and @contenteditable='true']")
replace_with_sendkeys(month_elem, month)
time.sleep(0.05)
replace_with_sendkeys(day_elem, day)
time.sleep(0.05)
replace_with_sendkeys(year_elem, year)
print(f"[DDMA step1] Filled DOB: {month}/{day}/{year}")
except Exception as e:
print(f"[DDMA step1] Warning: Could not fill DOB: {e}")
# 3. Fill First Name if provided
if self.firstName:
try:
first_name_input = wait.until(EC.presence_of_element_located(
(By.XPATH, '//input[@placeholder="First name - 1 char minimum" or contains(@placeholder,"first name") or contains(@name,"firstName")]')
))
first_name_input.clear()
first_name_input.send_keys(self.firstName)
print(f"[DDMA step1] Entered First Name: {self.firstName}")
time.sleep(0.2)
except Exception as e:
print(f"[DDMA step1] Warning: Could not fill First Name: {e}")
# 4. Fill Last Name if provided
if self.lastName:
try:
last_name_input = wait.until(EC.presence_of_element_located(
(By.XPATH, '//input[@placeholder="Last name - 2 char minimum" or contains(@placeholder,"last name") or contains(@name,"lastName")]')
))
last_name_input.clear()
last_name_input.send_keys(self.lastName)
print(f"[DDMA step1] Entered Last Name: {self.lastName}")
time.sleep(0.2)
except Exception as e:
print(f"[DDMA step1] Warning: Could not fill Last Name: {e}")
time.sleep(0.3)
# Click Search button
continue_btn = wait.until(EC.element_to_be_clickable(
(By.XPATH, '//button[@data-testid="member-search_search-button"]')
))
continue_btn.click()
print("[DDMA step1] Clicked Search button")
# Wait for either results row or no-results message (up to 15s)
try:
WebDriverWait(self.driver, 15).until(
EC.any_of(
EC.presence_of_element_located((By.XPATH, "//tbody//tr")),
EC.presence_of_element_located((By.XPATH, '//div[@data-testid="member-search-result-no-results"]')),
)
)
except TimeoutException:
pass # proceed and let step2 handle missing results
# Check for no-results error
try:
error_msg = self.driver.find_element(By.XPATH, '//div[@data-testid="member-search-result-no-results"]')
if error_msg:
print("[DDMA step1] Error: No results found")
return "ERROR: INVALID SEARCH CRITERIA"
except Exception:
pass
print("[DDMA step1] Search completed successfully")
return "Success"
except Exception as e:
print(f"[DDMA step1] Exception: {e}")
return f"ERROR:STEP1 - {e}"
def step2(self):
"""Navigate to patient detail page and generate PDF."""
wait = WebDriverWait(self.driver, 90)
try:
import re
# Wait for results table
try:
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.XPATH, "//tbody//tr"))
)
except TimeoutException:
print("[DDMA step2] Warning: Results table not found within timeout")
eligibilityText = "unknown"
foundMemberId = ""
patientName = ""
# Extract data from first result row
try:
first_row = self.driver.find_element(By.XPATH, "(//tbody//tr)[1]")
row_text = first_row.text.strip()
print(f"[DDMA step2] First row text: {row_text[:150]}...")
if row_text:
lines = row_text.split('\n')
# Extract patient name (first line, strip DOB if present)
if lines:
potential_name = lines[0].strip()
potential_name = re.sub(r'\s*DOB[:\s]*\d{1,2}/\d{1,2}/\d{2,4}\s*', '', potential_name, flags=re.IGNORECASE).strip()
if potential_name and not potential_name.startswith('DOB') and not potential_name.isdigit():
patientName = potential_name
print(f"[DDMA step2] Extracted patient name: '{patientName}'")
# Extract Member ID
for line in lines:
line = line.strip()
if line and re.match(r'^[A-Z0-9]{5,}$', line) and not line.startswith('DOB'):
foundMemberId = line
print(f"[DDMA step2] Extracted Member ID: {foundMemberId}")
break
if not foundMemberId and self.memberId:
foundMemberId = self.memberId
print(f"[DDMA step2] Using input Member ID: {foundMemberId}")
except Exception as e:
print(f"[DDMA step2] Error extracting data from row: {e}")
if self.memberId:
foundMemberId = self.memberId
# Extract eligibility status
try:
short_wait = WebDriverWait(self.driver, 3)
status_link = short_wait.until(EC.presence_of_element_located((
By.XPATH,
"(//tbody//tr)[1]//a[contains(@href, 'member-eligibility-search')]"
)))
eligibilityText = status_link.text.strip().lower()
print(f"[DDMA step2] Found eligibility status: {eligibilityText}")
except Exception:
try:
alt_status = self.driver.find_element(By.XPATH, "//*[contains(text(),'Active') or contains(text(),'Inactive') or contains(text(),'Eligible')]")
eligibilityText = alt_status.text.strip().lower()
if "active" in eligibilityText or "eligible" in eligibilityText:
eligibilityText = "active"
elif "inactive" in eligibilityText:
eligibilityText = "inactive"
print(f"[DDMA step2] Found eligibility via alternative: {eligibilityText}")
except Exception:
pass
# Navigate to detailed patient page
print("[DDMA step2] Navigating to patient detail page...")
patient_name_clicked = False
detail_url = None
patient_link_selectors = [
"(//table//tbody//tr)[1]//td[1]//a",
"(//tbody//tr)[1]//a[contains(@href, 'member-details')]",
"(//tbody//tr)[1]//a[contains(@href, 'member')]",
]
for selector in patient_link_selectors:
try:
patient_link = WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.XPATH, selector))
)
link_text = patient_link.text.strip()
href = patient_link.get_attribute("href")
print(f"[DDMA step2] Found patient link: text='{link_text}', href={href}")
if link_text and not patientName:
patientName = link_text
if href and "member-details" in href:
detail_url = href
patient_name_clicked = True
break
except Exception as e:
print(f"[DDMA step2] Selector '{selector}' failed: {e}")
continue
if not detail_url:
try:
all_links = self.driver.find_elements(By.XPATH, "//a[contains(@href, 'member-details')]")
if all_links:
detail_url = all_links[0].get_attribute("href")
patient_name_clicked = True
print(f"[DDMA step2] Fallback member-details link: {detail_url}")
except Exception as e:
print(f"[DDMA step2] Could not find member-details link: {e}")
if patient_name_clicked and detail_url:
print(f"[DDMA step2] Navigating directly to: {detail_url}")
self.driver.get(detail_url)
# Wait for page to be ready
try:
WebDriverWait(self.driver, 30).until(
lambda d: d.execute_script("return document.readyState") == "complete"
)
except Exception:
print("[DDMA step2] Warning: document.readyState did not become 'complete'")
# Wait for meaningful content to appear
content_selectors = [
"//div[contains(@class,'member') or contains(@class,'detail') or contains(@class,'patient')]",
"//h1", "//h2", "//table",
"//*[contains(text(),'Member ID') or contains(text(),'Name') or contains(text(),'Date of Birth')]",
]
for selector in content_selectors:
try:
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.XPATH, selector))
)
print(f"[DDMA step2] Content loaded: {selector}")
break
except Exception:
continue
time.sleep(1) # Brief settle for any late-rendering elements
# Try to extract patient name from detail page if not already found
if not patientName:
for selector in ["//h1", "//h2", "//*[contains(@class,'patient-name') or contains(@class,'member-name')]"]:
try:
name_elem = self.driver.find_element(By.XPATH, selector)
name_text = name_elem.text.strip()
if name_text and len(name_text) > 1:
if not any(x in name_text.lower() for x in ['active', 'inactive', 'eligible', 'search', 'date', 'print']):
patientName = name_text
print(f"[DDMA step2] Found patient name on detail page: {patientName}")
break
except Exception:
continue
else:
print("[DDMA step2] Warning: Could not navigate to patient detail page")
if not patientName:
try:
name_elem = self.driver.find_element(By.XPATH, "(//tbody//tr)[1]//td[1]")
patientName = name_elem.text.strip()
except Exception:
pass
if not patientName:
print("[DDMA step2] Could not extract patient name")
# Clean patient name
if patientName:
cleaned = re.sub(r'\s*DOB[:\s]*\d{1,2}/\d{1,2}/\d{2,4}\s*', '', patientName, flags=re.IGNORECASE).strip()
if cleaned:
patientName = cleaned
# Wait for page ready before PDF
try:
WebDriverWait(self.driver, 30).until(
lambda d: d.execute_script("return document.readyState") == "complete"
)
except Exception:
pass
# Generate PDF via Chrome DevTools Protocol
print("[DDMA step2] Generating PDF of patient detail page...")
pdf_options = {
"landscape": False,
"displayHeaderFooter": False,
"printBackground": True,
"preferCSSPageSize": True,
"paperWidth": 8.5,
"paperHeight": 11,
"marginTop": 0.4,
"marginBottom": 0.4,
"marginLeft": 0.4,
"marginRight": 0.4,
"scale": 0.9,
}
result = self.driver.execute_cdp_cmd("Page.printToPDF", pdf_options)
pdf_data = base64.b64decode(result.get('data', ''))
pdf_id = foundMemberId or self.memberId or "unknown"
pdf_path = os.path.join(self.download_dir, f"eligibility_{pdf_id}.pdf")
with open(pdf_path, "wb") as f:
f.write(pdf_data)
print(f"[DDMA step2] PDF saved at: {pdf_path}")
# Close the browser window after PDF (session preserved in profile)
try:
from ddma_browser_manager import get_browser_manager
get_browser_manager().quit_driver()
print("[step2] Browser closed - session preserved in profile")
except Exception as e:
print(f"[step2] Error closing browser: {e}")
print(f"[DDMA step2] Final — PatientName: '{patientName}', MemberID: '{foundMemberId}'")
return {
"status": "success",
"eligibility": eligibilityText,
"ss_path": pdf_path, # kept for backward compatibility
"pdf_path": pdf_path, # explicit pdf_path
"patientName": patientName,
"memberId": foundMemberId,
}
except Exception as e:
print("ERROR in step2:", e)
# Cleanup download dir on error
try:
dl = os.path.abspath(self.download_dir)
if os.path.isdir(dl):
for name in os.listdir(dl):
item = os.path.join(dl, name)
try:
if os.path.isfile(item) or os.path.islink(item):
os.remove(item)
except Exception as rm_err:
print(f"[cleanup] failed to remove {item}: {rm_err}")
except Exception as cleanup_exc:
print(f"[cleanup] unexpected error: {cleanup_exc}")
return {"status": "error", "message": str(e)}
def main_workflow(self, url):
try:
self.config_driver()
self.driver.maximize_window()
time.sleep(3)
login_result = self.login(url)
if login_result.startswith("ERROR"):
return {"status": "error", "message": login_result}
if login_result == "OTP_REQUIRED":
return {"status": "otp_required", "message": "OTP required after login"}
step1_result = self.step1()
if step1_result.startswith("ERROR"):
return {"status": "error", "message": step1_result}
step2_result = self.step2()
if step2_result.get("status") == "error":
return {"status": "error", "message": step2_result.get("message")}
return step2_result
except Exception as e:
return {"status": "error", "message": str(e)}
# NOTE: Do NOT quit driver — keep browser alive for next patient