- Insurance Forms modal: split into Insurance Claim / PreAuth tabs - PreAuth tab: same patient info + service lines, no toggle/direct combos - Excluded Recalls & New Patients, Composite Fillings (Front/Back), Pedo from PreAuth combos - Extractions: replaced Simple/Surg/Baby Teeth EXT with Full Bony EXT (D7240) - MH PreAuth button: rewritten selenium worker to use masshealth-dental.org, selects Dental Prior Authorization (2nd option), skips Date of Service field - agent.py: convert pdf_path to pdf_url for /claim-pre-auth endpoint - nginx + Express: raise body size limit to 50mb (fix 413 errors) - DB schema: appointmentId optional on Claim, add preAuthNumber field, add PREAUTH status - Backend: create PREAUTH claim record on preauth submit, save preAuthNumber on completion - Claims table: add PreAuth No column (blue) next to Claim No Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
951 lines
40 KiB
Python
Executable File
951 lines
40 KiB
Python
Executable File
from selenium import webdriver
|
|
from selenium.common import TimeoutException
|
|
from selenium.webdriver.chrome.service import Service
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.common.keys import Keys
|
|
from selenium.webdriver.support.ui import WebDriverWait, Select
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from webdriver_manager.chrome import ChromeDriverManager
|
|
import re
|
|
import time
|
|
import os
|
|
import base64
|
|
import tempfile
|
|
from datetime import datetime
|
|
|
|
|
|
class AutomationMassHealthPreAuth:
|
|
def __init__(self, data):
|
|
self.headless = False
|
|
self.driver = None
|
|
self.extracted_data = {}
|
|
|
|
print(f"DEBUG: Received data = {data}")
|
|
self.upload_files = []
|
|
if isinstance(data, dict):
|
|
self.upload_files = (data.get("pdfs", []) or []) + (data.get("images", []) or [])
|
|
self.data = data.get("data") if isinstance(data, dict) else None
|
|
if not self.data and isinstance(data, dict) and isinstance(data.get("claim"), dict):
|
|
claim = data.get("claim")
|
|
first_line = None
|
|
if isinstance(claim.get("serviceLines"), list) and len(claim.get("serviceLines")) > 0:
|
|
first_line = claim.get("serviceLines")[0] if isinstance(claim.get("serviceLines")[0], dict) else None
|
|
|
|
patient_name = (claim.get("patientName") or "").strip()
|
|
first_name = ""
|
|
last_name = ""
|
|
if patient_name:
|
|
parts = patient_name.split()
|
|
first_name = parts[0] if len(parts) > 0 else ""
|
|
last_name = " ".join(parts[1:]) if len(parts) > 1 else ""
|
|
|
|
self.data = {
|
|
"massdhpUsername": claim.get("massdhpUsername", ""),
|
|
"massdhpPassword": claim.get("massdhpPassword", ""),
|
|
"memberId": claim.get("memberId", ""),
|
|
"dateOfBirth": claim.get("dateOfBirth", ""),
|
|
"submissionDate": claim.get("serviceDate", ""),
|
|
"firstName": claim.get("firstName", "") or first_name,
|
|
"lastName": claim.get("lastName", "") or last_name,
|
|
"procedureCode": (first_line or {}).get("procedureCode", "") if first_line else "",
|
|
"toothNumber": (first_line or {}).get("toothNumber", "") if first_line else "",
|
|
"toothSurface": (first_line or {}).get("toothSurface", "") if first_line else "",
|
|
}
|
|
|
|
if not self.data:
|
|
self.data = {}
|
|
|
|
print(f"DEBUG: Extracted data = {self.data}")
|
|
|
|
# Extract service lines data for multiple rows
|
|
self.serviceLines = []
|
|
if isinstance(data, dict) and isinstance(data.get("claim"), dict):
|
|
claim = data.get("claim")
|
|
service_lines = claim.get("serviceLines", [])
|
|
if isinstance(service_lines, list) and len(service_lines) > 0:
|
|
self.serviceLines = service_lines
|
|
|
|
print(f"DEBUG: Found {len(self.serviceLines)} service lines")
|
|
for i, line in enumerate(self.serviceLines):
|
|
print(f"DEBUG: Service line {i+1}: {line}")
|
|
|
|
# Flatten values for convenience
|
|
self.massdhp_username = self.data.get("massdhpUsername", "")
|
|
self.massdhp_password = self.data.get("massdhpPassword", "")
|
|
self.dateOfBirth = self.data.get("dateOfBirth", "")
|
|
self.originalDateOfBirth = self.data.get("dateOfBirth", "")
|
|
self.memberId = self.data.get("memberId", "")
|
|
self.submissionDate = self.data.get("submissionDate", "")
|
|
self.firstName = self.data.get("firstName", "")
|
|
self.lastName = self.data.get("lastName", "")
|
|
self.procedureCode = self.data.get("procedureCode", "")
|
|
self.toothNumber = self.data.get("toothNumber", "")
|
|
self.toothSurface = self.data.get("toothSurface", "")
|
|
|
|
print(f"DEBUG: submissionDate = '{self.submissionDate}'")
|
|
print(f"DEBUG: firstName = '{self.firstName}', lastName = '{self.lastName}'")
|
|
print(f"DEBUG: procedureCode = '{self.procedureCode}'")
|
|
|
|
# Extract rendering provider info from npiProvider field
|
|
self.rendering_provider_name = ""
|
|
self.rendering_provider_npi = ""
|
|
if isinstance(data, dict) and isinstance(data.get("claim"), dict):
|
|
npi_provider = data["claim"].get("npiProvider") or {}
|
|
self.rendering_provider_name = (npi_provider.get("providerName") or "").strip()
|
|
self.rendering_provider_npi = (npi_provider.get("npiNumber") or "").strip()
|
|
|
|
print(f"DEBUG: rendering_provider_name = '{self.rendering_provider_name}'")
|
|
print(f"DEBUG: rendering_provider_npi = '{self.rendering_provider_npi}'")
|
|
|
|
# Convert dateOfBirth to MMDDYYYY format
|
|
if self.dateOfBirth:
|
|
dob_raw = str(self.dateOfBirth).strip()
|
|
if "T" in dob_raw:
|
|
dob_raw = dob_raw.split("T")[0]
|
|
|
|
parsed = None
|
|
for fmt in ("%Y-%m-%d", "%m-%d-%Y", "%m/%d/%Y"):
|
|
try:
|
|
parsed = datetime.strptime(dob_raw, fmt)
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
if parsed:
|
|
self.dateOfBirth = parsed.strftime("%m%d%Y")
|
|
|
|
self.download_dir = os.path.abspath("downloads")
|
|
os.makedirs(self.download_dir, exist_ok=True)
|
|
|
|
def config_driver(self):
|
|
options = webdriver.ChromeOptions()
|
|
if self.headless:
|
|
options.add_argument("--headless")
|
|
|
|
options.add_argument("--enable-print-browser")
|
|
|
|
prefs = {
|
|
"download.default_directory": self.download_dir,
|
|
"plugins.always_open_pdf_externally": False,
|
|
"download.prompt_for_download": False,
|
|
"download.directory_upgrade": True,
|
|
}
|
|
options.add_experimental_option("prefs", prefs)
|
|
|
|
s = Service(ChromeDriverManager().install())
|
|
driver = webdriver.Chrome(service=s, options=options)
|
|
self.driver = driver
|
|
|
|
def login(self):
|
|
wait = WebDriverWait(self.driver, 30)
|
|
|
|
try:
|
|
signin_button = wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, "a.btn.btn-block.btn-primary[href='https://connectsso.masshealth-dental.org/mhprovider/index.html']")))
|
|
signin_button.click()
|
|
time.sleep(2)
|
|
|
|
email_field = wait.until(EC.presence_of_element_located((By.ID, "User")))
|
|
email_field.clear()
|
|
email_field.send_keys(self.massdhp_username)
|
|
|
|
password_field = wait.until(EC.presence_of_element_located((By.ID, "Password")))
|
|
password_field.clear()
|
|
password_field.send_keys(self.massdhp_password)
|
|
|
|
login_button = wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, "input[type='submit'][name='submit'][value='Login']")))
|
|
login_button.click()
|
|
|
|
return "Success"
|
|
except Exception as e:
|
|
print(f"Error while logging in: {e}")
|
|
return "ERROR:LOGIN FAILED"
|
|
|
|
def navigate_to_submit_claims(self):
|
|
wait = WebDriverWait(self.driver, 30)
|
|
|
|
try:
|
|
claims_dropdown = wait.until(
|
|
EC.presence_of_element_located(
|
|
(By.XPATH, "//strong[contains(@class,'navtitle') and contains(text(),'Claims/Prior Authorizations')]")
|
|
)
|
|
)
|
|
self.driver.execute_script("arguments[0].scrollIntoView(true);", claims_dropdown)
|
|
self.driver.execute_script("arguments[0].click();", claims_dropdown)
|
|
time.sleep(1)
|
|
|
|
submit_claims_link = wait.until(
|
|
EC.presence_of_element_located(
|
|
(By.XPATH, "//a[contains(@class,'navitem subnav') and contains(text(),'Submit Claims & Prior Authorizations')]")
|
|
)
|
|
)
|
|
time.sleep(1)
|
|
self.driver.execute_script("arguments[0].scrollIntoView(true);", submit_claims_link)
|
|
self.driver.execute_script("arguments[0].click();", submit_claims_link)
|
|
time.sleep(1)
|
|
|
|
return "Success"
|
|
except Exception as e:
|
|
print(f"Error navigating to Submit Claims: {e}")
|
|
return "ERROR:NAVIGATION_FAILED"
|
|
|
|
def select_dental_prior_authorization(self):
|
|
"""Select 'Dental Prior Authorization' (2nd option) from the submission type dropdown."""
|
|
wait = WebDriverWait(self.driver, 30)
|
|
|
|
try:
|
|
submission_dropdown = wait.until(
|
|
EC.presence_of_element_located(
|
|
(By.XPATH, "//select[contains(@ng-model,'vm.newClaim.type')]")
|
|
)
|
|
)
|
|
time.sleep(1)
|
|
|
|
select = Select(submission_dropdown)
|
|
|
|
# Try by visible text first (most reliable)
|
|
target_texts = [
|
|
"Dental Prior Authorization",
|
|
"Dental Pre-Authorization",
|
|
"Prior Authorization",
|
|
"Pre-Authorization",
|
|
]
|
|
for text in target_texts:
|
|
try:
|
|
select.select_by_visible_text(text)
|
|
print(f"DEBUG: Selected preauth option by text: '{text}'")
|
|
time.sleep(1)
|
|
return "Success"
|
|
except Exception:
|
|
continue
|
|
|
|
# Fallback: select by index (2nd option = index 1, skipping the placeholder at index 0)
|
|
options = select.options
|
|
print(f"DEBUG: Available options: {[o.text for o in options]}")
|
|
non_empty = [o for o in options if o.get_attribute("value") and o.get_attribute("value") not in ("", "string:")]
|
|
if len(non_empty) >= 2:
|
|
non_empty[1].click()
|
|
print(f"DEBUG: Selected 2nd non-empty option: '{non_empty[1].text}'")
|
|
time.sleep(1)
|
|
return "Success"
|
|
|
|
raise Exception("Could not find preauthorization option in dropdown")
|
|
except Exception as e:
|
|
print(f"Error selecting Dental Prior Authorization: {e}")
|
|
return "ERROR:SELECTION FAILED"
|
|
|
|
def fill_date_of_service(self):
|
|
wait = WebDriverWait(self.driver, 30)
|
|
|
|
try:
|
|
raw_date = self.submissionDate if self.submissionDate else datetime.now().strftime("%Y-%m-%d")
|
|
try:
|
|
formatted_date = datetime.strptime(raw_date.strip(), "%Y-%m-%d").strftime("%m/%d/%Y")
|
|
except ValueError:
|
|
formatted_date = raw_date
|
|
print(f"DEBUG: Using service date = '{formatted_date}'")
|
|
|
|
date_input = wait.until(
|
|
EC.visibility_of_element_located(
|
|
(By.XPATH, "//input[@name='dateOfService' and @ng-model='date' and @placeholder='mm/dd/yyyy']")
|
|
)
|
|
)
|
|
time.sleep(1)
|
|
|
|
self.driver.execute_script(f"arguments[0].value = '{formatted_date}'; arguments[0].dispatchEvent(new Event('input', {{ bubbles: true }})); arguments[0].dispatchEvent(new Event('change', {{ bubbles: true }}));", date_input)
|
|
time.sleep(1)
|
|
date_input.send_keys(Keys.TAB)
|
|
time.sleep(1)
|
|
|
|
return "Success"
|
|
except Exception as e:
|
|
print(f"Error filling Date of Service: {e}")
|
|
return "ERROR:DATE_FILL_FAILED"
|
|
|
|
def fill_member_eligibility(self):
|
|
wait = WebDriverWait(self.driver, 30)
|
|
|
|
try:
|
|
print(f"DEBUG: Filling member eligibility with DOB: '{self.originalDateOfBirth}'")
|
|
|
|
formatted_dob = ""
|
|
if self.originalDateOfBirth:
|
|
try:
|
|
dob_raw = str(self.originalDateOfBirth).strip()
|
|
if "T" in dob_raw:
|
|
dob_raw = dob_raw.split("T")[0]
|
|
|
|
date_obj = None
|
|
for fmt in ("%Y-%m-%d", "%m-%d-%Y", "%m/%d/%Y"):
|
|
try:
|
|
date_obj = datetime.strptime(dob_raw, fmt)
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
if date_obj:
|
|
formatted_dob = date_obj.strftime("%m/%d/%Y")
|
|
print(f"DEBUG: Formatted DOB = '{formatted_dob}'")
|
|
except Exception as e:
|
|
print(f"DEBUG: DOB parsing error: {e}")
|
|
formatted_dob = ""
|
|
|
|
dob_input = wait.until(
|
|
EC.visibility_of_element_located(
|
|
(By.XPATH, "//input[@name='dateInput' and @placeholder='mm/dd/yyyy' and @ng-model='date']")
|
|
)
|
|
)
|
|
time.sleep(1)
|
|
self.driver.execute_script(f"arguments[0].value = '{formatted_dob}'; arguments[0].dispatchEvent(new Event('input', {{ bubbles: true }})); arguments[0].dispatchEvent(new Event('change', {{ bubbles: true }}));", dob_input)
|
|
time.sleep(1)
|
|
dob_input.send_keys(Keys.TAB)
|
|
time.sleep(1)
|
|
|
|
if self.memberId:
|
|
print(f"DEBUG: Filling member ID: '{self.memberId}'")
|
|
member_input = wait.until(
|
|
EC.visibility_of_element_located(
|
|
(By.XPATH, "//input[@placeholder='Member Number' and @ng-model='vm.number']")
|
|
)
|
|
)
|
|
time.sleep(1)
|
|
member_input.click()
|
|
time.sleep(1)
|
|
member_input.clear()
|
|
time.sleep(1)
|
|
member_input.send_keys(self.memberId)
|
|
time.sleep(1)
|
|
member_input.send_keys(Keys.TAB)
|
|
time.sleep(1)
|
|
|
|
print("DEBUG: Clicking SEARCH button")
|
|
search_button = wait.until(
|
|
EC.element_to_be_clickable(
|
|
(By.XPATH, "//button[@ng-click='vm.searchPrep()' and contains(text(),'SEARCH')]")
|
|
)
|
|
)
|
|
search_button.click()
|
|
time.sleep(2)
|
|
|
|
return "Success"
|
|
except Exception as e:
|
|
print(f"Error filling Member Eligibility: {e}")
|
|
return "ERROR:MEMBER_ELIGIBILITY_FAILED"
|
|
|
|
def select_place_of_service_office(self):
|
|
wait = WebDriverWait(self.driver, 30)
|
|
|
|
try:
|
|
pos_dropdown = wait.until(
|
|
EC.presence_of_element_located(
|
|
(By.XPATH, "//select[contains(@ng-model,'vm.newClaim.placeOfTreatmentCode')]")
|
|
)
|
|
)
|
|
time.sleep(1)
|
|
|
|
select = Select(pos_dropdown)
|
|
select.select_by_visible_text("Office")
|
|
time.sleep(2)
|
|
|
|
return "Success"
|
|
except Exception as e:
|
|
print(f"Error selecting Office place of service: {e}")
|
|
return "ERROR:SELECTION FAILED"
|
|
|
|
def select_office_summit_framingham(self):
|
|
wait = WebDriverWait(self.driver, 30)
|
|
|
|
try:
|
|
office_dropdown = wait.until(
|
|
EC.visibility_of_element_located(
|
|
(By.XPATH, "//select[@id='selectOffice']")
|
|
)
|
|
)
|
|
time.sleep(1)
|
|
|
|
select = Select(office_dropdown)
|
|
options = select.options
|
|
for opt in options:
|
|
if opt.get_attribute("value") not in (None, "", "string:") and opt.text.strip():
|
|
select.select_by_index(options.index(opt))
|
|
print(f"DEBUG: Selected office option: '{opt.text.strip()}'")
|
|
break
|
|
time.sleep(2)
|
|
|
|
return "Success"
|
|
except Exception as e:
|
|
print(f"Error selecting service office: {e}")
|
|
return "ERROR:SELECTION FAILED"
|
|
|
|
def select_dentist(self):
|
|
wait = WebDriverWait(self.driver, 30)
|
|
|
|
provider_name = self.rendering_provider_name or "Mary Scannell"
|
|
provider_npi = self.rendering_provider_npi or ""
|
|
|
|
parts = provider_name.strip().split()
|
|
if len(parts) >= 2:
|
|
last_name = parts[-1]
|
|
first_names = " ".join(parts[:-1])
|
|
search_keyword = last_name
|
|
display_name = f"{last_name}, {first_names} - {provider_npi}"
|
|
else:
|
|
search_keyword = provider_name
|
|
display_name = f"{provider_name} - {provider_npi}"
|
|
|
|
print(f"DEBUG: Searching dentist: keyword='{search_keyword}', expected='{display_name}'")
|
|
|
|
try:
|
|
dentist_input = wait.until(
|
|
EC.visibility_of_element_located(
|
|
(By.XPATH, "//input[@id='inputProvider' and @placeholder='Enter a dentist']")
|
|
)
|
|
)
|
|
time.sleep(1)
|
|
|
|
dentist_input.clear()
|
|
dentist_input.send_keys(search_keyword)
|
|
time.sleep(3)
|
|
|
|
option_xpath = f"//a[@ng-bind-html and contains(., '{display_name}')]"
|
|
|
|
try:
|
|
option = wait.until(EC.element_to_be_clickable((By.XPATH, option_xpath)))
|
|
print("DEBUG: Found dentist option, clicking...")
|
|
self.driver.execute_script("arguments[0].scrollIntoView(true);", option)
|
|
time.sleep(1)
|
|
option.click()
|
|
time.sleep(2)
|
|
return "Success"
|
|
except Exception:
|
|
fallback_selectors = [
|
|
f"//ul[contains(@class,'dropdown-menu')]//a[contains(., '{display_name}')]",
|
|
f"//ul[contains(@class,'dropdown-menu')]//li[contains(., '{display_name}')]",
|
|
]
|
|
for selector in fallback_selectors:
|
|
try:
|
|
option = wait.until(EC.element_to_be_clickable((By.XPATH, selector)))
|
|
self.driver.execute_script("arguments[0].scrollIntoView(true);", option)
|
|
time.sleep(1)
|
|
option.click()
|
|
time.sleep(2)
|
|
return "Success"
|
|
except Exception:
|
|
continue
|
|
|
|
print("DEBUG: Using Arrow Down + Enter fallback")
|
|
dentist_input.send_keys(Keys.ARROW_DOWN)
|
|
time.sleep(1)
|
|
dentist_input.send_keys(Keys.ENTER)
|
|
time.sleep(2)
|
|
return "Success"
|
|
|
|
except Exception as e:
|
|
print(f"Error selecting dentist {display_name}: {e}")
|
|
return "ERROR:SELECTION_FAILED"
|
|
|
|
def click_plus_button(self):
|
|
wait = WebDriverWait(self.driver, 30)
|
|
|
|
try:
|
|
print("DEBUG: Clicking plus button to add new service line")
|
|
time.sleep(3)
|
|
|
|
plus_button = None
|
|
|
|
try:
|
|
all_plus_buttons = self.driver.find_elements(By.XPATH, "//button[contains(text(), '+')]")
|
|
for btn in all_plus_buttons:
|
|
ng_click = btn.get_attribute("ng-click")
|
|
if ng_click and "serviceLineAdd" in ng_click:
|
|
plus_button = btn
|
|
break
|
|
except Exception as e:
|
|
print(f"DEBUG: Error searching for + buttons: {e}")
|
|
|
|
if not plus_button:
|
|
selectors = [
|
|
"//button[@ng-click='vm.serviceLineAdd()' and contains(@class,'btn') and text()='+']",
|
|
"//button[contains(@class,'btn') and @ng-click='vm.serviceLineAdd()']",
|
|
"//button[text()='+']",
|
|
]
|
|
for selector in selectors:
|
|
try:
|
|
plus_button = wait.until(EC.element_to_be_clickable((By.XPATH, selector)))
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
if not plus_button:
|
|
plus_button = self.driver.execute_script("""
|
|
var buttons = document.getElementsByTagName('button');
|
|
for (var i = 0; i < buttons.length; i++) {
|
|
if (buttons[i].textContent.trim() === '+' &&
|
|
buttons[i].getAttribute('ng-click') &&
|
|
buttons[i].getAttribute('ng-click').includes('serviceLineAdd')) {
|
|
return buttons[i];
|
|
}
|
|
}
|
|
return null;
|
|
""")
|
|
if not plus_button:
|
|
raise Exception("Could not find plus button with any method")
|
|
|
|
time.sleep(1)
|
|
self.driver.execute_script("arguments[0].scrollIntoView(true);", plus_button)
|
|
time.sleep(1)
|
|
|
|
try:
|
|
plus_button.click()
|
|
except Exception:
|
|
try:
|
|
self.driver.execute_script("arguments[0].click();", plus_button)
|
|
except Exception:
|
|
from selenium.webdriver.common.action_chains import ActionChains
|
|
ActionChains(self.driver).move_to_element(plus_button).click().perform()
|
|
|
|
time.sleep(2)
|
|
print("DEBUG: Successfully clicked plus button")
|
|
return "Success"
|
|
except Exception as e:
|
|
print(f"Error clicking plus button: {e}")
|
|
return "ERROR:PLUS_BUTTON_FAILED"
|
|
|
|
def fill_service_line(self, row_number=1, procedure_code="", tooth_number="", tooth_surface="", billed_amount=""):
|
|
wait = WebDriverWait(self.driver, 30)
|
|
|
|
try:
|
|
print(f"DEBUG: Filling service line {row_number} - Procedure: {procedure_code}, Tooth: {tooth_number}, Surface: {tooth_surface}, Amount: {billed_amount}")
|
|
|
|
if procedure_code:
|
|
procedure_inputs = wait.until(
|
|
EC.presence_of_all_elements_located(
|
|
(By.XPATH, "//input[@ng-model='serviceLine.procedure' and @placeholder='Code']")
|
|
)
|
|
)
|
|
if len(procedure_inputs) < row_number:
|
|
raise Exception(f"Only {len(procedure_inputs)} procedure inputs found, need row {row_number}")
|
|
|
|
procedure_input = procedure_inputs[row_number - 1]
|
|
time.sleep(1)
|
|
procedure_input.clear()
|
|
procedure_input.send_keys(procedure_code)
|
|
time.sleep(3)
|
|
|
|
try:
|
|
dropdown = wait.until(
|
|
EC.visibility_of_element_located(
|
|
(By.XPATH, "//ul[contains(@class,'dropdown-menu') and not(contains(@class,'ng-hide'))]")
|
|
)
|
|
)
|
|
first_option = dropdown.find_element(By.XPATH, ".//li")
|
|
if first_option:
|
|
first_option.click()
|
|
time.sleep(2)
|
|
except Exception:
|
|
procedure_input.send_keys(Keys.TAB)
|
|
time.sleep(1)
|
|
|
|
if tooth_number:
|
|
tooth_inputs = wait.until(
|
|
EC.presence_of_all_elements_located(
|
|
(By.XPATH, "//input[@ng-model='serviceLine.toothCode' and @placeholder='Tooth']")
|
|
)
|
|
)
|
|
if len(tooth_inputs) < row_number:
|
|
raise Exception(f"Only {len(tooth_inputs)} tooth inputs found, need row {row_number}")
|
|
tooth_input = tooth_inputs[row_number - 1]
|
|
time.sleep(1)
|
|
tooth_input.clear()
|
|
tooth_input.send_keys(tooth_number.upper())
|
|
time.sleep(2)
|
|
tooth_input.send_keys(Keys.TAB)
|
|
time.sleep(1)
|
|
|
|
if tooth_surface:
|
|
surface_inputs = wait.until(
|
|
EC.presence_of_all_elements_located(
|
|
(By.XPATH, "//input[@ng-model='serviceLine.surface' and @placeholder='Surface']")
|
|
)
|
|
)
|
|
if len(surface_inputs) < row_number:
|
|
raise Exception(f"Only {len(surface_inputs)} surface inputs found, need row {row_number}")
|
|
surface_input = surface_inputs[row_number - 1]
|
|
time.sleep(1)
|
|
surface_input.clear()
|
|
surface_input.send_keys(tooth_surface.upper())
|
|
time.sleep(2)
|
|
surface_input.send_keys(Keys.TAB)
|
|
time.sleep(1)
|
|
|
|
if billed_amount:
|
|
billed_amount_inputs = wait.until(
|
|
EC.presence_of_all_elements_located(
|
|
(By.XPATH, "//input[@ng-model='serviceLine.billedAmount' and @placeholder='00.00' and @type='number']")
|
|
)
|
|
)
|
|
if len(billed_amount_inputs) < row_number:
|
|
raise Exception(f"Only {len(billed_amount_inputs)} billed amount inputs found, need row {row_number}")
|
|
billed_amount_input = billed_amount_inputs[row_number - 1]
|
|
time.sleep(1)
|
|
billed_amount_input.clear()
|
|
billed_amount_input.send_keys(billed_amount)
|
|
time.sleep(2)
|
|
self.driver.execute_script("arguments[0].dispatchEvent(new Event('input', { bubbles: true })); arguments[0].dispatchEvent(new Event('change', { bubbles: true }));", billed_amount_input)
|
|
time.sleep(1)
|
|
billed_amount_input.send_keys(Keys.TAB)
|
|
time.sleep(2)
|
|
|
|
print(f"DEBUG: Service line {row_number} filled successfully")
|
|
return "Success"
|
|
except Exception as e:
|
|
print(f"Error filling service line {row_number}: {e}")
|
|
return "ERROR:SERVICE_LINE_FILL_FAILED"
|
|
|
|
def add_file_attachment(self):
|
|
wait = WebDriverWait(self.driver, 30)
|
|
|
|
try:
|
|
print("DEBUG: Adding file attachment")
|
|
if not self.upload_files:
|
|
print("DEBUG: No uploaded files available for attachment")
|
|
return "ERROR:NO_FILES"
|
|
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
for file_obj in self.upload_files:
|
|
if not isinstance(file_obj, dict):
|
|
continue
|
|
|
|
base64_data = file_obj.get("bufferBase64", "")
|
|
file_name = file_obj.get("originalname", "file.pdf")
|
|
if not base64_data:
|
|
continue
|
|
|
|
safe_file_name = os.path.basename(file_name)
|
|
if not any(safe_file_name.lower().endswith(ext) for ext in [".pdf", ".jpg", ".jpeg", ".png", ".webp"]):
|
|
safe_file_name += ".bin"
|
|
|
|
tmp_path = os.path.join(tmp_dir, safe_file_name)
|
|
with open(tmp_path, "wb") as f:
|
|
f.write(base64.b64decode(base64_data))
|
|
|
|
print(f"DEBUG: Uploading file: {tmp_path}")
|
|
|
|
injection_result = self.driver.execute_script("""
|
|
try {
|
|
var fileInput = document.getElementById('fileAttachment');
|
|
if (!fileInput) return 'FILE_INPUT_NOT_FOUND';
|
|
var scope = angular.element(fileInput).scope();
|
|
if (!scope || !scope.vm) return 'SCOPE_NOT_FOUND';
|
|
var fileAttachment = {
|
|
name: arguments[0],
|
|
type: scope.vm.fileType || 'RR',
|
|
data: arguments[1],
|
|
updatedDate: new Date()
|
|
};
|
|
if (!scope.vm.displayedArray) {
|
|
scope.vm.displayedArray = [];
|
|
}
|
|
scope.vm.displayedArray.push(fileAttachment);
|
|
scope.$apply();
|
|
return 'FILE_INJECTED:' + scope.vm.displayedArray.length;
|
|
} catch (err) {
|
|
return 'ERROR:' + err.message;
|
|
}
|
|
""", safe_file_name, base64_data)
|
|
print(f"DEBUG: Injection result = {injection_result}")
|
|
|
|
if not injection_result.startswith('FILE_INJECTED'):
|
|
print(f"DEBUG: Injection failed: {injection_result}")
|
|
return "ERROR:UPLOAD_FAILED"
|
|
|
|
wait.until(
|
|
EC.presence_of_element_located(
|
|
(By.XPATH, "//table[contains(@class, 'table-striped') and @ng-if='vm.displayedArray && vm.displayedArray.length']//tbody/tr")
|
|
)
|
|
)
|
|
time.sleep(1)
|
|
|
|
return "Success"
|
|
except Exception as e:
|
|
print(f"Error adding file attachment: {e}")
|
|
return "ERROR:UPLOAD_FAILED"
|
|
|
|
def select_radiology_reports(self):
|
|
wait = WebDriverWait(self.driver, 30)
|
|
|
|
try:
|
|
doc_type_dropdown = wait.until(
|
|
EC.presence_of_element_located(
|
|
(By.XPATH, "//select[@ng-model='vm.fileType' and @ng-options=\"item.value as item.name for item in vm.documentTypes\"]")
|
|
)
|
|
)
|
|
time.sleep(1)
|
|
|
|
select = Select(doc_type_dropdown)
|
|
select.select_by_visible_text("Radiology Reports")
|
|
self.driver.execute_script(
|
|
"arguments[0].dispatchEvent(new Event('input', { bubbles: true }));"
|
|
"arguments[0].dispatchEvent(new Event('change', { bubbles: true }));",
|
|
doc_type_dropdown
|
|
)
|
|
wait.until(lambda d: (doc_type_dropdown.get_attribute("value") or "").strip() != "")
|
|
time.sleep(2)
|
|
|
|
return "Success"
|
|
except Exception as e:
|
|
print(f"Error selecting Radiology Reports: {e}")
|
|
return "ERROR:RADIOLOGY_REPORTS_FAILED"
|
|
|
|
def click_submit_button(self):
|
|
wait = WebDriverWait(self.driver, 30)
|
|
|
|
try:
|
|
print("DEBUG: Clicking SUBMIT button")
|
|
|
|
submit_button = wait.until(
|
|
EC.element_to_be_clickable(
|
|
(By.XPATH, "//button[@ng-click=\"vm.submitForm('File Custom Pop Up Error')\" and contains(., 'SUBMIT')]")
|
|
)
|
|
)
|
|
|
|
if submit_button.get_attribute("disabled"):
|
|
print("DEBUG: SUBMIT button is disabled")
|
|
return "ERROR:SUBMIT_BUTTON_DISABLED"
|
|
|
|
submit_button.click()
|
|
print("DEBUG: SUBMIT button clicked successfully")
|
|
time.sleep(6)
|
|
print(f"DEBUG: Current URL after submit = {self.driver.current_url}")
|
|
|
|
return "Success"
|
|
except Exception as e:
|
|
print(f"Error clicking SUBMIT button: {e}")
|
|
return "ERROR:SUBMIT_FAILED"
|
|
|
|
def extract_preauth_number(self):
|
|
"""Extract preauthorization number from MassHealth confirmation page."""
|
|
wait = WebDriverWait(self.driver, 30)
|
|
|
|
try:
|
|
print("DEBUG: Extracting preauth number from confirmation page")
|
|
wait.until(lambda d: d.execute_script("return document.readyState") == "complete")
|
|
time.sleep(2)
|
|
|
|
page_text = self.driver.find_element(By.TAG_NAME, "body").text
|
|
print(f"DEBUG: Page text snippet: {page_text[:500]}")
|
|
|
|
selectors = [
|
|
"//*[contains(text(), 'assigned the number')]",
|
|
"//p[contains(text(), 'assigned the number')]",
|
|
"//div[contains(text(), 'assigned the number')]",
|
|
"//span[contains(text(), 'assigned the number')]",
|
|
"//h1[contains(text(), 'Submission Success')]/following-sibling::*",
|
|
]
|
|
|
|
for selector in selectors:
|
|
try:
|
|
elements = self.driver.find_elements(By.XPATH, selector)
|
|
for element in elements:
|
|
text = element.text.strip()
|
|
match = re.search(r'(\d{15})', text)
|
|
if match:
|
|
preauth_number = match.group(1)
|
|
print(f"DEBUG: Found 15-digit preauth number: {preauth_number}")
|
|
return preauth_number
|
|
match = re.search(r'(\d{9,14})', text)
|
|
if match:
|
|
preauth_number = match.group(1)
|
|
print(f"DEBUG: Found preauth number (9-14 digits): {preauth_number}")
|
|
return preauth_number
|
|
except Exception as e:
|
|
print(f"DEBUG: Selector {selector} failed: {e}")
|
|
continue
|
|
|
|
preauth_number = self.driver.execute_script(r"""
|
|
var allElements = document.querySelectorAll('body, p, div, span, td, label, h1, h2, h3');
|
|
for (var i = 0; i < allElements.length; i++) {
|
|
var text = allElements[i].textContent || '';
|
|
var match15 = text.match(/(\d{15})/);
|
|
if (match15) return match15[1];
|
|
var match9to14 = text.match(/(\d{9,14})/);
|
|
if (match9to14) return match9to14[1];
|
|
}
|
|
return null;
|
|
""")
|
|
|
|
if preauth_number:
|
|
print(f"DEBUG: Found preauth number via JavaScript: {preauth_number}")
|
|
return preauth_number
|
|
|
|
print("DEBUG: Could not extract preauth number from page")
|
|
return None
|
|
|
|
except Exception as e:
|
|
print(f"Error extracting preauth number: {e}")
|
|
return None
|
|
|
|
def save_confirmation_pdf(self):
|
|
wait = WebDriverWait(self.driver, 30)
|
|
|
|
wait.until(lambda d: d.execute_script("return document.readyState") == "complete")
|
|
wait.until(EC.presence_of_element_located((By.TAG_NAME, "body")))
|
|
time.sleep(2)
|
|
|
|
print(f"DEBUG: Capturing confirmation from: {self.driver.current_url}")
|
|
|
|
preauth_number = self.extract_preauth_number()
|
|
|
|
safe_member = "".join(c for c in str(self.memberId) if c.isalnum() or c in "-_.")
|
|
timestamp = time.strftime("%Y%m%d_%H%M%S")
|
|
safe_preauth = ("_" + "".join(c for c in str(preauth_number) if c.isalnum() or c in "-_.")[:20]) if preauth_number else ""
|
|
|
|
try:
|
|
pdf_filename = f"preauth_confirmation_{safe_member}{safe_preauth}_{timestamp}.pdf"
|
|
pdf_data = self.driver.execute_cdp_cmd("Page.printToPDF", {
|
|
"printBackground": True,
|
|
"paperWidth": 8.5,
|
|
"paperHeight": 11,
|
|
"marginTop": 0.4,
|
|
"marginBottom": 0.4,
|
|
"marginLeft": 0.4,
|
|
"marginRight": 0.4,
|
|
})
|
|
pdf_bytes = base64.b64decode(pdf_data["data"])
|
|
pdf_path = os.path.join(self.download_dir, pdf_filename)
|
|
with open(pdf_path, "wb") as f:
|
|
f.write(pdf_bytes)
|
|
print(f"DEBUG: PDF saved at: {pdf_path}")
|
|
return {
|
|
"status": "success",
|
|
"pdf_path": pdf_path,
|
|
"file_type": "pdf",
|
|
"preAuthNumber": preauth_number,
|
|
"message": "PreAuth confirmation PDF captured successfully",
|
|
}
|
|
except Exception as e:
|
|
print(f"PDF capture failed, falling back to screenshot: {e}")
|
|
|
|
try:
|
|
screenshot_filename = f"preauth_confirmation_{safe_member}{safe_preauth}_{timestamp}.png"
|
|
screenshot_path = os.path.join(self.download_dir, screenshot_filename)
|
|
total_height = self.driver.execute_script("return document.body.scrollHeight")
|
|
self.driver.set_window_size(1280, max(total_height, 800))
|
|
time.sleep(1)
|
|
self.driver.save_screenshot(screenshot_path)
|
|
print(f"DEBUG: Screenshot saved at: {screenshot_path}")
|
|
return {
|
|
"status": "success",
|
|
"pdf_path": screenshot_path,
|
|
"file_type": "screenshot",
|
|
"preAuthNumber": preauth_number,
|
|
"message": "Full-page screenshot captured (PDF failed)",
|
|
}
|
|
except Exception as ss_error:
|
|
return {
|
|
"status": "error",
|
|
"message": f"PDF + Screenshot failed: {ss_error}",
|
|
}
|
|
|
|
def main_workflow(self, url):
|
|
try:
|
|
self.config_driver()
|
|
self.driver.get(url)
|
|
time.sleep(2)
|
|
|
|
login_result = self.login()
|
|
if login_result != "Success":
|
|
return {"status": "error", "message": login_result}
|
|
|
|
nav_result = self.navigate_to_submit_claims()
|
|
if nav_result != "Success":
|
|
return {"status": "error", "message": nav_result}
|
|
|
|
# Select "Dental Prior Authorization" (2nd option) instead of "Dental Claim"
|
|
select_result = self.select_dental_prior_authorization()
|
|
if select_result != "Success":
|
|
return {"status": "error", "message": select_result}
|
|
|
|
# No Date of Service field on prior authorization form — skip it
|
|
|
|
pos_result = self.select_place_of_service_office()
|
|
if pos_result != "Success":
|
|
return {"status": "error", "message": pos_result}
|
|
|
|
office_result = self.select_office_summit_framingham()
|
|
if office_result != "Success":
|
|
return {"status": "error", "message": office_result}
|
|
|
|
dentist_result = self.select_dentist()
|
|
if dentist_result != "Success":
|
|
return {"status": "error", "message": dentist_result}
|
|
|
|
eligibility_result = self.fill_member_eligibility()
|
|
if eligibility_result != "Success":
|
|
return {"status": "error", "message": eligibility_result}
|
|
|
|
# Fill service lines
|
|
if not self.serviceLines:
|
|
print("DEBUG: No service lines found, using fallback values")
|
|
service_result = self.fill_service_line(
|
|
row_number=1,
|
|
procedure_code=self.procedureCode,
|
|
tooth_number=self.toothNumber,
|
|
tooth_surface=self.toothSurface,
|
|
)
|
|
if service_result != "Success":
|
|
return {"status": "error", "message": service_result}
|
|
else:
|
|
for i, service_line in enumerate(self.serviceLines):
|
|
row_num = i + 1
|
|
print(f"DEBUG: Processing service line {row_num} of {len(self.serviceLines)}")
|
|
|
|
if i > 0:
|
|
plus_result = self.click_plus_button()
|
|
if plus_result != "Success":
|
|
return {"status": "error", "message": plus_result}
|
|
|
|
service_result = self.fill_service_line(
|
|
row_number=row_num,
|
|
procedure_code=service_line.get("procedureCode", ""),
|
|
tooth_number=service_line.get("toothNumber", ""),
|
|
tooth_surface=service_line.get("toothSurface", ""),
|
|
billed_amount=service_line.get("totalBilled", ""),
|
|
)
|
|
if service_result != "Success":
|
|
return {"status": "error", "message": f"Service line {row_num} failed: {service_result}"}
|
|
|
|
radiology_result = self.select_radiology_reports()
|
|
if radiology_result != "Success":
|
|
return {"status": "error", "message": radiology_result}
|
|
|
|
if self.upload_files:
|
|
file_result = self.add_file_attachment()
|
|
if file_result != "Success":
|
|
return {"status": "error", "message": file_result}
|
|
else:
|
|
print("DEBUG: No files to attach, skipping attachment step")
|
|
|
|
submit_result = self.click_submit_button()
|
|
if submit_result != "Success":
|
|
return {"status": "error", "message": submit_result}
|
|
|
|
pdf_result = self.save_confirmation_pdf()
|
|
if pdf_result.get("status") == "error":
|
|
return {"status": "error", "message": pdf_result.get("message")}
|
|
|
|
time.sleep(3)
|
|
|
|
return {
|
|
"status": "success",
|
|
"message": "PreAuth automation completed successfully.",
|
|
"pdf_path": pdf_result.get("pdf_path"),
|
|
"file_type": pdf_result.get("file_type"),
|
|
"preAuthNumber": pdf_result.get("preAuthNumber"),
|
|
}
|
|
except Exception as e:
|
|
print(f"PreAuth automation error: {e}")
|
|
return {"status": "error", "message": str(e)}
|
|
finally:
|
|
if self.driver:
|
|
self.driver.quit()
|