feat: Tufts SCO claim automation, Claim All button, fee schedule updates

- Add full Tufts SCO claim Selenium worker (steps 1-8): login with OTP
  support, member search, Create Claim, fill form, attach files, submit,
  extract claim number and save confirmation PDF
- Fix DentaQuest browser manager to preserve device trust token on startup
  (only clear cookies, not LocalStorage/IndexedDB) so OTP is only needed
  once for both eligibility and Tufts claim
- Fix Tufts SCO claim route credential lookup key (TUFTS_SCO not TuftsSCO)
- Add Tufts SCO and United/DentalHub entries to fee schedule update route
- Add "Claim All" button that auto-routes to the correct claim handler
  based on the Insurance Type dropdown value
- Add fee schedule JSON files for DDMA, Tufts SCO, and United/DentalHub

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Gitead
2026-05-25 20:22:26 -04:00
parent 1e581c193c
commit 3534ecb3c9
11 changed files with 5188 additions and 88 deletions

View File

@@ -0,0 +1,145 @@
/**
* Processor for "tuftssco-claim-submit" jobs.
* Opens a claim on the Tufts SCO (DentaQuest) provider portal via Selenium.
*
* Flow:
* 1. POST /tuftssco-claim to Python agent → get session_id
* 2. Emit selenium:tuftssco_claim_started to frontend
* 3. Poll until completed/error
* 4. Emit result
*/
import {
forwardToSeleniumTuftsSCOClaimAgent,
getSeleniumTuftsSCOClaimSessionStatus,
} from "../../services/seleniumTuftsSCOClaimClient";
import { io } from "../../socket";
import { storage } from "../../storage";
function log(tag: string, msg: string, ctx?: any) {
console.log(`${new Date().toISOString()} [${tag}] ${msg}`, ctx ?? "");
}
function emitToSocket(socketId: string | undefined, event: string, payload: any) {
if (!socketId || !io) return;
try {
const socket = io.sockets.sockets.get(socketId);
if (socket) socket.emit(event, payload);
} catch (_) {}
}
async function pollUntilDone(
sessionId: string,
socketId: string | undefined,
jobId: string,
pollTimeoutMs = 10 * 60 * 1000
): Promise<any> {
const maxAttempts = 1200;
const pollIntervalMs = 500;
const maxTransientErrors = 12;
let transientErrors = 0;
let lastOtpEmit = 0;
const deadline = Date.now() + pollTimeoutMs;
for (let attempt = 0; attempt < maxAttempts; attempt++) {
if (Date.now() > deadline) {
throw new Error(`Tufts SCO claim polling timeout for session ${sessionId}`);
}
try {
const st = await getSeleniumTuftsSCOClaimSessionStatus(sessionId);
const status: string = st?.status ?? "unknown";
log("tuftssco-claim-processor", `poll attempt=${attempt}`, { sessionId, status });
transientErrors = 0;
if (status === "waiting_for_otp") {
if (Date.now() - lastOtpEmit > 5000) {
emitToSocket(socketId, "selenium:otp_required", {
session_id: sessionId,
jobId,
message: "OTP required. Please enter the OTP shown by the Tufts SCO portal.",
});
lastOtpEmit = Date.now();
}
await new Promise((r) => setTimeout(r, pollIntervalMs));
continue;
}
if (status === "completed") return st.result;
if (status === "error" || status === "not_found") {
throw new Error(st?.message || `Tufts SCO claim session ended with status: ${status}`);
}
await new Promise((r) => setTimeout(r, pollIntervalMs));
} catch (err: any) {
const isTerminal =
err?.response?.status === 404 ||
(typeof err?.message === "string" &&
(err.message.includes("not_found") || err.message.includes("polling timeout")));
if (isTerminal) throw err;
transientErrors++;
if (transientErrors > maxTransientErrors) {
throw new Error(`Too many transient errors polling Tufts SCO claim session ${sessionId}`);
}
const backoff = Math.min(30_000, 500 * Math.pow(2, transientErrors - 1));
await new Promise((r) => setTimeout(r, backoff));
}
}
throw new Error(`Tufts SCO claim polling exhausted all attempts for session ${sessionId}`);
}
export interface TuftsSCOClaimProcessorInput {
enrichedPayload: any;
userId: number;
claimId?: number;
socketId?: string;
}
export async function runTuftsSCOClaimProcessor(
input: TuftsSCOClaimProcessorInput,
jobId: string
): Promise<{ status: string; pdf_url?: string; claimNumber?: string }> {
const { enrichedPayload, userId, claimId, socketId } = input;
log("tuftssco-claim-processor", "starting Python agent session", { claimId });
const agentResp = await forwardToSeleniumTuftsSCOClaimAgent(enrichedPayload);
if (!agentResp?.session_id) {
throw new Error("Python agent did not return a session_id for Tufts SCO claim");
}
const sessionId = agentResp.session_id as string;
log("tuftssco-claim-processor", "got session_id", { sessionId });
emitToSocket(socketId, "selenium:tuftssco_claim_started", { session_id: sessionId, jobId });
const seleniumResult = await pollUntilDone(sessionId, socketId, jobId);
if (!seleniumResult || seleniumResult.status === "error") {
throw new Error(seleniumResult?.message ?? "Tufts SCO claim session returned an error");
}
const claimNumber: string | undefined = seleniumResult.claimNumber ?? undefined;
const pdf_url: string | undefined = seleniumResult.pdf_url ?? undefined;
if (claimId) {
try {
const updates: Record<string, any> = { status: "REVIEW" };
if (claimNumber) updates.claimNumber = claimNumber;
await storage.updateClaim(claimId, updates);
log("tuftssco-claim-processor", "claim record updated", { claimId, claimNumber });
} catch (e) {
log("tuftssco-claim-processor", "failed to update claim record (non-fatal)", { error: e });
}
}
emitToSocket(socketId, "selenium:tuftssco_claim_completed", {
jobId,
claimId,
claimNumber,
pdf_url,
message: claimNumber
? `Tufts SCO claim submitted — Claim #: ${claimNumber}`
: (seleniumResult?.message ?? "Tufts SCO claim submitted successfully"),
});
log("tuftssco-claim-processor", "done", { claimId, claimNumber });
return { status: "success", pdf_url, claimNumber };
}

View File

