Files
DentalManagementMH04/apps/SeleniumService/selenium_eligibilityCheckWorker.py

399 lines
16 KiB
Python
Executable File

from selenium import webdriver
from selenium.common import TimeoutException
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait, Select
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
import time
import os
import base64
class AutomationMassHealthEligibilityCheck:
def __init__(self, data):
self.headless = False
self.driver = None
self.extracted_data = {} # Store extracted data
self.data = data.get("data")
# Flatten values for convenience
self.massdhp_username = self.data.get("massdhpUsername", "")
self.massdhp_password = self.data.get("massdhpPassword", "")
self.dateOfBirth = self.data.get("dateOfBirth", "")
self.memberId = self.data.get("memberId", "")
# Convert dateOfBirth from YYYY-MM-DD to MMDDYYYY format
if self.dateOfBirth and "-" in self.dateOfBirth:
parts = self.dateOfBirth.split("-")
if len(parts) == 3:
year, month, day = parts
self.dateOfBirth = f"{month.zfill(2)}{day.zfill(2)}{year}"
self.download_dir = os.path.abspath("downloads")
os.makedirs(self.download_dir, exist_ok=True)
def config_driver(self):
options = webdriver.ChromeOptions()
if self.headless:
options.add_argument("--headless")
# Add PDF download preferences
prefs = {
"download.default_directory": self.download_dir,
"plugins.always_open_pdf_externally": False,
"download.prompt_for_download": False,
"download.directory_upgrade": True
}
options.add_experimental_option("prefs", prefs)
s = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=s, options=options)
self.driver = driver
def login(self):
wait = WebDriverWait(self.driver, 30)
try:
# Step 1: Click the SIGN IN button on the initial page
signin_button = wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, "a.btn.btn-block.btn-primary[href='https://connectsso.masshealth-dental.org/mhprovider/index.html']")))
signin_button.click()
# Wait for the new page to load
time.sleep(3)
# Step 2: Enter email on the new login page
email_field = wait.until(EC.presence_of_element_located((By.ID, "User")))
email_field.clear()
email_field.send_keys(self.massdhp_username)
# Step 3: Enter password
password_field = wait.until(EC.presence_of_element_located((By.ID, "Password")))
password_field.clear()
password_field.send_keys(self.massdhp_password)
# Step 4: Click login button
login_button = wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, "input[type='submit'][name='submit'][value='Login']")))
login_button.click()
return "Success"
except Exception as e:
print(f"Error while logging in: {e}")
return "ERROR:LOGIN FAILED"
def step1(self):
wait = WebDriverWait(self.driver, 30)
substep = "init"
try:
print(f"[step1] current URL after login: {self.driver.current_url}")
print(f"[step1] page title: {self.driver.title}")
substep = "patient_management"
patient_mgmt = wait.until(
EC.presence_of_element_located(
(By.XPATH, "//strong[@translate='Patient Management']")
)
)
self.driver.execute_script("arguments[0].scrollIntoView(true);", patient_mgmt)
self.driver.execute_script("arguments[0].click();", patient_mgmt)
time.sleep(2)
substep = "member_eligibility_link"
eligibility_link = wait.until(
EC.presence_of_element_located(
(By.XPATH, "//a[@translate='Member Eligibility']")
)
)
self.driver.execute_script("arguments[0].click();", eligibility_link)
time.sleep(2)
substep = "provider_dropdown"
provider_dropdown = wait.until(
EC.presence_of_element_located((By.NAME, "provider"))
)
select_provider = Select(provider_dropdown)
# Log available options so we can see if the provider name changed
options = [o.text for o in select_provider.options]
print(f"[step1] provider options: {options}")
substep = "select_provider"
# Select the first non-empty option (index 0 is usually a blank placeholder)
first_option = next(
(o for o in select_provider.options if o.get_attribute("value").strip()),
select_provider.options[0]
)
print(f"[step1] selecting provider: '{first_option.text}'")
select_provider.select_by_value(first_option.get_attribute("value"))
time.sleep(2)
substep = "member_dob"
member_dob = wait.until(
EC.presence_of_all_elements_located((By.NAME, "dateInput"))
)[1]
member_dob.clear()
member_dob.send_keys(self.dateOfBirth)
substep = "member_number"
member_number = wait.until(
EC.presence_of_element_located((By.NAME, "memberNumber"))
)
member_number.clear()
member_number.send_keys(self.memberId)
substep = "search_button"
search_button = wait.until(
EC.element_to_be_clickable((By.XPATH, "//button[contains(@class,'btn-primary')]"))
)
search_button.click()
substep = "wait_results"
wait.until(
EC.presence_of_element_located(
(By.XPATH, "//h4[text()='Eligible' or text()='Ineligible']")
)
)
self.extracted_data = self._extract_data_from_page()
print(f"[step1] data extracted: {self.extracted_data}")
return "Success"
except Exception as e:
print(f"[step1] FAILED at substep='{substep}': {e}")
print(f"[step1] URL at failure: {self.driver.current_url}")
return f"ERROR:STEP1:{substep}"
def _cell_text(self, cell):
"""Get text from a cell, falling back to JS innerText if .text is empty."""
text = cell.text.strip()
if not text:
try:
text = (self.driver.execute_script("return arguments[0].innerText;", cell) or "").strip()
except Exception:
pass
return text
def _normalize_id(self, s):
"""Strip all non-alphanumeric characters and lowercase for robust ID matching."""
return ''.join(c for c in str(s) if c.isalnum()).lower()
def _extract_data_from_page(self):
wait = WebDriverWait(self.driver, 5)
extracted = {}
try:
# Wait until Eligible or Ineligible header appears
wait.until(
EC.presence_of_element_located(
(By.XPATH, "//h4[text()='Eligible' or text()='Ineligible']")
)
)
eligible_rows = self.driver.find_elements(
By.XPATH,
"//h4[text()='Eligible']/following::table[1]/tbody/tr"
)
if eligible_rows:
for row in eligible_rows:
cells = row.find_elements(By.TAG_NAME, "td")
if len(cells) < 3:
continue
member_number = self._cell_text(cells[2])
norm_cell = self._normalize_id(member_number)
norm_self = self._normalize_id(self.memberId)
print(f"[eligible] cells count={len(cells)}, memberId check: '{norm_self}' vs '{norm_cell}' (raw: '{member_number}')")
if len(cells) >= 5:
print(f" cells[3]='{self._cell_text(cells[3])}' cells[4]='{self._cell_text(cells[4])}'", end="")
if len(cells) > 5:
print(f" cells[5]='{self._cell_text(cells[5])}'", end="")
if len(cells) > 6:
print(f" cells[6]='{self._cell_text(cells[6])}'", end="")
print()
if norm_self and norm_cell and (norm_self in norm_cell or norm_cell in norm_self):
# name is in cells[4], insurance in cells[6] (fallback to last cell)
full_name = self._cell_text(cells[4]) if len(cells) > 4 else ""
plan_name = self._cell_text(cells[6]) if len(cells) > 6 else (self._cell_text(cells[-1]) if len(cells) > 4 else "")
name_parts = full_name.split()
first_name = name_parts[0] if name_parts else ""
last_name = " ".join(name_parts[1:]) if len(name_parts) > 1 else ""
print(f"[eligible] MATCHED → name='{full_name}' plan='{plan_name}'")
extracted = {
"eligibility": "Y",
"firstName": first_name,
"lastName": last_name,
"insurance": plan_name
}
return extracted
ineligible_rows = self.driver.find_elements(
By.XPATH,
"//h4[text()='Ineligible']/following::table[1]/tbody/tr"
)
if ineligible_rows:
for row in ineligible_rows:
cells = row.find_elements(By.TAG_NAME, "td")
if len(cells) < 3:
continue
member_number = self._cell_text(cells[2])
norm_cell = self._normalize_id(member_number)
norm_self = self._normalize_id(self.memberId)
print(f"[ineligible] cells count={len(cells)}, memberId check: '{norm_self}' vs '{norm_cell}' (raw: '{member_number}')")
if len(cells) >= 5:
print(f" cells[3]='{self._cell_text(cells[3])}' cells[4]='{self._cell_text(cells[4])}'", end="")
if len(cells) > 5:
print(f" cells[5]='{self._cell_text(cells[5])}'", end="")
print()
if norm_self and norm_cell and (norm_self in norm_cell or norm_cell in norm_self):
full_name = self._cell_text(cells[4]) if len(cells) > 4 else ""
plan_name = self._cell_text(cells[5]) if len(cells) > 5 else (self._cell_text(cells[-1]) if len(cells) > 4 else "")
name_parts = full_name.split()
first_name = name_parts[0] if name_parts else ""
last_name = " ".join(name_parts[1:]) if len(name_parts) > 1 else ""
print(f"[ineligible] MATCHED → name='{full_name}' plan='{plan_name}'")
extracted = {
"eligibility": "N",
"firstName": first_name,
"lastName": last_name,
"insurance": plan_name
}
return extracted
print(f"[extraction] No matching row found for memberId='{self.memberId}'")
return {"eligibility": None}
except Exception as e:
print("Extraction error:", e)
return {"eligibility": None}
def _print_current_page_as_pdf(self):
"""Generate PDF from the currently active tab using CDP."""
safe_member = "".join(c for c in str(self.memberId) if c.isalnum() or c in "-_.")
pdf_filename = f"eligibility_{safe_member}.pdf"
pdf_data = self.driver.execute_cdp_cmd("Page.printToPDF", {"printBackground": True})
pdf_bytes = base64.b64decode(pdf_data["data"])
pdf_path = os.path.join(self.download_dir, pdf_filename)
with open(pdf_path, "wb") as f:
f.write(pdf_bytes)
print("PDF saved at:", pdf_path)
return pdf_path
def step2(self):
wait = WebDriverWait(self.driver, 30)
try:
print(f"Using stored extracted data: {self.extracted_data}")
# Step 1: Click Printer Friendly Format
download_button = wait.until(
EC.element_to_be_clickable(
(By.XPATH, "//button[contains(.,'Printer Friendly Format')]")
)
)
original_tab = self.driver.current_window_handle
download_button.click()
# Wait up to 10s for a new tab; if none opens, print the current page
try:
WebDriverWait(self.driver, 10).until(lambda d: len(d.window_handles) > 1)
new_tabs = [tab for tab in self.driver.window_handles if tab != original_tab]
self.driver.switch_to.window(new_tabs[0])
wait.until(lambda d: d.execute_script("return document.readyState") == "complete")
wait.until(EC.presence_of_element_located((By.TAG_NAME, "body")))
time.sleep(2)
print("Printer-friendly tab opened:", self.driver.current_url)
except TimeoutException:
# Portal did not open a new tab — print the results page directly
print("No new tab opened; printing results page directly as PDF")
pdf_path = self._print_current_page_as_pdf()
result = {
"status": "success",
"pdf_path": pdf_path,
"file_type": "pdf",
"message": "PDF captured successfully"
}
if self.extracted_data:
result.update(self.extracted_data)
return result
except Exception as e:
print("PDF capture failed:", e)
# Fallback to screenshot (always keep this)
try:
safe_member = "".join(c for c in str(self.memberId) if c.isalnum() or c in "-_.")
screenshot_path = os.path.join(self.download_dir, f"eligibility_{safe_member}.png")
self.driver.save_screenshot(screenshot_path)
print("Screenshot saved at:", screenshot_path)
result = {
"status": "success",
"pdf_path": screenshot_path,
"file_type": "screenshot",
"message": "Screenshot captured (PDF failed)"
}
# Add stored extracted data to result even for screenshots
if self.extracted_data:
result.update(self.extracted_data)
return result
except Exception as ss_error:
return {
"status": "error",
"message": f"PDF + Screenshot failed: {ss_error}"
}
def main_workflow(self, url):
try:
self.config_driver()
self.driver.maximize_window()
self.driver.get(url)
time.sleep(3)
login_result = self.login()
if login_result.startswith("ERROR"):
return {"status": "error", "message": login_result}
step1_result = self.step1()
if step1_result.startswith("ERROR"):
return {"status": "error", "message": step1_result}
step2_result = self.step2()
if step2_result.get("status") == "error":
return {"status": "error", "message": step2_result.get("message")}
return step2_result
except Exception as e:
return {
"status": "error",
"message": str(e)
}
finally:
self.driver.quit()