from selenium import webdriver from selenium.common import TimeoutException from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait, Select from selenium.webdriver.support import expected_conditions as EC from webdriver_manager.chrome import ChromeDriverManager import time import os import base64 class AutomationCMSPEligibilityHistoryRemainingCheck: def __init__(self, data): self.headless = False self.driver = None self.extracted_data = {} self.eligibility_tab = None self.data = data.get("data") self.massdhp_username = self.data.get("massdhpUsername", "") self.massdhp_password = self.data.get("massdhpPassword", "") self.dateOfBirth = self.data.get("dateOfBirth", "") self.memberId = self.data.get("memberId", "") self.firstName = self.data.get("firstName", "") self.lastName = self.data.get("lastName", "") # Convert dateOfBirth from YYYY-MM-DD to MMDDYYYY if self.dateOfBirth and "-" in self.dateOfBirth: parts = self.dateOfBirth.split("-") if len(parts) == 3: year, month, day = parts self.dateOfBirth = f"{month.zfill(2)}{day.zfill(2)}{year}" self.download_dir = os.path.abspath("downloads") os.makedirs(self.download_dir, exist_ok=True) # ── driver ────────────────────────────────────────────────────────────────── def config_driver(self): options = webdriver.ChromeOptions() if self.headless: options.add_argument("--headless") prefs = { "download.default_directory": self.download_dir, "plugins.always_open_pdf_externally": False, "download.prompt_for_download": False, "download.directory_upgrade": True, } options.add_experimental_option("prefs", prefs) s = Service(ChromeDriverManager().install()) self.driver = webdriver.Chrome(service=s, options=options) # ── login (same MassHealth credentials) ───────────────────────────────────── def login(self): wait = WebDriverWait(self.driver, 30) try: signin_button = wait.until( EC.element_to_be_clickable( (By.CSS_SELECTOR, "a.btn.btn-block.btn-primary[href='https://connectsso.masshealth-dental.org/mhprovider/index.html']") ) ) signin_button.click() time.sleep(3) email_field = wait.until(EC.presence_of_element_located((By.ID, "User"))) email_field.clear() email_field.send_keys(self.massdhp_username) password_field = wait.until(EC.presence_of_element_located((By.ID, "Password"))) password_field.clear() password_field.send_keys(self.massdhp_password) login_button = wait.until( EC.element_to_be_clickable( (By.CSS_SELECTOR, "input[type='submit'][name='submit'][value='Login']") ) ) login_button.click() return "Success" except Exception as e: print(f"[login] Error: {e}") return "ERROR:LOGIN FAILED" # ── step 1 — search member, extract eligibility data ──────────────────────── def step1(self): wait = WebDriverWait(self.driver, 30) substep = "init" try: substep = "patient_management" patient_mgmt = wait.until( EC.presence_of_element_located( (By.XPATH, "//strong[@translate='Patient Management']") ) ) self.driver.execute_script("arguments[0].scrollIntoView(true);", patient_mgmt) self.driver.execute_script("arguments[0].click();", patient_mgmt) time.sleep(2) substep = "member_eligibility_link" eligibility_link = wait.until( EC.presence_of_element_located( (By.XPATH, "//a[@translate='Member Eligibility']") ) ) self.driver.execute_script("arguments[0].click();", eligibility_link) time.sleep(2) substep = "provider_dropdown" provider_dropdown = wait.until( EC.presence_of_element_located((By.NAME, "provider")) ) select_provider = Select(provider_dropdown) first_option = next( (o for o in select_provider.options if o.get_attribute("value").strip()), select_provider.options[0] ) select_provider.select_by_value(first_option.get_attribute("value")) time.sleep(2) substep = "member_dob" member_dob = wait.until( EC.presence_of_all_elements_located((By.NAME, "dateInput")) )[1] member_dob.clear() member_dob.send_keys(self.dateOfBirth) substep = "member_number" member_number = wait.until( EC.presence_of_element_located((By.NAME, "memberNumber")) ) member_number.clear() member_number.send_keys(self.memberId) substep = "search_button" search_button = wait.until( EC.element_to_be_clickable((By.XPATH, "//button[contains(@class,'btn-primary')]")) ) search_button.click() substep = "wait_results" wait.until( EC.presence_of_element_located( (By.XPATH, "//h4[text()='Eligible' or text()='Ineligible']") ) ) self.extracted_data = self._extract_data_from_page() print(f"[step1] data extracted: {self.extracted_data}") return "Success" except Exception as e: print(f"[step1] FAILED at substep='{substep}': {e}") return f"ERROR:STEP1:{substep}" # ── helpers ───────────────────────────────────────────────────────────────── def _cell_text(self, cell): text = cell.text.strip() if not text: try: text = (self.driver.execute_script("return arguments[0].innerText;", cell) or "").strip() except Exception: pass return text def _normalize_id(self, s): return "".join(c for c in str(s) if c.isalnum()).lower() def _extract_data_from_page(self): wait = WebDriverWait(self.driver, 15) try: wait.until( EC.presence_of_element_located( (By.XPATH, "//h4[text()='Eligible' or text()='Ineligible']/following::table[1]/tbody/tr") ) ) for status_label, elig_flag in [("Eligible", "Y"), ("Ineligible", "N")]: rows = self.driver.find_elements( By.XPATH, f"//h4[text()='{status_label}']/following::table[1]/tbody/tr" ) for row in rows: cells = row.find_elements(By.TAG_NAME, "td") if len(cells) < 3: continue norm_cell = self._normalize_id(self._cell_text(cells[2])) norm_self = self._normalize_id(self.memberId) if norm_self and norm_cell and (norm_self in norm_cell or norm_cell in norm_self): full_name = self._cell_text(cells[4]) if len(cells) > 4 else "" plan_name = ( self._cell_text(cells[6]) if len(cells) > 6 else (self._cell_text(cells[-1]) if len(cells) > 4 else "") ) name_parts = full_name.split() return { "eligibility": elig_flag, "firstName": name_parts[0] if name_parts else "", "lastName": " ".join(name_parts[1:]) if len(name_parts) > 1 else "", "insurance": plan_name, } return {"eligibility": None} except Exception as e: print("Extraction error:", e) return {"eligibility": None} # ── step 2 — print eligibility PDF, return to results tab ─────────────────── def step2_eligibility_pdf(self): wait = WebDriverWait(self.driver, 30) self.eligibility_tab = self.driver.current_window_handle try: download_button = wait.until( EC.element_to_be_clickable( (By.XPATH, "//button[contains(.,'Printer Friendly Format')]") ) ) download_button.click() try: WebDriverWait(self.driver, 10).until(lambda d: len(d.window_handles) > 1) new_tabs = [t for t in self.driver.window_handles if t != self.eligibility_tab] self.driver.switch_to.window(new_tabs[0]) wait.until(lambda d: d.execute_script("return document.readyState") == "complete") wait.until(EC.presence_of_element_located((By.TAG_NAME, "body"))) time.sleep(2) except TimeoutException: print("No new tab for eligibility; printing current page directly") safe_member = "".join(c for c in str(self.memberId) if c.isalnum() or c in "-_.") pdf_filename = f"cmsp_eligibility_{safe_member}.pdf" pdf_data = self.driver.execute_cdp_cmd("Page.printToPDF", {"printBackground": True}) pdf_bytes = base64.b64decode(pdf_data["data"]) pdf_path = os.path.join(self.download_dir, pdf_filename) with open(pdf_path, "wb") as f: f.write(pdf_bytes) print("Eligibility PDF saved at:", pdf_path) if self.driver.current_window_handle != self.eligibility_tab: self.driver.close() self.driver.switch_to.window(self.eligibility_tab) time.sleep(1) return pdf_path except Exception as e: print(f"[step2_eligibility_pdf] failed: {e}") if self.eligibility_tab and self.driver.current_window_handle != self.eligibility_tab: try: self.driver.close() self.driver.switch_to.window(self.eligibility_tab) except Exception: pass raise # ── step 3 — click member ID link → member details ────────────────────────── def step3_click_member_id(self): wait = WebDriverWait(self.driver, 30) substep = "ng_bind_link" try: member_link = wait.until( EC.element_to_be_clickable( (By.CSS_SELECTOR, "a[ng-bind='member.memberNumber']") ) ) self.driver.execute_script("arguments[0].click();", member_link) time.sleep(2) print(f"[step3] clicked member ID link, URL: {self.driver.current_url}") return "Success" except Exception as e: print(f"[step3] FAILED at substep='{substep}': {e}") return f"ERROR:STEP3:{substep}" # ── step 4 — click "VIEW SERVICE HISTORY" → service history page ───────────── def step4_view_service_history(self): wait = WebDriverWait(self.driver, 30) substep = "service_history_link" try: history_link = wait.until( EC.element_to_be_clickable( (By.CSS_SELECTOR, "a.btn.btn-primary[href*='service-history']") ) ) self.driver.execute_script("arguments[0].scrollIntoView(true);", history_link) self.driver.execute_script("arguments[0].click();", history_link) time.sleep(2) print(f"[step4] navigated to service history, URL: {self.driver.current_url}") return "Success" except Exception as e: print(f"[step4] FAILED at substep='{substep}': {e}") return f"ERROR:STEP4:{substep}" # ── step 5 — print history PDF via CDP, navigate back to member details ────── # We do NOT click the "Printer Friendly Format" button here because on the # service history page it calls window.print() (native dialog) which freezes # Chrome. Instead we capture the page directly via CDP and then go back. def step5_history_pdf(self): wait = WebDriverWait(self.driver, 30) try: # Wait for Angular to finish rendering the service history data wait.until(lambda d: d.execute_script("return document.readyState") == "complete") time.sleep(3) safe_member = "".join(c for c in str(self.memberId) if c.isalnum() or c in "-_.") pdf_filename = f"cmsp_history_{safe_member}.pdf" pdf_data = self.driver.execute_cdp_cmd("Page.printToPDF", {"printBackground": True}) pdf_bytes = base64.b64decode(pdf_data["data"]) pdf_path = os.path.join(self.download_dir, pdf_filename) with open(pdf_path, "wb") as f: f.write(pdf_bytes) print("History PDF saved at:", pdf_path) # Go back to member details page where "View Accumulator" lives self.driver.back() time.sleep(2) wait.until(lambda d: d.execute_script("return document.readyState") == "complete") print(f"[step5] returned to member details, URL: {self.driver.current_url}") return pdf_path except Exception as e: print(f"[step5_history_pdf] failed: {e}") return None # ── step 6 — click "View Accumulator" ─────────────────────────────────────── def step6_click_view_accumulator(self): wait = WebDriverWait(self.driver, 30) substep = "view_accumulator_link" try: # ng-if="vm.showAccumulator" is the stable Angular condition on this link accumulator_link = wait.until( EC.element_to_be_clickable( (By.CSS_SELECTOR, "a.btn.btn-primary[ng-if='vm.showAccumulator']") ) ) self.driver.execute_script("arguments[0].scrollIntoView(true);", accumulator_link) self.driver.execute_script("arguments[0].click();", accumulator_link) time.sleep(2) print(f"[step6] navigated to accumulator, URL: {self.driver.current_url}") return "Success" except Exception as e: print(f"[step6] FAILED at substep='{substep}': {e}") return f"ERROR:STEP6:{substep}" # ── step 7 — print accumulator PDF ────────────────────────────────────────── def step7_accumulator_pdf(self): wait = WebDriverWait(self.driver, 30) current_tab = self.driver.current_window_handle try: # ng-click="vm.printResults()" and ng-if="vm.hasResults" identify this button print_button = wait.until( EC.element_to_be_clickable( (By.CSS_SELECTOR, "button.btn.btn-primary[ng-click='vm.printResults()']") ) ) print_button.click() try: WebDriverWait(self.driver, 10).until(lambda d: len(d.window_handles) > 1) new_tabs = [t for t in self.driver.window_handles if t != current_tab] self.driver.switch_to.window(new_tabs[0]) wait.until(lambda d: d.execute_script("return document.readyState") == "complete") wait.until(EC.presence_of_element_located((By.TAG_NAME, "body"))) time.sleep(2) except TimeoutException: print("No new tab for accumulator; printing current page directly") safe_member = "".join(c for c in str(self.memberId) if c.isalnum() or c in "-_.") pdf_filename = f"cmsp_accumulator_{safe_member}.pdf" pdf_data = self.driver.execute_cdp_cmd("Page.printToPDF", {"printBackground": True}) pdf_bytes = base64.b64decode(pdf_data["data"]) pdf_path = os.path.join(self.download_dir, pdf_filename) with open(pdf_path, "wb") as f: f.write(pdf_bytes) print("Accumulator PDF saved at:", pdf_path) return pdf_path except Exception as e: print(f"[step7_accumulator_pdf] failed: {e}") return None # ── main workflow ──────────────────────────────────────────────────────────── def main_workflow(self, url): try: self.config_driver() self.driver.maximize_window() self.driver.get(url) time.sleep(3) login_result = self.login() if login_result.startswith("ERROR"): return {"status": "error", "message": login_result} step1_result = self.step1() if step1_result.startswith("ERROR"): return {"status": "error", "message": step1_result} eligibility_pdf_path = self.step2_eligibility_pdf() step3_result = self.step3_click_member_id() if step3_result.startswith("ERROR"): return {"status": "partial", "message": step3_result, "pdf_path": eligibility_pdf_path, "file_type": "pdf"} step4_result = self.step4_view_service_history() if step4_result.startswith("ERROR"): return {"status": "partial", "message": step4_result, "pdf_path": eligibility_pdf_path, "file_type": "pdf"} history_pdf_path = self.step5_history_pdf() step6_result = self.step6_click_view_accumulator() if step6_result.startswith("ERROR"): return {"status": "partial", "message": step6_result, "pdf_path": eligibility_pdf_path, "history_pdf_path": history_pdf_path, "file_type": "pdf"} accumulator_pdf_path = self.step7_accumulator_pdf() result = { "status": "success", "pdf_path": eligibility_pdf_path, "history_pdf_path": history_pdf_path, "accumulator_pdf_path": accumulator_pdf_path, "file_type": "pdf", "message": "Eligibility, service history, and accumulator PDFs captured", } if self.extracted_data: result.update(self.extracted_data) return result except Exception as e: return {"status": "error", "message": str(e)} finally: if self.driver: self.driver.quit()