feat: DDMA claim submission with OTP, PDF, claim number extraction

- Add full DDMA claim Selenium flow (steps 1-8): search patient, open
  member page, create claim, fill form, attach files, next, submit,
  extract claim number and save confirmation PDF
- Add fee schedule price-mismatch dialog for all claim buttons (MH,
  CCA, DDMA, United, Tufts, Save) with optional price update to JSON
- Add OTP modal for DDMA claim when session expires, mirroring
  eligibility OTP flow
- Close Chrome after claim submission via quit_driver() (session
  preserved in profile)
- Move Map Price button between Direct Submission and procedure table,
  right-aligned above Billed Amount column
- Add fee-schedule update-price backend route
- Add DDMA claim processor with claimNumber/pdf_url result handling

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Gitead
2026-05-24 13:35:04 -04:00
parent 5ceecbeb7f
commit cd1381e9c6
13 changed files with 2139 additions and 22 deletions

View File

@@ -19,6 +19,8 @@ import helpers_unitedsco_eligibility as hunitedsco
import helpers_dentaquest_eligibility as hdentaquest
import helpers_cca_eligibility as hcca
import helpers_cca_claim as hcca_claim
import helpers_cca_preauth as hcca_preauth
import helpers_ddma_claim as hddma_claim
# Import startup session-clear functions
from ddma_browser_manager import clear_ddma_session_on_startup
@@ -584,6 +586,89 @@ async def cca_claim(request: Request):
return {"status": "started", "session_id": sid}
async def _ddma_claim_worker_wrapper(sid: str, data: dict, url: str):
"""Background worker for DDMA claim submission."""
global active_jobs, waiting_jobs
async with semaphore:
async with lock:
waiting_jobs -= 1
active_jobs += 1
try:
await hddma_claim.start_ddma_claim_run(sid, data, url)
finally:
async with lock:
active_jobs -= 1
@app.post("/ddma-claim")
async def ddma_claim(request: Request):
"""
Starts a DDMA claim submission session in the background.
Logs in, searches patient, opens Member Information page, clicks Create claim,
fills service date and procedure code.
Body: { "claim": { "massddmaUsername": "...", "massddmaPassword": "...", ... } }
Returns: { status: "started", session_id: "<uuid>" }
"""
global waiting_jobs
body = await request.json()
sid = hddma_claim.make_session_entry()
hddma_claim.sessions[sid]["type"] = "ddma_claim"
hddma_claim.sessions[sid]["last_activity"] = time.time()
async with lock:
waiting_jobs += 1
asyncio.create_task(_ddma_claim_worker_wrapper(
sid, body,
url="https://providers.deltadentalma.com/onboarding/start/"
))
return {"status": "started", "session_id": sid}
async def _cca_preauth_worker_wrapper(sid: str, data: dict, url: str):
"""Background worker for CCA pre-authorization submission."""
global active_jobs, waiting_jobs
async with semaphore:
async with lock:
waiting_jobs -= 1
active_jobs += 1
try:
await hcca_preauth.start_cca_preauth_run(sid, data, url)
finally:
async with lock:
active_jobs -= 1
@app.post("/cca-preauth")
async def cca_preauth(request: Request):
"""
Starts a CCA pre-authorization session in the background.
Logs in, navigates to Authorization Entry, fills the form and submits.
Body: { "claim": { "cca_username": "...", "cca_password": "...", ... } }
Returns: { status: "started", session_id: "<uuid>" }
"""
global waiting_jobs
body = await request.json()
sid = hcca_preauth.make_session_entry()
hcca_preauth.sessions[sid]["type"] = "cca_preauth"
hcca_preauth.sessions[sid]["last_activity"] = time.time()
async with lock:
waiting_jobs += 1
asyncio.create_task(_cca_preauth_worker_wrapper(
sid, body,
url="https://pwp.sciondental.com/PWP/Landing"
))
return {"status": "started", "session_id": sid}
@app.post("/submit-otp")
async def submit_otp(request: Request):
"""
@@ -605,6 +690,8 @@ async def submit_otp(request: Request):
res = hunitedsco.submit_otp(sid, otp)
elif sid in hdentaquest.sessions:
res = hdentaquest.submit_otp(sid, otp)
elif sid in hddma_claim.sessions:
res = hddma_claim.submit_otp(sid, otp)
else:
raise HTTPException(status_code=404, detail="session not found")
@@ -628,6 +715,10 @@ async def session_status(sid: str):
s = hcca.get_session_status(sid)
elif sid in hcca_claim.sessions:
s = hcca_claim.get_session_status(sid)
elif sid in hcca_preauth.sessions:
s = hcca_preauth.get_session_status(sid)
elif sid in hddma_claim.sessions:
s = hddma_claim.get_session_status(sid)
else:
s = {"status": "not_found"}
if s.get("status") == "not_found":

View File

@@ -0,0 +1,367 @@
import os
import time
import asyncio
from typing import Dict, Any
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import WebDriverException, TimeoutException
from selenium_DDMA_claimSubmitWorker import AutomationDDMAClaimSubmit
from ddma_browser_manager import get_browser_manager
sessions: Dict[str, Dict[str, Any]] = {}
SESSION_OTP_TIMEOUT = int(os.getenv("SESSION_OTP_TIMEOUT", "120"))
def make_session_entry() -> str:
import uuid
sid = str(uuid.uuid4())
sessions[sid] = {
"status": "created",
"created_at": time.time(),
"last_activity": time.time(),
"bot": None,
"driver": None,
"otp_event": asyncio.Event(),
"otp_value": None,
"result": None,
"message": None,
}
return sid
async def cleanup_session(sid: str, message: str | None = None):
s = sessions.get(sid)
if not s:
return
try:
if s.get("status") not in ("completed", "error", "not_found"):
s["status"] = "error"
if message:
s["message"] = message
ev = s.get("otp_event")
if ev and not ev.is_set():
ev.set()
finally:
sessions.pop(sid, None)
print(f"[helpers_ddma_claim] cleaned session {sid}")
async def _remove_session_later(sid: str, delay: int = 20):
await asyncio.sleep(delay)
await cleanup_session(sid)
async def start_ddma_claim_run(sid: str, data: dict, url: str):
"""
Run the DDMA claim workflow — mirrors helpers_ddma_eligibility exactly for
login/OTP, then continues into claim-specific steps.
OTP handling uses two complementary strategies:
1. Accept OTP submitted from the app (via /submit-otp → otp_value)
2. Poll the browser URL/DOM to detect when user enters OTP directly
"""
s = sessions.get(sid)
if not s:
return {"status": "error", "message": "session not found"}
s["status"] = "running"
s["last_activity"] = time.time()
try:
bot = AutomationDDMAClaimSubmit(data)
bot.config_driver()
s["bot"] = bot
s["driver"] = bot.driver
s["last_activity"] = time.time()
try:
bot.driver.maximize_window()
bot.driver.get(url)
await asyncio.sleep(1)
except Exception as e:
s["status"] = "error"
s["message"] = f"Navigation failed: {e}"
await cleanup_session(sid)
return {"status": "error", "message": s["message"]}
# --- Login ---
try:
login_result = bot.login(url)
except WebDriverException as wde:
s["status"] = "error"
s["message"] = f"Selenium driver error during login: {wde}"
await cleanup_session(sid, s["message"])
return {"status": "error", "message": s["message"]}
except Exception as e:
s["status"] = "error"
s["message"] = f"Unexpected error during login: {e}"
await cleanup_session(sid, s["message"])
return {"status": "error", "message": s["message"]}
# ── Already logged in (persistent session) ────────────────────────────
if isinstance(login_result, str) and login_result == "ALREADY_LOGGED_IN":
print("[DDMA Claim] Session persisted — skipping OTP")
s["status"] = "running"
s["message"] = "Session persisted"
# ── OTP required ──────────────────────────────────────────────────────
elif isinstance(login_result, str) and login_result == "OTP_REQUIRED":
s["status"] = "waiting_for_otp"
s["message"] = "OTP required for login — please enter OTP"
s["last_activity"] = time.time()
driver = s["driver"]
max_polls = SESSION_OTP_TIMEOUT
login_success = False
print(f"[DDMA Claim OTP] Polling for OTP completion (up to {SESSION_OTP_TIMEOUT}s)...")
for poll in range(max_polls):
await asyncio.sleep(1)
s["last_activity"] = time.time()
try:
# a) App submitted OTP via /submit-otp endpoint
otp_value = s.get("otp_value")
if otp_value:
print(f"[DDMA Claim OTP poll {poll+1}] OTP received from app, typing it in...")
try:
otp_input = driver.find_element(By.XPATH,
"//input[contains(@aria-label,'Verification') or "
"contains(@placeholder,'verification') or @type='tel' or "
"contains(@aria-lable,'Verification code') or "
"contains(@placeholder,'Enter your verification code')]"
)
otp_input.clear()
otp_input.send_keys(otp_value)
try:
verify_btn = driver.find_element(By.XPATH,
"//button[@type='button' and @aria-label='Verify']")
verify_btn.click()
except Exception:
otp_input.send_keys("\n")
print("[DDMA Claim OTP] OTP typed and submitted via app")
s["otp_value"] = None
await asyncio.sleep(3)
except Exception as type_err:
print(f"[DDMA Claim OTP] Failed to type OTP from app: {type_err}")
# b) Check URL — if past OTP page, login succeeded
current_url = driver.current_url.lower()
print(f"[DDMA Claim OTP poll {poll+1}/{max_polls}] URL: {current_url[:70]}...")
if "member" in current_url or "dashboard" in current_url or "eligibility" in current_url:
try:
WebDriverWait(driver, 5).until(
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
)
print("[DDMA Claim OTP] Member search found — login successful!")
login_success = True
break
except TimeoutException:
print("[DDMA Claim OTP] On member page but search input not found, continuing...")
# Check if OTP input still visible (user hasn't finished)
try:
driver.find_element(By.XPATH,
"//input[contains(@aria-label,'Verification') or "
"contains(@placeholder,'verification') or @type='tel']"
)
print(f"[DDMA Claim OTP poll {poll+1}] OTP input still visible — waiting...")
except Exception:
# OTP input gone — try navigating to members
if "onboarding" in current_url or "start" in current_url:
print("[DDMA Claim OTP] OTP input gone, navigating to members page...")
try:
driver.get("https://providers.deltadentalma.com/members")
await asyncio.sleep(2)
except Exception:
pass
except Exception as poll_err:
print(f"[DDMA Claim OTP poll {poll+1}] Error: {poll_err}")
if not login_success:
# Final attempt — navigate directly to members page
try:
print("[DDMA Claim OTP] Final attempt — navigating to members page...")
driver.get("https://providers.deltadentalma.com/members")
await asyncio.sleep(3)
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
)
print("[DDMA Claim OTP] Member search found — login successful!")
login_success = True
except TimeoutException:
s["status"] = "error"
s["message"] = "OTP timeout — login not completed"
await cleanup_session(sid)
return {"status": "error", "message": "OTP not completed in time"}
except Exception as final_err:
s["status"] = "error"
s["message"] = f"OTP verification failed: {final_err}"
await cleanup_session(sid)
return {"status": "error", "message": s["message"]}
if login_success:
s["status"] = "running"
s["message"] = "Login successful after OTP"
print("[DDMA Claim OTP] Proceeding to claim steps...")
# ── Login succeeded without OTP ───────────────────────────────────────
elif isinstance(login_result, str) and login_result == "SUCCESS":
print("[DDMA Claim] Login succeeded without OTP")
s["status"] = "running"
s["message"] = "Login succeeded"
# ── Login error ───────────────────────────────────────────────────────
elif isinstance(login_result, str) and login_result.startswith("ERROR"):
s["status"] = "error"
s["message"] = login_result
await cleanup_session(sid)
return {"status": "error", "message": login_result}
# --- Step 1: Search patient ---
step1_result = bot.step1_search_patient()
print(f"[DDMA Claim] step1 result: {step1_result}")
if isinstance(step1_result, str) and step1_result.startswith("ERROR"):
s["status"] = "error"
s["message"] = step1_result
await cleanup_session(sid)
return {"status": "error", "message": step1_result}
# --- Step 2: Open member information page ---
step2_result = bot.step2_open_member_page()
print(f"[DDMA Claim] step2 result: {step2_result}")
if isinstance(step2_result, str) and step2_result.startswith("ERROR"):
s["status"] = "error"
s["message"] = step2_result
await cleanup_session(sid)
return {"status": "error", "message": step2_result}
# --- Step 3: Click Create claim ---
step3_result = bot.step3_click_create_claim()
print(f"[DDMA Claim] step3 result: {step3_result}")
if isinstance(step3_result, str) and step3_result.startswith("ERROR"):
s["status"] = "error"
s["message"] = step3_result
await cleanup_session(sid)
return {"status": "error", "message": step3_result}
# --- Step 4: Fill service date + procedure code + tooth/arch/surface ---
step4_result = bot.step4_fill_claim_form()
print(f"[DDMA Claim] step4 result: {step4_result}")
if isinstance(step4_result, str) and step4_result.startswith("ERROR"):
s["status"] = "error"
s["message"] = step4_result
await cleanup_session(sid)
return {"status": "error", "message": step4_result}
# --- Step 5: Attach files ---
step5_result = bot.step5_attach_files()
print(f"[DDMA Claim] step5 result: {step5_result}")
if isinstance(step5_result, str) and step5_result.startswith("ERROR"):
s["status"] = "error"
s["message"] = step5_result
await cleanup_session(sid)
return {"status": "error", "message": step5_result}
# --- Step 6: Click Next step ---
step6_result = bot.step6_click_next()
print(f"[DDMA Claim] step6 result: {step6_result}")
if isinstance(step6_result, str) and step6_result.startswith("ERROR"):
s["status"] = "error"
s["message"] = step6_result
await cleanup_session(sid)
return {"status": "error", "message": step6_result}
# --- Step 7: Acknowledge + Submit claim ---
step7_result = bot.step7_submit_claim()
print(f"[DDMA Claim] step7 result: {step7_result}")
if isinstance(step7_result, str) and step7_result.startswith("ERROR"):
s["status"] = "error"
s["message"] = step7_result
await cleanup_session(sid)
return {"status": "error", "message": step7_result}
# --- Step 8: Extract claim number + save confirmation PDF ---
step8_result = bot.step8_save_confirmation_pdf()
print(f"[DDMA Claim] step8 result: {step8_result}")
if isinstance(step8_result, str) and step8_result.startswith("ERROR"):
# Non-fatal: claim was submitted; log but don't abort
print(f"[DDMA Claim] step8 warning (non-fatal): {step8_result}")
step8_result = {}
# Build pdf_url from pdf_path so the backend can fetch it
pdf_path = step8_result.get("pdf_path") if isinstance(step8_result, dict) else None
pdf_url = None
if pdf_path:
import os as _os
filename = _os.path.basename(pdf_path)
port = _os.getenv("PORT", "5002")
url_host = _os.getenv("HOST", "localhost")
pdf_url = f"http://{url_host}:{port}/downloads/{filename}"
print(f"[DDMA Claim] pdf_url: {pdf_url}")
claim_number = step8_result.get("claimNumber") if isinstance(step8_result, dict) else None
result = {
"status": "success",
"message": "DDMA claim submitted successfully",
"claimNumber": claim_number,
"pdf_url": pdf_url,
}
s["status"] = "completed"
s["result"] = result
s["message"] = "completed"
# Close the browser window after claim (session preserved in profile)
try:
from ddma_browser_manager import get_browser_manager
get_browser_manager().quit_driver()
print("[DDMA Claim] Browser closed - session preserved in profile")
except Exception as close_err:
print(f"[DDMA Claim] Could not close browser (non-fatal): {close_err}")
asyncio.create_task(_remove_session_later(sid, 60))
return result
except Exception as e:
if s:
s["status"] = "error"
s["message"] = f"worker exception: {e}"
await cleanup_session(sid)
return {"status": "error", "message": f"worker exception: {e}"}
def submit_otp(sid: str, otp: str) -> Dict[str, Any]:
s = sessions.get(sid)
if not s:
return {"status": "error", "message": "session not found"}
if s.get("status") != "waiting_for_otp":
return {"status": "error", "message": f"session not waiting for otp (state={s.get('status')})"}
s["otp_value"] = otp
s["last_activity"] = time.time()
try:
s["otp_event"].set()
except Exception:
pass
return {"status": "ok", "message": "otp accepted"}
def get_session_status(sid: str) -> Dict[str, Any]:
s = sessions.get(sid)
if not s:
return {"status": "not_found"}
return {
"session_id": sid,
"status": s.get("status"),
"message": s.get("message"),
"created_at": s.get("created_at"),
"last_activity": s.get("last_activity"),
"result": s.get("result") if s.get("status") in ("completed", "error") else None,
}

View File

@@ -0,0 +1,947 @@
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
import os
import json
import base64
from datetime import date
from ddma_browser_manager import get_browser_manager
MEMBERS_URL = "https://providers.deltadentalma.com/members"
# Absolute path to the Backend app root (two levels up from this file's SeleniumService dir)
_SERVICE_DIR = os.path.dirname(os.path.abspath(__file__))
_BACKEND_CWD = os.path.normpath(os.path.join(_SERVICE_DIR, "..", "Backend"))
class AutomationDDMAClaimSubmit:
def __init__(self, data):
self.headless = False
self.driver = None
raw = data if isinstance(data, dict) else {}
claim = raw.get("claim", {}) or {}
patient_name = (claim.get("patientName") or "").strip()
parts = patient_name.split()
first_name = parts[0] if parts else ""
last_name = " ".join(parts[1:]) if len(parts) > 1 else ""
self.massddma_username = claim.get("massddmaUsername", "") or raw.get("massddmaUsername", "")
self.massddma_password = claim.get("massddmaPassword", "") or raw.get("massddmaPassword", "")
self.memberId = claim.get("memberId", "")
self.dateOfBirth = claim.get("dateOfBirth", "")
self.serviceDate = claim.get("serviceDate", "")
self.firstName = claim.get("firstName", "") or first_name
self.lastName = claim.get("lastName", "") or last_name
self.serviceLines = claim.get("serviceLines", []) or []
self.claimFiles = claim.get("claimFiles", []) or []
self.download_dir = get_browser_manager().download_dir
print(f"[DDMA Claim] Init — member={self.memberId}, "
f"patient={self.firstName} {self.lastName}, "
f"lines={len(self.serviceLines)}")
def config_driver(self):
self.driver = get_browser_manager().get_driver(self.headless)
# ------------------------------------------------------------------ #
# Login — identical to DDMA eligibility worker #
# ------------------------------------------------------------------ #
def _force_logout(self):
try:
print("[DDMA Claim login] Forcing logout due to credential change...")
browser_manager = get_browser_manager()
try:
self.driver.get("https://providers.deltadentalma.com/")
time.sleep(2)
for selector in [
"//button[contains(text(), 'Log out') or contains(text(), 'Logout') or contains(text(), 'Sign out')]",
"//a[contains(text(), 'Log out') or contains(text(), 'Logout') or contains(text(), 'Sign out')]",
]:
try:
btn = WebDriverWait(self.driver, 3).until(
EC.element_to_be_clickable((By.XPATH, selector))
)
btn.click()
time.sleep(2)
break
except TimeoutException:
continue
except Exception as e:
print(f"[DDMA Claim login] Could not click logout: {e}")
try:
self.driver.delete_all_cookies()
except Exception:
pass
browser_manager.clear_credentials_hash()
except Exception as e:
print(f"[DDMA Claim login] Error during forced logout: {e}")
def login(self, url):
wait = WebDriverWait(self.driver, 30)
browser_manager = get_browser_manager()
try:
if self.massddma_username and browser_manager.credentials_changed(self.massddma_username):
self._force_logout()
self.driver.get(url)
time.sleep(2)
# Check if already on a logged-in page
try:
current_url = self.driver.current_url
print(f"[DDMA Claim login] Current URL: {current_url}")
logged_in_patterns = ["member", "dashboard", "eligibility", "search", "patients"]
is_logged_in_url = any(p in current_url.lower() for p in logged_in_patterns)
if is_logged_in_url and "onboarding" not in current_url.lower():
# Navigate to members page to confirm
if "member" not in current_url.lower():
try:
WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
)
print("[DDMA Claim login] Already logged in")
return "ALREADY_LOGGED_IN"
except TimeoutException:
self.driver.get(MEMBERS_URL)
time.sleep(2)
try:
WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
)
print("[DDMA Claim login] Already logged in — member search found")
return "ALREADY_LOGGED_IN"
except TimeoutException:
print("[DDMA Claim login] Could not find member search, will try login")
except Exception as e:
print(f"[DDMA Claim login] Error checking current state: {e}")
self.driver.get(url)
time.sleep(2)
# Check if session redirected us straight to member search
try:
current_url = self.driver.current_url
if "onboarding" not in current_url.lower():
member_search = WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
)
if member_search:
print("[DDMA Claim login] Session valid — skipping login")
return "ALREADY_LOGGED_IN"
except TimeoutException:
print("[DDMA Claim login] Proceeding with login form")
# Dismiss any "Authentication flow continued in another tab" modal
modal_dismissed = False
try:
ok_button = WebDriverWait(self.driver, 3).until(
EC.element_to_be_clickable((By.XPATH,
"//button[normalize-space(text())='Ok' or normalize-space(text())='OK']"
))
)
ok_button.click()
print("[DDMA Claim login] Dismissed authentication modal")
modal_dismissed = True
time.sleep(2)
all_windows = self.driver.window_handles
if len(all_windows) > 1:
original_window = self.driver.current_window_handle
for window in all_windows:
if window != original_window:
self.driver.switch_to.window(window)
break
try:
otp_candidate = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.XPATH,
"//input[contains(@aria-label,'Verification code') or "
"contains(@placeholder,'Enter your verification code')]"
))
)
if otp_candidate:
print("[DDMA Claim login] OTP required (popup)")
return "OTP_REQUIRED"
except TimeoutException:
self.driver.switch_to.window(original_window)
except TimeoutException:
pass
if modal_dismissed:
time.sleep(2)
try:
WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
)
print("[DDMA Claim login] Already authenticated after modal dismiss")
return "ALREADY_LOGGED_IN"
except TimeoutException:
pass
# Fill login form
try:
email_field = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH, "//input[@name='username' and @type='text']"))
)
except TimeoutException:
return "ERROR: Login form not found"
email_field.clear()
email_field.send_keys(self.massddma_username)
password_field = wait.until(
EC.presence_of_element_located((By.XPATH, "//input[@name='password' and @type='password']"))
)
password_field.clear()
password_field.send_keys(self.massddma_password)
try:
remember_me = wait.until(EC.element_to_be_clickable(
(By.XPATH, "//label[.//span[contains(text(),'Remember me')]]")
))
remember_me.click()
except Exception:
print("[DDMA Claim login] Remember me checkbox not found (continuing)")
login_button = wait.until(EC.element_to_be_clickable(
(By.XPATH, "//button[@type='submit' and @aria-label='Sign in']")
))
login_button.click()
if self.massddma_username:
browser_manager.save_credentials_hash(self.massddma_username)
# OTP detection
try:
otp_candidate = WebDriverWait(self.driver, 30).until(
EC.presence_of_element_located((By.XPATH,
"//input[contains(@aria-lable,'Verification code') or "
"contains(@placeholder,'Enter your verification code')]"
))
)
if otp_candidate:
print("[DDMA Claim login] OTP required")
return "OTP_REQUIRED"
except TimeoutException:
try:
current_url = self.driver.current_url.lower()
if "member" in current_url or "dashboard" in current_url:
print("[DDMA Claim login] Login succeeded without OTP")
return "SUCCESS"
except Exception:
pass
if "onboarding" in self.driver.current_url.lower() or "login" in self.driver.current_url.lower():
return "ERROR: LOGIN FAILED: Still on login/onboarding page"
print("[DDMA Claim login] Assuming login succeeded")
return "SUCCESS"
except Exception as e:
print(f"[DDMA Claim login] Exception: {e}")
return f"ERROR:LOGIN FAILED: {e}"
# ------------------------------------------------------------------ #
# Step 1 — Navigate directly to /members then search patient #
# (same as eligibility — bypasses onboarding date/location screen) #
# ------------------------------------------------------------------ #
def step1_search_patient(self):
"""Search for the patient — identical to DDMA eligibility step1.
Does NOT navigate: login already left the browser on the search page."""
wait = WebDriverWait(self.driver, 30)
def replace_with_sendkeys(el, value):
el.click()
el.send_keys(Keys.CONTROL, "a")
el.send_keys(Keys.BACKSPACE)
el.send_keys(value)
try:
print(f"[DDMA Claim step1] Current URL: {self.driver.current_url}")
print(f"[DDMA Claim step1] Waiting for member search input...")
# Fill Member ID
if self.memberId:
try:
member_id_input = wait.until(EC.presence_of_element_located(
(By.XPATH, '//input[@placeholder="Search by member ID"]')
))
member_id_input.clear()
member_id_input.send_keys(self.memberId)
print(f"[DDMA Claim step1] Entered Member ID: {self.memberId}")
time.sleep(0.2)
except Exception as e:
print(f"[DDMA Claim step1] Warning: Could not fill Member ID: {e}")
# Fill DOB
if self.dateOfBirth:
try:
dob_parts = self.dateOfBirth.split("-")
year = dob_parts[0]
month = dob_parts[1].zfill(2)
day = dob_parts[2].zfill(2)
dob_container = wait.until(EC.presence_of_element_located(
(By.XPATH, "//div[@data-testid='member-search_date-of-birth']")
))
month_elem = dob_container.find_element(By.XPATH, ".//span[@data-type='month' and @contenteditable='true']")
day_elem = dob_container.find_element(By.XPATH, ".//span[@data-type='day' and @contenteditable='true']")
year_elem = dob_container.find_element(By.XPATH, ".//span[@data-type='year' and @contenteditable='true']")
replace_with_sendkeys(month_elem, month)
time.sleep(0.05)
replace_with_sendkeys(day_elem, day)
time.sleep(0.05)
replace_with_sendkeys(year_elem, year)
print(f"[DDMA Claim step1] Filled DOB: {month}/{day}/{year}")
except Exception as e:
print(f"[DDMA Claim step1] Warning: Could not fill DOB: {e}")
time.sleep(0.3)
# Click Search
search_btn = wait.until(EC.element_to_be_clickable(
(By.XPATH, '//button[@data-testid="member-search_search-button"]')
))
search_btn.click()
print("[DDMA Claim step1] Clicked Search button")
# Wait for the member search result page to load
WebDriverWait(self.driver, 15).until(
EC.any_of(
EC.presence_of_element_located((By.XPATH, "//tbody//tr")),
EC.presence_of_element_located((By.XPATH, '//div[@data-testid="member-search-result-no-results"]')),
)
)
time.sleep(4)
try:
no_results = self.driver.find_element(By.XPATH, '//div[@data-testid="member-search-result-no-results"]')
if no_results:
return "ERROR: No patient found with given search criteria"
except Exception:
pass
print("[DDMA Claim step1] Search completed")
return "SUCCESS"
except Exception as e:
print(f"[DDMA Claim step1] Exception: {e}")
return f"ERROR: step1 failed: {e}"
# ------------------------------------------------------------------ #
# Step 2 — Click patient name → Member Information page #
# ------------------------------------------------------------------ #
def step2_open_member_page(self):
"""Navigate to member detail page — same approach as DDMA eligibility step2."""
try:
try:
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.XPATH, "//tbody//tr"))
)
time.sleep(2)
except TimeoutException:
print("[DDMA Claim step2] Warning: Results table not found within timeout")
# Find member-details URL from first row — identical to eligibility step2
detail_url = None
for selector in [
"(//table//tbody//tr)[1]//td[1]//a",
"(//tbody//tr)[1]//a[contains(@href,'member-details')]",
"(//tbody//tr)[1]//a[contains(@href,'member')]",
"//a[contains(@href,'member-details')]",
]:
try:
link_el = WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.XPATH, selector))
)
href = link_el.get_attribute("href")
if href and "member-details" in href:
detail_url = href
print(f"[DDMA Claim step2] Found detail URL: {href}")
break
except Exception:
continue
if not detail_url:
return "ERROR: step2 failed: could not find member-details link"
self.driver.get(detail_url)
print(f"[DDMA Claim step2] Navigating to: {detail_url}")
try:
WebDriverWait(self.driver, 15).until(
lambda d: "member-details" in d.current_url
)
print(f"[DDMA Claim step2] Member Information page loaded: {self.driver.current_url}")
except TimeoutException:
print(f"[DDMA Claim step2] Warning — URL: {self.driver.current_url}")
try:
WebDriverWait(self.driver, 15).until(
EC.presence_of_element_located((By.XPATH, "//button[@aria-label='Create claim']"))
)
print("[DDMA Claim step2] 'Create claim' button found")
except TimeoutException:
print("[DDMA Claim step2] Warning: 'Create claim' button not found")
time.sleep(2)
return "SUCCESS"
except Exception as e:
print(f"[DDMA Claim step2] Exception: {e}")
return f"ERROR: step2 failed: {e}"
# ------------------------------------------------------------------ #
# Step 3 — Click "Create claim" button #
# ------------------------------------------------------------------ #
def step3_click_create_claim(self):
"""Click the 'Create claim' button on the Member Information page."""
try:
print(f"[DDMA Claim step3] Current URL: {self.driver.current_url}")
handles_before = set(self.driver.window_handles)
self.driver.execute_script("window.scrollTo(0, 0);")
time.sleep(0.5)
# Log all buttons on page for debugging
all_btns = self.driver.find_elements(By.XPATH, "//button")
print(f"[DDMA Claim step3] Buttons on page: {[b.get_attribute('aria-label') or b.text for b in all_btns]}")
btn = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH,
"//button[@aria-label='Create claim' and @data-react-aria-pressable='true']"
))
)
print(f"[DDMA Claim step3] Found 'Create claim' button, displayed={btn.is_displayed()}, enabled={btn.is_enabled()}")
self.driver.execute_script("arguments[0].scrollIntoView({block:'center'});", btn)
time.sleep(0.5)
# Try all click methods in sequence until one causes navigation
self.driver.execute_script("""
var el = arguments[0];
el.dispatchEvent(new PointerEvent('pointerover', {bubbles:true, cancelable:true, composed:true}));
el.dispatchEvent(new PointerEvent('pointerdown', {bubbles:true, cancelable:true, composed:true}));
el.dispatchEvent(new PointerEvent('pointerup', {bubbles:true, cancelable:true, composed:true}));
el.dispatchEvent(new MouseEvent('click', {bubbles:true, cancelable:true, composed:true}));
""", btn)
print("[DDMA Claim step3] Dispatched pointer+click events on 'Create claim'")
time.sleep(2)
# Switch to new tab if one opened
handles_after = set(self.driver.window_handles)
new_handles = handles_after - handles_before
if new_handles:
self.driver.switch_to.window(new_handles.pop())
print(f"[DDMA Claim step3] Switched to new tab")
print(f"[DDMA Claim step3] Post-click URL: {self.driver.current_url}")
# Wait for claim form — just log, don't fail
try:
WebDriverWait(self.driver, 20).until(
EC.any_of(
EC.presence_of_element_located((By.XPATH, "//input[contains(@id,'procedureCode')]")),
EC.presence_of_element_located((By.XPATH, "//span[@data-type='month' and @contenteditable='true']")),
EC.presence_of_element_located((By.XPATH,
"//*[contains(text(),'date of service') or contains(text(),'Date of service') "
"or contains(text(),'Procedure code')]"
)),
)
)
print(f"[DDMA Claim step3] Claim form loaded")
except TimeoutException:
page_text = self.driver.execute_script("return document.body.innerText;")[:400]
print(f"[DDMA Claim step3] Claim form not detected — page: {page_text}")
time.sleep(1)
return "SUCCESS"
except Exception as e:
print(f"[DDMA Claim step3] Exception: {e}")
return f"ERROR: step3 failed: {e}"
# ------------------------------------------------------------------ #
# Step 4 — Fill service date and procedure code #
# ------------------------------------------------------------------ #
def _parse_service_date(self):
s = str(self.serviceDate or "").strip()
if not s:
return None, None, None
if "-" in s:
parts = s.split("-")
if len(parts) == 3 and len(parts[0]) == 4:
return parts[1].zfill(2), parts[2].zfill(2), parts[0]
if len(parts) == 3 and len(parts[2]) == 4:
return parts[0].zfill(2), parts[1].zfill(2), parts[2]
if "/" in s:
parts = s.split("/")
if len(parts) == 3:
return parts[0].zfill(2), parts[1].zfill(2), parts[2]
return None, None, None
def _fill_spinbutton(self, label_fragment, value):
for sel in [
f"//span[@contenteditable='true' and contains(@aria-label,'{label_fragment}')]",
f"//span[@data-type='{label_fragment}' and @contenteditable='true']",
]:
try:
elem = WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.XPATH, sel))
)
elem.click()
elem.send_keys(Keys.CONTROL, "a")
elem.send_keys(Keys.BACKSPACE)
elem.send_keys(value)
time.sleep(0.1)
print(f"[DDMA Claim step4] Filled spinbutton '{label_fragment}' = {value!r}")
return True
except Exception:
continue
print(f"[DDMA Claim step4] Warning: spinbutton '{label_fragment}' not found")
return False
def _fill_combobox(self, inp, value, label="field"):
"""Type value into a combobox and select the first matching dropdown option."""
try:
inp.click()
inp.send_keys(Keys.CONTROL + "a")
inp.send_keys(Keys.DELETE)
inp.send_keys(str(value))
time.sleep(0.5)
# Try to click matching option in listbox
listbox_id = inp.get_attribute("aria-controls") or ""
try:
if listbox_id:
option = WebDriverWait(self.driver, 4).until(
EC.element_to_be_clickable((By.XPATH,
f"//*[@id='{listbox_id}']//*[@role='option'][1]"
))
)
else:
option = WebDriverWait(self.driver, 4).until(
EC.element_to_be_clickable((By.XPATH,
f"//*[@role='listbox']//*[@role='option' and contains(normalize-space(.),'{value}')]"
f" | //*[@role='listbox']//*[@role='option'][1]"
))
)
option.click()
print(f"[DDMA Claim step4] {label}: selected '{value}'")
except TimeoutException:
# No dropdown — press Enter or Tab to confirm free text
inp.send_keys(Keys.TAB)
print(f"[DDMA Claim step4] {label}: typed '{value}' (no dropdown)")
except Exception as e:
print(f"[DDMA Claim step4] Warning: could not fill {label}: {e}")
def _fill_text_input(self, inp, value, label="field"):
"""Clear and type value into a plain text input."""
try:
inp.click()
inp.send_keys(Keys.CONTROL + "a")
inp.send_keys(Keys.DELETE)
inp.send_keys(str(value))
time.sleep(0.1)
print(f"[DDMA Claim step4] {label}: typed '{value}'")
except Exception as e:
print(f"[DDMA Claim step4] Warning: could not fill {label}: {e}")
def step4_fill_claim_form(self):
"""Fill service date then all procedure line fields."""
try:
month, day, year = self._parse_service_date()
# ── Service date (once, at the top of the form) ──────────────────
if month and day and year:
print(f"[DDMA Claim step4] Filling service date: {month}/{day}/{year}")
try:
dos_container = WebDriverWait(self.driver, 8).until(
EC.presence_of_element_located((By.XPATH,
"//*[@data-testid and contains(@data-testid,'date-of-service')] | "
"//*[contains(@aria-label,'Select date of service')]/ancestor::div[1]"
))
)
month_el = dos_container.find_element(By.XPATH, ".//span[@data-type='month' and @contenteditable='true']")
day_el = dos_container.find_element(By.XPATH, ".//span[@data-type='day' and @contenteditable='true']")
year_el = dos_container.find_element(By.XPATH, ".//span[@data-type='year' and @contenteditable='true']")
for elem, val in [(month_el, month), (day_el, day), (year_el, year)]:
elem.click()
elem.send_keys(Keys.CONTROL, "a")
elem.send_keys(Keys.BACKSPACE)
elem.send_keys(val)
time.sleep(0.05)
print("[DDMA Claim step4] Service date filled")
except Exception:
self._fill_spinbutton("month", month)
self._fill_spinbutton("day", day)
self._fill_spinbutton("year", year)
else:
print(f"[DDMA Claim step4] No valid service date: {self.serviceDate!r}")
time.sleep(0.3)
active_lines = [ln for ln in self.serviceLines if str(ln.get("procedureCode") or "").strip()]
print(f"[DDMA Claim step4] {len(active_lines)} service line(s)")
for idx, line in enumerate(active_lines):
code = str(line.get("procedureCode") or "").strip().upper()
tooth = str(line.get("toothNumber") or line.get("tooth") or "").strip()
arch = str(line.get("arch") or "").strip()
quad = str(line.get("quad") or line.get("quadrant") or "").strip()
surface = str(line.get("toothSurface") or line.get("surface") or "").strip().upper()
billed = str(line.get("totalBilled") or line.get("billedAmount") or line.get("fee") or "").strip()
billed = billed.replace("$", "").strip()
print(f"[DDMA Claim step4] Line {idx}: code={code} tooth={tooth!r} arch={arch!r} "
f"quad={quad!r} surface={surface!r} billed={billed!r}")
# ── Click "Add a procedure" for lines after the first ─────────
if idx > 0:
try:
add_btn = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH,
"//button[.//span[contains(text(),'Add a procedure')]] | "
"//button[contains(normalize-space(text()),'Add a procedure')] | "
"//*[contains(text(),'Add a procedure') and @role='button']"
))
)
self.driver.execute_script("arguments[0].scrollIntoView({block:'center'});", add_btn)
add_btn.click()
print(f"[DDMA Claim step4] Clicked 'Add a procedure' for line {idx}")
time.sleep(1)
except Exception as e:
print(f"[DDMA Claim step4] Could not click 'Add a procedure': {e}")
# ── Procedure code ────────────────────────────────────────────
if code:
try:
proc_inputs = self.driver.find_elements(By.XPATH,
"//input[contains(@id,'procedureCode') and contains(@id,'-input')]"
)
proc_inp = proc_inputs[idx] if idx < len(proc_inputs) else proc_inputs[-1]
self.driver.execute_script("arguments[0].scrollIntoView({block:'center'});", proc_inp)
self._fill_combobox(proc_inp, code, f"procedureCode[{idx}]")
time.sleep(0.5)
except Exception as e:
print(f"[DDMA Claim step4] Could not fill procedure code: {e}")
# ── Tooth ─────────────────────────────────────────────────────
if tooth:
try:
tooth_inputs = self.driver.find_elements(By.XPATH, "//input[@aria-label='Tooth']")
if idx < len(tooth_inputs):
self._fill_combobox(tooth_inputs[idx], tooth, f"tooth[{idx}]")
time.sleep(0.3)
except Exception as e:
print(f"[DDMA Claim step4] Could not fill tooth: {e}")
# ── Arch ──────────────────────────────────────────────────────
if arch:
try:
arch_inputs = self.driver.find_elements(By.XPATH, "//input[@aria-label='Arch']")
if idx < len(arch_inputs):
self._fill_combobox(arch_inputs[idx], arch, f"arch[{idx}]")
time.sleep(0.3)
except Exception as e:
print(f"[DDMA Claim step4] Could not fill arch: {e}")
# ── Quad ──────────────────────────────────────────────────────
if quad:
try:
quad_inputs = self.driver.find_elements(By.XPATH, "//input[@aria-label='Quad']")
if idx < len(quad_inputs):
self._fill_combobox(quad_inputs[idx], quad, f"quad[{idx}]")
time.sleep(0.3)
except Exception as e:
print(f"[DDMA Claim step4] Could not fill quad: {e}")
# ── Surface (free-text combobox — type directly) ──────────────
if surface:
try:
surface_inputs = self.driver.find_elements(By.XPATH, "//input[@aria-label='Surface']")
if idx < len(surface_inputs):
surf_inp = surface_inputs[idx]
surf_inp.click()
surf_inp.send_keys(Keys.CONTROL + "a")
surf_inp.send_keys(Keys.DELETE)
surf_inp.send_keys(surface)
# Dismiss any listbox with Escape so it doesn't block next field
time.sleep(0.3)
surf_inp.send_keys(Keys.ESCAPE)
print(f"[DDMA Claim step4] surface[{idx}]: typed '{surface}'")
time.sleep(0.2)
except Exception as e:
print(f"[DDMA Claim step4] Could not fill surface: {e}")
# ── Billed amount ─────────────────────────────────────────────
if billed:
try:
billed_inputs = self.driver.find_elements(By.XPATH,
"//input[@aria-label='Enter billed amount']"
)
if idx < len(billed_inputs):
self._fill_text_input(billed_inputs[idx], billed, f"billedAmount[{idx}]")
time.sleep(0.2)
except Exception as e:
print(f"[DDMA Claim step4] Could not fill billed amount: {e}")
print("[DDMA Claim step4] Done")
return "SUCCESS"
except Exception as e:
print(f"[DDMA Claim step4] Exception: {e}")
return f"ERROR: step4 failed: {e}"
# ------------------------------------------------------------------ #
# Step 5 — Attach files #
# ------------------------------------------------------------------ #
def step5_attach_files(self):
"""For each claimFile with a filePath, click 'Add a file' and upload it."""
if not self.claimFiles:
print("[DDMA Claim step5] No files to attach")
return "SUCCESS"
attached = 0
for cf in self.claimFiles:
relative_path = cf.get("filePath") or ""
if not relative_path:
print(f"[DDMA Claim step5] Skipping file with no filePath: {cf}")
continue
# Build absolute path — filePath is like /uploads/patients/Name/file.pdf
abs_path = os.path.normpath(os.path.join(_BACKEND_CWD, relative_path.lstrip("/")))
if not os.path.isfile(abs_path):
print(f"[DDMA Claim step5] File not found on disk: {abs_path}")
continue
print(f"[DDMA Claim step5] Attaching: {abs_path}")
try:
# Click "Add a file" button/link
add_file_btn = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH,
"//button[.//span[contains(text(),'Add a file')]] | "
"//button[contains(normalize-space(text()),'Add a file')] | "
"//*[contains(text(),'Add a file') and (@role='button' or self::label)]"
))
)
self.driver.execute_script("arguments[0].scrollIntoView({block:'center'});", add_file_btn)
ActionChains(self.driver).move_to_element(add_file_btn).click().perform()
time.sleep(1)
# Find the file input that appeared (may be hidden)
file_input = WebDriverWait(self.driver, 8).until(
EC.presence_of_element_located((By.XPATH, "//input[@type='file']"))
)
# Make hidden inputs interactable
self.driver.execute_script("arguments[0].style.display='block';", file_input)
file_input.send_keys(abs_path)
time.sleep(1.5)
print(f"[DDMA Claim step5] Attached: {os.path.basename(abs_path)}")
attached += 1
except Exception as e:
print(f"[DDMA Claim step5] Could not attach {abs_path}: {e}")
print(f"[DDMA Claim step5] Attached {attached}/{len(self.claimFiles)} file(s)")
return "SUCCESS"
# ------------------------------------------------------------------ #
# Step 6 — Click Next step #
# ------------------------------------------------------------------ #
def step6_click_next(self):
"""Click the 'Next step' button (React Aria — dispatches pointer events directly)."""
try:
print(f"[DDMA Claim step6] Current URL: {self.driver.current_url}")
btn = WebDriverWait(self.driver, 15).until(
EC.element_to_be_clickable((By.XPATH,
"//button[@data-testid='next-step-btn'] | "
"//button[@aria-label='Next step']"
))
)
self.driver.execute_script("arguments[0].scrollIntoView({block:'center'});", btn)
time.sleep(0.5)
self.driver.execute_script("""
var el = arguments[0];
el.dispatchEvent(new PointerEvent('pointerover', {bubbles:true, cancelable:true, composed:true}));
el.dispatchEvent(new PointerEvent('pointerdown', {bubbles:true, cancelable:true, composed:true}));
el.dispatchEvent(new PointerEvent('pointerup', {bubbles:true, cancelable:true, composed:true}));
el.dispatchEvent(new MouseEvent('click', {bubbles:true, cancelable:true, composed:true}));
""", btn)
print(f"[DDMA Claim step6] Clicked 'Next step'")
time.sleep(2)
print(f"[DDMA Claim step6] URL after Next: {self.driver.current_url}")
return "SUCCESS"
except Exception as e:
print(f"[DDMA Claim step6] Exception: {e}")
return f"ERROR: step6 failed: {e}"
# ------------------------------------------------------------------ #
# Step 7 — Claims summary: check acknowledgement + submit #
# ------------------------------------------------------------------ #
def step7_submit_claim(self):
"""On the claims summary page, tick the acknowledgement checkbox then click Submit claim."""
try:
print(f"[DDMA Claim step7] Current URL: {self.driver.current_url}")
# Wait for the acknowledgement checkbox to appear
checkbox = WebDriverWait(self.driver, 15).until(
EC.presence_of_element_located((By.XPATH,
"//input[@type='checkbox'] | "
"//*[@role='checkbox'] | "
"//label[contains(.,'submitting this claim')]//input | "
"//*[contains(@aria-label,'submitting this claim')]"
))
)
self.driver.execute_script("arguments[0].scrollIntoView({block:'center'});", checkbox)
time.sleep(0.3)
self.driver.execute_script("""
var el = arguments[0];
el.dispatchEvent(new PointerEvent('pointerover', {bubbles:true, cancelable:true, composed:true}));
el.dispatchEvent(new PointerEvent('pointerdown', {bubbles:true, cancelable:true, composed:true}));
el.dispatchEvent(new PointerEvent('pointerup', {bubbles:true, cancelable:true, composed:true}));
el.dispatchEvent(new MouseEvent('click', {bubbles:true, cancelable:true, composed:true}));
""", checkbox)
print("[DDMA Claim step7] Checked acknowledgement checkbox")
time.sleep(0.5)
# Click Submit claim button
submit_btn = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH,
"//button[.//span[contains(text(),'Submit claim')]] | "
"//button[contains(normalize-space(text()),'Submit claim')] | "
"//button[@aria-label='Submit claim']"
))
)
self.driver.execute_script("arguments[0].scrollIntoView({block:'center'});", submit_btn)
time.sleep(0.3)
self.driver.execute_script("""
var el = arguments[0];
el.dispatchEvent(new PointerEvent('pointerover', {bubbles:true, cancelable:true, composed:true}));
el.dispatchEvent(new PointerEvent('pointerdown', {bubbles:true, cancelable:true, composed:true}));
el.dispatchEvent(new PointerEvent('pointerup', {bubbles:true, cancelable:true, composed:true}));
el.dispatchEvent(new MouseEvent('click', {bubbles:true, cancelable:true, composed:true}));
""", submit_btn)
print("[DDMA Claim step7] Clicked 'Submit claim'")
time.sleep(2)
print(f"[DDMA Claim step7] URL after submit: {self.driver.current_url}")
return "SUCCESS"
except Exception as e:
print(f"[DDMA Claim step7] Exception: {e}")
return f"ERROR: step7 failed: {e}"
def step8_save_confirmation_pdf(self):
"""Wait for the Thank-you page, extract the claim number, save page as PDF."""
import re
try:
# Wait for confirmation page
WebDriverWait(self.driver, 30).until(
lambda d: "thank" in d.page_source.lower() or "submitted claim" in d.page_source.lower()
)
time.sleep(2)
print(f"[DDMA Claim step8] Confirmation page URL: {self.driver.current_url}")
# Extract claim number from page text
claim_number = None
try:
body_text = self.driver.find_element(By.TAG_NAME, "body").text
match = re.search(r'submitted claim\s+(\d{10,})', body_text, re.IGNORECASE)
if match:
claim_number = match.group(1)
print(f"[DDMA Claim step8] Extracted claim number: {claim_number}")
else:
# Fallback: any long digit sequence
match = re.search(r'\b(\d{12,})\b', body_text)
if match:
claim_number = match.group(1)
print(f"[DDMA Claim step8] Extracted claim number (fallback): {claim_number}")
except Exception as e:
print(f"[DDMA Claim step8] Could not extract claim number: {e}")
# Save page as PDF via Chrome DevTools Protocol
# Use the shared 'downloads/' dir that agent.py serves as static files
shared_downloads = os.path.join(_SERVICE_DIR, "downloads")
os.makedirs(shared_downloads, exist_ok=True)
safe_member = "".join(c for c in str(self.memberId) if c.isalnum() or c in "-_.")
safe_claim = ("_" + claim_number[:20]) if claim_number else ""
timestamp = time.strftime("%Y%m%d_%H%M%S")
pdf_filename = f"ddma_claim_confirmation_{safe_member}{safe_claim}_{timestamp}.pdf"
pdf_path = os.path.join(shared_downloads, pdf_filename)
try:
pdf_data = self.driver.execute_cdp_cmd("Page.printToPDF", {
"printBackground": True,
"paperWidth": 8.5,
"paperHeight": 11,
"marginTop": 0.4,
"marginBottom": 0.4,
"marginLeft": 0.4,
"marginRight": 0.4,
})
pdf_bytes = base64.b64decode(pdf_data["data"])
with open(pdf_path, "wb") as f:
f.write(pdf_bytes)
print(f"[DDMA Claim step8] PDF saved: {pdf_path}")
except Exception as e:
print(f"[DDMA Claim step8] PDF capture failed: {e}")
return f"ERROR: step8 PDF failed: {e}"
return {
"status": "success",
"pdf_path": pdf_path,
"claimNumber": claim_number,
}
except Exception as e:
print(f"[DDMA Claim step8] Exception: {e}")
return f"ERROR: step8 failed: {e}"
# ------------------------------------------------------------------ #
# Fee schedule helpers #
# ------------------------------------------------------------------ #
def _load_ddma_fee_schedule(self):
base = os.path.dirname(os.path.abspath(__file__))
json_path = os.path.join(base, "..", "Frontend", "src", "assets", "data", "procedureCodesDDMA.json")
try:
with open(json_path) as f:
rows = json.load(f)
fee_map = {}
for row in rows:
code = str(row.get("Procedure Code", "")).strip().upper()
if code:
fee_map[code] = row
print(f"[DDMA Claim] Loaded {len(fee_map)} fee codes")
return fee_map
except Exception as e:
print(f"[DDMA Claim] Could not load fee schedule: {e}")
return {}
def _get_patient_age(self):
if not self.dateOfBirth:
return None
try:
parts = self.dateOfBirth.split("-")
dob = date(int(parts[0]), int(parts[1]), int(parts[2]))
today = date.today()
return today.year - dob.year - ((today.month, today.day) < (dob.month, dob.day))
except Exception:
return None
def _get_fee(self, code, fee_map, age):
row = fee_map.get(str(code).strip().upper(), {})
if not row:
return ""
if "Price" in row:
val = str(row["Price"])
elif age is not None and age <= 21:
val = str(row.get("PriceLTEQ21") or row.get("PriceGT21") or "")
else:
val = str(row.get("PriceGT21") or row.get("PriceLTEQ21") or "")
return "" if val in ("NC", "IC", "None", "") else val