from selenium import webdriver from selenium.common import TimeoutException from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait, Select from selenium.webdriver.support import expected_conditions as EC from webdriver_manager.chrome import ChromeDriverManager import time import os import base64 class AutomationMassHealthEligibilityCheck: def __init__(self, data): self.headless = False self.driver = None self.extracted_data = {} # Store extracted data self.data = data.get("data") # Flatten values for convenience self.massdhp_username = self.data.get("massdhpUsername", "") self.massdhp_password = self.data.get("massdhpPassword", "") self.dateOfBirth = self.data.get("dateOfBirth", "") self.memberId = self.data.get("memberId", "") # Convert dateOfBirth from YYYY-MM-DD to MMDDYYYY format if self.dateOfBirth and "-" in self.dateOfBirth: parts = self.dateOfBirth.split("-") if len(parts) == 3: year, month, day = parts self.dateOfBirth = f"{month.zfill(2)}{day.zfill(2)}{year}" self.download_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "downloads") os.makedirs(self.download_dir, exist_ok=True) def config_driver(self): options = webdriver.ChromeOptions() if self.headless: options.add_argument("--headless") # Add PDF download preferences prefs = { "download.default_directory": self.download_dir, "plugins.always_open_pdf_externally": False, "download.prompt_for_download": False, "download.directory_upgrade": True } options.add_experimental_option("prefs", prefs) s = Service(ChromeDriverManager().install()) driver = webdriver.Chrome(service=s, options=options) self.driver = driver def _is_maintenance_page(self) -> bool: """Return True if the current page is a server maintenance/capacity error page.""" try: body = self.driver.find_element(By.TAG_NAME, "body").text markers = [ "temporarily unable to service", "maintenance downtime", "capacity problems", "Please try again later", ] body_lower = body.lower() return any(m.lower() in body_lower for m in markers) except Exception: return False def login(self): wait = WebDriverWait(self.driver, 30) try: # Check immediately if the portal is showing a maintenance page if self._is_maintenance_page(): print("[login] Maintenance page detected on initial load") return "ERROR: MassHealth portal is temporarily unavailable (maintenance). Please try again later." # Step 1: Click the SIGN IN button on the initial page signin_button = wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, "a.btn.btn-block.btn-primary[href='https://connectsso.masshealth-dental.org/mhprovider/index.html']"))) signin_button.click() # Wait for the new page to load time.sleep(3) # Step 2: Enter email on the new login page email_field = wait.until(EC.presence_of_element_located((By.ID, "User"))) email_field.clear() email_field.send_keys(self.massdhp_username) # Step 3: Enter password password_field = wait.until(EC.presence_of_element_located((By.ID, "Password"))) password_field.clear() password_field.send_keys(self.massdhp_password) # Step 4: Click login button login_button = wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, "input[type='submit'][name='submit'][value='Login']"))) login_button.click() # Wait for the SSO redirect to complete. On some machines Chrome opens # the dashboard in a NEW tab and closes the SSO tab. Poll ALL open # window handles so we find the portal regardless of which tab it lands on. print("[login] Waiting for SSO redirect to provider.masshealth-dental.org ...") deadline = time.time() + 30 found = False while time.time() < deadline: time.sleep(0.5) for handle in self.driver.window_handles: try: self.driver.switch_to.window(handle) if self.driver.current_url.startswith("https://provider.masshealth-dental.org"): print(f"[login] Redirect complete. URL: {self.driver.current_url}") found = True break except Exception: continue if found: break if not found: print("[login] Redirect timeout — portal window not found in any tab.") return "ERROR: Login redirect timed out — check credentials or portal availability." if self._is_maintenance_page(): print("[login] Maintenance page detected after login submission") return "ERROR: MassHealth portal is temporarily unavailable (maintenance). Please try again later." return "Success" except Exception as e: print(f"Error while logging in: {e}") return "ERROR:LOGIN FAILED" def step1(self): wait = WebDriverWait(self.driver, 30) substep = "init" try: print(f"[step1] current URL after login: {self.driver.current_url}") print(f"[step1] page title: {self.driver.title}") if self._is_maintenance_page(): print("[step1] Maintenance page detected") return "ERROR: MassHealth portal is temporarily unavailable (maintenance). Please try again later." # Dismiss any post-login modal/alert dialogs (session warnings, notices, etc.) for btn_xpath in [ "//button[contains(text(),'OK') or contains(text(),'Ok') or contains(text(),'ok')]", "//button[contains(text(),'Continue') or contains(text(),'Accept') or contains(text(),'Close')]", "//button[contains(text(),'Dismiss') or contains(text(),'Got it')]", "//a[contains(@class,'close') or @data-dismiss='modal']", ]: try: btn = WebDriverWait(self.driver, 3).until(EC.element_to_be_clickable((By.XPATH, btn_xpath))) btn.click() print(f"[step1] Dismissed modal via: {btn_xpath}") time.sleep(1) break except Exception: pass substep = "patient_management" patient_mgmt = wait.until( EC.presence_of_element_located( (By.XPATH, "//strong[@translate='Patient Management']") ) ) self.driver.execute_script("arguments[0].scrollIntoView(true);", patient_mgmt) self.driver.execute_script("arguments[0].click();", patient_mgmt) time.sleep(2) substep = "member_eligibility_link" eligibility_link = wait.until( EC.presence_of_element_located( (By.XPATH, "//a[@translate='Member Eligibility']") ) ) self.driver.execute_script("arguments[0].click();", eligibility_link) time.sleep(2) substep = "provider_dropdown" provider_dropdown = wait.until( EC.presence_of_element_located((By.NAME, "provider")) ) select_provider = Select(provider_dropdown) # Log available options so we can see if the provider name changed options = [o.text for o in select_provider.options] print(f"[step1] provider options: {options}") substep = "select_provider" # Select the first non-empty option (index 0 is usually a blank placeholder) first_option = next( (o for o in select_provider.options if o.get_attribute("value").strip()), select_provider.options[0] ) print(f"[step1] selecting provider: '{first_option.text}'") select_provider.select_by_value(first_option.get_attribute("value")) time.sleep(2) substep = "member_dob" member_dob = wait.until( EC.presence_of_all_elements_located((By.NAME, "dateInput")) )[1] member_dob.clear() member_dob.send_keys(self.dateOfBirth) substep = "member_number" member_number = wait.until( EC.presence_of_element_located((By.NAME, "memberNumber")) ) member_number.clear() member_number.send_keys(self.memberId) substep = "search_button" search_button = wait.until( EC.element_to_be_clickable((By.XPATH, "//button[contains(@class,'btn-primary')]")) ) search_button.click() substep = "wait_results" wait.until( EC.presence_of_element_located( (By.XPATH, "//h4[text()='Eligible' or text()='Ineligible']") ) ) self.extracted_data = self._extract_data_from_page() print(f"[step1] data extracted: {self.extracted_data}") return "Success" except Exception as e: print(f"[step1] FAILED at substep='{substep}': {e}") try: print(f"[step1] URL at failure: {self.driver.current_url}") print(f"[step1] Page title: {self.driver.title}") body_text = self.driver.find_element(By.TAG_NAME, "body").text print(f"[step1] Page body (first 500 chars): {body_text[:500]}") except Exception: pass try: ss_path = os.path.join(self.download_dir, f"step1_failure_{substep}_{int(time.time())}.png") self.driver.save_screenshot(ss_path) print(f"[step1] Screenshot saved: {ss_path}") except Exception as ss_err: print(f"[step1] Could not save screenshot: {ss_err}") return f"ERROR:STEP1:{substep}" def _cell_text(self, cell): """Get text from a cell, falling back to JS innerText if .text is empty.""" text = cell.text.strip() if not text: try: text = (self.driver.execute_script("return arguments[0].innerText;", cell) or "").strip() except Exception: pass return text def _normalize_id(self, s): """Strip all non-alphanumeric characters and lowercase for robust ID matching.""" return ''.join(c for c in str(s) if c.isalnum()).lower() def _extract_data_from_page(self): wait = WebDriverWait(self.driver, 15) extracted = {} try: # Wait until at least one table row appears under Eligible or Ineligible — # the h4 header renders before the rows are populated, so we must wait for rows. wait.until( EC.presence_of_element_located( (By.XPATH, "//h4[text()='Eligible' or text()='Ineligible']/following::table[1]/tbody/tr") ) ) eligible_rows = self.driver.find_elements( By.XPATH, "//h4[text()='Eligible']/following::table[1]/tbody/tr" ) if eligible_rows: for row in eligible_rows: cells = row.find_elements(By.TAG_NAME, "td") if len(cells) < 3: continue member_number = self._cell_text(cells[2]) norm_cell = self._normalize_id(member_number) norm_self = self._normalize_id(self.memberId) print(f"[eligible] cells count={len(cells)}, memberId check: '{norm_self}' vs '{norm_cell}' (raw: '{member_number}')") if len(cells) >= 5: print(f" cells[3]='{self._cell_text(cells[3])}' cells[4]='{self._cell_text(cells[4])}'", end="") if len(cells) > 5: print(f" cells[5]='{self._cell_text(cells[5])}'", end="") if len(cells) > 6: print(f" cells[6]='{self._cell_text(cells[6])}'", end="") print() if norm_self and norm_cell and (norm_self in norm_cell or norm_cell in norm_self): # name is in cells[4], insurance in cells[6] (fallback to last cell) full_name = self._cell_text(cells[4]) if len(cells) > 4 else "" plan_name = self._cell_text(cells[6]) if len(cells) > 6 else (self._cell_text(cells[-1]) if len(cells) > 4 else "") name_parts = full_name.split() first_name = name_parts[0] if name_parts else "" last_name = " ".join(name_parts[1:]) if len(name_parts) > 1 else "" print(f"[eligible] MATCHED → name='{full_name}' plan='{plan_name}'") extracted = { "eligibility": "Y", "firstName": first_name, "lastName": last_name, "insurance": plan_name } return extracted ineligible_rows = self.driver.find_elements( By.XPATH, "//h4[text()='Ineligible']/following::table[1]/tbody/tr" ) if ineligible_rows: for row in ineligible_rows: cells = row.find_elements(By.TAG_NAME, "td") if len(cells) < 3: continue member_number = self._cell_text(cells[2]) norm_cell = self._normalize_id(member_number) norm_self = self._normalize_id(self.memberId) print(f"[ineligible] cells count={len(cells)}, memberId check: '{norm_self}' vs '{norm_cell}' (raw: '{member_number}')") if len(cells) >= 5: print(f" cells[3]='{self._cell_text(cells[3])}' cells[4]='{self._cell_text(cells[4])}'", end="") if len(cells) > 5: print(f" cells[5]='{self._cell_text(cells[5])}'", end="") print() if norm_self and norm_cell and (norm_self in norm_cell or norm_cell in norm_self): full_name = self._cell_text(cells[4]) if len(cells) > 4 else "" plan_name = self._cell_text(cells[5]) if len(cells) > 5 else (self._cell_text(cells[-1]) if len(cells) > 4 else "") name_parts = full_name.split() first_name = name_parts[0] if name_parts else "" last_name = " ".join(name_parts[1:]) if len(name_parts) > 1 else "" print(f"[ineligible] MATCHED → name='{full_name}' plan='{plan_name}'") extracted = { "eligibility": "N", "firstName": first_name, "lastName": last_name, "insurance": plan_name } return extracted print(f"[extraction] No matching row found for memberId='{self.memberId}'") return {"eligibility": None} except Exception as e: print("Extraction error:", e) return {"eligibility": None} def _print_current_page_as_pdf(self): """Generate PDF from the currently active tab using CDP.""" safe_member = "".join(c for c in str(self.memberId) if c.isalnum() or c in "-_.") pdf_filename = f"eligibility_{safe_member}.pdf" pdf_data = self.driver.execute_cdp_cmd("Page.printToPDF", {"printBackground": True}) pdf_bytes = base64.b64decode(pdf_data["data"]) pdf_path = os.path.join(self.download_dir, pdf_filename) with open(pdf_path, "wb") as f: f.write(pdf_bytes) print("PDF saved at:", pdf_path) return pdf_path def step2(self): wait = WebDriverWait(self.driver, 30) try: print(f"Using stored extracted data: {self.extracted_data}") # Step 1: Click Printer Friendly Format download_button = wait.until( EC.element_to_be_clickable( (By.XPATH, "//button[contains(.,'Printer Friendly Format')]") ) ) original_tab = self.driver.current_window_handle download_button.click() # Wait up to 10s for a new tab; if none opens, print the current page try: WebDriverWait(self.driver, 10).until(lambda d: len(d.window_handles) > 1) new_tabs = [tab for tab in self.driver.window_handles if tab != original_tab] self.driver.switch_to.window(new_tabs[0]) wait.until(lambda d: d.execute_script("return document.readyState") == "complete") wait.until(EC.presence_of_element_located((By.TAG_NAME, "body"))) time.sleep(2) print("Printer-friendly tab opened:", self.driver.current_url) except TimeoutException: # Portal did not open a new tab — print the results page directly print("No new tab opened; printing results page directly as PDF") pdf_path = self._print_current_page_as_pdf() result = { "status": "success", "pdf_path": pdf_path, "file_type": "pdf", "message": "PDF captured successfully" } if self.extracted_data: result.update(self.extracted_data) return result except Exception as e: print("PDF capture failed:", e) # Fallback to screenshot (always keep this) try: safe_member = "".join(c for c in str(self.memberId) if c.isalnum() or c in "-_.") screenshot_path = os.path.join(self.download_dir, f"eligibility_{safe_member}.png") self.driver.save_screenshot(screenshot_path) print("Screenshot saved at:", screenshot_path) result = { "status": "success", "pdf_path": screenshot_path, "file_type": "screenshot", "message": "Screenshot captured (PDF failed)" } # Add stored extracted data to result even for screenshots if self.extracted_data: result.update(self.extracted_data) return result except Exception as ss_error: return { "status": "error", "message": f"PDF + Screenshot failed: {ss_error}" } def main_workflow(self, url): try: self.config_driver() self.driver.maximize_window() self.driver.get(url) time.sleep(3) login_result = self.login() if login_result.startswith("ERROR"): return {"status": "error", "message": login_result} step1_result = self.step1() if step1_result.startswith("ERROR"): return {"status": "error", "message": step1_result} step2_result = self.step2() if step2_result.get("status") == "error": return {"status": "error", "message": step2_result.get("message")} return step2_result except Exception as e: return { "status": "error", "message": str(e) } finally: self.driver.quit()