Previously waited up to 60s for vm.hasResults button causing long freeze. Now caps at 15s then always proceeds — captures data or no-results state. Extra 5s sleep ensures Angular finishes rendering table rows before CDP print. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
472 lines
21 KiB
Python
472 lines
21 KiB
Python
from selenium import webdriver
|
|
from selenium.common import TimeoutException
|
|
from selenium.webdriver.chrome.service import Service
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.support.ui import WebDriverWait, Select
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from webdriver_manager.chrome import ChromeDriverManager
|
|
import time
|
|
import os
|
|
import base64
|
|
|
|
|
|
class AutomationCMSPEligibilityHistoryRemainingCheck:
|
|
def __init__(self, data):
|
|
self.headless = False
|
|
self.driver = None
|
|
self.extracted_data = {}
|
|
self.eligibility_tab = None
|
|
|
|
self.data = data.get("data")
|
|
|
|
self.massdhp_username = self.data.get("massdhpUsername", "")
|
|
self.massdhp_password = self.data.get("massdhpPassword", "")
|
|
self.dateOfBirth = self.data.get("dateOfBirth", "")
|
|
self.memberId = self.data.get("memberId", "")
|
|
self.firstName = self.data.get("firstName", "")
|
|
self.lastName = self.data.get("lastName", "")
|
|
|
|
# Convert dateOfBirth from YYYY-MM-DD to MMDDYYYY
|
|
if self.dateOfBirth and "-" in self.dateOfBirth:
|
|
parts = self.dateOfBirth.split("-")
|
|
if len(parts) == 3:
|
|
year, month, day = parts
|
|
self.dateOfBirth = f"{month.zfill(2)}{day.zfill(2)}{year}"
|
|
|
|
self.download_dir = os.path.abspath("downloads")
|
|
os.makedirs(self.download_dir, exist_ok=True)
|
|
|
|
# ── driver ──────────────────────────────────────────────────────────────────
|
|
|
|
def config_driver(self):
|
|
options = webdriver.ChromeOptions()
|
|
if self.headless:
|
|
options.add_argument("--headless")
|
|
|
|
prefs = {
|
|
"download.default_directory": self.download_dir,
|
|
"plugins.always_open_pdf_externally": False,
|
|
"download.prompt_for_download": False,
|
|
"download.directory_upgrade": True,
|
|
}
|
|
options.add_experimental_option("prefs", prefs)
|
|
|
|
s = Service(ChromeDriverManager().install())
|
|
self.driver = webdriver.Chrome(service=s, options=options)
|
|
|
|
# ── login (same MassHealth credentials) ─────────────────────────────────────
|
|
|
|
def login(self):
|
|
wait = WebDriverWait(self.driver, 30)
|
|
try:
|
|
signin_button = wait.until(
|
|
EC.element_to_be_clickable(
|
|
(By.CSS_SELECTOR, "a.btn.btn-block.btn-primary[href='https://connectsso.masshealth-dental.org/mhprovider/index.html']")
|
|
)
|
|
)
|
|
signin_button.click()
|
|
time.sleep(3)
|
|
|
|
email_field = wait.until(EC.presence_of_element_located((By.ID, "User")))
|
|
email_field.clear()
|
|
email_field.send_keys(self.massdhp_username)
|
|
|
|
password_field = wait.until(EC.presence_of_element_located((By.ID, "Password")))
|
|
password_field.clear()
|
|
password_field.send_keys(self.massdhp_password)
|
|
|
|
login_button = wait.until(
|
|
EC.element_to_be_clickable(
|
|
(By.CSS_SELECTOR, "input[type='submit'][name='submit'][value='Login']")
|
|
)
|
|
)
|
|
login_button.click()
|
|
return "Success"
|
|
except Exception as e:
|
|
print(f"[login] Error: {e}")
|
|
return "ERROR:LOGIN FAILED"
|
|
|
|
# ── step 1 — search member, extract eligibility data ────────────────────────
|
|
|
|
def step1(self):
|
|
wait = WebDriverWait(self.driver, 30)
|
|
substep = "init"
|
|
try:
|
|
substep = "patient_management"
|
|
patient_mgmt = wait.until(
|
|
EC.presence_of_element_located(
|
|
(By.XPATH, "//strong[@translate='Patient Management']")
|
|
)
|
|
)
|
|
self.driver.execute_script("arguments[0].scrollIntoView(true);", patient_mgmt)
|
|
self.driver.execute_script("arguments[0].click();", patient_mgmt)
|
|
time.sleep(2)
|
|
|
|
substep = "member_eligibility_link"
|
|
eligibility_link = wait.until(
|
|
EC.presence_of_element_located(
|
|
(By.XPATH, "//a[@translate='Member Eligibility']")
|
|
)
|
|
)
|
|
self.driver.execute_script("arguments[0].click();", eligibility_link)
|
|
time.sleep(2)
|
|
|
|
substep = "provider_dropdown"
|
|
provider_dropdown = wait.until(
|
|
EC.presence_of_element_located((By.NAME, "provider"))
|
|
)
|
|
select_provider = Select(provider_dropdown)
|
|
first_option = next(
|
|
(o for o in select_provider.options if o.get_attribute("value").strip()),
|
|
select_provider.options[0]
|
|
)
|
|
select_provider.select_by_value(first_option.get_attribute("value"))
|
|
time.sleep(2)
|
|
|
|
substep = "member_dob"
|
|
member_dob = wait.until(
|
|
EC.presence_of_all_elements_located((By.NAME, "dateInput"))
|
|
)[1]
|
|
member_dob.clear()
|
|
member_dob.send_keys(self.dateOfBirth)
|
|
|
|
substep = "member_number"
|
|
member_number = wait.until(
|
|
EC.presence_of_element_located((By.NAME, "memberNumber"))
|
|
)
|
|
member_number.clear()
|
|
member_number.send_keys(self.memberId)
|
|
|
|
substep = "search_button"
|
|
search_button = wait.until(
|
|
EC.element_to_be_clickable((By.XPATH, "//button[contains(@class,'btn-primary')]"))
|
|
)
|
|
search_button.click()
|
|
|
|
substep = "wait_results"
|
|
wait.until(
|
|
EC.presence_of_element_located(
|
|
(By.XPATH, "//h4[text()='Eligible' or text()='Ineligible']")
|
|
)
|
|
)
|
|
|
|
self.extracted_data = self._extract_data_from_page()
|
|
print(f"[step1] data extracted: {self.extracted_data}")
|
|
return "Success"
|
|
except Exception as e:
|
|
print(f"[step1] FAILED at substep='{substep}': {e}")
|
|
return f"ERROR:STEP1:{substep}"
|
|
|
|
# ── helpers ─────────────────────────────────────────────────────────────────
|
|
|
|
def _cell_text(self, cell):
|
|
text = cell.text.strip()
|
|
if not text:
|
|
try:
|
|
text = (self.driver.execute_script("return arguments[0].innerText;", cell) or "").strip()
|
|
except Exception:
|
|
pass
|
|
return text
|
|
|
|
def _normalize_id(self, s):
|
|
return "".join(c for c in str(s) if c.isalnum()).lower()
|
|
|
|
def _extract_data_from_page(self):
|
|
wait = WebDriverWait(self.driver, 15)
|
|
try:
|
|
wait.until(
|
|
EC.presence_of_element_located(
|
|
(By.XPATH, "//h4[text()='Eligible' or text()='Ineligible']/following::table[1]/tbody/tr")
|
|
)
|
|
)
|
|
for status_label, elig_flag in [("Eligible", "Y"), ("Ineligible", "N")]:
|
|
rows = self.driver.find_elements(
|
|
By.XPATH,
|
|
f"//h4[text()='{status_label}']/following::table[1]/tbody/tr"
|
|
)
|
|
for row in rows:
|
|
cells = row.find_elements(By.TAG_NAME, "td")
|
|
if len(cells) < 3:
|
|
continue
|
|
norm_cell = self._normalize_id(self._cell_text(cells[2]))
|
|
norm_self = self._normalize_id(self.memberId)
|
|
if norm_self and norm_cell and (norm_self in norm_cell or norm_cell in norm_self):
|
|
full_name = self._cell_text(cells[4]) if len(cells) > 4 else ""
|
|
plan_name = (
|
|
self._cell_text(cells[6]) if len(cells) > 6
|
|
else (self._cell_text(cells[-1]) if len(cells) > 4 else "")
|
|
)
|
|
name_parts = full_name.split()
|
|
return {
|
|
"eligibility": elig_flag,
|
|
"firstName": name_parts[0] if name_parts else "",
|
|
"lastName": " ".join(name_parts[1:]) if len(name_parts) > 1 else "",
|
|
"insurance": plan_name,
|
|
}
|
|
return {"eligibility": None}
|
|
except Exception as e:
|
|
print("Extraction error:", e)
|
|
return {"eligibility": None}
|
|
|
|
# ── step 2 — print eligibility PDF, return to results tab ───────────────────
|
|
|
|
def step2_eligibility_pdf(self):
|
|
wait = WebDriverWait(self.driver, 30)
|
|
self.eligibility_tab = self.driver.current_window_handle
|
|
try:
|
|
download_button = wait.until(
|
|
EC.element_to_be_clickable(
|
|
(By.XPATH, "//button[contains(.,'Printer Friendly Format')]")
|
|
)
|
|
)
|
|
download_button.click()
|
|
|
|
try:
|
|
WebDriverWait(self.driver, 10).until(lambda d: len(d.window_handles) > 1)
|
|
new_tabs = [t for t in self.driver.window_handles if t != self.eligibility_tab]
|
|
self.driver.switch_to.window(new_tabs[0])
|
|
wait.until(lambda d: d.execute_script("return document.readyState") == "complete")
|
|
wait.until(EC.presence_of_element_located((By.TAG_NAME, "body")))
|
|
time.sleep(2)
|
|
except TimeoutException:
|
|
print("No new tab for eligibility; printing current page directly")
|
|
|
|
safe_member = "".join(c for c in str(self.memberId) if c.isalnum() or c in "-_.")
|
|
pdf_filename = f"cmsp_eligibility_{safe_member}.pdf"
|
|
pdf_data = self.driver.execute_cdp_cmd("Page.printToPDF", {"printBackground": True})
|
|
pdf_bytes = base64.b64decode(pdf_data["data"])
|
|
pdf_path = os.path.join(self.download_dir, pdf_filename)
|
|
with open(pdf_path, "wb") as f:
|
|
f.write(pdf_bytes)
|
|
print("Eligibility PDF saved at:", pdf_path)
|
|
|
|
if self.driver.current_window_handle != self.eligibility_tab:
|
|
self.driver.close()
|
|
self.driver.switch_to.window(self.eligibility_tab)
|
|
time.sleep(1)
|
|
|
|
return pdf_path
|
|
except Exception as e:
|
|
print(f"[step2_eligibility_pdf] failed: {e}")
|
|
if self.eligibility_tab and self.driver.current_window_handle != self.eligibility_tab:
|
|
try:
|
|
self.driver.close()
|
|
self.driver.switch_to.window(self.eligibility_tab)
|
|
except Exception:
|
|
pass
|
|
raise
|
|
|
|
# ── step 3 — click member ID link → member details ──────────────────────────
|
|
|
|
def step3_click_member_id(self):
|
|
wait = WebDriverWait(self.driver, 30)
|
|
substep = "ng_bind_link"
|
|
try:
|
|
member_link = wait.until(
|
|
EC.element_to_be_clickable(
|
|
(By.CSS_SELECTOR, "a[ng-bind='member.memberNumber']")
|
|
)
|
|
)
|
|
self.driver.execute_script("arguments[0].click();", member_link)
|
|
time.sleep(2)
|
|
print(f"[step3] clicked member ID link, URL: {self.driver.current_url}")
|
|
return "Success"
|
|
except Exception as e:
|
|
print(f"[step3] FAILED at substep='{substep}': {e}")
|
|
return f"ERROR:STEP3:{substep}"
|
|
|
|
# ── step 3b — print member details page as PDF via CDP ───────────────────────
|
|
|
|
def step3b_member_details_pdf(self):
|
|
wait = WebDriverWait(self.driver, 30)
|
|
try:
|
|
wait.until(lambda d: d.execute_script("return document.readyState") == "complete")
|
|
time.sleep(2)
|
|
|
|
safe_member = "".join(c for c in str(self.memberId) if c.isalnum() or c in "-_.")
|
|
pdf_filename = f"cmsp_member_details_{safe_member}.pdf"
|
|
pdf_data = self.driver.execute_cdp_cmd("Page.printToPDF", {"printBackground": True})
|
|
pdf_bytes = base64.b64decode(pdf_data["data"])
|
|
pdf_path = os.path.join(self.download_dir, pdf_filename)
|
|
with open(pdf_path, "wb") as f:
|
|
f.write(pdf_bytes)
|
|
print("Member details PDF saved at:", pdf_path)
|
|
return pdf_path
|
|
except Exception as e:
|
|
print(f"[step3b_member_details_pdf] failed: {e}")
|
|
return None
|
|
|
|
# ── step 4 — click "VIEW SERVICE HISTORY" → service history page ─────────────
|
|
|
|
def step4_view_service_history(self):
|
|
wait = WebDriverWait(self.driver, 30)
|
|
substep = "service_history_link"
|
|
try:
|
|
history_link = wait.until(
|
|
EC.element_to_be_clickable(
|
|
(By.CSS_SELECTOR, "a.btn.btn-primary[href*='service-history']")
|
|
)
|
|
)
|
|
self.driver.execute_script("arguments[0].scrollIntoView(true);", history_link)
|
|
self.driver.execute_script("arguments[0].click();", history_link)
|
|
time.sleep(2)
|
|
print(f"[step4] navigated to service history, URL: {self.driver.current_url}")
|
|
return "Success"
|
|
except Exception as e:
|
|
print(f"[step4] FAILED at substep='{substep}': {e}")
|
|
return f"ERROR:STEP4:{substep}"
|
|
|
|
# ── step 5 — print history PDF via CDP, navigate back to member details ──────
|
|
# We do NOT click the "Printer Friendly Format" button here because on the
|
|
# service history page it calls window.print() (native dialog) which freezes
|
|
# Chrome. Instead we capture the page directly via CDP and then go back.
|
|
|
|
def step5_history_pdf(self):
|
|
wait = WebDriverWait(self.driver, 30)
|
|
try:
|
|
# Wait for Angular to finish rendering the service history data
|
|
wait.until(lambda d: d.execute_script("return document.readyState") == "complete")
|
|
time.sleep(3)
|
|
|
|
safe_member = "".join(c for c in str(self.memberId) if c.isalnum() or c in "-_.")
|
|
pdf_filename = f"cmsp_history_{safe_member}.pdf"
|
|
pdf_data = self.driver.execute_cdp_cmd("Page.printToPDF", {"printBackground": True})
|
|
pdf_bytes = base64.b64decode(pdf_data["data"])
|
|
pdf_path = os.path.join(self.download_dir, pdf_filename)
|
|
with open(pdf_path, "wb") as f:
|
|
f.write(pdf_bytes)
|
|
print("History PDF saved at:", pdf_path)
|
|
|
|
# Go back to member details page where "View Accumulator" lives
|
|
self.driver.back()
|
|
time.sleep(2)
|
|
wait.until(lambda d: d.execute_script("return document.readyState") == "complete")
|
|
print(f"[step5] returned to member details, URL: {self.driver.current_url}")
|
|
|
|
return pdf_path
|
|
except Exception as e:
|
|
print(f"[step5_history_pdf] failed: {e}")
|
|
return None
|
|
|
|
# ── step 6 — click "View Accumulator" ───────────────────────────────────────
|
|
|
|
def step6_click_view_accumulator(self):
|
|
wait = WebDriverWait(self.driver, 30)
|
|
substep = "view_accumulator_link"
|
|
try:
|
|
# ng-if="vm.showAccumulator" is the stable Angular condition on this link
|
|
accumulator_link = wait.until(
|
|
EC.element_to_be_clickable(
|
|
(By.CSS_SELECTOR, "a.btn.btn-primary[ng-if='vm.showAccumulator']")
|
|
)
|
|
)
|
|
self.driver.execute_script("arguments[0].scrollIntoView(true);", accumulator_link)
|
|
self.driver.execute_script("arguments[0].click();", accumulator_link)
|
|
time.sleep(2)
|
|
print(f"[step6] navigated to accumulator, URL: {self.driver.current_url}")
|
|
return "Success"
|
|
except Exception as e:
|
|
print(f"[step6] FAILED at substep='{substep}': {e}")
|
|
return f"ERROR:STEP6:{substep}"
|
|
|
|
# ── step 7 — print accumulator PDF ──────────────────────────────────────────
|
|
|
|
def step7_accumulator_pdf(self):
|
|
try:
|
|
# Wait for the page shell to load
|
|
WebDriverWait(self.driver, 30).until(
|
|
lambda d: d.execute_script("return document.readyState") == "complete"
|
|
)
|
|
|
|
# Try to detect when Angular data is ready (button has ng-if="vm.hasResults").
|
|
# Cap at 15 s — if the patient has no accumulator data the button never
|
|
# appears and we still want to capture whatever the page shows.
|
|
try:
|
|
WebDriverWait(self.driver, 15).until(
|
|
EC.visibility_of_element_located(
|
|
(By.CSS_SELECTOR, "button.btn.btn-primary[ng-click='vm.printResults()']")
|
|
)
|
|
)
|
|
print("[step7] vm.hasResults is true — data loaded")
|
|
except TimeoutException:
|
|
print("[step7] print button not visible after 15 s — patient may have no accumulator data, printing anyway")
|
|
|
|
# Extra pause so Angular finishes rendering table rows
|
|
time.sleep(5)
|
|
|
|
safe_member = "".join(c for c in str(self.memberId) if c.isalnum() or c in "-_.")
|
|
pdf_filename = f"cmsp_accumulator_{safe_member}.pdf"
|
|
pdf_data = self.driver.execute_cdp_cmd("Page.printToPDF", {"printBackground": True})
|
|
pdf_bytes = base64.b64decode(pdf_data["data"])
|
|
pdf_path = os.path.join(self.download_dir, pdf_filename)
|
|
with open(pdf_path, "wb") as f:
|
|
f.write(pdf_bytes)
|
|
print("Accumulator PDF saved at:", pdf_path)
|
|
return pdf_path
|
|
except Exception as e:
|
|
print(f"[step7_accumulator_pdf] failed: {e}")
|
|
return None
|
|
|
|
# ── main workflow ────────────────────────────────────────────────────────────
|
|
|
|
def main_workflow(self, url):
|
|
try:
|
|
self.config_driver()
|
|
self.driver.maximize_window()
|
|
self.driver.get(url)
|
|
time.sleep(3)
|
|
|
|
login_result = self.login()
|
|
if login_result.startswith("ERROR"):
|
|
return {"status": "error", "message": login_result}
|
|
|
|
step1_result = self.step1()
|
|
if step1_result.startswith("ERROR"):
|
|
return {"status": "error", "message": step1_result}
|
|
|
|
eligibility_pdf_path = self.step2_eligibility_pdf()
|
|
|
|
step3_result = self.step3_click_member_id()
|
|
if step3_result.startswith("ERROR"):
|
|
return {"status": "partial", "message": step3_result,
|
|
"pdf_path": eligibility_pdf_path, "file_type": "pdf"}
|
|
|
|
member_details_pdf_path = self.step3b_member_details_pdf()
|
|
|
|
step4_result = self.step4_view_service_history()
|
|
if step4_result.startswith("ERROR"):
|
|
return {"status": "partial", "message": step4_result,
|
|
"pdf_path": eligibility_pdf_path,
|
|
"member_details_pdf_path": member_details_pdf_path,
|
|
"file_type": "pdf"}
|
|
|
|
history_pdf_path = self.step5_history_pdf()
|
|
|
|
step6_result = self.step6_click_view_accumulator()
|
|
if step6_result.startswith("ERROR"):
|
|
return {"status": "partial", "message": step6_result,
|
|
"pdf_path": eligibility_pdf_path,
|
|
"member_details_pdf_path": member_details_pdf_path,
|
|
"history_pdf_path": history_pdf_path, "file_type": "pdf"}
|
|
|
|
accumulator_pdf_path = self.step7_accumulator_pdf()
|
|
|
|
result = {
|
|
"status": "success",
|
|
"pdf_path": eligibility_pdf_path,
|
|
"member_details_pdf_path": member_details_pdf_path,
|
|
"history_pdf_path": history_pdf_path,
|
|
"accumulator_pdf_path": accumulator_pdf_path,
|
|
"file_type": "pdf",
|
|
"message": "Eligibility, member details, service history, and accumulator PDFs captured",
|
|
}
|
|
if self.extracted_data:
|
|
result.update(self.extracted_data)
|
|
return result
|
|
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|
|
finally:
|
|
if self.driver:
|
|
self.driver.quit()
|