Files
DentalManagementMH06/apps/SeleniumService/selenium_MHBatchPaymentCheckWorker.py
Gitead 1edf73fdc8 feat: add new frontend components, MH batch worker, and gitignore rules
- Add all new Frontend source files (pages, components, hooks, utils)
- Add selenium_MHBatchPaymentCheckWorker.py and MHSinglePaymentCheckWorker.py
- Add install-steps-5-13.sh setup script
- Update .gitignore to exclude runtime/sensitive data (backups, uploads,
  chat-history, keys, downloads, generated .d.ts files) while keeping folders
- Add .gitkeep to preserve empty runtime folders in git

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-26 00:23:43 -04:00

464 lines
19 KiB
Python

from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
import time
import os
import shutil
import glob
import json
import base64
class AutomationMassHealthBatchPaymentCheck:
def __init__(self, data):
self.headless = False
self.driver = None
self.data = data.get("data", {}) if isinstance(data, dict) else {}
self.massdhp_username = self.data.get("massdhpUsername", "")
self.massdhp_password = self.data.get("massdhpPassword", "")
self.from_date = self.data.get("fromDate", "")
self.to_date = self.data.get("toDate", "")
self.download_dir = os.path.abspath("downloads")
os.makedirs(self.download_dir, exist_ok=True)
self.voucher_dir = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "Backend", "uploads", "MHVoucher")
)
os.makedirs(self.voucher_dir, exist_ok=True)
self.voucher_log_path = os.path.join(self.voucher_dir, "downloaded_vouchers.json")
def config_driver(self):
options = webdriver.ChromeOptions()
if self.headless:
options.add_argument("--headless")
prefs = {
"download.default_directory": self.download_dir,
"plugins.always_open_pdf_externally": False,
"download.prompt_for_download": False,
"download.directory_upgrade": True,
}
options.add_experimental_option("prefs", prefs)
s = Service(ChromeDriverManager().install())
self.driver = webdriver.Chrome(service=s, options=options)
def login(self):
wait = WebDriverWait(self.driver, 30)
try:
signin_button = wait.until(
EC.element_to_be_clickable(
(By.CSS_SELECTOR, "a.btn.btn-block.btn-primary[href='https://connectsso.masshealth-dental.org/mhprovider/index.html']")
)
)
signin_button.click()
print("[MHBatch login] Clicked SIGN IN button")
time.sleep(3)
email_field = wait.until(EC.presence_of_element_located((By.ID, "User")))
email_field.clear()
email_field.send_keys(self.massdhp_username)
print("[MHBatch login] Entered username")
password_field = wait.until(EC.presence_of_element_located((By.ID, "Password")))
password_field.clear()
password_field.send_keys(self.massdhp_password)
print("[MHBatch login] Entered password")
login_button = wait.until(
EC.element_to_be_clickable(
(By.CSS_SELECTOR, "input[type='submit'][name='submit'][value='Login']")
)
)
login_button.click()
print("[MHBatch login] Clicked Login button")
# Wait for SSO redirect — same approach as eligibility worker
print("[MHBatch login] Waiting for SSO redirect to provider.masshealth-dental.org ...")
deadline = time.time() + 30
found = False
while time.time() < deadline:
time.sleep(0.5)
for handle in self.driver.window_handles:
try:
self.driver.switch_to.window(handle)
if self.driver.current_url.startswith("https://provider.masshealth-dental.org"):
print(f"[MHBatch login] Redirect complete. URL: {self.driver.current_url}")
found = True
break
except Exception:
continue
if found:
break
if not found:
print("[MHBatch login] Redirect timeout — portal window not found.")
return "ERROR: Login redirect timed out"
return "Success"
except Exception as e:
print(f"[MHBatch login] Error: {e}")
return "ERROR:LOGIN FAILED"
def _load_downloaded_vouchers(self) -> set:
"""Load set of already-downloaded voucher numbers from JSON log."""
if not os.path.exists(self.voucher_log_path):
return set()
try:
with open(self.voucher_log_path, "r") as f:
data = json.load(f)
return set(data.get("downloaded", []))
except Exception as e:
print(f"[MHBatch] Warning: could not read voucher log: {e}")
return set()
def _save_downloaded_voucher(self, voucher: str):
"""Append a voucher number to the JSON log."""
existing = self._load_downloaded_vouchers()
existing.add(voucher)
try:
with open(self.voucher_log_path, "w") as f:
json.dump({"downloaded": sorted(existing)}, f, indent=2)
except Exception as e:
print(f"[MHBatch] Warning: could not update voucher log: {e}")
def _format_date(self, iso_date: str) -> str:
"""Convert YYYY-MM-DD to MM/DD/YYYY."""
try:
parts = iso_date.split("-")
return f"{parts[1]}/{parts[2]}/{parts[0]}"
except Exception:
return iso_date
def step1_click_remittance(self):
wait = WebDriverWait(self.driver, 30)
try:
remittance = wait.until(
EC.presence_of_element_located(
(By.XPATH, "//strong[@translate='Remittance']")
)
)
self.driver.execute_script("arguments[0].scrollIntoView(true);", remittance)
self.driver.execute_script("arguments[0].click();", remittance)
print("[MHBatch step1] Clicked Remittance")
time.sleep(2)
print(f"[MHBatch step1] URL after click: {self.driver.current_url}")
return "Success"
except Exception as e:
print(f"[MHBatch step1] Error clicking Remittance: {e}")
return "ERROR:STEP1:REMITTANCE"
def step2_fill_ra_dates(self):
wait = WebDriverWait(self.driver, 30)
try:
from_date = self._format_date(self.from_date)
to_date = self._format_date(self.to_date)
print(f"[MHBatch step2] Filling RA Date: From={from_date}, To={to_date}")
date_inputs = wait.until(
EC.presence_of_all_elements_located(
(By.XPATH, "//input[@name='dateInput' and @placeholder='mm/dd/yyyy']")
)
)
if len(date_inputs) < 2:
return f"ERROR:STEP2: Expected 2 date inputs, found {len(date_inputs)}"
def fill_date(input_el, date_str):
"""Click, select-all, delete, then type date digits only — lets Angular insert slashes."""
input_el.click()
time.sleep(0.2)
input_el.send_keys(Keys.CONTROL, "a")
input_el.send_keys(Keys.DELETE)
time.sleep(0.2)
# Type only digits — the field mask will insert slashes automatically
digits = date_str.replace("/", "")
for ch in digits:
input_el.send_keys(ch)
time.sleep(0.05)
input_el.send_keys(Keys.TAB)
time.sleep(0.3)
# From date (first input)
from_input = date_inputs[0]
self.driver.execute_script("arguments[0].scrollIntoView(true);", from_input)
fill_date(from_input, from_date)
print(f"[MHBatch step2] Entered From date: {from_date}")
# To date (second input)
to_input = date_inputs[1]
fill_date(to_input, to_date)
print(f"[MHBatch step2] Entered To date: {to_date}")
return "Success"
except Exception as e:
print(f"[MHBatch step2] Error: {e}")
return f"ERROR:STEP2:{e}"
def step3_click_search(self):
wait = WebDriverWait(self.driver, 30)
try:
search_btn = wait.until(
EC.element_to_be_clickable(
(By.XPATH, "//button[@ng-click='vm.search()' and @type='submit']")
)
)
self.driver.execute_script("arguments[0].click();", search_btn)
print("[MHBatch step3] Clicked SEARCH")
time.sleep(3)
print(f"[MHBatch step3] URL after search: {self.driver.current_url}")
return "Success"
except Exception as e:
print(f"[MHBatch step3] Error: {e}")
return f"ERROR:STEP3:{e}"
def step3b_save_results_pdf(self):
"""Save the search results page as a PDF in MHVoucher folder."""
try:
time.sleep(2)
safe_from = self.from_date.replace("-", "")
safe_to = self.to_date.replace("-", "")
filename = f"remittance_search_{safe_from}_to_{safe_to}.pdf"
pdf_path = os.path.join(self.voucher_dir, filename)
result = self.driver.execute_cdp_cmd("Page.printToPDF", {
"landscape": True,
"printBackground": True,
"preferCSSPageSize": False,
"paperWidth": 11,
"paperHeight": 8.5,
"marginTop": 0.3,
"marginBottom": 0.3,
"marginLeft": 0.3,
"marginRight": 0.3,
"scale": 0.8,
})
pdf_bytes = base64.b64decode(result.get("data", ""))
with open(pdf_path, "wb") as f:
f.write(pdf_bytes)
print(f"[MHBatch step3b] Saved results PDF: {pdf_path}")
return pdf_path
except Exception as e:
print(f"[MHBatch step3b] Error saving PDF: {e}")
return None
def _has_no_results(self):
"""Return True if the search results page shows no vouchers."""
try:
body_text = self.driver.find_element(By.TAG_NAME, "body").text.lower()
no_result_markers = ["no results", "no records", "no data", "0 results", "nothing found"]
if any(m in body_text for m in no_result_markers):
return True
rows = self.driver.find_elements(By.XPATH, "//table//tbody//tr")
if not rows:
return True
return False
except Exception:
return False
def step4_collect_voucher_numbers(self):
"""Collect all voucher numbers + their hrefs from results table, clicking VIEW MORE until exhausted.
Returns list of (voucher_number, href_or_None) tuples, newest first (top of table)."""
wait = WebDriverWait(self.driver, 30)
vouchers = [] # list of (voucher_number, href)
seen = set()
try:
wait.until(EC.presence_of_element_located(
(By.XPATH, "//table//tbody//tr")
))
print("[MHBatch step4] Results table loaded")
# Click VIEW MORE once if present to load all results
try:
view_more = self.driver.find_element(
By.XPATH, "//a[normalize-space(text())='VIEW MORE'] | //button[normalize-space(text())='VIEW MORE']"
)
if view_more.is_displayed():
self.driver.execute_script("arguments[0].click();", view_more)
print("[MHBatch step4] Clicked VIEW MORE")
time.sleep(2)
except Exception:
print("[MHBatch step4] No VIEW MORE button found")
# Collect all voucher rows
rows = self.driver.find_elements(By.XPATH, "//table//tbody//tr")
for row in rows:
try:
cell = row.find_element(By.XPATH, ".//td[1]")
voucher_text = cell.text.strip()
if not voucher_text or voucher_text in seen:
continue
seen.add(voucher_text)
href = None
try:
link = cell.find_element(By.TAG_NAME, "a")
href = link.get_attribute("href")
except Exception:
pass
vouchers.append((voucher_text, href))
print(f"[MHBatch step4] Found voucher: {voucher_text} href={href}")
except Exception:
continue
print(f"[MHBatch step4] Total vouchers collected: {len(vouchers)}")
return vouchers
except Exception as e:
print(f"[MHBatch step4] Error: {e}")
return []
def _wait_for_new_download(self, existing_files, timeout=30):
"""Wait for a new non-.crdownload file to appear in download_dir."""
start = time.time()
while time.time() - start < timeout:
current = set(glob.glob(os.path.join(self.download_dir, "*")))
new_files = current - existing_files
completed = [f for f in new_files if not f.endswith(".crdownload")]
if completed:
return completed[0]
time.sleep(1)
return None
def step5_download_vouchers(self, vouchers):
"""Phase 1: check all voucher numbers on the page against the JSON log.
Phase 2: download only the new ones, newest first."""
results_url = self.driver.current_url
downloaded = []
failed = []
# --- Phase 1: check all vouchers against log ---
already_downloaded = self._load_downloaded_vouchers()
print(f"[MHBatch step5] Webpage has {len(vouchers)} voucher(s), log has {len(already_downloaded)} already downloaded")
new_vouchers = [(v, href) for v, href in vouchers if v not in already_downloaded]
skipped = [v for v, href in vouchers if v in already_downloaded]
print(f"[MHBatch step5] New vouchers to download: {len(new_vouchers)}")
for v, _ in new_vouchers:
print(f" -> {v}")
print(f"[MHBatch step5] Skipping {len(skipped)} already downloaded: {skipped}")
if not new_vouchers:
print("[MHBatch step5] Nothing new to download.")
return {"downloaded": [], "skipped": skipped, "failed": []}
# --- Phase 2: download new vouchers ---
for voucher, href in new_vouchers:
try:
print(f"[MHBatch step5] Downloading: {voucher}")
existing_files = set(glob.glob(os.path.join(self.download_dir, "*")))
if href:
self.driver.get(href)
time.sleep(2)
else:
self.driver.get(results_url)
time.sleep(2)
link = WebDriverWait(self.driver, 15).until(
EC.element_to_be_clickable(
(By.XPATH, f"//td[normalize-space(text())='{voucher}']//a | //a[normalize-space(text())='{voucher}']")
)
)
self.driver.execute_script("arguments[0].click();", link)
time.sleep(2)
# Close any extra tab (PDF viewer)
if len(self.driver.window_handles) > 1:
self.driver.switch_to.window(self.driver.window_handles[-1])
self.driver.close()
self.driver.switch_to.window(self.driver.window_handles[0])
new_file = self._wait_for_new_download(existing_files, timeout=30)
if not new_file:
print(f"[MHBatch step5] Download timed out: {voucher}")
failed.append(voucher)
self.driver.get(results_url)
time.sleep(2)
continue
dest = os.path.join(self.voucher_dir, f"{voucher}.pdf")
shutil.move(new_file, dest)
print(f"[MHBatch step5] Saved: {dest}")
self._save_downloaded_voucher(voucher)
downloaded.append(voucher)
self.driver.get(results_url)
time.sleep(2)
except Exception as e:
print(f"[MHBatch step5] Error on {voucher}: {e}")
failed.append(voucher)
try:
self.driver.get(results_url)
time.sleep(2)
except Exception:
pass
return {"downloaded": downloaded, "skipped": skipped, "failed": failed}
def main_workflow(self, url):
try:
self.config_driver()
self.driver.maximize_window()
self.driver.get(url)
time.sleep(3)
login_result = self.login()
if login_result.startswith("ERROR"):
return {"status": "error", "message": login_result}
step1_result = self.step1_click_remittance()
if step1_result.startswith("ERROR"):
return {"status": "error", "message": step1_result}
step2_result = self.step2_fill_ra_dates()
if step2_result.startswith("ERROR"):
return {"status": "error", "message": step2_result}
step3_result = self.step3_click_search()
if step3_result.startswith("ERROR"):
return {"status": "error", "message": step3_result}
results_pdf = self.step3b_save_results_pdf()
if self._has_no_results():
return {
"status": "success",
"message": "No results found for the selected date range.",
"noResults": True,
"resultsPdf": results_pdf,
"downloaded": [],
"skipped": [],
"failed": [],
}
vouchers = self.step4_collect_voucher_numbers()
if not vouchers:
return {"status": "success", "message": "No vouchers found for the selected date range.", "downloaded": [], "skipped": [], "failed": []}
step5_result = self.step5_download_vouchers(vouchers)
return {
"status": "success",
"message": f"Downloaded {len(step5_result['downloaded'])} new, {len(step5_result['skipped'])} skipped (already downloaded), {len(step5_result['failed'])} failed.",
"downloaded": step5_result["downloaded"],
"skipped": step5_result["skipped"],
"failed": step5_result["failed"],
"voucherDir": self.voucher_dir,
}
except Exception as e:
return {"status": "error", "message": str(e)}
finally:
if self.driver:
self.driver.quit()