@@ -9,6 +9,10 @@ const SCHEDULE_FILES: Record<string, string> = {
MASSHEALTH: "procedureCodesMH.json", MASSHEALTH: "procedureCodesMH.json",
CCA: "procedureCodesCCA.json", CCA: "procedureCodesCCA.json",
DDMA: "procedureCodesDDMA.json", DDMA: "procedureCodesDDMA.json",
TUFTSSCO: "procedureCodesTuftsSCO.json",
TUFTS_SCO: "procedureCodesTuftsSCO.json",
UNITEDDH: "procedureCodesUnitedDH.json",
UNITED_SCO: "procedureCodesUnitedDH.json",
}; };
function getSchedulePath(siteKey: string): string | null { function getSchedulePath(siteKey: string): string | null {

View File

@@ -0,0 +1,94 @@
import { Router, Request, Response } from "express";
import { storage } from "../storage";
import { enqueueSeleniumJob } from "../queue/jobRunner";
import { forwardOtpToSeleniumTuftsSCOClaimAgent } from "../services/seleniumTuftsSCOClaimClient";
import { io } from "../socket";
const router = Router();
/**
* POST /tuftssco-claim
*
* Enqueues a Tufts SCO (DentaQuest) claim submission job.
*
* Body fields (JSON):
* data — claim payload (memberId, dateOfBirth, serviceDate, serviceLines, patientName, etc.)
* socketId — socket.io client id
* claimId — existing claim DB id (optional)
*
* Response: { status: "queued", jobId: "…" }
*/
router.post("/tuftssco-claim", async (req: Request, res: Response): Promise<any> => {
if (!req.user?.id) {
return res.status(401).json({ error: "Unauthorized: user info missing" });
}
try {
const claimData =
typeof req.body.data === "string"
? JSON.parse(req.body.data)
: req.body.data ?? req.body ?? {};
// Fetch Tufts SCO (DentaQuest) credentials
const credentials = await storage.getInsuranceCredentialByUserAndSiteKey(
req.user.id,
"TUFTS_SCO"
);
if (!credentials) {
return res.status(404).json({
error: "No Tufts SCO credentials found. Please add them on the Settings page.",
});
}
const enrichedPayload = {
claim: {
...claimData,
dentaquestUsername: credentials.username,
dentaquestPassword: credentials.password,
},
};
const socketId: string | undefined = req.body.socketId;
const claimId: number | undefined = claimData.claimId
? Number(claimData.claimId)
: undefined;
const jobId = enqueueSeleniumJob({
jobType: "tuftssco-claim-submit",
userId: req.user.id,
socketId,
enrichedPayload,
claimId,
});
return res.json({ status: "queued", jobId });
} catch (err: any) {
console.error("[tuftssco-claim route] error:", err);
return res.status(500).json({
error: err.message || "Failed to enqueue Tufts SCO claim job",
});
}
});
/**
* POST /claims/tuftssco-claim/selenium/submit-otp
* Body: { session_id, otp, socketId? }
*/
router.post("/tuftssco-claim/selenium/submit-otp", async (req: Request, res: Response): Promise<any> => {
const { session_id: sessionId, otp, socketId } = req.body;
if (!sessionId || !otp) {
return res.status(400).json({ error: "session_id and otp are required" });
}
try {
const r = await forwardOtpToSeleniumTuftsSCOClaimAgent(sessionId, otp);
if (socketId && io) {
io.to(socketId).emit("selenium:otp_submitted", { session_id: sessionId });
}
return res.json(r);
} catch (err: any) {
console.error("[tuftssco-claim] submit-otp failed:", err?.message);
return res.status(500).json({ error: err?.message || "Failed to forward OTP" });
}
});
export default router;

View File

@@ -0,0 +1,35 @@
import axios from "axios";
const SELENIUM_BASE = process.env.SELENIUM_SERVICE_URL ?? "http://localhost:5002";
/**
* POST /tuftssco-claim
* Returns { status: "started", session_id: "<uuid>" }
*/
export async function forwardToSeleniumTuftsSCOClaimAgent(
data: Record<string, any>
): Promise<{ status: string; session_id: string }> {
const resp = await axios.post(`${SELENIUM_BASE}/tuftssco-claim`, data);
return resp.data;
}
/**
* GET /session/{sid}/status
*/
export async function getSeleniumTuftsSCOClaimSessionStatus(
sessionId: string
): Promise<Record<string, any>> {
const resp = await axios.get(`${SELENIUM_BASE}/session/${sessionId}/status`);
return resp.data;
}
/**
* POST /submit-otp
*/
export async function forwardOtpToSeleniumTuftsSCOClaimAgent(
sessionId: string,
otp: string
): Promise<Record<string, any>> {
const resp = await axios.post(`${SELENIUM_BASE}/submit-otp`, { session_id: sessionId, otp });
return resp.data;
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1202,16 +1202,26 @@ export function ClaimForm({
? npiProviders.find((p) => p.npiNumber === npiProvider.npiNumber)?.id ?? null ? npiProviders.find((p) => p.npiNumber === npiProvider.npiNumber)?.id ?? null
: null; : null;
const createdClaim = await onSubmit({ let createdClaim: any;
...formToCreateClaim, try {
serviceLines: filteredServiceLines, createdClaim = await onSubmit({
staffId: appointmentStaffId ?? Number(staff?.id), ...formToCreateClaim,
patientId, serviceLines: filteredServiceLines,
insuranceProvider: "Tufts SCO", staffId: appointmentStaffId ?? Number(staff?.id),
appointmentId: appointmentIdToUse!, patientId,
claimFiles: claimFilesMeta, insuranceProvider: "Tufts SCO",
...(selectedNpiProviderId ? { npiProviderId: selectedNpiProviderId } : {}), appointmentId: appointmentIdToUse!,
}); claimFiles: claimFilesMeta,
...(selectedNpiProviderId ? { npiProviderId: selectedNpiProviderId } : {}),
});
} catch (err: any) {
toast({
title: "Failed to save claim",
description: err?.message || "An error occurred saving the claim.",
variant: "destructive",
});
return;
}
onHandleForTuftsSCOSeleniumClaim({ onHandleForTuftsSCOSeleniumClaim({
...form, ...form,
@@ -1228,6 +1238,27 @@ export function ClaimForm({
onClose(); onClose();
}; };
const handleClaimAll = () => {
const siteKey = (form.insuranceSiteKey || deriveInsuranceSiteKey(form.insuranceProvider || "")).toUpperCase();
if (siteKey === "MH" || siteKey === "MASSHEALTH") {
runWithPriceCheck(() => handleMHSubmit());
} else if (siteKey === "CCA") {
runWithPriceCheck(handleCCAClaim);
} else if (siteKey === "DDMA") {
runWithPriceCheck(handleDDMAClaim);
} else if (siteKey === "TUFTSSCO" || siteKey === "TUFTS_SCO") {
runWithPriceCheck(handleTuftsSCOClaim);
} else if (siteKey === "UNITEDSCO" || siteKey === "UNITEDDH" || siteKey === "UNITED_SCO") {
runWithPriceCheck(handleUnitedDHClaim);
} else {
toast({
title: "No automated claim for this insurance",
description: `Insurance type "${form.insuranceSiteKey || "unknown"}" does not have an automated claim. Please use one of the buttons below.`,
variant: "destructive",
});
}
};
const handleCCAPreAuth = async () => { const handleCCAPreAuth = async () => {
const missingFields: string[] = []; const missingFields: string[] = [];
if (!form.memberId?.trim()) missingFields.push("Member ID"); if (!form.memberId?.trim()) missingFields.push("Member ID");
@@ -2146,6 +2177,15 @@ export function ClaimForm({
</Button> </Button>
</div> </div>
) : ( ) : (
<>
<div className="flex justify-center mb-3">
<Button
className="w-36 bg-slate-800 hover:bg-slate-900 text-white font-semibold"
onClick={handleClaimAll}
>
Claim All
</Button>
</div>
<div className="flex flex-wrap gap-2 justify-center"> <div className="flex flex-wrap gap-2 justify-center">
<Button <Button
className="w-32 bg-blue-600 hover:bg-blue-700 text-white" className="w-32 bg-blue-600 hover:bg-blue-700 text-white"
@@ -2184,6 +2224,7 @@ export function ClaimForm({
Claim Saved Claim Saved
</Button> </Button>
</div> </div>
</>
)} )}
</div> </div>
</div> </div>

View File

@@ -43,99 +43,43 @@ class DentaQuestBrowserManager:
def clear_session_on_startup(self): def clear_session_on_startup(self):
""" """
Clear session cookies from Chrome profile on startup. Clear only login cookies on startup to force credential re-entry after restart.
This forces a fresh login after PC restart. NEVER clears Local Storage or IndexedDB — those hold the DentaQuest device trust
Preserves device trust tokens (LocalStorage, IndexedDB) to avoid OTPs. token that allows the portal to skip OTP for recognised devices.
""" """
print("[DentaQuest BrowserManager] Clearing session on startup...") print("[DentaQuest BrowserManager] Clearing login cookies on startup (preserving device trust)...")
try: try:
# Clear the credentials tracking file # Reset credentials tracking so the next login re-saves the hash
if os.path.exists(self._credentials_file): if os.path.exists(self._credentials_file):
os.remove(self._credentials_file) os.remove(self._credentials_file)
print("[DentaQuest BrowserManager] Cleared credentials tracking file") print("[DentaQuest BrowserManager] Cleared credentials tracking file")
# Clear session-related files from Chrome profile # Only remove cookie / login-data files — these expire the session so the
# These are the files that store login session cookies # user must re-enter credentials, but the device trust token is untouched.
session_files = [ session_files = [
"Cookies", "Cookies",
"Cookies-journal", "Cookies-journal",
"Login Data", "Login Data",
"Login Data-journal", "Login Data-journal",
"Web Data",
"Web Data-journal",
] ]
for filename in session_files: for filename in session_files:
filepath = os.path.join(self.profile_dir, "Default", filename) for base in [os.path.join(self.profile_dir, "Default"), self.profile_dir]:
if os.path.exists(filepath): filepath = os.path.join(base, filename)
try: if os.path.exists(filepath):
os.remove(filepath) try:
print(f"[DentaQuest BrowserManager] Removed {filename}") os.remove(filepath)
except Exception as e: print(f"[DentaQuest BrowserManager] Removed {filename}")
print(f"[DentaQuest BrowserManager] Could not remove {filename}: {e}") except Exception as e:
print(f"[DentaQuest BrowserManager] Could not remove {filename}: {e}")
# Also try root level (some Chrome versions)
for filename in session_files:
filepath = os.path.join(self.profile_dir, filename)
if os.path.exists(filepath):
try:
os.remove(filepath)
print(f"[DentaQuest BrowserManager] Removed root {filename}")
except Exception as e:
print(f"[DentaQuest BrowserManager] Could not remove root {filename}: {e}")
# Clear Session Storage (contains login state)
session_storage_dir = os.path.join(self.profile_dir, "Default", "Session Storage")
if os.path.exists(session_storage_dir):
try:
shutil.rmtree(session_storage_dir)
print("[DentaQuest BrowserManager] Cleared Session Storage")
except Exception as e:
print(f"[DentaQuest BrowserManager] Could not clear Session Storage: {e}")
# Clear Local Storage (may contain auth tokens)
local_storage_dir = os.path.join(self.profile_dir, "Default", "Local Storage")
if os.path.exists(local_storage_dir):
try:
shutil.rmtree(local_storage_dir)
print("[DentaQuest BrowserManager] Cleared Local Storage")
except Exception as e:
print(f"[DentaQuest BrowserManager] Could not clear Local Storage: {e}")
# Clear IndexedDB (may contain auth tokens)
indexeddb_dir = os.path.join(self.profile_dir, "Default", "IndexedDB")
if os.path.exists(indexeddb_dir):
try:
shutil.rmtree(indexeddb_dir)
print("[DentaQuest BrowserManager] Cleared IndexedDB")
except Exception as e:
print(f"[DentaQuest BrowserManager] Could not clear IndexedDB: {e}")
# Clear browser caches (prevents Chrome crash from corrupted cache) # Local Storage, IndexedDB, and Session Storage are intentionally
cache_dirs = [ # NOT cleared — they contain the DentaQuest device trust token that
os.path.join(self.profile_dir, "Default", "Cache"), # prevents OTP from being required on every login.
os.path.join(self.profile_dir, "Default", "Code Cache"),
os.path.join(self.profile_dir, "Default", "GPUCache"),
os.path.join(self.profile_dir, "Default", "Service Worker"),
os.path.join(self.profile_dir, "Cache"),
os.path.join(self.profile_dir, "Code Cache"),
os.path.join(self.profile_dir, "GPUCache"),
os.path.join(self.profile_dir, "ShaderCache"),
]
for cache_dir in cache_dirs:
if os.path.exists(cache_dir):
try:
shutil.rmtree(cache_dir)
print(f"[DentaQuest BrowserManager] Cleared {os.path.basename(cache_dir)}")
except Exception as e:
print(f"[DentaQuest BrowserManager] Could not clear {os.path.basename(cache_dir)}: {e}")
# Set flag to clear session via JavaScript after browser opens
self._needs_session_clear = True self._needs_session_clear = True
print("[DentaQuest BrowserManager] Startup clear done — device trust preserved, OTP not required")
print("[DentaQuest BrowserManager] Session cleared - will require fresh login")
except Exception as e: except Exception as e:
print(f"[DentaQuest BrowserManager] Error clearing session: {e}") print(f"[DentaQuest BrowserManager] Error clearing session: {e}")

View File

@@ -0,0 +1,344 @@
import os
import time
import asyncio
from typing import Dict, Any
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import WebDriverException, TimeoutException
from selenium_TuftsSCO_claimSubmitWorker import AutomationTuftsSCOClaimSubmit
sessions: Dict[str, Dict[str, Any]] = {}
SESSION_OTP_TIMEOUT = int(os.getenv("SESSION_OTP_TIMEOUT", "120"))
def make_session_entry() -> str:
import uuid
sid = str(uuid.uuid4())
sessions[sid] = {
"status": "created",
"created_at": time.time(),
"last_activity": time.time(),
"bot": None,
"driver": None,
"otp_event": asyncio.Event(),
"otp_value": None,
"result": None,
"message": None,
}
return sid
async def cleanup_session(sid: str, message: str | None = None):
s = sessions.get(sid)
if not s:
return
try:
if s.get("status") not in ("completed", "error", "not_found"):
s["status"] = "error"
if message:
s["message"] = message
ev = s.get("otp_event")
if ev and not ev.is_set():
ev.set()
finally:
sessions.pop(sid, None)
print(f"[helpers_tuftssco_claim] cleaned session {sid}")
async def _remove_session_later(sid: str, delay: int = 20):
await asyncio.sleep(delay)
await cleanup_session(sid)
def _minimize_browser(bot):
try:
if bot and bot.driver:
try:
bot.driver.get("about:blank")
except Exception:
pass
try:
bot.driver.minimize_window()
print("[TuftsSCO Claim] Browser minimized after error")
return
except Exception:
pass
try:
bot.driver.set_window_position(-10000, -10000)
print("[TuftsSCO Claim] Browser moved off-screen after error")
except Exception:
pass
except Exception as e:
print(f"[TuftsSCO Claim] Could not hide browser: {e}")
async def start_tuftssco_claim_run(sid: str, data: dict, url: str):
"""
Run the Tufts SCO (DentaQuest) claim workflow.
Login/OTP handling mirrors helpers_ddma_claim.py.
Claim steps call selenium_TuftsSCO_claimSubmitWorker.
"""
s = sessions.get(sid)
if not s:
return {"status": "error", "message": "session not found"}
s["status"] = "running"
s["last_activity"] = time.time()
bot = None
try:
bot = AutomationTuftsSCOClaimSubmit(data)
bot.config_driver()
s["bot"] = bot
s["driver"] = bot.driver
s["last_activity"] = time.time()
try:
bot.driver.maximize_window()
bot.driver.get(url)
await asyncio.sleep(1)
except Exception as e:
s["status"] = "error"
s["message"] = f"Navigation failed: {e}"
await cleanup_session(sid)
return {"status": "error", "message": s["message"]}
# --- Login ---
try:
login_result = bot.login(url)
except WebDriverException as wde:
s["status"] = "error"
s["message"] = f"Selenium driver error during login: {wde}"
await cleanup_session(sid, s["message"])
return {"status": "error", "message": s["message"]}
except Exception as e:
s["status"] = "error"
s["message"] = f"Unexpected error during login: {e}"
await cleanup_session(sid, s["message"])
return {"status": "error", "message": s["message"]}
# ── Already logged in ────────────────────────────────────────────────
if isinstance(login_result, str) and login_result == "ALREADY_LOGGED_IN":
print("[TuftsSCO Claim] Session persisted — skipping OTP")
s["status"] = "running"
s["message"] = "Session persisted"
# ── OTP required ─────────────────────────────────────────────────────
elif isinstance(login_result, str) and login_result == "OTP_REQUIRED":
s["status"] = "waiting_for_otp"
s["message"] = "OTP required for login — please enter OTP"
s["last_activity"] = time.time()
driver = s["driver"]
max_polls = SESSION_OTP_TIMEOUT
login_success = False
print(f"[TuftsSCO Claim OTP] Polling for OTP completion (up to {SESSION_OTP_TIMEOUT}s)...")
for poll in range(max_polls):
await asyncio.sleep(1)
s["last_activity"] = time.time()
try:
# a) App submitted OTP via /submit-otp endpoint
otp_value = s.get("otp_value")
if otp_value:
print(f"[TuftsSCO Claim OTP poll {poll+1}] OTP received from app, typing...")
try:
otp_input = driver.find_element(By.XPATH,
"//input[contains(@name,'otp') or contains(@name,'code') or @type='tel' or "
"contains(@aria-label,'Verification') or contains(@placeholder,'code') or "
"contains(@placeholder,'Code')]"
)
otp_input.clear()
otp_input.send_keys(otp_value)
try:
verify_btn = driver.find_element(By.XPATH,
"//button[@type='button' and @aria-label='Verify']")
verify_btn.click()
except Exception:
try:
verify_btn = driver.find_element(By.XPATH,
"//button[contains(text(),'Verify') or contains(text(),'Submit') or @type='submit']")
verify_btn.click()
except Exception:
otp_input.send_keys("\n")
print("[TuftsSCO Claim OTP] OTP submitted")
s["otp_value"] = None
await asyncio.sleep(3)
except Exception as type_err:
print(f"[TuftsSCO Claim OTP] Failed to type OTP: {type_err}")
# b) Check URL — if past OTP page, login succeeded
current_url = driver.current_url.lower()
print(f"[TuftsSCO Claim OTP poll {poll+1}/{max_polls}] URL: {current_url[:70]}...")
if "member" in current_url or "dashboard" in current_url or "eligibility" in current_url:
try:
WebDriverWait(driver, 5).until(
EC.presence_of_element_located((By.XPATH,
'//input[@placeholder="Search by member ID"]'))
)
print("[TuftsSCO Claim OTP] Member search found — login successful!")
login_success = True
break
except TimeoutException:
print("[TuftsSCO Claim OTP] On member page but element not found, continuing...")
# Check if OTP input still visible
try:
driver.find_element(By.XPATH,
"//input[contains(@name,'otp') or contains(@name,'code') or @type='tel' or "
"contains(@aria-label,'Verification') or contains(@placeholder,'code')]"
)
print(f"[TuftsSCO Claim OTP poll {poll+1}] OTP input still visible — waiting...")
except Exception:
if "login" in current_url or "onboarding" in current_url:
print("[TuftsSCO Claim OTP] OTP input gone, navigating to members page...")
try:
driver.get("https://providers.dentaquest.com/members")
await asyncio.sleep(2)
except Exception:
pass
except Exception as poll_err:
print(f"[TuftsSCO Claim OTP poll {poll+1}] Error: {poll_err}")
if not login_success:
try:
print("[TuftsSCO Claim OTP] Final attempt — navigating to members page...")
driver.get("https://providers.dentaquest.com/members")
await asyncio.sleep(3)
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.XPATH,
'//input[@placeholder="Search by member ID"]'))
)
print("[TuftsSCO Claim OTP] Member search found — login successful!")
login_success = True
except TimeoutException:
s["status"] = "error"
s["message"] = "OTP timeout — login not completed"
await cleanup_session(sid)
return {"status": "error", "message": "OTP not completed in time"}
except Exception as final_err:
s["status"] = "error"
s["message"] = f"OTP verification failed: {final_err}"
await cleanup_session(sid)
return {"status": "error", "message": s["message"]}
if login_success:
s["status"] = "running"
s["message"] = "Login successful after OTP"
print("[TuftsSCO Claim OTP] Proceeding to claim steps...")
# ── Login succeeded without OTP ───────────────────────────────────────
elif isinstance(login_result, str) and login_result == "SUCCESS":
print("[TuftsSCO Claim] Login succeeded without OTP")
s["status"] = "running"
s["message"] = "Login succeeded"
# ── Login error ───────────────────────────────────────────────────────
elif isinstance(login_result, str) and login_result.startswith("ERROR"):
s["status"] = "error"
s["message"] = login_result
await cleanup_session(sid)
return {"status": "error", "message": login_result}
# --- Claim steps ---
for step_name, step_fn in [
("step1_search_patient", bot.step1_search_patient),
("step2_open_member_page", bot.step2_open_member_page),
("step3_click_create_claim", bot.step3_click_create_claim),
("step4_fill_claim_form", bot.step4_fill_claim_form),
("step5_attach_files", bot.step5_attach_files),
("step6_click_next", bot.step6_click_next),
("step7_submit_claim", bot.step7_submit_claim),
]:
result = step_fn()
print(f"[TuftsSCO Claim] {step_name} result: {result}")
if isinstance(result, str) and result.startswith("ERROR"):
s["status"] = "error"
s["message"] = result
_minimize_browser(bot)
asyncio.create_task(_remove_session_later(sid, 30))
return {"status": "error", "message": result}
# --- Step 8: PDF + claim number ---
step8_result = bot.step8_save_confirmation_pdf()
print(f"[TuftsSCO Claim] step8 result: {step8_result}")
if isinstance(step8_result, str) and step8_result.startswith("ERROR"):
print(f"[TuftsSCO Claim] step8 warning (non-fatal): {step8_result}")
step8_result = {}
pdf_path = step8_result.get("pdf_path") if isinstance(step8_result, dict) else None
pdf_url = None
if pdf_path:
import os as _os
filename = _os.path.basename(pdf_path)
port = _os.getenv("PORT", "5002")
url_host = _os.getenv("HOST", "localhost")
pdf_url = f"http://{url_host}:{port}/downloads/{filename}"
print(f"[TuftsSCO Claim] pdf_url: {pdf_url}")
claim_number = step8_result.get("claimNumber") if isinstance(step8_result, dict) else None
result = {
"status": "success",
"message": "Tufts SCO claim submitted successfully",
"claimNumber": claim_number,
"pdf_url": pdf_url,
}
s["status"] = "completed"
s["result"] = result
s["message"] = "completed"
# Close browser window (session preserved in profile)
try:
from dentaquest_browser_manager import get_browser_manager as _gbm
_gbm().quit_driver()
print("[TuftsSCO Claim] Browser closed - session preserved in profile")
except Exception as close_err:
print(f"[TuftsSCO Claim] Could not close browser (non-fatal): {close_err}")
asyncio.create_task(_remove_session_later(sid, 60))
return result
except Exception as e:
if s:
s["status"] = "error"
s["message"] = f"worker exception: {e}"
await cleanup_session(sid)
return {"status": "error", "message": f"worker exception: {e}"}
def submit_otp(sid: str, otp: str) -> Dict[str, Any]:
s = sessions.get(sid)
if not s:
return {"status": "error", "message": "session not found"}
if s.get("status") != "waiting_for_otp":
return {"status": "error", "message": f"session not waiting for otp (state={s.get('status')})"}
s["otp_value"] = otp
s["last_activity"] = time.time()
try:
s["otp_event"].set()
except Exception:
pass
return {"status": "ok", "message": "otp accepted"}
def get_session_status(sid: str) -> Dict[str, Any]:
s = sessions.get(sid)
if not s:
return {"status": "not_found"}
return {
"session_id": sid,
"status": s.get("status"),
"message": s.get("message"),
"created_at": s.get("created_at"),
"last_activity": s.get("last_activity"),
"result": s.get("result") if s.get("status") in ("completed", "error") else None,
}

View File

@@ -0,0 +1,905 @@
"""
Tufts SCO (DentaQuest) Claim Submission Worker.
Portal: providers.dentaquest.com
Step 1 & 2 mirror the DentaQuest eligibility worker (same portal, same selectors).
Step 38 mirror the DDMA claim worker (identical claim-form UI on both portals).
"""
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
import os
import base64
from dentaquest_browser_manager import get_browser_manager
MEMBERS_URL = "https://providers.dentaquest.com/members"
_SERVICE_DIR = os.path.dirname(os.path.abspath(__file__))
_BACKEND_CWD = os.path.normpath(os.path.join(_SERVICE_DIR, "..", "Backend"))
class AutomationTuftsSCOClaimSubmit:
def __init__(self, data):
self.headless = False
self.driver = None
claim = data.get("claim", {}) if isinstance(data, dict) else {}
self.memberId = claim.get("memberId", "")
self.dateOfBirth = claim.get("dateOfBirth", "")
self.firstName = claim.get("firstName", "")
self.lastName = claim.get("lastName", "")
self.serviceDate = claim.get("serviceDate", "")
self.serviceLines = claim.get("serviceLines", [])
self.claimFiles = claim.get("claimFiles", [])
self.patientName = claim.get("patientName", "")
self.remarks = claim.get("remarks", "")
self.dentaquest_username = claim.get("dentaquestUsername", "")
self.dentaquest_password = claim.get("dentaquestPassword", "")
self.download_dir = get_browser_manager().download_dir
os.makedirs(self.download_dir, exist_ok=True)
def config_driver(self):
self.driver = get_browser_manager().get_driver(self.headless)
def _force_logout(self):
try:
print("[TuftsSCO Claim login] Forcing logout due to credential change...")
browser_manager = get_browser_manager()
try:
self.driver.get("https://providers.dentaquest.com/")
time.sleep(2)
for selector in [
"//button[contains(text(),'Log out') or contains(text(),'Logout') or contains(text(),'Sign out')]",
"//a[contains(text(),'Log out') or contains(text(),'Logout')]",
"//button[@aria-label='Log out' or @aria-label='Logout']",
"//*[contains(@class,'logout') or contains(@class,'signout')]",
]:
try:
btn = WebDriverWait(self.driver, 3).until(EC.element_to_be_clickable((By.XPATH, selector)))
btn.click()
print("[TuftsSCO Claim login] Clicked logout button")
time.sleep(2)
break
except TimeoutException:
continue
except Exception as e:
print(f"[TuftsSCO Claim login] Could not click logout button: {e}")
try:
self.driver.delete_all_cookies()
print("[TuftsSCO Claim login] Cleared all cookies")
except Exception as e:
print(f"[TuftsSCO Claim login] Error clearing cookies: {e}")
browser_manager.clear_credentials_hash()
return True
except Exception as e:
print(f"[TuftsSCO Claim login] Error during forced logout: {e}")
return False
def _is_maintenance_page(self) -> bool:
try:
body = self.driver.find_element(By.TAG_NAME, "body").text.lower()
markers = ["temporarily unable to service", "maintenance downtime", "capacity problems"]
return any(m in body for m in markers)
except Exception:
return False
def login(self, url):
wait = WebDriverWait(self.driver, 30)
browser_manager = get_browser_manager()
try:
if self.dentaquest_username and browser_manager.credentials_changed(self.dentaquest_username):
self._force_logout()
self.driver.get(url)
time.sleep(2)
try:
current_url = self.driver.current_url
print(f"[TuftsSCO Claim login] Current URL: {current_url}")
if "dashboard" in current_url.lower() or "member" in current_url.lower():
try:
WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
)
print("[TuftsSCO Claim login] Already logged in")
return "ALREADY_LOGGED_IN"
except TimeoutException:
pass
except Exception as e:
print(f"[TuftsSCO Claim login] Error checking current state: {e}")
self.driver.get(url)
time.sleep(3)
if self._is_maintenance_page():
return "ERROR: DentaQuest portal is in maintenance mode"
current_url = self.driver.current_url.lower()
print(f"[TuftsSCO Claim login] After navigation URL: {current_url}")
if "dashboard" in current_url or "member" in current_url:
try:
WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
)
print("[TuftsSCO Claim login] Already on dashboard")
return "ALREADY_LOGGED_IN"
except TimeoutException:
pass
# Dismiss "Authentication flow continued in another tab" modal if present
try:
ok_button = WebDriverWait(self.driver, 5).until(
EC.element_to_be_clickable((By.XPATH,
"//button[normalize-space(text())='Ok' or normalize-space(text())='OK' "
"or normalize-space(text())='Continue']"
))
)
ok_button.click()
print("[TuftsSCO Claim login] Dismissed modal")
time.sleep(3)
try:
WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
)
print("[TuftsSCO Claim login] Already authenticated after modal dismiss")
return "ALREADY_LOGGED_IN"
except TimeoutException:
pass
except TimeoutException:
pass
# Check for OTP input on page
try:
WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located((By.XPATH,
"//input[@type='tel' or contains(@placeholder,'code') or "
"contains(@aria-label,'Verification') or contains(@name,'otp')]"))
)
print("[TuftsSCO Claim login] OTP input found on arrival")
return "OTP_REQUIRED"
except TimeoutException:
pass
# Fill login credentials
try:
username_field = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH,
"//input[@type='email' or @name='username' or @id='username' or "
"@name='Email' or @placeholder='Email' or @placeholder='Username' or @type='text']"))
)
username_field.clear()
username_field.send_keys(self.dentaquest_username)
print(f"[TuftsSCO Claim login] Entered username")
password_field = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.XPATH, "//input[@type='password']"))
)
password_field.clear()
password_field.send_keys(self.dentaquest_password)
print("[TuftsSCO Claim login] Entered password")
signin_btn = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH,
"//button[@type='submit'] | //input[@type='submit'] | "
"//button[contains(text(),'Sign') or contains(text(),'Log')]"))
)
signin_btn.click()
print("[TuftsSCO Claim login] Clicked Sign in")
if self.dentaquest_username:
browser_manager.save_credentials_hash(self.dentaquest_username)
# Wait for OTP input to appear
try:
WebDriverWait(self.driver, 30).until(
EC.presence_of_element_located((By.XPATH,
"//input[@type='tel' or contains(@placeholder,'code') or contains(@placeholder,'Code') or "
"contains(@aria-label,'Verification') or contains(@aria-label,'verification') or "
"contains(@name,'otp') or contains(@name,'code')]"
))
)
print("[TuftsSCO Claim login] OTP required after sign-in")
return "OTP_REQUIRED"
except TimeoutException:
pass
current_url = self.driver.current_url.lower()
if "dashboard" in current_url or "member" in current_url:
print("[TuftsSCO Claim login] Login succeeded without OTP")
return "SUCCESS"
print(f"[TuftsSCO Claim login] Unexpected state — URL: {current_url}")
return "SUCCESS"
except Exception as e:
return f"ERROR: Login failed - {e}"
except Exception as e:
return f"ERROR: Login exception - {e}"
# ── Step 1: Search patient (mirrors DentaQuest eligibility step1) ──────────
def step1_search_patient(self):
"""Navigate to member search and find the patient by Member ID + DOB."""
wait = WebDriverWait(self.driver, 30)
def replace_with_sendkeys(el, value):
el.click()
time.sleep(0.05)
el.send_keys(Keys.CONTROL, "a")
el.send_keys(Keys.BACKSPACE)
el.send_keys(value)
try:
print(f"[TuftsSCO Claim step1] Current URL: {self.driver.current_url}")
print(f"[TuftsSCO Claim step1] Searching memberId={self.memberId} dob={self.dateOfBirth}")
if self._is_maintenance_page():
return "ERROR: DentaQuest portal is in maintenance mode"
time.sleep(2)
# Parse DOB
try:
dob_parts = self.dateOfBirth.split("-")
dob_year = dob_parts[0]
dob_month = dob_parts[1].zfill(2)
dob_day = dob_parts[2].zfill(2)
print(f"[TuftsSCO Claim step1] Parsed DOB: {dob_month}/{dob_day}/{dob_year}")
except Exception as e:
return f"ERROR: step1 DOB parse failed: {e}"
# 1. Select Location from dropdown (required field on DentaQuest portal)
try:
trigger = WebDriverWait(self.driver, 5).until(
EC.element_to_be_clickable((By.XPATH,
'//button[@data-testid="member-search_location_select-btn"]'))
)
trigger.click()
print("[TuftsSCO Claim step1] Clicked location dropdown")
time.sleep(0.5)
first_option = WebDriverWait(self.driver, 5).until(
EC.element_to_be_clickable((By.XPATH, "(//li[@role='option'])[1]"))
)
opt_text = first_option.get_attribute("aria-label") or first_option.text.strip()
first_option.click()
print(f"[TuftsSCO Claim step1] Selected location: {opt_text[:60]}")
time.sleep(0.3)
except TimeoutException:
print("[TuftsSCO Claim step1] Warning: Location dropdown not found (continuing)")
except Exception as e:
print(f"[TuftsSCO Claim step1] Warning: Location select failed: {e}")
# 2. Fill DOB
try:
dob_container = wait.until(EC.presence_of_element_located(
(By.XPATH, "//div[@data-testid='member-search_date-of-birth']")
))
month_elem = dob_container.find_element(By.XPATH, ".//span[@data-type='month' and @contenteditable='true']")
day_elem = dob_container.find_element(By.XPATH, ".//span[@data-type='day' and @contenteditable='true']")
year_elem = dob_container.find_element(By.XPATH, ".//span[@data-type='year' and @contenteditable='true']")
replace_with_sendkeys(month_elem, dob_month)
time.sleep(0.1)
replace_with_sendkeys(day_elem, dob_day)
time.sleep(0.1)
replace_with_sendkeys(year_elem, dob_year)
print(f"[TuftsSCO Claim step1] Filled DOB: {dob_month}/{dob_day}/{dob_year}")
except Exception as e:
print(f"[TuftsSCO Claim step1] Warning: Could not fill DOB: {e}")
time.sleep(0.3)
# 3. Fill Member ID
if self.memberId:
try:
member_id_input = wait.until(EC.presence_of_element_located(
(By.XPATH, '//input[@placeholder="Search by member ID"]')
))
member_id_input.clear()
member_id_input.send_keys(self.memberId)
print(f"[TuftsSCO Claim step1] Entered Member ID: {self.memberId}")
time.sleep(0.2)
except Exception as e:
print(f"[TuftsSCO Claim step1] Warning: Could not fill Member ID: {e}")
time.sleep(0.3)
# 4. Click Search
try:
search_btn = wait.until(EC.element_to_be_clickable(
(By.XPATH, '//button[@data-testid="member-search_search-button"]')
))
search_btn.click()
print("[TuftsSCO Claim step1] Clicked Search button")
except TimeoutException:
try:
search_btn = self.driver.find_element(By.XPATH, '//button[contains(text(),"Search")]')
search_btn.click()
print("[TuftsSCO Claim step1] Clicked Search button (fallback)")
except Exception:
ActionChains(self.driver).send_keys(Keys.RETURN).perform()
print("[TuftsSCO Claim step1] Pressed Enter to search")
# Wait for results or no-results
WebDriverWait(self.driver, 15).until(
EC.any_of(
EC.presence_of_element_located((By.XPATH, "//tbody//tr")),
EC.presence_of_element_located((By.XPATH,
'//*[contains(@data-testid,"no-results") or contains(text(),"No results") '
'or contains(text(),"No member found") or contains(text(),"Nothing was found")]'
)),
)
)
time.sleep(4)
# Check for no-results
try:
no_results = self.driver.find_element(By.XPATH,
'//*[contains(@data-testid,"no-results") or contains(text(),"No results") '
'or contains(text(),"No member found")]'
)
if no_results and no_results.is_displayed():
return "ERROR: No patient found with given search criteria"
except Exception:
pass
print("[TuftsSCO Claim step1] Search completed")
return "SUCCESS"
except Exception as e:
print(f"[TuftsSCO Claim step1] Exception: {e}")
return f"ERROR: step1 failed: {e}"
# ── Step 2: Open member information page ───────────────────────────────────
def step2_open_member_page(self):
"""Click patient name link → Member Information page, wait for Create claim button."""
try:
try:
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.XPATH, "//tbody//tr"))
)
time.sleep(2)
except TimeoutException:
print("[TuftsSCO Claim step2] Warning: Results table not found within timeout")
# Find member-details URL from first row
detail_url = None
for selector in [
"(//table//tbody//tr)[1]//td[1]//a",
"(//tbody//tr)[1]//a[contains(@href,'member-details')]",
"(//tbody//tr)[1]//a[contains(@href,'member')]",
"//a[contains(@href,'member-details')]",
]:
try:
link_el = WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.XPATH, selector))
)
href = link_el.get_attribute("href")
if href and "member" in href:
detail_url = href
print(f"[TuftsSCO Claim step2] Found detail URL: {href}")
break
except Exception:
continue
if not detail_url:
return "ERROR: step2 failed: could not find member link"
self.driver.get(detail_url)
print(f"[TuftsSCO Claim step2] Navigating to: {detail_url}")
try:
WebDriverWait(self.driver, 15).until(
lambda d: "member" in d.current_url
)
print(f"[TuftsSCO Claim step2] Member Information page loaded: {self.driver.current_url}")
except TimeoutException:
print(f"[TuftsSCO Claim step2] Warning — URL: {self.driver.current_url}")
try:
WebDriverWait(self.driver, 15).until(
EC.presence_of_element_located((By.XPATH, "//button[@aria-label='Create claim']"))
)
print("[TuftsSCO Claim step2] 'Create claim' button found")
except TimeoutException:
print("[TuftsSCO Claim step2] Warning: 'Create claim' button not found within timeout")
time.sleep(2)
return "SUCCESS"
except Exception as e:
print(f"[TuftsSCO Claim step2] Exception: {e}")
return f"ERROR: step2 failed: {e}"
# ── Step 3: Click "Create claim" button ────────────────────────────────────
def step3_click_create_claim(self):
"""Click the 'Create claim' button on the Member Information page."""
try:
print(f"[TuftsSCO Claim step3] Current URL: {self.driver.current_url}")
handles_before = set(self.driver.window_handles)
self.driver.execute_script("window.scrollTo(0, 0);")
time.sleep(0.5)
all_btns = self.driver.find_elements(By.XPATH, "//button")
print(f"[TuftsSCO Claim step3] Buttons on page: {[b.get_attribute('aria-label') or b.text for b in all_btns]}")
btn = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH,
"//button[@aria-label='Create claim' and @data-react-aria-pressable='true']"
))
)
print(f"[TuftsSCO Claim step3] Found 'Create claim': displayed={btn.is_displayed()}, enabled={btn.is_enabled()}")
self.driver.execute_script("arguments[0].scrollIntoView({block:'center'});", btn)
time.sleep(0.5)
self.driver.execute_script("""
var el = arguments[0];
el.dispatchEvent(new PointerEvent('pointerover', {bubbles:true, cancelable:true, composed:true}));
el.dispatchEvent(new PointerEvent('pointerdown', {bubbles:true, cancelable:true, composed:true}));
el.dispatchEvent(new PointerEvent('pointerup', {bubbles:true, cancelable:true, composed:true}));
el.dispatchEvent(new MouseEvent('click', {bubbles:true, cancelable:true, composed:true}));
""", btn)
print("[TuftsSCO Claim step3] Dispatched pointer+click events on 'Create claim'")
time.sleep(2)
# Switch to new tab if one opened
handles_after = set(self.driver.window_handles)
new_handles = handles_after - handles_before
if new_handles:
self.driver.switch_to.window(new_handles.pop())
print("[TuftsSCO Claim step3] Switched to new tab")
print(f"[TuftsSCO Claim step3] Post-click URL: {self.driver.current_url}")
try:
WebDriverWait(self.driver, 20).until(
EC.any_of(
EC.presence_of_element_located((By.XPATH, "//input[contains(@id,'procedureCode')]")),
EC.presence_of_element_located((By.XPATH, "//span[@data-type='month' and @contenteditable='true']")),
EC.presence_of_element_located((By.XPATH,
"//*[contains(text(),'date of service') or contains(text(),'Date of service') "
"or contains(text(),'Procedure code')]"
)),
)
)
print("[TuftsSCO Claim step3] Claim form loaded")
except TimeoutException:
page_text = self.driver.execute_script("return document.body.innerText;")[:400]
print(f"[TuftsSCO Claim step3] Claim form not detected — page: {page_text}")
time.sleep(1)
return "SUCCESS"
except Exception as e:
print(f"[TuftsSCO Claim step3] Exception: {e}")
return f"ERROR: step3 failed: {e}"
# ── Step 4: Fill service date and procedure lines ──────────────────────────
def _parse_service_date(self):
s = str(self.serviceDate or "").strip()
if not s:
return None, None, None
if "-" in s:
parts = s.split("-")
if len(parts) == 3 and len(parts[0]) == 4:
return parts[1].zfill(2), parts[2].zfill(2), parts[0]
if len(parts) == 3 and len(parts[2]) == 4:
return parts[0].zfill(2), parts[1].zfill(2), parts[2]
if "/" in s:
parts = s.split("/")
if len(parts) == 3:
return parts[0].zfill(2), parts[1].zfill(2), parts[2]
return None, None, None
def _fill_spinbutton(self, label_fragment, value):
for sel in [
f"//span[@contenteditable='true' and contains(@aria-label,'{label_fragment}')]",
f"//span[@data-type='{label_fragment}' and @contenteditable='true']",
]:
try:
elem = WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.XPATH, sel))
)
elem.click()
elem.send_keys(Keys.CONTROL, "a")
elem.send_keys(Keys.BACKSPACE)
elem.send_keys(value)
time.sleep(0.1)
print(f"[TuftsSCO Claim step4] Filled spinbutton '{label_fragment}' = {value!r}")
return True
except Exception:
continue
print(f"[TuftsSCO Claim step4] Warning: spinbutton '{label_fragment}' not found")
return False
def _fill_combobox(self, inp, value, label="field"):
try:
inp.click()
inp.send_keys(Keys.CONTROL + "a")
inp.send_keys(Keys.DELETE)
inp.send_keys(str(value))
time.sleep(0.5)
listbox_id = inp.get_attribute("aria-controls") or ""
try:
if listbox_id:
option = WebDriverWait(self.driver, 4).until(
EC.element_to_be_clickable((By.XPATH,
f"//*[@id='{listbox_id}']//*[@role='option'][1]"
))
)
else:
option = WebDriverWait(self.driver, 4).until(
EC.element_to_be_clickable((By.XPATH,
f"//*[@role='listbox']//*[@role='option' and contains(normalize-space(.),'{value}')]"
f" | //*[@role='listbox']//*[@role='option'][1]"
))
)
option.click()
print(f"[TuftsSCO Claim step4] {label}: selected '{value}'")
except TimeoutException:
inp.send_keys(Keys.TAB)
print(f"[TuftsSCO Claim step4] {label}: typed '{value}' (no dropdown)")
except Exception as e:
print(f"[TuftsSCO Claim step4] Warning: could not fill {label}: {e}")
def _fill_text_input(self, inp, value, label="field"):
try:
inp.click()
inp.send_keys(Keys.CONTROL + "a")
inp.send_keys(Keys.DELETE)
inp.send_keys(str(value))
time.sleep(0.1)
print(f"[TuftsSCO Claim step4] {label}: typed '{value}'")
except Exception as e:
print(f"[TuftsSCO Claim step4] Warning: could not fill {label}: {e}")
def step4_fill_claim_form(self):
"""Fill service date then all procedure line fields."""
try:
month, day, year = self._parse_service_date()
# Service date (once, at the top of the form)
if month and day and year:
print(f"[TuftsSCO Claim step4] Filling service date: {month}/{day}/{year}")
try:
dos_container = WebDriverWait(self.driver, 8).until(
EC.presence_of_element_located((By.XPATH,
"//*[@data-testid and contains(@data-testid,'date-of-service')] | "
"//*[contains(@aria-label,'Select date of service')]/ancestor::div[1]"
))
)
month_el = dos_container.find_element(By.XPATH, ".//span[@data-type='month' and @contenteditable='true']")
day_el = dos_container.find_element(By.XPATH, ".//span[@data-type='day' and @contenteditable='true']")
year_el = dos_container.find_element(By.XPATH, ".//span[@data-type='year' and @contenteditable='true']")
for elem, val in [(month_el, month), (day_el, day), (year_el, year)]:
elem.click()
elem.send_keys(Keys.CONTROL, "a")
elem.send_keys(Keys.BACKSPACE)
elem.send_keys(val)
time.sleep(0.05)
print("[TuftsSCO Claim step4] Service date filled")
except Exception:
self._fill_spinbutton("month", month)
self._fill_spinbutton("day", day)
self._fill_spinbutton("year", year)
else:
print(f"[TuftsSCO Claim step4] No valid service date: {self.serviceDate!r}")
time.sleep(0.3)
active_lines = [ln for ln in self.serviceLines if str(ln.get("procedureCode") or "").strip()]
print(f"[TuftsSCO Claim step4] {len(active_lines)} service line(s)")
for idx, line in enumerate(active_lines):
code = str(line.get("procedureCode") or "").strip().upper()
tooth = str(line.get("toothNumber") or line.get("tooth") or "").strip()
arch = str(line.get("arch") or "").strip()
quad = str(line.get("quad") or line.get("quadrant") or "").strip()
surface = str(line.get("toothSurface") or line.get("surface") or "").strip().upper()
billed = str(line.get("totalBilled") or line.get("billedAmount") or line.get("fee") or "").strip()
billed = billed.replace("$", "").strip()
print(f"[TuftsSCO Claim step4] Line {idx}: code={code} tooth={tooth!r} arch={arch!r} "
f"quad={quad!r} surface={surface!r} billed={billed!r}")
# Click "Add a procedure" for lines after the first
if idx > 0:
try:
add_btn = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH,
"//button[.//span[contains(text(),'Add a procedure')]] | "
"//button[contains(normalize-space(text()),'Add a procedure')] | "
"//*[contains(text(),'Add a procedure') and @role='button']"
))
)
self.driver.execute_script("arguments[0].scrollIntoView({block:'center'});", add_btn)
add_btn.click()
print(f"[TuftsSCO Claim step4] Clicked 'Add a procedure' for line {idx}")
time.sleep(1)
except Exception as e:
print(f"[TuftsSCO Claim step4] Could not click 'Add a procedure': {e}")
# Procedure code
if code:
try:
proc_inputs = self.driver.find_elements(By.XPATH,
"//input[contains(@id,'procedureCode') and contains(@id,'-input')]"
)
proc_inp = proc_inputs[idx] if idx < len(proc_inputs) else proc_inputs[-1]
self.driver.execute_script("arguments[0].scrollIntoView({block:'center'});", proc_inp)
self._fill_combobox(proc_inp, code, f"procedureCode[{idx}]")
time.sleep(0.5)
except Exception as e:
print(f"[TuftsSCO Claim step4] Could not fill procedure code: {e}")
# Tooth
if tooth:
try:
tooth_inputs = self.driver.find_elements(By.XPATH, "//input[@aria-label='Tooth']")
if idx < len(tooth_inputs):
self._fill_combobox(tooth_inputs[idx], tooth, f"tooth[{idx}]")
time.sleep(0.3)
except Exception as e:
print(f"[TuftsSCO Claim step4] Could not fill tooth: {e}")
# Arch
if arch:
try:
arch_inputs = self.driver.find_elements(By.XPATH, "//input[@aria-label='Arch']")
if idx < len(arch_inputs):
self._fill_combobox(arch_inputs[idx], arch, f"arch[{idx}]")
time.sleep(0.3)
except Exception as e:
print(f"[TuftsSCO Claim step4] Could not fill arch: {e}")
# Quad
if quad:
try:
quad_inputs = self.driver.find_elements(By.XPATH, "//input[@aria-label='Quad']")
if idx < len(quad_inputs):
self._fill_combobox(quad_inputs[idx], quad, f"quad[{idx}]")
time.sleep(0.3)
except Exception as e:
print(f"[TuftsSCO Claim step4] Could not fill quad: {e}")
# Surface (free-text — type directly, dismiss listbox with Escape)
if surface:
try:
surface_inputs = self.driver.find_elements(By.XPATH, "//input[@aria-label='Surface']")
if idx < len(surface_inputs):
surf_inp = surface_inputs[idx]
surf_inp.click()
surf_inp.send_keys(Keys.CONTROL + "a")
surf_inp.send_keys(Keys.DELETE)
surf_inp.send_keys(surface)
time.sleep(0.3)
surf_inp.send_keys(Keys.ESCAPE)
print(f"[TuftsSCO Claim step4] surface[{idx}]: typed '{surface}'")
time.sleep(0.2)
except Exception as e:
print(f"[TuftsSCO Claim step4] Could not fill surface: {e}")
# Billed amount
if billed:
try:
billed_inputs = self.driver.find_elements(By.XPATH,
"//input[@aria-label='Enter billed amount']"
)
if idx < len(billed_inputs):
self._fill_text_input(billed_inputs[idx], billed, f"billedAmount[{idx}]")
time.sleep(0.2)
except Exception as e:
print(f"[TuftsSCO Claim step4] Could not fill billed amount: {e}")
print("[TuftsSCO Claim step4] Done")
return "SUCCESS"
except Exception as e:
print(f"[TuftsSCO Claim step4] Exception: {e}")
return f"ERROR: step4 failed: {e}"
# ── Step 5: Attach files ────────────────────────────────────────────────────
def step5_attach_files(self):
"""For each claimFile with a filePath, click 'Add a file' and upload it."""
if not self.claimFiles:
print("[TuftsSCO Claim step5] No files to attach")
return "SUCCESS"
attached = 0
for cf in self.claimFiles:
relative_path = cf.get("filePath") or ""
if not relative_path:
print(f"[TuftsSCO Claim step5] Skipping file with no filePath: {cf}")
continue
abs_path = os.path.normpath(os.path.join(_BACKEND_CWD, relative_path.lstrip("/")))
if not os.path.isfile(abs_path):
print(f"[TuftsSCO Claim step5] File not found on disk: {abs_path}")
continue
print(f"[TuftsSCO Claim step5] Attaching: {abs_path}")
try:
add_file_btn = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH,
"//button[.//span[contains(text(),'Add a file')]] | "
"//button[contains(normalize-space(text()),'Add a file')] | "
"//*[contains(text(),'Add a file') and (@role='button' or self::label)]"
))
)
self.driver.execute_script("arguments[0].scrollIntoView({block:'center'});", add_file_btn)
ActionChains(self.driver).move_to_element(add_file_btn).click().perform()
time.sleep(1)
file_input = WebDriverWait(self.driver, 8).until(
EC.presence_of_element_located((By.XPATH, "//input[@type='file']"))
)
self.driver.execute_script("arguments[0].style.display='block';", file_input)
file_input.send_keys(abs_path)
time.sleep(1.5)
print(f"[TuftsSCO Claim step5] Attached: {os.path.basename(abs_path)}")
attached += 1
except Exception as e:
print(f"[TuftsSCO Claim step5] Could not attach {abs_path}: {e}")
print(f"[TuftsSCO Claim step5] Attached {attached}/{len(self.claimFiles)} file(s)")
return "SUCCESS"
# ── Step 6: Click "Next step" ──────────────────────────────────────────────
def step6_click_next(self):
"""Click the 'Next step' button (React Aria — dispatches pointer events directly)."""
try:
print(f"[TuftsSCO Claim step6] Current URL: {self.driver.current_url}")
btn = WebDriverWait(self.driver, 15).until(
EC.element_to_be_clickable((By.XPATH,
"//button[@data-testid='next-step-btn'] | "
"//button[@aria-label='Next step']"
))
)
self.driver.execute_script("arguments[0].scrollIntoView({block:'center'});", btn)
time.sleep(0.5)
self.driver.execute_script("""
var el = arguments[0];
el.dispatchEvent(new PointerEvent('pointerover', {bubbles:true, cancelable:true, composed:true}));
el.dispatchEvent(new PointerEvent('pointerdown', {bubbles:true, cancelable:true, composed:true}));
el.dispatchEvent(new PointerEvent('pointerup', {bubbles:true, cancelable:true, composed:true}));
el.dispatchEvent(new MouseEvent('click', {bubbles:true, cancelable:true, composed:true}));
""", btn)
print("[TuftsSCO Claim step6] Clicked 'Next step'")
time.sleep(2)
print(f"[TuftsSCO Claim step6] URL after Next: {self.driver.current_url}")
return "SUCCESS"
except Exception as e:
print(f"[TuftsSCO Claim step6] Exception: {e}")
return f"ERROR: step6 failed: {e}"
# ── Step 7: Acknowledge + submit ────────────────────────────────────────────
def step7_submit_claim(self):
"""On the claims summary page, tick the acknowledgement checkbox then click Submit claim."""
try:
print(f"[TuftsSCO Claim step7] Current URL: {self.driver.current_url}")
checkbox = WebDriverWait(self.driver, 15).until(
EC.presence_of_element_located((By.XPATH,
"//input[@type='checkbox'] | "
"//*[@role='checkbox'] | "
"//label[contains(.,'submitting this claim')]//input | "
"//*[contains(@aria-label,'submitting this claim')]"
))
)
self.driver.execute_script("arguments[0].scrollIntoView({block:'center'});", checkbox)
time.sleep(0.3)
self.driver.execute_script("""
var el = arguments[0];
el.dispatchEvent(new PointerEvent('pointerover', {bubbles:true, cancelable:true, composed:true}));
el.dispatchEvent(new PointerEvent('pointerdown', {bubbles:true, cancelable:true, composed:true}));
el.dispatchEvent(new PointerEvent('pointerup', {bubbles:true, cancelable:true, composed:true}));
el.dispatchEvent(new MouseEvent('click', {bubbles:true, cancelable:true, composed:true}));
""", checkbox)
print("[TuftsSCO Claim step7] Checked acknowledgement checkbox")
time.sleep(0.5)
submit_btn = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH,
"//button[.//span[contains(text(),'Submit claim')]] | "
"//button[contains(normalize-space(text()),'Submit claim')] | "
"//button[@aria-label='Submit claim']"
))
)
self.driver.execute_script("arguments[0].scrollIntoView({block:'center'});", submit_btn)
time.sleep(0.3)
self.driver.execute_script("""
var el = arguments[0];
el.dispatchEvent(new PointerEvent('pointerover', {bubbles:true, cancelable:true, composed:true}));
el.dispatchEvent(new PointerEvent('pointerdown', {bubbles:true, cancelable:true, composed:true}));
el.dispatchEvent(new PointerEvent('pointerup', {bubbles:true, cancelable:true, composed:true}));
el.dispatchEvent(new MouseEvent('click', {bubbles:true, cancelable:true, composed:true}));
""", submit_btn)
print("[TuftsSCO Claim step7] Clicked 'Submit claim'")
time.sleep(2)
print(f"[TuftsSCO Claim step7] URL after submit: {self.driver.current_url}")
return "SUCCESS"
except Exception as e:
print(f"[TuftsSCO Claim step7] Exception: {e}")
return f"ERROR: step7 failed: {e}"
# ── Step 8: Extract claim number + save confirmation PDF ───────────────────
def step8_save_confirmation_pdf(self):
"""Wait for the thank-you page, extract the claim number, save page as PDF."""
import re
try:
WebDriverWait(self.driver, 30).until(
lambda d: "thank" in d.page_source.lower() or "submitted claim" in d.page_source.lower()
)
time.sleep(2)
print(f"[TuftsSCO Claim step8] Confirmation page URL: {self.driver.current_url}")
claim_number = None
try:
body_text = self.driver.find_element(By.TAG_NAME, "body").text
match = re.search(r'submitted claim\s+(\d{10,})', body_text, re.IGNORECASE)
if match:
claim_number = match.group(1)
print(f"[TuftsSCO Claim step8] Extracted claim number: {claim_number}")
else:
match = re.search(r'\b(\d{12,})\b', body_text)
if match:
claim_number = match.group(1)
print(f"[TuftsSCO Claim step8] Extracted claim number (fallback): {claim_number}")
except Exception as e:
print(f"[TuftsSCO Claim step8] Could not extract claim number: {e}")
shared_downloads = os.path.join(_SERVICE_DIR, "downloads")
os.makedirs(shared_downloads, exist_ok=True)
safe_member = "".join(c for c in str(self.memberId) if c.isalnum() or c in "-_.")
safe_claim = ("_" + claim_number[:20]) if claim_number else ""
timestamp = time.strftime("%Y%m%d_%H%M%S")
pdf_filename = f"tuftssco_claim_confirmation_{safe_member}{safe_claim}_{timestamp}.pdf"
pdf_path = os.path.join(shared_downloads, pdf_filename)
try:
pdf_data = self.driver.execute_cdp_cmd("Page.printToPDF", {
"printBackground": True,
"paperWidth": 8.5,
"paperHeight": 11,
"marginTop": 0.4,
"marginBottom": 0.4,
"marginLeft": 0.4,
"marginRight": 0.4,
})
pdf_bytes = base64.b64decode(pdf_data["data"])
with open(pdf_path, "wb") as f:
f.write(pdf_bytes)
print(f"[TuftsSCO Claim step8] PDF saved: {pdf_path}")
except Exception as e:
print(f"[TuftsSCO Claim step8] PDF capture failed: {e}")
return f"ERROR: step8 PDF failed: {e}"
return {
"status": "success",
"pdf_path": pdf_path,
"claimNumber": claim_number,
}
except Exception as e:
print(f"[TuftsSCO Claim step8] Exception: {e}")
return f"ERROR: step8 failed: {e}"