Files
DentalManagementMH06/apps/SeleniumService/selenium_eligibilityCheckWorker.py
Gitead a03f3f25dd fix: handle new-tab SSO redirect for MassHealth eligibility and history
After login, Chrome on some machines opens the portal dashboard in a new
tab and closes the SSO tab. Poll all window handles until the portal URL
is found, then switch to it — works for both same-tab and new-tab redirects.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 08:52:11 -04:00

480 lines
20 KiB
Python
Executable File

from selenium import webdriver
from selenium.common import TimeoutException
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait, Select
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
import time
import os
import base64
class AutomationMassHealthEligibilityCheck:
def __init__(self, data):
self.headless = False
self.driver = None
self.extracted_data = {} # Store extracted data
self.data = data.get("data")
# Flatten values for convenience
self.massdhp_username = self.data.get("massdhpUsername", "")
self.massdhp_password = self.data.get("massdhpPassword", "")
self.dateOfBirth = self.data.get("dateOfBirth", "")
self.memberId = self.data.get("memberId", "")
# Convert dateOfBirth from YYYY-MM-DD to MMDDYYYY format
if self.dateOfBirth and "-" in self.dateOfBirth:
parts = self.dateOfBirth.split("-")
if len(parts) == 3:
year, month, day = parts
self.dateOfBirth = f"{month.zfill(2)}{day.zfill(2)}{year}"
self.download_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "downloads")
os.makedirs(self.download_dir, exist_ok=True)
def config_driver(self):
options = webdriver.ChromeOptions()
if self.headless:
options.add_argument("--headless")
# Add PDF download preferences
prefs = {
"download.default_directory": self.download_dir,
"plugins.always_open_pdf_externally": False,
"download.prompt_for_download": False,
"download.directory_upgrade": True
}
options.add_experimental_option("prefs", prefs)
s = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=s, options=options)
self.driver = driver
def _is_maintenance_page(self) -> bool:
"""Return True if the current page is a server maintenance/capacity error page."""
try:
body = self.driver.find_element(By.TAG_NAME, "body").text
markers = [
"temporarily unable to service",
"maintenance downtime",
"capacity problems",
"Please try again later",
]
body_lower = body.lower()
return any(m.lower() in body_lower for m in markers)
except Exception:
return False
def login(self):
wait = WebDriverWait(self.driver, 30)
try:
# Check immediately if the portal is showing a maintenance page
if self._is_maintenance_page():
print("[login] Maintenance page detected on initial load")
return "ERROR: MassHealth portal is temporarily unavailable (maintenance). Please try again later."
# Step 1: Click the SIGN IN button on the initial page
signin_button = wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, "a.btn.btn-block.btn-primary[href='https://connectsso.masshealth-dental.org/mhprovider/index.html']")))
signin_button.click()
# Wait for the new page to load
time.sleep(3)
# Step 2: Enter email on the new login page
email_field = wait.until(EC.presence_of_element_located((By.ID, "User")))
email_field.clear()
email_field.send_keys(self.massdhp_username)
# Step 3: Enter password
password_field = wait.until(EC.presence_of_element_located((By.ID, "Password")))
password_field.clear()
password_field.send_keys(self.massdhp_password)
# Step 4: Click login button
login_button = wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, "input[type='submit'][name='submit'][value='Login']")))
login_button.click()
# Wait for the SSO redirect to complete. On some machines Chrome opens
# the dashboard in a NEW tab and closes the SSO tab. Poll ALL open
# window handles so we find the portal regardless of which tab it lands on.
print("[login] Waiting for SSO redirect to provider.masshealth-dental.org ...")
deadline = time.time() + 30
found = False
while time.time() < deadline:
time.sleep(0.5)
for handle in self.driver.window_handles:
try:
self.driver.switch_to.window(handle)
if self.driver.current_url.startswith("https://provider.masshealth-dental.org"):
print(f"[login] Redirect complete. URL: {self.driver.current_url}")
found = True
break
except Exception:
continue
if found:
break
if not found:
print("[login] Redirect timeout — portal window not found in any tab.")
return "ERROR: Login redirect timed out — check credentials or portal availability."
if self._is_maintenance_page():
print("[login] Maintenance page detected after login submission")
return "ERROR: MassHealth portal is temporarily unavailable (maintenance). Please try again later."
return "Success"
except Exception as e:
print(f"Error while logging in: {e}")
return "ERROR:LOGIN FAILED"
def step1(self):
wait = WebDriverWait(self.driver, 30)
substep = "init"
try:
print(f"[step1] current URL after login: {self.driver.current_url}")
print(f"[step1] page title: {self.driver.title}")
if self._is_maintenance_page():
print("[step1] Maintenance page detected")
return "ERROR: MassHealth portal is temporarily unavailable (maintenance). Please try again later."
# Dismiss any post-login modal/alert dialogs (session warnings, notices, etc.)
for btn_xpath in [
"//button[contains(text(),'OK') or contains(text(),'Ok') or contains(text(),'ok')]",
"//button[contains(text(),'Continue') or contains(text(),'Accept') or contains(text(),'Close')]",
"//button[contains(text(),'Dismiss') or contains(text(),'Got it')]",
"//a[contains(@class,'close') or @data-dismiss='modal']",
]:
try:
btn = WebDriverWait(self.driver, 3).until(EC.element_to_be_clickable((By.XPATH, btn_xpath)))
btn.click()
print(f"[step1] Dismissed modal via: {btn_xpath}")
time.sleep(1)
break
except Exception:
pass
substep = "patient_management"
patient_mgmt = wait.until(
EC.presence_of_element_located(
(By.XPATH, "//strong[@translate='Patient Management']")
)
)
self.driver.execute_script("arguments[0].scrollIntoView(true);", patient_mgmt)
self.driver.execute_script("arguments[0].click();", patient_mgmt)
time.sleep(2)
substep = "member_eligibility_link"
eligibility_link = wait.until(
EC.presence_of_element_located(
(By.XPATH, "//a[@translate='Member Eligibility']")
)
)
self.driver.execute_script("arguments[0].click();", eligibility_link)
time.sleep(2)
substep = "provider_dropdown"
provider_dropdown = wait.until(
EC.presence_of_element_located((By.NAME, "provider"))
)
select_provider = Select(provider_dropdown)
# Log available options so we can see if the provider name changed
options = [o.text for o in select_provider.options]
print(f"[step1] provider options: {options}")
substep = "select_provider"
# Select the first non-empty option (index 0 is usually a blank placeholder)
first_option = next(
(o for o in select_provider.options if o.get_attribute("value").strip()),
select_provider.options[0]
)
print(f"[step1] selecting provider: '{first_option.text}'")
select_provider.select_by_value(first_option.get_attribute("value"))
time.sleep(2)
substep = "member_dob"
member_dob = wait.until(
EC.presence_of_all_elements_located((By.NAME, "dateInput"))
)[1]
member_dob.clear()
member_dob.send_keys(self.dateOfBirth)
substep = "member_number"
member_number = wait.until(
EC.presence_of_element_located((By.NAME, "memberNumber"))
)
member_number.clear()
member_number.send_keys(self.memberId)
substep = "search_button"
search_button = wait.until(
EC.element_to_be_clickable((By.XPATH, "//button[contains(@class,'btn-primary')]"))
)
search_button.click()
substep = "wait_results"
wait.until(
EC.presence_of_element_located(
(By.XPATH, "//h4[text()='Eligible' or text()='Ineligible']")
)
)
self.extracted_data = self._extract_data_from_page()
print(f"[step1] data extracted: {self.extracted_data}")
return "Success"
except Exception as e:
print(f"[step1] FAILED at substep='{substep}': {e}")
try:
print(f"[step1] URL at failure: {self.driver.current_url}")
print(f"[step1] Page title: {self.driver.title}")
body_text = self.driver.find_element(By.TAG_NAME, "body").text
print(f"[step1] Page body (first 500 chars): {body_text[:500]}")
except Exception:
pass
try:
ss_path = os.path.join(self.download_dir, f"step1_failure_{substep}_{int(time.time())}.png")
self.driver.save_screenshot(ss_path)
print(f"[step1] Screenshot saved: {ss_path}")
except Exception as ss_err:
print(f"[step1] Could not save screenshot: {ss_err}")
return f"ERROR:STEP1:{substep}"
def _cell_text(self, cell):
"""Get text from a cell, falling back to JS innerText if .text is empty."""
text = cell.text.strip()
if not text:
try:
text = (self.driver.execute_script("return arguments[0].innerText;", cell) or "").strip()
except Exception:
pass
return text
def _normalize_id(self, s):
"""Strip all non-alphanumeric characters and lowercase for robust ID matching."""
return ''.join(c for c in str(s) if c.isalnum()).lower()
def _extract_data_from_page(self):
wait = WebDriverWait(self.driver, 15)
extracted = {}
try:
# Wait until at least one table row appears under Eligible or Ineligible —
# the h4 header renders before the rows are populated, so we must wait for rows.
wait.until(
EC.presence_of_element_located(
(By.XPATH, "//h4[text()='Eligible' or text()='Ineligible']/following::table[1]/tbody/tr")
)
)
eligible_rows = self.driver.find_elements(
By.XPATH,
"//h4[text()='Eligible']/following::table[1]/tbody/tr"
)
if eligible_rows:
for row in eligible_rows:
cells = row.find_elements(By.TAG_NAME, "td")
if len(cells) < 3:
continue
member_number = self._cell_text(cells[2])
norm_cell = self._normalize_id(member_number)
norm_self = self._normalize_id(self.memberId)
print(f"[eligible] cells count={len(cells)}, memberId check: '{norm_self}' vs '{norm_cell}' (raw: '{member_number}')")
if len(cells) >= 5:
print(f" cells[3]='{self._cell_text(cells[3])}' cells[4]='{self._cell_text(cells[4])}'", end="")
if len(cells) > 5:
print(f" cells[5]='{self._cell_text(cells[5])}'", end="")
if len(cells) > 6:
print(f" cells[6]='{self._cell_text(cells[6])}'", end="")
print()
if norm_self and norm_cell and (norm_self in norm_cell or norm_cell in norm_self):
# name is in cells[4], insurance in cells[6] (fallback to last cell)
full_name = self._cell_text(cells[4]) if len(cells) > 4 else ""
plan_name = self._cell_text(cells[6]) if len(cells) > 6 else (self._cell_text(cells[-1]) if len(cells) > 4 else "")
name_parts = full_name.split()
first_name = name_parts[0] if name_parts else ""
last_name = " ".join(name_parts[1:]) if len(name_parts) > 1 else ""
print(f"[eligible] MATCHED → name='{full_name}' plan='{plan_name}'")
extracted = {
"eligibility": "Y",
"firstName": first_name,
"lastName": last_name,
"insurance": plan_name
}
return extracted
ineligible_rows = self.driver.find_elements(
By.XPATH,
"//h4[text()='Ineligible']/following::table[1]/tbody/tr"
)
if ineligible_rows:
for row in ineligible_rows:
cells = row.find_elements(By.TAG_NAME, "td")
if len(cells) < 3:
continue
member_number = self._cell_text(cells[2])
norm_cell = self._normalize_id(member_number)
norm_self = self._normalize_id(self.memberId)
print(f"[ineligible] cells count={len(cells)}, memberId check: '{norm_self}' vs '{norm_cell}' (raw: '{member_number}')")
if len(cells) >= 5:
print(f" cells[3]='{self._cell_text(cells[3])}' cells[4]='{self._cell_text(cells[4])}'", end="")
if len(cells) > 5:
print(f" cells[5]='{self._cell_text(cells[5])}'", end="")
print()
if norm_self and norm_cell and (norm_self in norm_cell or norm_cell in norm_self):
full_name = self._cell_text(cells[4]) if len(cells) > 4 else ""
plan_name = self._cell_text(cells[5]) if len(cells) > 5 else (self._cell_text(cells[-1]) if len(cells) > 4 else "")
name_parts = full_name.split()
first_name = name_parts[0] if name_parts else ""
last_name = " ".join(name_parts[1:]) if len(name_parts) > 1 else ""
print(f"[ineligible] MATCHED → name='{full_name}' plan='{plan_name}'")
extracted = {
"eligibility": "N",
"firstName": first_name,
"lastName": last_name,
"insurance": plan_name
}
return extracted
print(f"[extraction] No matching row found for memberId='{self.memberId}'")
return {"eligibility": None}
except Exception as e:
print("Extraction error:", e)
return {"eligibility": None}
def _print_current_page_as_pdf(self):
"""Generate PDF from the currently active tab using CDP."""
safe_member = "".join(c for c in str(self.memberId) if c.isalnum() or c in "-_.")
pdf_filename = f"eligibility_{safe_member}.pdf"
pdf_data = self.driver.execute_cdp_cmd("Page.printToPDF", {"printBackground": True})
pdf_bytes = base64.b64decode(pdf_data["data"])
pdf_path = os.path.join(self.download_dir, pdf_filename)
with open(pdf_path, "wb") as f:
f.write(pdf_bytes)
print("PDF saved at:", pdf_path)
return pdf_path
def step2(self):
wait = WebDriverWait(self.driver, 30)
try:
print(f"Using stored extracted data: {self.extracted_data}")
# Step 1: Click Printer Friendly Format
download_button = wait.until(
EC.element_to_be_clickable(
(By.XPATH, "//button[contains(.,'Printer Friendly Format')]")
)
)
original_tab = self.driver.current_window_handle
download_button.click()
# Wait up to 10s for a new tab; if none opens, print the current page
try:
WebDriverWait(self.driver, 10).until(lambda d: len(d.window_handles) > 1)
new_tabs = [tab for tab in self.driver.window_handles if tab != original_tab]
self.driver.switch_to.window(new_tabs[0])
wait.until(lambda d: d.execute_script("return document.readyState") == "complete")
wait.until(EC.presence_of_element_located((By.TAG_NAME, "body")))
time.sleep(2)
print("Printer-friendly tab opened:", self.driver.current_url)
except TimeoutException:
# Portal did not open a new tab — print the results page directly
print("No new tab opened; printing results page directly as PDF")
pdf_path = self._print_current_page_as_pdf()
result = {
"status": "success",
"pdf_path": pdf_path,
"file_type": "pdf",
"message": "PDF captured successfully"
}
if self.extracted_data:
result.update(self.extracted_data)
return result
except Exception as e:
print("PDF capture failed:", e)
# Fallback to screenshot (always keep this)
try:
safe_member = "".join(c for c in str(self.memberId) if c.isalnum() or c in "-_.")
screenshot_path = os.path.join(self.download_dir, f"eligibility_{safe_member}.png")
self.driver.save_screenshot(screenshot_path)
print("Screenshot saved at:", screenshot_path)
result = {
"status": "success",
"pdf_path": screenshot_path,
"file_type": "screenshot",
"message": "Screenshot captured (PDF failed)"
}
# Add stored extracted data to result even for screenshots
if self.extracted_data:
result.update(self.extracted_data)
return result
except Exception as ss_error:
return {
"status": "error",
"message": f"PDF + Screenshot failed: {ss_error}"
}
def main_workflow(self, url):
try:
self.config_driver()
self.driver.maximize_window()
self.driver.get(url)
time.sleep(3)
login_result = self.login()
if login_result.startswith("ERROR"):
return {"status": "error", "message": login_result}
step1_result = self.step1()
if step1_result.startswith("ERROR"):
return {"status": "error", "message": step1_result}
step2_result = self.step2()
if step2_result.get("status") == "error":
return {"status": "error", "message": step2_result.get("message")}
return step2_result
except Exception as e:
return {
"status": "error",
"message": str(e)
}
finally:
self.driver.quit()