feat: add CCA claim submission with Selenium automation
- Add CCA claim submit Selenium worker (login, fill form, attach docs, submit, capture dashboard PDF) - Add CCA fee schedule (procedureCodesMH.json renamed, procedureCodesCCA.json added with D6010) - Add backend route /api/claims/cca-claim, processor, and Selenium client - Wire CCA claim handler in claims-page with job tracking and PDF preview popup - Add insurance type dropdown in claim form (same options as eligibility page) - Auto-populate insurance type from patient.insuranceProvider in claim form and patient edit form - Map fee schedule by insurance type in Map Price button and combo buttons - Fix CCA login speed (remove fixed sleeps, use readyState check) - Fix CCA claim DOB format bug (was sending MM-DD-YYYY, now sends YYYY-MM-DD) - Fix npiProviderId not saved for CCA claims - Change Add Service → CCA Claim button (blue), MH → MH Claim Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -18,6 +18,7 @@ import helpers_deltains_eligibility as hdeltains
|
||||
import helpers_unitedsco_eligibility as hunitedsco
|
||||
import helpers_dentaquest_eligibility as hdentaquest
|
||||
import helpers_cca_eligibility as hcca
|
||||
import helpers_cca_claim as hcca_claim
|
||||
|
||||
# Import startup session-clear functions
|
||||
from ddma_browser_manager import clear_ddma_session_on_startup
|
||||
@@ -542,6 +543,47 @@ async def cca_eligibility(request: Request):
|
||||
return {"status": "started", "session_id": sid}
|
||||
|
||||
|
||||
async def _cca_claim_worker_wrapper(sid: str, data: dict, url: str):
|
||||
"""Background worker for CCA claim submission."""
|
||||
global active_jobs, waiting_jobs
|
||||
async with semaphore:
|
||||
async with lock:
|
||||
waiting_jobs -= 1
|
||||
active_jobs += 1
|
||||
try:
|
||||
await hcca_claim.start_cca_claim_run(sid, data, url)
|
||||
finally:
|
||||
async with lock:
|
||||
active_jobs -= 1
|
||||
|
||||
|
||||
@app.post("/cca-claim")
|
||||
async def cca_claim(request: Request):
|
||||
"""
|
||||
Starts a CCA claim submission session in the background.
|
||||
Logs in, navigates Claims > Submit Claims, opens claim entry page.
|
||||
Body: { "claim": { "cca_username": "...", "cca_password": "...", ... } }
|
||||
Returns: { status: "started", session_id: "<uuid>" }
|
||||
"""
|
||||
global waiting_jobs
|
||||
|
||||
body = await request.json()
|
||||
|
||||
sid = hcca_claim.make_session_entry()
|
||||
hcca_claim.sessions[sid]["type"] = "cca_claim"
|
||||
hcca_claim.sessions[sid]["last_activity"] = time.time()
|
||||
|
||||
async with lock:
|
||||
waiting_jobs += 1
|
||||
|
||||
asyncio.create_task(_cca_claim_worker_wrapper(
|
||||
sid, body,
|
||||
url="https://pwp.sciondental.com/PWP/Landing"
|
||||
))
|
||||
|
||||
return {"status": "started", "session_id": sid}
|
||||
|
||||
|
||||
@app.post("/submit-otp")
|
||||
async def submit_otp(request: Request):
|
||||
"""
|
||||
@@ -584,6 +626,8 @@ async def session_status(sid: str):
|
||||
s = hdentaquest.get_session_status(sid)
|
||||
elif sid in hcca.sessions:
|
||||
s = hcca.get_session_status(sid)
|
||||
elif sid in hcca_claim.sessions:
|
||||
s = hcca_claim.get_session_status(sid)
|
||||
else:
|
||||
s = {"status": "not_found"}
|
||||
if s.get("status") == "not_found":
|
||||
|
||||
216
apps/SeleniumService/helpers_cca_claim.py
Normal file
216
apps/SeleniumService/helpers_cca_claim.py
Normal file
@@ -0,0 +1,216 @@
|
||||
import os
|
||||
import time
|
||||
import asyncio
|
||||
from typing import Dict, Any
|
||||
from selenium.common.exceptions import WebDriverException
|
||||
|
||||
from selenium_CCA_claimSubmitWorker import AutomationCCAClaimSubmit
|
||||
from cca_browser_manager import get_browser_manager
|
||||
|
||||
sessions: Dict[str, Dict[str, Any]] = {}
|
||||
|
||||
|
||||
def make_session_entry() -> str:
|
||||
import uuid
|
||||
sid = str(uuid.uuid4())
|
||||
sessions[sid] = {
|
||||
"status": "created",
|
||||
"created_at": time.time(),
|
||||
"last_activity": time.time(),
|
||||
"bot": None,
|
||||
"driver": None,
|
||||
"result": None,
|
||||
"message": None,
|
||||
}
|
||||
return sid
|
||||
|
||||
|
||||
async def cleanup_session(sid: str, message: str | None = None):
|
||||
s = sessions.get(sid)
|
||||
if not s:
|
||||
return
|
||||
try:
|
||||
if s.get("status") not in ("completed", "error"):
|
||||
s["status"] = "error"
|
||||
if message:
|
||||
s["message"] = message
|
||||
finally:
|
||||
sessions.pop(sid, None)
|
||||
|
||||
|
||||
async def _remove_session_later(sid: str, delay: int = 30):
|
||||
await asyncio.sleep(delay)
|
||||
await cleanup_session(sid)
|
||||
|
||||
|
||||
def _close_browser(bot):
|
||||
try:
|
||||
bm = get_browser_manager()
|
||||
try:
|
||||
bm.save_cookies()
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
bm.quit_driver()
|
||||
print("[CCA Claim] Browser closed")
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as e:
|
||||
print(f"[CCA Claim] Could not close browser: {e}")
|
||||
|
||||
|
||||
async def start_cca_claim_run(sid: str, data: dict, url: str):
|
||||
"""
|
||||
Run the CCA claim submission workflow:
|
||||
1. Login to ScionDental portal
|
||||
2. Navigate Claims > Submit Claims to open claim entry page
|
||||
"""
|
||||
s = sessions.get(sid)
|
||||
if not s:
|
||||
return {"status": "error", "message": "session not found"}
|
||||
|
||||
s["status"] = "running"
|
||||
s["last_activity"] = time.time()
|
||||
bot = None
|
||||
|
||||
try:
|
||||
bot = AutomationCCAClaimSubmit(data)
|
||||
bot.config_driver()
|
||||
|
||||
s["bot"] = bot
|
||||
s["driver"] = bot.driver
|
||||
s["last_activity"] = time.time()
|
||||
|
||||
try:
|
||||
bot.driver.maximize_window()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# --- Login ---
|
||||
try:
|
||||
login_result = bot.login(url)
|
||||
except WebDriverException as wde:
|
||||
s["status"] = "error"
|
||||
s["message"] = f"Selenium driver error during login: {wde}"
|
||||
s["result"] = {"status": "error", "message": s["message"]}
|
||||
_close_browser(bot)
|
||||
asyncio.create_task(_remove_session_later(sid, 30))
|
||||
return {"status": "error", "message": s["message"]}
|
||||
except Exception as e:
|
||||
s["status"] = "error"
|
||||
s["message"] = f"Unexpected error during login: {e}"
|
||||
s["result"] = {"status": "error", "message": s["message"]}
|
||||
_close_browser(bot)
|
||||
asyncio.create_task(_remove_session_later(sid, 30))
|
||||
return {"status": "error", "message": s["message"]}
|
||||
|
||||
if isinstance(login_result, str) and login_result == "ALREADY_LOGGED_IN":
|
||||
print("[CCA Claim] Session persisted - skipping login")
|
||||
get_browser_manager().save_cookies()
|
||||
|
||||
elif isinstance(login_result, str) and login_result.startswith("ERROR"):
|
||||
s["status"] = "error"
|
||||
s["message"] = login_result
|
||||
s["result"] = {"status": "error", "message": login_result}
|
||||
_close_browser(bot)
|
||||
asyncio.create_task(_remove_session_later(sid, 30))
|
||||
return {"status": "error", "message": login_result}
|
||||
|
||||
elif isinstance(login_result, str) and login_result == "SUCCESS":
|
||||
print("[CCA Claim] Login succeeded")
|
||||
get_browser_manager().save_cookies()
|
||||
|
||||
# --- Navigate to Submit Claims ---
|
||||
step1_result = bot.step1_navigate_to_submit_claims()
|
||||
print(f"[CCA Claim] step1 result: {step1_result}")
|
||||
|
||||
if isinstance(step1_result, str) and step1_result.startswith("ERROR"):
|
||||
s["status"] = "error"
|
||||
s["message"] = step1_result
|
||||
s["result"] = {"status": "error", "message": step1_result}
|
||||
_close_browser(bot)
|
||||
asyncio.create_task(_remove_session_later(sid, 30))
|
||||
return {"status": "error", "message": step1_result}
|
||||
|
||||
# --- Fill patient eligibility form & verify ---
|
||||
step2_result = bot.step2_fill_patient_eligibility()
|
||||
print(f"[CCA Claim] step2 result: {step2_result}")
|
||||
|
||||
if isinstance(step2_result, str) and step2_result.startswith("ERROR"):
|
||||
# Keep browser open — log page state for debugging
|
||||
try:
|
||||
url = bot.driver.current_url
|
||||
body = bot.driver.find_element(__import__('selenium').webdriver.common.by.By.TAG_NAME, "body").text[:600]
|
||||
print(f"[CCA Claim] step2 error — URL: {url}")
|
||||
print(f"[CCA Claim] step2 error — page text: {body}")
|
||||
except Exception:
|
||||
pass
|
||||
s["status"] = "error"
|
||||
s["message"] = step2_result
|
||||
s["result"] = {"status": "error", "message": step2_result}
|
||||
_close_browser(bot)
|
||||
asyncio.create_task(_remove_session_later(sid, 30))
|
||||
return {"status": "error", "message": step2_result}
|
||||
|
||||
# --- Fill Services grid, attach docs, submit ---
|
||||
step3_result = bot.step3_fill_services_and_submit()
|
||||
print(f"[CCA Claim] step3 result: {step3_result}")
|
||||
|
||||
if isinstance(step3_result, str) and step3_result.startswith("ERROR"):
|
||||
try:
|
||||
url = bot.driver.current_url
|
||||
body = bot.driver.find_element(__import__('selenium').webdriver.common.by.By.TAG_NAME, "body").text[:600]
|
||||
print(f"[CCA Claim] step3 error — URL: {url}")
|
||||
print(f"[CCA Claim] step3 error — page text: {body}")
|
||||
except Exception:
|
||||
pass
|
||||
s["status"] = "error"
|
||||
s["message"] = step3_result
|
||||
s["result"] = {"status": "error", "message": step3_result}
|
||||
_close_browser(bot)
|
||||
asyncio.create_task(_remove_session_later(sid, 30))
|
||||
return {"status": "error", "message": step3_result}
|
||||
|
||||
# --- Capture confirmation PDF + claim number ---
|
||||
step4_result = bot.step4_capture_confirmation()
|
||||
print(f"[CCA Claim] step4 claimNumber={step4_result.get('claimNumber')}, "
|
||||
f"pdfLen={len(step4_result.get('pdfBase64',''))}")
|
||||
|
||||
_close_browser(bot)
|
||||
|
||||
result = {
|
||||
"status": "success",
|
||||
"message": "CCA claim submitted successfully",
|
||||
"claimNumber": step4_result.get("claimNumber"),
|
||||
"pdfBase64": step4_result.get("pdfBase64", ""),
|
||||
"pdfFilename": step4_result.get("pdfFilename", ""),
|
||||
}
|
||||
s["status"] = "completed"
|
||||
s["result"] = result
|
||||
s["message"] = "completed"
|
||||
asyncio.create_task(_remove_session_later(sid, 60))
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
if s:
|
||||
s["status"] = "error"
|
||||
s["message"] = f"worker exception: {e}"
|
||||
s["result"] = {"status": "error", "message": s["message"]}
|
||||
if bot:
|
||||
_close_browser(bot)
|
||||
asyncio.create_task(_remove_session_later(sid, 30))
|
||||
return {"status": "error", "message": f"worker exception: {e}"}
|
||||
|
||||
|
||||
def get_session_status(sid: str) -> Dict[str, Any]:
|
||||
s = sessions.get(sid)
|
||||
if not s:
|
||||
return {"status": "not_found"}
|
||||
return {
|
||||
"session_id": sid,
|
||||
"status": s.get("status"),
|
||||
"message": s.get("message"),
|
||||
"created_at": s.get("created_at"),
|
||||
"last_activity": s.get("last_activity"),
|
||||
"result": s.get("result") if s.get("status") in ("completed", "error") else None,
|
||||
}
|
||||
977
apps/SeleniumService/selenium_CCA_claimSubmitWorker.py
Normal file
977
apps/SeleniumService/selenium_CCA_claimSubmitWorker.py
Normal file
@@ -0,0 +1,977 @@
|
||||
from selenium.common.exceptions import TimeoutException
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.common.keys import Keys
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
import time
|
||||
import os
|
||||
import re
|
||||
import json
|
||||
import base64
|
||||
import tempfile
|
||||
from datetime import date
|
||||
|
||||
from cca_browser_manager import get_browser_manager
|
||||
|
||||
LANDING_URL = "https://pwp.sciondental.com/PWP/Landing"
|
||||
CLAIM_ENTRY_URL = "https://pwp.sciondental.com/PWP/Dental/ClaimEntry"
|
||||
|
||||
|
||||
class AutomationCCAClaimSubmit:
|
||||
def __init__(self, data):
|
||||
self.headless = False
|
||||
self.driver = None
|
||||
|
||||
raw = data if isinstance(data, dict) else {}
|
||||
claim = raw.get("claim", {}) or {}
|
||||
|
||||
patient_name = (claim.get("patientName") or "").strip()
|
||||
parts = patient_name.split()
|
||||
first_name = parts[0] if parts else ""
|
||||
last_name = " ".join(parts[1:]) if len(parts) > 1 else ""
|
||||
|
||||
self.cca_username = claim.get("cca_username", "") or raw.get("cca_username", "")
|
||||
self.cca_password = claim.get("cca_password", "") or raw.get("cca_password", "")
|
||||
self.memberId = claim.get("memberId", "")
|
||||
self.dateOfBirth = claim.get("dateOfBirth", "")
|
||||
self.serviceDate = claim.get("serviceDate", "")
|
||||
self.firstName = claim.get("firstName", "") or first_name
|
||||
self.lastName = claim.get("lastName", "") or last_name
|
||||
self.serviceLines = claim.get("serviceLines", []) or []
|
||||
# Files: list of {originalname, bufferBase64, mimetype}
|
||||
files_raw = raw.get("files", []) or []
|
||||
self.uploadFiles = [f for f in files_raw if f.get("bufferBase64")]
|
||||
|
||||
print(f"[CCA Claim] Init — member={self.memberId}, "
|
||||
f"patient={self.firstName} {self.lastName}, "
|
||||
f"lines={len(self.serviceLines)}, files={len(self.uploadFiles)}")
|
||||
|
||||
def config_driver(self):
|
||||
self.driver = get_browser_manager().get_driver(self.headless)
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Login (same logic as eligibility worker) #
|
||||
# ------------------------------------------------------------------ #
|
||||
def _page_has_logged_in_content(self):
|
||||
try:
|
||||
body = self.driver.find_element(By.TAG_NAME, "body").text
|
||||
return any(x in body for x in [
|
||||
"Verify Patient Eligibility", "Patient Management",
|
||||
"Submit a Claim", "Claim Inquiry", "Submit Claims",
|
||||
])
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def login(self, url):
|
||||
browser_manager = get_browser_manager()
|
||||
try:
|
||||
if self.cca_username and browser_manager.credentials_changed(self.cca_username):
|
||||
try:
|
||||
self.driver.delete_all_cookies()
|
||||
except Exception:
|
||||
pass
|
||||
browser_manager.clear_credentials_hash()
|
||||
self.driver.get(url)
|
||||
time.sleep(2)
|
||||
|
||||
try:
|
||||
current_url = self.driver.current_url
|
||||
if ("sciondental.com" in current_url
|
||||
and "login" not in current_url.lower()
|
||||
and self._page_has_logged_in_content()):
|
||||
print("[CCA Claim login] Already logged in")
|
||||
return "ALREADY_LOGGED_IN"
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
print("[CCA Claim login] Checking session at landing page...")
|
||||
self.driver.get(LANDING_URL)
|
||||
try:
|
||||
WebDriverWait(self.driver, 10).until(
|
||||
lambda d: "sciondental.com" in d.current_url
|
||||
and d.execute_script("return document.readyState") == "complete"
|
||||
)
|
||||
except TimeoutException:
|
||||
pass
|
||||
|
||||
if self._page_has_logged_in_content():
|
||||
print("[CCA Claim login] Session still valid")
|
||||
return "ALREADY_LOGGED_IN"
|
||||
|
||||
print("[CCA Claim login] Session expired — logging in...")
|
||||
self.driver.get(url)
|
||||
try:
|
||||
WebDriverWait(self.driver, 15).until(
|
||||
EC.presence_of_element_located((By.XPATH, "//input")))
|
||||
except TimeoutException:
|
||||
pass
|
||||
|
||||
# Username
|
||||
username_field = None
|
||||
for sel in [
|
||||
(By.ID, "Username"),
|
||||
(By.NAME, "Username"),
|
||||
(By.XPATH, "//input[@type='text']"),
|
||||
(By.XPATH, "//input[@type='email']"),
|
||||
]:
|
||||
try:
|
||||
f = WebDriverWait(self.driver, 6).until(EC.element_to_be_clickable(sel))
|
||||
username_field = f
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if not username_field:
|
||||
if self._page_has_logged_in_content():
|
||||
return "ALREADY_LOGGED_IN"
|
||||
return "ERROR: Could not find username field"
|
||||
|
||||
username_field.click()
|
||||
username_field.send_keys(Keys.CONTROL + "a")
|
||||
username_field.send_keys(Keys.DELETE)
|
||||
username_field.send_keys(self.cca_username)
|
||||
time.sleep(0.3)
|
||||
|
||||
# Password
|
||||
pw_field = None
|
||||
for sel in [
|
||||
(By.ID, "Password"),
|
||||
(By.NAME, "Password"),
|
||||
(By.XPATH, "//input[@type='password']"),
|
||||
]:
|
||||
try:
|
||||
f = WebDriverWait(self.driver, 6).until(EC.element_to_be_clickable(sel))
|
||||
pw_field = f
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if not pw_field:
|
||||
return "ERROR: Password field not found"
|
||||
|
||||
pw_field.click()
|
||||
pw_field.send_keys(Keys.CONTROL + "a")
|
||||
pw_field.send_keys(Keys.DELETE)
|
||||
pw_field.send_keys(self.cca_password)
|
||||
time.sleep(0.3)
|
||||
|
||||
# Submit
|
||||
submitted = False
|
||||
for sel in [
|
||||
(By.XPATH, "//button[@type='submit']"),
|
||||
(By.XPATH, "//input[@type='submit']"),
|
||||
(By.XPATH, "//button[contains(text(),'Sign In') or contains(text(),'Log In') or contains(text(),'Login')]"),
|
||||
]:
|
||||
try:
|
||||
btn = self.driver.find_element(*sel)
|
||||
if btn.is_displayed():
|
||||
btn.click()
|
||||
submitted = True
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if not submitted:
|
||||
pw_field.send_keys(Keys.RETURN)
|
||||
|
||||
if self.cca_username:
|
||||
browser_manager.save_credentials_hash(self.cca_username)
|
||||
|
||||
try:
|
||||
WebDriverWait(self.driver, 20).until(
|
||||
lambda d: any(x in d.find_element(By.TAG_NAME, "body").text
|
||||
for x in ["Verify Patient Eligibility", "Patient Management",
|
||||
"Submit a Claim", "Claim Inquiry", "Submit Claims"]))
|
||||
print("[CCA Claim login] Login succeeded")
|
||||
return "SUCCESS"
|
||||
except TimeoutException:
|
||||
pass
|
||||
|
||||
body_text = self.driver.find_element(By.TAG_NAME, "body").text
|
||||
if "invalid" in body_text.lower():
|
||||
return "ERROR: Invalid username or password"
|
||||
return "ERROR: Login did not succeed — portal content not found after submit"
|
||||
|
||||
except Exception as e:
|
||||
print(f"[CCA Claim login] Exception: {e}")
|
||||
return f"ERROR:LOGIN FAILED: {e}"
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Step 1 — Navigate to Claims > Submit Claims #
|
||||
# ------------------------------------------------------------------ #
|
||||
def step1_navigate_to_submit_claims(self):
|
||||
"""
|
||||
Click the 'Claims' navbar dropdown then choose 'Submit Claims'.
|
||||
Returns 'SUCCESS' when the claim-entry page is loaded, else 'ERROR:...'.
|
||||
"""
|
||||
try:
|
||||
# Navigate directly to the claim entry URL (no menu navigation needed)
|
||||
print(f"[CCA Claim step1] Navigating directly to {CLAIM_ENTRY_URL}")
|
||||
self.driver.get(CLAIM_ENTRY_URL)
|
||||
|
||||
# Wait for the Subscriber ID field — confirms we're on the right page
|
||||
try:
|
||||
WebDriverWait(self.driver, 20).until(
|
||||
EC.presence_of_element_located((By.ID, "tbSubscriberId"))
|
||||
)
|
||||
print(f"[CCA Claim step1] Claim entry page loaded — URL: {self.driver.current_url}")
|
||||
except TimeoutException:
|
||||
# May have been redirected to login — check and re-login if needed
|
||||
print(f"[CCA Claim step1] tbSubscriberId not found, URL: {self.driver.current_url}")
|
||||
if "login" in self.driver.current_url.lower() or not self._page_has_logged_in_content():
|
||||
return "ERROR: Session expired — redirected to login"
|
||||
print("[CCA Claim step1] Continuing despite timeout")
|
||||
|
||||
time.sleep(1)
|
||||
return "SUCCESS"
|
||||
|
||||
except Exception as e:
|
||||
print(f"[CCA Claim step1] Exception: {e}")
|
||||
return f"ERROR: step1 failed: {e}"
|
||||
|
||||
def _set_ng_value(self, field, value: str):
|
||||
"""Set value on an AngularJS ng-model input and trigger digest."""
|
||||
try:
|
||||
self.driver.execute_script(
|
||||
"""
|
||||
var el = arguments[0], val = arguments[1];
|
||||
var setter = Object.getOwnPropertyDescriptor(
|
||||
window.HTMLInputElement.prototype, 'value').set;
|
||||
setter.call(el, val);
|
||||
el.dispatchEvent(new Event('input', {bubbles:true}));
|
||||
el.dispatchEvent(new Event('change', {bubbles:true}));
|
||||
""",
|
||||
field, value
|
||||
)
|
||||
except Exception:
|
||||
field.click()
|
||||
field.send_keys(Keys.CONTROL + "a")
|
||||
field.send_keys(Keys.DELETE)
|
||||
field.send_keys(value)
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Step 2 — Expand panel, fill Patient/Provider info, Verify Eligibility
|
||||
# ------------------------------------------------------------------ #
|
||||
def _js_set(self, element_id: str, value: str) -> bool:
|
||||
"""Set an input value via JavaScript — works regardless of CSS visibility."""
|
||||
try:
|
||||
el = self.driver.find_element(By.ID, element_id)
|
||||
self.driver.execute_script(
|
||||
"var el=arguments[0], v=arguments[1];"
|
||||
"var s=Object.getOwnPropertyDescriptor(window.HTMLInputElement.prototype,'value').set;"
|
||||
"s.call(el,v);"
|
||||
"el.dispatchEvent(new Event('input',{bubbles:true}));"
|
||||
"el.dispatchEvent(new Event('change',{bubbles:true}));",
|
||||
el, value
|
||||
)
|
||||
actual = el.get_attribute("value")
|
||||
print(f"[CCA Claim step2] JS set {element_id!r} = {actual!r}")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"[CCA Claim step2] JS set {element_id!r} failed: {e}")
|
||||
return False
|
||||
|
||||
def _keys_set(self, element_id: str, value: str) -> bool:
|
||||
"""Set an input via send_keys — needed for masked date fields."""
|
||||
try:
|
||||
el = self.driver.find_element(By.ID, element_id)
|
||||
el.click()
|
||||
el.send_keys(Keys.CONTROL + "a")
|
||||
el.send_keys(Keys.DELETE)
|
||||
el.send_keys(value)
|
||||
time.sleep(0.3)
|
||||
actual = el.get_attribute("value")
|
||||
print(f"[CCA Claim step2] keys set {element_id!r} = {actual!r}")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"[CCA Claim step2] keys set {element_id!r} failed: {e}")
|
||||
return False
|
||||
|
||||
def step2_fill_patient_eligibility(self):
|
||||
"""
|
||||
Fill Subscriber ID, Date of Birth, Date of Service then click Verify Eligibility.
|
||||
All panels and tabs are already open on the claim entry page.
|
||||
Returns 'SUCCESS' or 'ERROR:...'.
|
||||
"""
|
||||
try:
|
||||
formatted_dob = self._format_dob(self.dateOfBirth)
|
||||
formatted_dos = self._format_dob(self.serviceDate) if self.serviceDate else ""
|
||||
print(f"[CCA Claim step2] memberId={self.memberId!r}, DOB={formatted_dob!r}, DOS={formatted_dos!r}")
|
||||
|
||||
# Wait for form to be ready
|
||||
print("[CCA Claim step2] Waiting for tbSubscriberId in DOM...")
|
||||
try:
|
||||
WebDriverWait(self.driver, 20).until(
|
||||
EC.presence_of_element_located((By.ID, "tbSubscriberId"))
|
||||
)
|
||||
print("[CCA Claim step2] tbSubscriberId present in DOM")
|
||||
except Exception:
|
||||
print("[CCA Claim step2] tbSubscriberId not in DOM after 20s")
|
||||
return "ERROR: Subscriber ID field not found in DOM"
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
# --- Subscriber ID (JS set) ---
|
||||
if not self._js_set("tbSubscriberId", self.memberId):
|
||||
return "ERROR: Could not set Subscriber ID"
|
||||
|
||||
# --- Date of Birth (send_keys for date mask) ---
|
||||
if not self._keys_set("tbDateOfBirth", formatted_dob):
|
||||
return "ERROR: Could not set Date of Birth"
|
||||
|
||||
# --- Date of Service (send_keys for date mask) ---
|
||||
if formatted_dos:
|
||||
self._keys_set("tbDateOfService", formatted_dos)
|
||||
|
||||
# --- Click Verify Eligibility ---
|
||||
print("[CCA Claim step2] Clicking 'Verify Eligibility'...")
|
||||
verified = False
|
||||
for sel in [
|
||||
(By.ID, "ButtonVerifyMemberEligibility"),
|
||||
(By.XPATH, "//button[@id='ButtonVerifyMemberEligibility']"),
|
||||
(By.XPATH, "//button[contains(text(),'Verify Eligibility')]"),
|
||||
]:
|
||||
try:
|
||||
btn = WebDriverWait(self.driver, 8).until(EC.element_to_be_clickable(sel))
|
||||
btn.click()
|
||||
verified = True
|
||||
print(f"[CCA Claim step2] Clicked 'Verify Eligibility' via {sel}")
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if not verified:
|
||||
return "ERROR: 'Verify Eligibility' button not found"
|
||||
|
||||
# Wait for eligibility result
|
||||
print("[CCA Claim step2] Waiting for eligibility result...")
|
||||
try:
|
||||
WebDriverWait(self.driver, 25).until(
|
||||
lambda d: any(x in d.find_element(By.TAG_NAME, "body").text.lower() for x in [
|
||||
"eligible", "not eligible", "ineligible",
|
||||
"patient name", "member name", "verified",
|
||||
])
|
||||
)
|
||||
print("[CCA Claim step2] Eligibility result appeared")
|
||||
except TimeoutException:
|
||||
print(f"[CCA Claim step2] Timed out waiting for eligibility result — URL: {self.driver.current_url}")
|
||||
|
||||
time.sleep(1)
|
||||
return "SUCCESS"
|
||||
|
||||
except Exception as e:
|
||||
print(f"[CCA Claim step2] Exception: {e}")
|
||||
return f"ERROR: step2 failed: {e}"
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Step 4 — Extract claim number + capture confirmation PDF #
|
||||
# ------------------------------------------------------------------ #
|
||||
def extract_claim_number(self):
|
||||
"""Extract claim/reference number from the CCA submission confirmation page."""
|
||||
try:
|
||||
WebDriverWait(self.driver, 15).until(
|
||||
lambda d: d.execute_script("return document.readyState") == "complete"
|
||||
)
|
||||
time.sleep(1)
|
||||
|
||||
body_text = self.driver.find_element(By.TAG_NAME, "body").text
|
||||
print(f"[CCA Claim step4] Confirmation page text (first 600): {body_text[:600]}")
|
||||
|
||||
# Try DOM selectors first
|
||||
for sel in [
|
||||
(By.XPATH, "//*[contains(text(),'Claim Number') or contains(text(),'claim number')]/following-sibling::*[1]"),
|
||||
(By.XPATH, "//*[contains(text(),'Claim Number') or contains(text(),'claim number')]/following::*[1]"),
|
||||
(By.XPATH, "//*[contains(text(),'Reference Number') or contains(text(),'Confirmation Number')]/following-sibling::*[1]"),
|
||||
(By.XPATH, "//*[contains(text(),'assigned the number')]/following::*[1]"),
|
||||
(By.XPATH, "//*[@id and contains(@id,'ClaimNumber')]"),
|
||||
(By.XPATH, "//*[@id and contains(@id,'ConfirmationNumber')]"),
|
||||
]:
|
||||
try:
|
||||
el = self.driver.find_element(*sel)
|
||||
text = el.text.strip()
|
||||
if text and re.search(r'\d{6,}', text):
|
||||
print(f"[CCA Claim step4] Claim number via DOM: {text}")
|
||||
return re.search(r'[\w\-]{6,}', text).group(0)
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
# Regex scan over full page text
|
||||
# Pattern: "assigned the number XXXXXXXXX" or "Claim Number: XXXXXXX"
|
||||
for pattern in [
|
||||
r'assigned\s+the\s+number\s+([\w\-]{6,30})',
|
||||
r'[Cc]laim\s+[Nn]umber[:\s]+([A-Z0-9\-]{6,30})',
|
||||
r'[Cc]onfirmation\s+[Nn]umber[:\s]+([A-Z0-9\-]{6,30})',
|
||||
r'[Rr]eference\s+[Nn]umber[:\s]+([A-Z0-9\-]{6,30})',
|
||||
r'(\d{15})', # MassHealth-style 15-digit
|
||||
r'(\d{9,14})', # 9-14 digit fallback
|
||||
]:
|
||||
m = re.search(pattern, body_text)
|
||||
if m:
|
||||
print(f"[CCA Claim step4] Claim number via regex ({pattern}): {m.group(1)}")
|
||||
return m.group(1)
|
||||
|
||||
# JavaScript fallback
|
||||
claim_number = self.driver.execute_script(r"""
|
||||
var els = document.querySelectorAll('body, p, div, span, td, label, h1, h2, h3, strong, b');
|
||||
for (var i = 0; i < els.length; i++) {
|
||||
var t = (els[i].textContent || '').trim();
|
||||
var m = t.match(/assigned\s+the\s+number\s+([\w\-]{6,30})/i)
|
||||
|| t.match(/[Cc]laim\s+[Nn]umber[:\s]+([\w\-]{6,30})/)
|
||||
|| t.match(/(\d{15})/)
|
||||
|| t.match(/(\d{9,14})/);
|
||||
if (m) return m[1];
|
||||
}
|
||||
return null;
|
||||
""")
|
||||
if claim_number:
|
||||
print(f"[CCA Claim step4] Claim number via JS: {claim_number}")
|
||||
return claim_number
|
||||
|
||||
print("[CCA Claim step4] Could not extract claim number")
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"[CCA Claim step4] extract_claim_number exception: {e}")
|
||||
return None
|
||||
|
||||
def step4_capture_confirmation(self):
|
||||
"""
|
||||
On the Claims Dashboard page, extract the Encounter ID from the first row
|
||||
and capture the page as PDF.
|
||||
Returns dict: { claimNumber, pdfBase64, pdfFilename }.
|
||||
"""
|
||||
try:
|
||||
WebDriverWait(self.driver, 15).until(
|
||||
lambda d: d.execute_script("return document.readyState") == "complete"
|
||||
)
|
||||
time.sleep(1)
|
||||
print(f"[CCA Claim step4] Capturing dashboard — URL: {self.driver.current_url}")
|
||||
|
||||
# Extract Encounter ID from the LAST row (newest claim is at the bottom)
|
||||
claim_number = None
|
||||
|
||||
# Try table cells first (Encounter ID is the first column)
|
||||
for sel in [
|
||||
(By.XPATH, "//table//tbody//tr[last()]/td[1]"),
|
||||
(By.XPATH, "//*[contains(@class,'wbx-table-results')]//tr[last()]/td[1]"),
|
||||
(By.XPATH, "//table//tbody//tr[last()-0]/td[1]"),
|
||||
]:
|
||||
try:
|
||||
cell = WebDriverWait(self.driver, 8).until(
|
||||
EC.presence_of_element_located(sel))
|
||||
text = cell.text.strip()
|
||||
if re.match(r'\d{10,}', text):
|
||||
claim_number = text
|
||||
print(f"[CCA Claim step4] Encounter ID from last row: {claim_number}")
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
# Fallback: regex scan on full page text
|
||||
if not claim_number:
|
||||
claim_number = self.extract_claim_number()
|
||||
|
||||
safe_member = "".join(c for c in str(self.memberId) if c.isalnum() or c in "-_.")
|
||||
timestamp = time.strftime("%Y%m%d_%H%M%S")
|
||||
safe_claim = ("_" + re.sub(r'[^\w\-]', '', str(claim_number))[:20]) if claim_number else ""
|
||||
pdf_filename = f"cca_claim_{safe_member}{safe_claim}_{timestamp}.pdf"
|
||||
|
||||
print(f"[CCA Claim step4] Capturing confirmation PDF — {self.driver.current_url}")
|
||||
|
||||
# Primary: CDP printToPDF
|
||||
try:
|
||||
pdf_data = self.driver.execute_cdp_cmd("Page.printToPDF", {
|
||||
"printBackground": True,
|
||||
"paperWidth": 8.5,
|
||||
"paperHeight": 11,
|
||||
"marginTop": 0.4,
|
||||
"marginBottom": 0.4,
|
||||
"marginLeft": 0.4,
|
||||
"marginRight": 0.4,
|
||||
})
|
||||
pdf_base64 = pdf_data.get("data", "")
|
||||
if pdf_base64 and len(pdf_base64) > 500:
|
||||
print(f"[CCA Claim step4] PDF captured via CDP ({len(pdf_base64)} b64 chars)")
|
||||
return {
|
||||
"claimNumber": claim_number,
|
||||
"pdfBase64": pdf_base64,
|
||||
"pdfFilename": pdf_filename,
|
||||
}
|
||||
except Exception as e:
|
||||
print(f"[CCA Claim step4] CDP PDF failed: {e}")
|
||||
|
||||
# Fallback: screenshot as PNG
|
||||
try:
|
||||
png_filename = pdf_filename.replace(".pdf", ".png")
|
||||
total_height = self.driver.execute_script("return document.body.scrollHeight")
|
||||
self.driver.set_window_size(1280, max(total_height, 900))
|
||||
time.sleep(0.5)
|
||||
png_base64 = self.driver.get_screenshot_as_base64()
|
||||
print(f"[CCA Claim step4] Screenshot fallback captured")
|
||||
return {
|
||||
"claimNumber": claim_number,
|
||||
"pdfBase64": png_base64,
|
||||
"pdfFilename": png_filename,
|
||||
}
|
||||
except Exception as e2:
|
||||
print(f"[CCA Claim step4] Screenshot fallback failed: {e2}")
|
||||
|
||||
return {"claimNumber": claim_number, "pdfBase64": "", "pdfFilename": ""}
|
||||
|
||||
except Exception as e:
|
||||
print(f"[CCA Claim step4] Exception: {e}")
|
||||
return {"claimNumber": None, "pdfBase64": "", "pdfFilename": ""}
|
||||
|
||||
def _format_dob(self, dob_str):
|
||||
"""Normalize any common date format to MM/DD/YYYY."""
|
||||
if not dob_str:
|
||||
return dob_str
|
||||
s = str(dob_str).strip()
|
||||
# Already MM/DD/YYYY or M/D/YYYY
|
||||
if re.match(r'^\d{1,2}/\d{1,2}/\d{4}$', s):
|
||||
return s
|
||||
# YYYY-MM-DD or MM-DD-YYYY (detect by length of first segment)
|
||||
if "-" in s:
|
||||
parts = s.split("-")
|
||||
if len(parts) == 3:
|
||||
if len(parts[0]) == 4:
|
||||
# YYYY-MM-DD → MM/DD/YYYY
|
||||
return f"{parts[1]}/{parts[2]}/{parts[0]}"
|
||||
if len(parts[2]) == 4:
|
||||
# MM-DD-YYYY → MM/DD/YYYY
|
||||
return f"{parts[0]}/{parts[1]}/{parts[2]}"
|
||||
return s
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Fee schedule helpers #
|
||||
# ------------------------------------------------------------------ #
|
||||
def _load_cca_fee_schedule(self):
|
||||
base = os.path.dirname(os.path.abspath(__file__))
|
||||
json_path = os.path.join(
|
||||
base, "..", "Frontend", "src", "assets", "data", "procedureCodesCCA.json"
|
||||
)
|
||||
try:
|
||||
with open(json_path) as f:
|
||||
rows = json.load(f)
|
||||
fee_map = {}
|
||||
for row in rows:
|
||||
code = str(row.get("Procedure Code", "")).strip().upper()
|
||||
if code:
|
||||
fee_map[code] = row
|
||||
print(f"[CCA Claim] Loaded {len(fee_map)} CCA fee codes")
|
||||
return fee_map
|
||||
except Exception as e:
|
||||
print(f"[CCA Claim] Could not load CCA fee schedule: {e}")
|
||||
return {}
|
||||
|
||||
def _get_patient_age(self):
|
||||
if not self.dateOfBirth:
|
||||
return None
|
||||
try:
|
||||
parts = self.dateOfBirth.split("-")
|
||||
dob = date(int(parts[0]), int(parts[1]), int(parts[2]))
|
||||
today = date.today()
|
||||
return today.year - dob.year - ((today.month, today.day) < (dob.month, dob.day))
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def _get_fee(self, code, fee_map, age):
|
||||
row = fee_map.get(str(code).strip().upper(), {})
|
||||
if not row:
|
||||
return ""
|
||||
if "Price" in row:
|
||||
val = str(row["Price"])
|
||||
elif age is not None and age <= 21:
|
||||
val = str(row.get("PriceLTEQ21") or row.get("PriceGT21") or "")
|
||||
else:
|
||||
val = str(row.get("PriceGT21") or row.get("PriceLTEQ21") or "")
|
||||
return "" if val in ("NC", "IC", "None", "") else val
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# EJ Grid cell fill helper #
|
||||
# ------------------------------------------------------------------ #
|
||||
def _build_col_map(self):
|
||||
"""
|
||||
Map column names to 1-based td positions using ej-mappingname divs.
|
||||
Falls back to hardcoded positions based on the known PDF column order.
|
||||
"""
|
||||
# Hardcoded based on PDF column order:
|
||||
# rownum(1), Code(2), Desc(3), Tooth(4),
|
||||
# Surf1-5(5-9), OralCavity1-4(10-13), DiagPtr1-4(14-17),
|
||||
# EPSDT(18), Qty(19), Auth(20), ServiceDate(21), BilledAmt(22)
|
||||
col_map = {
|
||||
"CODE": 2,
|
||||
"TOOTH": 4,
|
||||
"SURF1": 5, "S1": 5,
|
||||
"SURF2": 6, "S2": 6,
|
||||
"SURF3": 7, "S3": 7,
|
||||
"SURF4": 8, "S4": 8,
|
||||
"SURF5": 9, "S5": 9,
|
||||
"QUANTITY": 19, "QTY": 19,
|
||||
"DOS": 21, "SERVICE DATE": 21, "SERVICEDATE": 21,
|
||||
"BILLED_AMOUNT": 22, "BILLED AMT": 22, "BILLED A": 22,
|
||||
}
|
||||
|
||||
# Try to confirm/override positions using ej-mappingname divs in header
|
||||
try:
|
||||
divs = self.driver.find_elements(
|
||||
By.XPATH,
|
||||
"//div[@id='Services']//div[@ej-mappingname] | "
|
||||
"//div[@id='ServicesGrid']//div[@ej-mappingname]"
|
||||
)
|
||||
print(f"[CCA Claim grid] Found {len(divs)} ej-mappingname header divs")
|
||||
for div in divs:
|
||||
mapping = div.get_attribute("ej-mappingname") or ""
|
||||
text = div.text.strip()
|
||||
print(f"[CCA Claim grid] Header: mapping={mapping!r} text={text!r}")
|
||||
except Exception as e:
|
||||
print(f"[CCA Claim grid] Header scan error: {e}")
|
||||
|
||||
return col_map
|
||||
|
||||
def _dbl_click_col(self, row_num, col_idx):
|
||||
"""Double-click the cell at (row_num, col_idx) — both 1-based.
|
||||
row_num is 1-based (first data row = 1), col_idx is 1-based td position."""
|
||||
from selenium.webdriver.common.action_chains import ActionChains
|
||||
row_idx = row_num - 1 # EJ Grid aria-rowindex is 0-based
|
||||
for xpath in [
|
||||
# aria-rowindex is most reliable in EJ Grid
|
||||
f"//tr[@aria-rowindex='{row_idx}']/td[{col_idx}]",
|
||||
# Scoped to Services grid, position-based
|
||||
f"(//div[@id='Services']//tr[.//td[contains(@class,'e-rowcell')]])[{row_num}]/td[{col_idx}]",
|
||||
f"(//div[@id='Services']//tbody//tr)[{row_num}]/td[{col_idx}]",
|
||||
f"(//tr[contains(@class,'e-row')])[{row_num}]/td[{col_idx}]",
|
||||
]:
|
||||
try:
|
||||
cell = self.driver.find_element(By.XPATH, xpath)
|
||||
self.driver.execute_script(
|
||||
"arguments[0].scrollIntoView({block:'nearest',inline:'nearest'});", cell)
|
||||
time.sleep(0.2)
|
||||
ActionChains(self.driver).double_click(cell).perform()
|
||||
time.sleep(0.5)
|
||||
visible_inputs = self.driver.execute_script(
|
||||
"return Array.from(document.querySelectorAll('input[id]'))"
|
||||
".filter(function(i){return i.offsetParent!==null})"
|
||||
".map(function(i){return i.id})")
|
||||
print(f"[CCA Claim grid] Dbl-clicked row={row_num} col={col_idx}, "
|
||||
f"visible inputs: {visible_inputs[:8]}")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"[CCA Claim grid] dbl_click_col({row_num},{col_idx}) failed: {e}")
|
||||
continue
|
||||
return False
|
||||
|
||||
def _fill_active_input(self, input_id, value):
|
||||
"""Fill the input that appeared after double-clicking a cell."""
|
||||
try:
|
||||
field = WebDriverWait(self.driver, 5).until(
|
||||
EC.presence_of_element_located((By.ID, input_id))
|
||||
)
|
||||
self.driver.execute_script(
|
||||
"arguments[0].scrollIntoView({block:'nearest',inline:'nearest'});", field)
|
||||
field.click()
|
||||
field.send_keys(Keys.CONTROL + "a")
|
||||
field.send_keys(Keys.DELETE)
|
||||
if value:
|
||||
field.send_keys(str(value))
|
||||
time.sleep(0.2)
|
||||
print(f"[CCA Claim grid] Filled {input_id}={value!r}")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"[CCA Claim grid] Could not fill {input_id}: {e}")
|
||||
return False
|
||||
|
||||
def _fill_cell(self, input_id, value, tab_after=True):
|
||||
"""Legacy helper kept for compatibility."""
|
||||
return self._fill_active_input(input_id, value)
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Step 3 — Fill Services grid, attach docs, submit #
|
||||
# ------------------------------------------------------------------ #
|
||||
def step3_fill_services_and_submit(self):
|
||||
"""
|
||||
1. Expand Services panel and add one grid row per service line.
|
||||
Per row: CODE → TOOTH → SURF1-5 → QTY(1) → DOS → BILLED_AMOUNT
|
||||
2. Expand Attached Documents, upload files if present.
|
||||
3. Click Submit Claim.
|
||||
"""
|
||||
try:
|
||||
fee_map = self._load_cca_fee_schedule()
|
||||
age = self._get_patient_age()
|
||||
formatted_dos = self._format_dob(self.serviceDate) if self.serviceDate else ""
|
||||
|
||||
active_lines = [
|
||||
ln for ln in self.serviceLines
|
||||
if str(ln.get("procedureCode") or "").strip()
|
||||
]
|
||||
print(f"[CCA Claim step3] {len(active_lines)} active service line(s)")
|
||||
|
||||
# Services panel is already open on the claim entry page
|
||||
# Build column position map from grid headers
|
||||
col_map = self._build_col_map()
|
||||
print(f"[CCA Claim step3] Column map: {col_map}")
|
||||
|
||||
# ---- Add each service line ----
|
||||
for idx, line in enumerate(active_lines):
|
||||
code = str(line.get("procedureCode") or "").strip()
|
||||
tooth = str(line.get("toothNumber") or "").strip()
|
||||
surface_raw = str(line.get("toothSurface") or "").strip()
|
||||
surface_chars = [c for c in surface_raw.upper() if c.isalpha()]
|
||||
|
||||
# Determine billed amount: prefer value already on the line
|
||||
total_billed = line.get("totalBilled") or line.get("fee") or line.get("billedAmount")
|
||||
if total_billed:
|
||||
billed_str = str(total_billed)
|
||||
else:
|
||||
billed_str = self._get_fee(code, fee_map, age)
|
||||
|
||||
print(f"[CCA Claim step3] Line {idx+1}: code={code}, tooth={tooth}, "
|
||||
f"surf={surface_raw!r}, billed={billed_str}")
|
||||
|
||||
# --- Activate inline edit for row (idx) ---
|
||||
row_idx = idx # 0-based for EJ Grid API
|
||||
row_num = idx + 1 # 1-based for XPath
|
||||
cell_clicked = False
|
||||
|
||||
# --- Single-click cell to activate (yellow), then fill input ---
|
||||
def click_cell_and_fill(col_expr, input_id, value):
|
||||
"""
|
||||
Click the cell at (row_num, col_expr) to activate it (turns yellow),
|
||||
then fill the pre-rendered input by ID.
|
||||
col_expr can be a number like 2 or an XPath expression like 'last()' or 'last()-1'.
|
||||
"""
|
||||
# Find the cell in the grid CONTENT rows (not headers)
|
||||
cell = None
|
||||
for xpath in [
|
||||
f"(//div[contains(@class,'e-gridcontent')]//tr)[{row_num}]/td[{col_expr}]",
|
||||
f"(//div[contains(@class,'e-content')]//tr)[{row_num}]/td[{col_expr}]",
|
||||
f"(//div[@id='Services']//div[contains(@class,'e-content')]//tr)[{row_num}]/td[{col_expr}]",
|
||||
f"(//tr[@aria-rowindex='{row_num-1}'])/td[{col_expr}]",
|
||||
]:
|
||||
try:
|
||||
cell = self.driver.find_element(By.XPATH, xpath)
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if cell:
|
||||
try:
|
||||
self.driver.execute_script(
|
||||
"arguments[0].scrollIntoView({block:'nearest',inline:'nearest'});",
|
||||
cell)
|
||||
cell.click()
|
||||
time.sleep(0.3)
|
||||
print(f"[CCA Claim grid] Clicked row={row_num} col={col_idx}")
|
||||
except Exception as e:
|
||||
print(f"[CCA Claim grid] Cell click failed: {e}")
|
||||
else:
|
||||
print(f"[CCA Claim grid] Cell not found row={row_num} col={col_idx}")
|
||||
|
||||
# Fill the pre-rendered input
|
||||
try:
|
||||
field = WebDriverWait(self.driver, 4).until(
|
||||
EC.presence_of_element_located((By.ID, input_id)))
|
||||
self.driver.execute_script(
|
||||
"arguments[0].scrollIntoView({block:'nearest',inline:'nearest'});",
|
||||
field)
|
||||
field.click()
|
||||
field.send_keys(Keys.CONTROL + "a")
|
||||
field.send_keys(Keys.DELETE)
|
||||
if value:
|
||||
field.send_keys(str(value))
|
||||
time.sleep(0.2)
|
||||
print(f"[CCA Claim grid] Filled {input_id}={value!r}")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"[CCA Claim grid] Fill {input_id} failed: {e}")
|
||||
return False
|
||||
|
||||
# CODE (col 2)
|
||||
click_cell_and_fill(2, "ServicesCODE", code)
|
||||
|
||||
# TOOTH (col 4) — skip if empty
|
||||
if tooth:
|
||||
click_cell_and_fill(4, "ServicesTOOTH", tooth)
|
||||
|
||||
# Surfaces (col 5-9) — skip if empty
|
||||
if surface_chars:
|
||||
for si, char in enumerate(surface_chars[:5]):
|
||||
surf_ids = ["ServicesSURF1","ServicesSURF2","ServicesSURF3",
|
||||
"ServicesSURF4","ServicesSURF5"]
|
||||
click_cell_and_fill(5 + si, surf_ids[si], char)
|
||||
|
||||
# Billed Amount — last column
|
||||
# QTY and Service Date auto-fill after CODE is entered and this cell is clicked
|
||||
click_cell_and_fill("last()", "ServicesBILLED_AMOUNT", billed_str)
|
||||
|
||||
time.sleep(0.3)
|
||||
|
||||
# ---- Attached Documents ----
|
||||
if self.uploadFiles:
|
||||
print(f"[CCA Claim step3] Attaching {len(self.uploadFiles)} file(s)...")
|
||||
|
||||
# Expand Attached Documents panel if collapsed
|
||||
for sel in [
|
||||
(By.XPATH, "//h4[contains(@class,'panel-title') and contains(.,'Attached Documents')]"),
|
||||
(By.XPATH, "//*[contains(@class,'panel-title') and contains(.,'Attached Documents')]"),
|
||||
]:
|
||||
try:
|
||||
el = WebDriverWait(self.driver, 6).until(EC.element_to_be_clickable(sel))
|
||||
try:
|
||||
chevron = el.find_element(By.XPATH, ".//*[contains(@class,'fa-chevron-down')]")
|
||||
if chevron.is_displayed():
|
||||
el.click()
|
||||
time.sleep(0.8)
|
||||
print("[CCA Claim step3] Attached Documents panel expanded")
|
||||
except Exception:
|
||||
pass
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
# Write files preserving the original filename so the portal sees the correct name
|
||||
tmp_dir = tempfile.mkdtemp()
|
||||
temp_paths = []
|
||||
for f in self.uploadFiles:
|
||||
try:
|
||||
original_name = f.get("originalname", "attachment.pdf")
|
||||
# Sanitise filename — keep extension, replace unsafe chars
|
||||
safe_name = re.sub(r'[^\w.\-]', '_', original_name)
|
||||
tmp_path = os.path.join(tmp_dir, safe_name)
|
||||
with open(tmp_path, "wb") as fh:
|
||||
fh.write(base64.b64decode(f["bufferBase64"]))
|
||||
temp_paths.append(tmp_path)
|
||||
print(f"[CCA Claim step3] Wrote attachment: {tmp_path}")
|
||||
except Exception as fe:
|
||||
print(f"[CCA Claim step3] Failed to write attachment: {fe}")
|
||||
|
||||
if temp_paths:
|
||||
# Make the hidden file input interactable and send paths
|
||||
file_input_found = False
|
||||
for sel in [
|
||||
(By.XPATH, "//input[@type='file']"),
|
||||
(By.XPATH, "//button[@id='AttachDocumentButton']/preceding::input[@type='file'][1]"),
|
||||
(By.XPATH, "//button[@id='AttachDocumentButton']/following::input[@type='file'][1]"),
|
||||
]:
|
||||
try:
|
||||
file_input = self.driver.find_element(*sel)
|
||||
self.driver.execute_script(
|
||||
"arguments[0].style.display='block'; "
|
||||
"arguments[0].style.visibility='visible'; "
|
||||
"arguments[0].style.opacity='1';",
|
||||
file_input
|
||||
)
|
||||
file_input.send_keys("\n".join(temp_paths))
|
||||
file_input_found = True
|
||||
print(f"[CCA Claim step3] Files sent to input, waiting for upload...")
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if not file_input_found:
|
||||
try:
|
||||
btn = WebDriverWait(self.driver, 6).until(
|
||||
EC.element_to_be_clickable((By.ID, "AttachDocumentButton")))
|
||||
btn.click()
|
||||
print("[CCA Claim step3] Clicked AttachDocumentButton")
|
||||
except Exception:
|
||||
print("[CCA Claim step3] AttachDocumentButton not found")
|
||||
|
||||
# Wait for the attachment to appear in the Attached Documents section
|
||||
print("[CCA Claim step3] Waiting for attachment to upload...")
|
||||
try:
|
||||
# The panel shows file names once uploaded; wait for any filename to appear
|
||||
WebDriverWait(self.driver, 30).until(
|
||||
lambda d: any(
|
||||
re.sub(r'[^\w.\-]', '_', f.get("originalname", ""))[:10].lower()
|
||||
in d.find_element(By.TAG_NAME, "body").text.lower()
|
||||
for f in self.uploadFiles
|
||||
) or "Attached Documents (1)" in d.find_element(By.TAG_NAME, "body").text
|
||||
or "Attached Documents (2)" in d.find_element(By.TAG_NAME, "body").text
|
||||
)
|
||||
print("[CCA Claim step3] Attachment confirmed on page")
|
||||
except TimeoutException:
|
||||
print("[CCA Claim step3] Attachment upload wait timed out — proceeding")
|
||||
|
||||
# Clean up temp files
|
||||
try:
|
||||
import shutil
|
||||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# ---- Submit Claim ----
|
||||
print("[CCA Claim step3] Clicking 'Submit Claim'...")
|
||||
submitted = False
|
||||
for sel in [
|
||||
(By.ID, "SaveClaimButton"),
|
||||
(By.XPATH, "//button[@id='SaveClaimButton']"),
|
||||
(By.XPATH, "//button[@ng-click and contains(@ng-click,'SubmitClaim')]"),
|
||||
(By.XPATH, "//button[contains(text(),'Submit Claim')]"),
|
||||
]:
|
||||
try:
|
||||
btn = WebDriverWait(self.driver, 8).until(EC.element_to_be_clickable(sel))
|
||||
btn.click()
|
||||
submitted = True
|
||||
print(f"[CCA Claim step3] Clicked Submit Claim via {sel}")
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if not submitted:
|
||||
return "ERROR: Submit Claim button not found or not clickable"
|
||||
|
||||
# ---- Handle confirmation modal popup ----
|
||||
print("[CCA Claim step3] Waiting for confirmation modal...")
|
||||
time.sleep(1.5)
|
||||
modal_confirmed = False
|
||||
for sel in [
|
||||
# Most specific: Submit/Confirm/Yes button inside an active modal
|
||||
(By.XPATH, "//div[contains(@class,'modal') and contains(@class,'in')]//button[contains(text(),'Submit') or contains(text(),'Confirm') or contains(text(),'Yes')]"),
|
||||
(By.XPATH, "//div[@class='modal-footer']//button[contains(text(),'Submit') or contains(text(),'Confirm') or contains(text(),'Yes')]"),
|
||||
(By.XPATH, "//button[@ng-click and (contains(@ng-click,'confirm') or contains(@ng-click,'Confirm') or contains(@ng-click,'submit') or contains(@ng-click,'Submit'))]"),
|
||||
(By.XPATH, "//div[contains(@class,'modal')]//button[not(contains(@class,'cancel')) and not(contains(text(),'Cancel')) and not(contains(text(),'No'))]"),
|
||||
]:
|
||||
try:
|
||||
btn = WebDriverWait(self.driver, 5).until(EC.element_to_be_clickable(sel))
|
||||
btn.click()
|
||||
modal_confirmed = True
|
||||
print(f"[CCA Claim step3] Clicked modal confirm via {sel}")
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if not modal_confirmed:
|
||||
print("[CCA Claim step3] No confirmation modal found — may have auto-submitted")
|
||||
|
||||
# Wait for final confirmation page
|
||||
print("[CCA Claim step3] Waiting for submission confirmation...")
|
||||
try:
|
||||
WebDriverWait(self.driver, 30).until(
|
||||
lambda d: any(x in d.find_element(By.TAG_NAME, "body").text.lower() for x in [
|
||||
"claim submitted", "claim number", "confirmation", "success",
|
||||
"has been submitted", "claim id", "assigned the number",
|
||||
])
|
||||
)
|
||||
print("[CCA Claim step3] Claim submission confirmed")
|
||||
except TimeoutException:
|
||||
print(f"[CCA Claim step3] Confirmation not detected — URL: {self.driver.current_url}")
|
||||
|
||||
# Navigate to Claims Dashboard to get Encounter ID and save as PDF
|
||||
print("[CCA Claim step3] Navigating to Claims Dashboard...")
|
||||
time.sleep(2)
|
||||
self.driver.get("https://pwp.sciondental.com/PWP/Dental/ClaimDashboard")
|
||||
try:
|
||||
WebDriverWait(self.driver, 20).until(
|
||||
lambda d: "Dashboard" in d.find_element(By.TAG_NAME, "body").text
|
||||
or "Encounter" in d.find_element(By.TAG_NAME, "body").text
|
||||
)
|
||||
print(f"[CCA Claim step3] Dashboard loaded — URL: {self.driver.current_url}")
|
||||
except TimeoutException:
|
||||
print("[CCA Claim step3] Dashboard load timed out")
|
||||
|
||||
time.sleep(2)
|
||||
return "SUCCESS"
|
||||
|
||||
except Exception as e:
|
||||
print(f"[CCA Claim step3] Exception: {e}")
|
||||
return f"ERROR: step3 failed: {e}"
|
||||
@@ -131,10 +131,10 @@ class AutomationCCAEligibilityCheck:
|
||||
try:
|
||||
WebDriverWait(self.driver, 10).until(
|
||||
lambda d: "sciondental.com" in d.current_url
|
||||
and d.execute_script("return document.readyState") == "complete"
|
||||
)
|
||||
except TimeoutException:
|
||||
pass
|
||||
time.sleep(2)
|
||||
|
||||
current_url = self.driver.current_url
|
||||
print(f"[CCA login] After landing nav URL: {current_url}")
|
||||
@@ -147,14 +147,13 @@ class AutomationCCAEligibilityCheck:
|
||||
print("[CCA login] Session not valid, navigating to login page...")
|
||||
self.driver.get(url)
|
||||
|
||||
# Wait up to 15s for ANY input to appear, then snapshot the page
|
||||
# Wait for login inputs to appear
|
||||
try:
|
||||
WebDriverWait(self.driver, 15).until(
|
||||
EC.presence_of_element_located((By.XPATH, "//input"))
|
||||
)
|
||||
except TimeoutException:
|
||||
pass
|
||||
time.sleep(1)
|
||||
|
||||
current_url = self.driver.current_url
|
||||
print(f"[CCA login] After login nav URL: {current_url}")
|
||||
|
||||
Reference in New Issue
Block a user