From 289ea426d309afa371d7773dd7e4ceda4c9037a7 Mon Sep 17 00:00:00 2001 From: ff Date: Thu, 16 Apr 2026 09:21:47 -0400 Subject: [PATCH] feat: integrate DDMA eligibility into BullMQ queue with persistent session - Route DDMA eligibility through InProcessQueue (concurrency=1) so it queues behind other selenium jobs instead of running concurrently - New ddmaEligibilityProcessor: starts Python session, polls for OTP/ completion via socket events, saves PDF and updates patient DB - Frontend ddma-buton-modal now uses shared app socket + job:update pattern (drops private socket connection) - SeleniumService: upgrade ddma_browser_manager with credential hash tracking, anti-detection options, and startup session clearing; upgrade DDMA worker with firstName/lastName support, PDF via printToPDF, force-logout on credential change; upgrade helpers with dual OTP strategy (app API + browser polling); add /clear-ddma-session endpoint; reduce fixed sleeps with smart WebDriverWait Co-Authored-By: Claude Sonnet 4.6 --- apps/Backend/src/queue/jobRunner.ts | 15 + .../processors/ddmaEligibilityProcessor.ts | 359 ++++++++++ apps/Backend/src/queue/queues.ts | 3 +- .../Backend/src/routes/insuranceStatusDDMA.ts | 645 ++---------------- .../insurance-status/ddma-buton-modal.tsx | 566 ++++++--------- apps/SeleniumService/agent.py | 30 +- apps/SeleniumService/ddma_browser_manager.py | 236 ++++++- .../helpers_ddma_eligibility.py | 219 +++--- .../selenium_DDMA_eligibilityCheckWorker.py | 595 +++++++++++----- 9 files changed, 1429 insertions(+), 1239 deletions(-) create mode 100644 apps/Backend/src/queue/processors/ddmaEligibilityProcessor.ts diff --git a/apps/Backend/src/queue/jobRunner.ts b/apps/Backend/src/queue/jobRunner.ts index e3ee857..a2c0ed6 100644 --- a/apps/Backend/src/queue/jobRunner.ts +++ b/apps/Backend/src/queue/jobRunner.ts @@ -11,6 +11,7 @@ import { runEligibilityProcessor } from "./processors/eligibilityProcessor"; import { runClaimStatusProcessor } from "./processors/claimStatusProcessor"; import { runClaimSubmitProcessor } from "./processors/claimSubmitProcessor"; import { runOcrProcessor } from "./processors/ocrProcessor"; +import { runDdmaEligibilityProcessor } from "./processors/ddmaEligibilityProcessor"; import type { SeleniumJobData, OcrJobData } from "./queues"; // ── Queue instances ────────────────────────────────────────────────────────── @@ -68,6 +69,20 @@ export function enqueueSeleniumJob(data: SeleniumJobData): string { variant: jobType === "claim-pre-auth" ? "claim-pre-auth" : "claimsubmit", }); } + if (jobType === "ddma-eligibility-check") { + return runDdmaEligibilityProcessor( + { + enrichedPayload: data.enrichedPayload, + userId: data.userId, + insuranceId: data.insuranceId!, + formFirstName: data.formFirstName, + formLastName: data.formLastName, + formDob: data.formDob, + socketId: data.socketId, + }, + job.id + ); + } throw new Error(`Unknown selenium jobType: ${jobType}`); }); diff --git a/apps/Backend/src/queue/processors/ddmaEligibilityProcessor.ts b/apps/Backend/src/queue/processors/ddmaEligibilityProcessor.ts new file mode 100644 index 0000000..1b0c75c --- /dev/null +++ b/apps/Backend/src/queue/processors/ddmaEligibilityProcessor.ts @@ -0,0 +1,359 @@ +/** + * Processor for "ddma-eligibility-check" jobs. + * + * Integrates the full DDMA persistent-session flow into the InProcessQueue: + * 1. Start a session on the Python agent (POST /ddma-eligibility) + * 2. Emit selenium:ddma_session_started → frontend stores session_id for OTP + * 3. Poll agent status, emitting selenium:otp_required when OTP is needed + * 4. On completion: save PDF, create/update patient, update eligibility status + * 5. Return { pdfFileId, pdfFilename, patientUpdateStatus, pdfUploadStatus } + * + * The OTP submission endpoint (/api/insurance-status-ddma/selenium/submit-otp) + * continues to forward OTPs directly to the Python agent — it does NOT go + * through the queue. + */ +import fs from "fs/promises"; +import fsSync from "fs"; +import path from "path"; +import { storage } from "../../storage"; +import { emptyFolderContainingFile } from "../../utils/emptyTempFolder"; +import { + forwardToSeleniumDdmaEligibilityAgent, + getSeleniumDdmaSessionStatus, +} from "../../services/seleniumDdmaInsuranceEligibilityClient"; +import { + splitName, + createOrUpdatePatientByInsuranceId, + imageToPdfBuffer, +} from "./_shared"; +import { io } from "../../socket"; + +// ─── Helpers ──────────────────────────────────────────────────────────────── + +function now() { + return new Date().toISOString(); +} + +function log(tag: string, msg: string, ctx?: any) { + console.log(`${now()} [${tag}] ${msg}`, ctx ?? ""); +} + +function emitToSocket(socketId: string | undefined, event: string, payload: any) { + if (!socketId || !io) return; + try { + const socket = io.sockets.sockets.get(socketId); + if (socket) { + socket.emit(event, payload); + log("ddma-processor", `emitted ${event}`, { socketId }); + } + } catch (err: any) { + log("ddma-processor", `emit failed for ${event}`, { err: err?.message }); + } +} + +// ─── Types ─────────────────────────────────────────────────────────────────── + +export interface DdmaEligibilityProcessorInput { + enrichedPayload: any; + userId: number; + insuranceId: string; + formFirstName?: string; + formLastName?: string; + formDob?: string; + socketId?: string; +} + +export interface DdmaEligibilityProcessorResult { + patientUpdateStatus?: string; + pdfUploadStatus?: string; + pdfFileId?: number | null; + pdfFilename?: string | null; +} + +// ─── Core DB processing (mirrors handleDdmaCompletedJob in the route) ───────── + +async function processDdmaResult( + userId: number, + insuranceId: string, + formFirstName: string | undefined, + formLastName: string | undefined, + formDob: string | undefined, + seleniumResult: any +): Promise { + const output: DdmaEligibilityProcessorResult = {}; + let createdPdfFileId: number | null = null; + + try { + // 1) Resolve patient name (prefer selenium extraction → form data) + const rawName = + typeof seleniumResult?.patientName === "string" + ? seleniumResult.patientName.trim() + : null; + + const { firstName, lastName } = rawName + ? splitName(rawName) + : { firstName: formFirstName ?? "", lastName: formLastName ?? "" }; + + // 2) Create / update patient + await createOrUpdatePatientByInsuranceId({ + insuranceId, + firstName, + lastName, + dob: formDob, + userId, + }); + + // 3) Fetch patient (needed for ID) + const patient = await storage.getPatientByInsuranceId(insuranceId); + if (!patient?.id) { + output.patientUpdateStatus = "Patient not found; no update performed"; + return output; + } + + // 4) Determine and update eligibility status + insurance provider name + const eligStatus = (seleniumResult?.eligibility ?? "").toLowerCase(); + const newStatus = + eligStatus === "active" || eligStatus === "y" ? "ACTIVE" : "INACTIVE"; + await storage.updatePatient(patient.id, { + status: newStatus, + insuranceProvider: "Delta Dental MA", + }); + output.patientUpdateStatus = `Patient status updated to ${newStatus}`; + + // 5) Resolve PDF buffer + // New DDMA worker returns a real PDF via pdf_path / ss_path (.pdf). + // Old worker returned a screenshot (.png) via ss_path. + let pdfBuffer: Buffer | null = null; + let pdfFilename: string | null = null; + + const pdfPath: string | null = + seleniumResult?.pdf_path ?? seleniumResult?.ss_path ?? null; + + if (pdfPath && fsSync.existsSync(pdfPath)) { + if (pdfPath.endsWith(".pdf")) { + // Already a PDF — read directly + try { + pdfBuffer = await fs.readFile(pdfPath); + pdfFilename = path.basename(pdfPath); + log("ddma-processor", "read PDF directly", { pdfPath }); + } catch (e: any) { + output.pdfUploadStatus = `Failed to read PDF: ${e.message}`; + } + } else if ( + pdfPath.endsWith(".png") || + pdfPath.endsWith(".jpg") || + pdfPath.endsWith(".jpeg") + ) { + // Legacy screenshot → convert to PDF + try { + pdfBuffer = await imageToPdfBuffer(pdfPath); + pdfFilename = `ddma_eligibility_${insuranceId}_${Date.now()}.pdf`; + log("ddma-processor", "converted screenshot to PDF", { pdfPath }); + } catch (e: any) { + output.pdfUploadStatus = `Failed to convert screenshot to PDF: ${e.message}`; + } + } + } else { + output.pdfUploadStatus = "No valid file path from Selenium; nothing uploaded."; + } + + // 6) Save PDF to patient document group + if (pdfBuffer && pdfFilename) { + const groupTitleKey = "ELIGIBILITY_STATUS"; + const groupTitle = "Eligibility Status"; + + let group = await storage.findPdfGroupByPatientTitleKey(patient.id, groupTitleKey); + if (!group) group = await storage.createPdfGroup(patient.id, groupTitle, groupTitleKey); + if (!group?.id) throw new Error("PDF group creation failed"); + + const created = await storage.createPdfFile(group.id, pdfFilename, pdfBuffer); + if (created && typeof created === "object" && "id" in created) { + createdPdfFileId = Number(created.id); + } + output.pdfUploadStatus = `PDF saved to group: ${group.title}`; + output.pdfFilename = pdfFilename; + } + + output.pdfFileId = createdPdfFileId; + return output; + } catch (err: any) { + return { + ...output, + pdfUploadStatus: + output.pdfUploadStatus ?? `Processing failed: ${err?.message ?? String(err)}`, + pdfFileId: createdPdfFileId, + }; + } finally { + // Always clean up temp files + const cleanupPath = + seleniumResult?.pdf_path ?? seleniumResult?.ss_path ?? null; + if (cleanupPath) { + try { + await emptyFolderContainingFile(cleanupPath); + } catch (e) { + log("ddma-processor", "cleanup failed", { cleanupPath }); + } + } + } +} + +// ─── Polling loop ──────────────────────────────────────────────────────────── + +async function pollUntilDone( + sessionId: string, + socketId: string | undefined, + jobId: string, + pollTimeoutMs = 5 * 60 * 1000 // 5 min total (includes OTP wait) +): Promise { + const maxAttempts = 600; // 600 × 500ms = 5 min + const pollIntervalMs = 500; + const maxTransientErrors = 12; + const noProgressLimit = 120; // 60 s of same status → abort + + let transientErrors = 0; + let consecutiveNoProgress = 0; + let lastStatus: string | null = null; + const deadline = Date.now() + pollTimeoutMs; + + for (let attempt = 0; attempt < maxAttempts; attempt++) { + if (Date.now() > deadline) { + throw new Error( + `DDMA polling timeout (${Math.round(pollTimeoutMs / 1000)}s) for session ${sessionId}` + ); + } + + try { + const st = await getSeleniumDdmaSessionStatus(sessionId); + const status: string = st?.status ?? "unknown"; + + log("ddma-processor", `poll attempt=${attempt}`, { sessionId, status }); + + transientErrors = 0; // reset on success + + // Track no-progress + const isTerminal = + status === "completed" || status === "error" || status === "not_found"; + if (status === lastStatus && !isTerminal) { + consecutiveNoProgress++; + } else { + consecutiveNoProgress = 0; + } + lastStatus = status; + + if (consecutiveNoProgress >= noProgressLimit) { + throw new Error( + `No progress from Python agent (status="${status}") after ${consecutiveNoProgress} polls` + ); + } + + // OTP required — notify frontend and keep polling + if (status === "waiting_for_otp") { + emitToSocket(socketId, "selenium:otp_required", { + session_id: sessionId, + jobId, + message: "OTP required. Please enter the OTP shown by the DDMA portal.", + }); + await new Promise((r) => setTimeout(r, pollIntervalMs)); + continue; + } + + if (status === "completed") { + log("ddma-processor", "session completed", { sessionId }); + return st.result; + } + + if (status === "error" || status === "not_found") { + throw new Error( + st?.message || `DDMA session ended with status: ${status}` + ); + } + + // Still running / otp_submitted / created — keep polling + await new Promise((r) => setTimeout(r, pollIntervalMs)); + } catch (err: any) { + // Propagate terminal errors immediately + const isTerminal = + err?.response?.status === 404 || + (typeof err?.message === "string" && + (err.message.includes("not_found") || + err.message.includes("polling timeout") || + err.message.includes("No progress"))); + + if (isTerminal) throw err; + + // Transient network errors — back off + transientErrors++; + if (transientErrors > maxTransientErrors) { + throw new Error( + `Too many transient network errors polling DDMA session ${sessionId}` + ); + } + const backoff = Math.min(30_000, 500 * Math.pow(2, transientErrors - 1)); + log("ddma-processor", `transient error #${transientErrors}, backoff ${backoff}ms`, { + err: err?.message, + }); + await new Promise((r) => setTimeout(r, backoff)); + } + } + + throw new Error(`DDMA polling exhausted all attempts for session ${sessionId}`); +} + +// ─── Main processor entry point ─────────────────────────────────────────────── + +export async function runDdmaEligibilityProcessor( + input: DdmaEligibilityProcessorInput, + jobId: string +): Promise { + const { + enrichedPayload, + userId, + insuranceId, + formFirstName, + formLastName, + formDob, + socketId, + } = input; + + // 1) Tell Python agent to start a DDMA session + log("ddma-processor", "starting Python agent session", { insuranceId }); + const agentResp = await forwardToSeleniumDdmaEligibilityAgent(enrichedPayload); + + if (!agentResp?.session_id) { + throw new Error( + "Python agent did not return a session_id for DDMA eligibility" + ); + } + + const sessionId = agentResp.session_id as string; + log("ddma-processor", "got session_id", { sessionId }); + + // 2) Emit session started so frontend can store session_id for OTP submission + emitToSocket(socketId, "selenium:ddma_session_started", { + session_id: sessionId, + jobId, + }); + + // 3) Poll until done (handles OTP events internally) + const seleniumResult = await pollUntilDone(sessionId, socketId, jobId); + + if (!seleniumResult || seleniumResult.status === "error") { + throw new Error( + seleniumResult?.message ?? "DDMA session returned an error result" + ); + } + + // 4) Process DB writes and PDF upload + log("ddma-processor", "processing DB result", { insuranceId }); + const result = await processDdmaResult( + userId, + insuranceId, + formFirstName, + formLastName, + formDob, + seleniumResult + ); + + log("ddma-processor", "done", { result }); + return result; +} diff --git a/apps/Backend/src/queue/queues.ts b/apps/Backend/src/queue/queues.ts index ac252e3..8177d5b 100644 --- a/apps/Backend/src/queue/queues.ts +++ b/apps/Backend/src/queue/queues.ts @@ -6,7 +6,8 @@ export type SeleniumJobType = | "eligibility-check" | "claim-status-check" | "claim-submit" - | "claim-pre-auth"; + | "claim-pre-auth" + | "ddma-eligibility-check"; export interface SeleniumJobData { jobType: SeleniumJobType; diff --git a/apps/Backend/src/routes/insuranceStatusDDMA.ts b/apps/Backend/src/routes/insuranceStatusDDMA.ts index a5571ea..1b8afe3 100755 --- a/apps/Backend/src/routes/insuranceStatusDDMA.ts +++ b/apps/Backend/src/routes/insuranceStatusDDMA.ts @@ -1,571 +1,41 @@ import { Router, Request, Response } from "express"; import { storage } from "../storage"; -import { - forwardToSeleniumDdmaEligibilityAgent, - forwardOtpToSeleniumDdmaAgent, - getSeleniumDdmaSessionStatus, -} from "../services/seleniumDdmaInsuranceEligibilityClient"; -import fs from "fs/promises"; -import fsSync from "fs"; -import path from "path"; -import PDFDocument from "pdfkit"; -import { emptyFolderContainingFile } from "../utils/emptyTempFolder"; -import { - InsertPatient, - insertPatientSchema, -} from "../../../../packages/db/types/patient-types"; +import { forwardOtpToSeleniumDdmaAgent } from "../services/seleniumDdmaInsuranceEligibilityClient"; import { io } from "../socket"; +import { enqueueSeleniumJob } from "../queue/jobRunner"; const router = Router(); -/** Job context stored in memory by sessionId */ -interface DdmaJobContext { - userId: number; - insuranceEligibilityData: any; // parsed, enriched (includes username/password) - socketId?: string; -} - -const ddmaJobs: Record = {}; - -/** Utility: naive name splitter */ -function splitName(fullName?: string | null) { - if (!fullName) return { firstName: "", lastName: "" }; - const parts = fullName.trim().split(/\s+/).filter(Boolean); - const firstName = parts.shift() ?? ""; - const lastName = parts.join(" ") ?? ""; - return { firstName, lastName }; -} - -async function imageToPdfBuffer(imagePath: string): Promise { - return new Promise((resolve, reject) => { - try { - const doc = new PDFDocument({ autoFirstPage: false }); - const chunks: Uint8Array[] = []; - - doc.on("data", (chunk: any) => chunks.push(chunk)); - doc.on("end", () => resolve(Buffer.concat(chunks))); - doc.on("error", (err: any) => reject(err)); - - const A4_WIDTH = 595.28; // points - const A4_HEIGHT = 841.89; // points - - doc.addPage({ size: [A4_WIDTH, A4_HEIGHT] }); - - doc.image(imagePath, 0, 0, { - fit: [A4_WIDTH, A4_HEIGHT], - align: "center", - valign: "center", - }); - - doc.end(); - } catch (err) { - reject(err); - } - }); -} - -/** - * Ensure patient exists for given insuranceId. - */ -async function createOrUpdatePatientByInsuranceId(options: { - insuranceId: string; - firstName?: string | null; - lastName?: string | null; - dob?: string | Date | null; - userId: number; -}) { - const { insuranceId, firstName, lastName, dob, userId } = options; - if (!insuranceId) throw new Error("Missing insuranceId"); - - const incomingFirst = (firstName || "").trim(); - const incomingLast = (lastName || "").trim(); - - let patient = await storage.getPatientByInsuranceId(insuranceId); - - if (patient && patient.id) { - const updates: any = {}; - if ( - incomingFirst && - String(patient.firstName ?? "").trim() !== incomingFirst - ) { - updates.firstName = incomingFirst; - } - if ( - incomingLast && - String(patient.lastName ?? "").trim() !== incomingLast - ) { - updates.lastName = incomingLast; - } - if (Object.keys(updates).length > 0) { - await storage.updatePatient(patient.id, updates); - } - return; - } else { - const createPayload: any = { - firstName: incomingFirst, - lastName: incomingLast, - dateOfBirth: dob, - gender: "", - phone: "", - userId, - insuranceId, - }; - let patientData: InsertPatient; - try { - patientData = insertPatientSchema.parse(createPayload); - } catch (err) { - const safePayload = { ...createPayload }; - delete (safePayload as any).dateOfBirth; - patientData = insertPatientSchema.parse(safePayload); - } - await storage.createPatient(patientData); - } -} - -/** - * When Selenium finishes for a given sessionId, run your patient + PDF pipeline, - * and return the final API response shape. - */ -async function handleDdmaCompletedJob( - sessionId: string, - job: DdmaJobContext, - seleniumResult: any -) { - let createdPdfFileId: number | null = null; - const outputResult: any = {}; - - // We'll wrap the processing in try/catch/finally so cleanup always runs - try { - // 1) ensuring memberid. - const insuranceEligibilityData = job.insuranceEligibilityData; - const insuranceId = String(insuranceEligibilityData.memberId ?? "").trim(); - if (!insuranceId) { - throw new Error("Missing memberId for ddma job"); - } - - // 2) Create or update patient (with name from selenium result if available) - const patientNameFromResult = - typeof seleniumResult?.patientName === "string" - ? seleniumResult.patientName.trim() - : null; - - const { firstName, lastName } = splitName(patientNameFromResult); - - await createOrUpdatePatientByInsuranceId({ - insuranceId, - firstName, - lastName, - dob: insuranceEligibilityData.dateOfBirth, - userId: job.userId, - }); - - // 3) Update patient status + PDF upload - const patient = await storage.getPatientByInsuranceId( - insuranceEligibilityData.memberId - ); - if (!patient?.id) { - outputResult.patientUpdateStatus = - "Patient not found; no update performed"; - return { - patientUpdateStatus: outputResult.patientUpdateStatus, - pdfUploadStatus: "none", - pdfFileId: null, - }; - } - - // update patient status. - const newStatus = - seleniumResult.eligibility === "active" ? "ACTIVE" : "INACTIVE"; - await storage.updatePatient(patient.id, { status: newStatus }); - outputResult.patientUpdateStatus = `Patient status updated to ${newStatus}`; - - // convert screenshot -> pdf if available - let pdfBuffer: Buffer | null = null; - let generatedPdfPath: string | null = null; - - if ( - seleniumResult && - seleniumResult.ss_path && - typeof seleniumResult.ss_path === "string" && - (seleniumResult.ss_path.endsWith(".png") || - seleniumResult.ss_path.endsWith(".jpg") || - seleniumResult.ss_path.endsWith(".jpeg")) - ) { - try { - if (!fsSync.existsSync(seleniumResult.ss_path)) { - throw new Error( - `Screenshot file not found: ${seleniumResult.ss_path}` - ); - } - - pdfBuffer = await imageToPdfBuffer(seleniumResult.ss_path); - - const pdfFileName = `ddma_eligibility_${insuranceEligibilityData.memberId}_${Date.now()}.pdf`; - generatedPdfPath = path.join( - path.dirname(seleniumResult.ss_path), - pdfFileName - ); - await fs.writeFile(generatedPdfPath, pdfBuffer); - - // ensure cleanup uses this - seleniumResult.pdf_path = generatedPdfPath; - } catch (err: any) { - console.error("Failed to convert screenshot to PDF:", err); - outputResult.pdfUploadStatus = `Failed to convert screenshot to PDF: ${String(err)}`; - } - } else { - outputResult.pdfUploadStatus = - "No valid screenshot (ss_path) provided by Selenium; nothing to upload."; - } - - if (pdfBuffer && generatedPdfPath) { - const groupTitle = "Eligibility Status"; - const groupTitleKey = "ELIGIBILITY_STATUS"; - - let group = await storage.findPdfGroupByPatientTitleKey( - patient.id, - groupTitleKey - ); - if (!group) { - group = await storage.createPdfGroup( - patient.id, - groupTitle, - groupTitleKey - ); - } - if (!group?.id) { - throw new Error("PDF group creation failed: missing group ID"); - } - - const created = await storage.createPdfFile( - group.id, - path.basename(generatedPdfPath), - pdfBuffer - ); - if (created && typeof created === "object" && "id" in created) { - createdPdfFileId = Number(created.id); - } - outputResult.pdfUploadStatus = `PDF saved to group: ${group.title}`; - } else { - outputResult.pdfUploadStatus = - "No valid PDF path provided by Selenium, Couldn't upload pdf to server."; - } - - return { - patientUpdateStatus: outputResult.patientUpdateStatus, - pdfUploadStatus: outputResult.pdfUploadStatus, - pdfFileId: createdPdfFileId, - }; - } catch (err: any) { - return { - patientUpdateStatus: outputResult.patientUpdateStatus, - pdfUploadStatus: - outputResult.pdfUploadStatus ?? - `Failed to process DDMA job: ${err?.message ?? String(err)}`, - pdfFileId: createdPdfFileId, - error: err?.message ?? String(err), - }; - } finally { - // ALWAYS attempt cleanup of temp files - try { - if (seleniumResult && seleniumResult.pdf_path) { - await emptyFolderContainingFile(seleniumResult.pdf_path); - } else if (seleniumResult && seleniumResult.ss_path) { - await emptyFolderContainingFile(seleniumResult.ss_path); - } else { - console.log( - `[ddma-eligibility] no pdf_path or ss_path available to cleanup` - ); - } - } catch (cleanupErr) { - console.error( - `[ddma-eligibility cleanup failed for ${seleniumResult?.pdf_path ?? seleniumResult?.ss_path}]`, - cleanupErr - ); - } - } -} - -// --- top of file, alongside ddmaJobs --- -let currentFinalSessionId: string | null = null; -let currentFinalResult: any = null; - -function now() { - return new Date().toISOString(); -} function log(tag: string, msg: string, ctx?: any) { - console.log(`${now()} [${tag}] ${msg}`, ctx ?? ""); + console.log(`${new Date().toISOString()} [${tag}] ${msg}`, ctx ?? ""); } function emitSafe(socketId: string | undefined, event: string, payload: any) { - if (!socketId) { - log("socket", "no socketId for emit", { event }); - return; - } + if (!socketId || !io) return; try { - const socket = io?.sockets.sockets.get(socketId); - if (!socket) { - log("socket", "socket not found (maybe disconnected)", { - socketId, - event, - }); - return; - } - socket.emit(event, payload); - log("socket", "emitted", { socketId, event }); + const socket = io.sockets.sockets.get(socketId); + if (socket) socket.emit(event, payload); } catch (err: any) { log("socket", "emit failed", { socketId, event, err: err?.message }); } } -/** - * Polls Python agent for session status and emits socket events: - * - 'selenium:otp_required' when waiting_for_otp - * - 'selenium:session_update' when completed/error - * - rabsolute timeout + transient error handling. - * - pollTimeoutMs default = 2 minutes (adjust where invoked) - */ -async function pollAgentSessionAndProcess( - sessionId: string, - socketId?: string, - pollTimeoutMs = 2 * 60 * 1000 -) { - const maxAttempts = 300; - const baseDelayMs = 1000; - const maxTransientErrors = 12; - - // NEW: give up if same non-terminal status repeats this many times - const noProgressLimit = 100; - - const job = ddmaJobs[sessionId]; - let transientErrorCount = 0; - let consecutiveNoProgress = 0; - let lastStatus: string | null = null; - const deadline = Date.now() + pollTimeoutMs; - - for (let attempt = 0; attempt < maxAttempts; attempt++) { - // absolute deadline check - if (Date.now() > deadline) { - emitSafe(socketId, "selenium:session_update", { - session_id: sessionId, - status: "error", - message: `Polling timeout reached (${Math.round(pollTimeoutMs / 1000)}s).`, - }); - delete ddmaJobs[sessionId]; - return; - } - - log( - "poller", - `attempt=${attempt} session=${sessionId} transientErrCount=${transientErrorCount}` - ); - - try { - const st = await getSeleniumDdmaSessionStatus(sessionId); - const status = st?.status ?? null; - log("poller", "got status", { - sessionId, - status, - message: st?.message, - resultKeys: st?.result ? Object.keys(st.result) : null, - }); - - // reset transient errors on success - transientErrorCount = 0; - - // if status unchanged and non-terminal, increment no-progress counter - const isTerminalLike = - status === "completed" || status === "error" || status === "not_found"; - if (status === lastStatus && !isTerminalLike) { - consecutiveNoProgress++; - } else { - consecutiveNoProgress = 0; - } - lastStatus = status; - - // if no progress for too many consecutive polls -> abort - if (consecutiveNoProgress >= noProgressLimit) { - emitSafe(socketId, "selenium:session_update", { - session_id: sessionId, - status: "error", - message: `No progress from selenium agent (status="${status}") after ${consecutiveNoProgress} polls; aborting.`, - }); - emitSafe(socketId, "selenium:session_error", { - session_id: sessionId, - status: "error", - message: "No progress from selenium agent", - }); - delete ddmaJobs[sessionId]; - return; - } - - // always emit debug to client if socket exists - emitSafe(socketId, "selenium:debug", { - session_id: sessionId, - attempt, - status, - serverTime: new Date().toISOString(), - }); - - // If agent is waiting for OTP, inform client but keep polling (do not return) - if (status === "waiting_for_otp") { - emitSafe(socketId, "selenium:otp_required", { - session_id: sessionId, - message: "OTP required. Please enter the OTP.", - }); - // do not return — keep polling (allows same poller to pick up completion) - await new Promise((r) => setTimeout(r, baseDelayMs)); - continue; - } - - // Completed path - if (status === "completed") { - log("poller", "agent completed; processing result", { - sessionId, - resultKeys: st.result ? Object.keys(st.result) : null, - }); - - // Persist raw result so frontend can fetch if socket disconnects - currentFinalSessionId = sessionId; - currentFinalResult = { - rawSelenium: st.result, - processedAt: null, - final: null, - }; - - let finalResult: any = null; - if (job && st.result) { - try { - finalResult = await handleDdmaCompletedJob( - sessionId, - job, - st.result - ); - currentFinalResult.final = finalResult; - currentFinalResult.processedAt = Date.now(); - } catch (err: any) { - currentFinalResult.final = { - error: "processing_failed", - detail: err?.message ?? String(err), - }; - currentFinalResult.processedAt = Date.now(); - log("poller", "handleDdmaCompletedJob failed", { - sessionId, - err: err?.message ?? err, - }); - } - } else { - currentFinalResult[sessionId].final = { - error: "no_job_or_no_result", - }; - currentFinalResult[sessionId].processedAt = Date.now(); - } - - // Emit final update (if socket present) - emitSafe(socketId, "selenium:session_update", { - session_id: sessionId, - status: "completed", - rawSelenium: st.result, - final: currentFinalResult.final, - }); - - // cleanup job context - delete ddmaJobs[sessionId]; - return; - } - - // Terminal error / not_found - if (status === "error" || status === "not_found") { - const emitPayload = { - session_id: sessionId, - status, - message: st?.message || "Selenium session error", - }; - emitSafe(socketId, "selenium:session_update", emitPayload); - emitSafe(socketId, "selenium:session_error", emitPayload); - delete ddmaJobs[sessionId]; - return; - } - } catch (err: any) { - const axiosStatus = - err?.response?.status ?? (err?.status ? Number(err.status) : undefined); - const errCode = err?.code ?? err?.errno; - const errMsg = err?.message ?? String(err); - const errData = err?.response?.data ?? null; - - // If agent explicitly returned 404 -> terminal (session gone) - if ( - axiosStatus === 404 || - (typeof errMsg === "string" && errMsg.includes("not_found")) - ) { - console.warn( - `${new Date().toISOString()} [poller] terminal 404/not_found for ${sessionId}: data=${JSON.stringify(errData)}` - ); - - // Emit not_found to client - const emitPayload = { - session_id: sessionId, - status: "not_found", - message: - errData?.detail || "Selenium session not found (agent cleaned up).", - }; - emitSafe(socketId, "selenium:session_update", emitPayload); - emitSafe(socketId, "selenium:session_error", emitPayload); - - // Remove job context and stop polling - delete ddmaJobs[sessionId]; - return; - } - - // Detailed transient error logging - transientErrorCount++; - if (transientErrorCount > maxTransientErrors) { - const emitPayload = { - session_id: sessionId, - status: "error", - message: - "Repeated network errors while polling selenium agent; giving up.", - }; - emitSafe(socketId, "selenium:session_update", emitPayload); - emitSafe(socketId, "selenium:session_error", emitPayload); - delete ddmaJobs[sessionId]; - return; - } - - const backoffMs = Math.min( - 30_000, - baseDelayMs * Math.pow(2, transientErrorCount - 1) - ); - console.warn( - `${new Date().toISOString()} [poller] transient error (#${transientErrorCount}) for ${sessionId}: code=${errCode} status=${axiosStatus} msg=${errMsg} data=${JSON.stringify(errData)}` - ); - console.warn( - `${new Date().toISOString()} [poller] backing off ${backoffMs}ms before next attempt` - ); - - await new Promise((r) => setTimeout(r, backoffMs)); - continue; - } - - // normal poll interval - await new Promise((r) => setTimeout(r, baseDelayMs)); - } - - // overall timeout fallback - emitSafe(socketId, "selenium:session_update", { - session_id: sessionId, - status: "error", - message: "Polling timeout while waiting for selenium session", - }); - delete ddmaJobs[sessionId]; -} - /** * POST /ddma-eligibility - * Starts DDMA eligibility Selenium job. - * Expects: - * - req.body.data: stringified JSON like your existing /eligibility-check - * - req.body.socketId: socket.io client id + * + * Enqueues a DDMA eligibility check in the shared InProcessQueue + * (concurrency=1, mirrors the Python semaphore). + * + * Body: + * data — patient + search fields (memberId, dateOfBirth, …) + * socketId — socket.io client id for real-time updates + * + * Response: { status: "queued", jobId: "…" } + * + * Real-time events emitted to socketId during job execution: + * job:update { jobId, jobType, status: "active"|"completed"|"failed", … } + * selenium:ddma_session_started { session_id, jobId } + * selenium:otp_required { session_id, jobId, message } */ router.post( "/ddma-eligibility", @@ -575,8 +45,7 @@ router.post( .status(400) .json({ error: "Missing Insurance Eligibility data for selenium" }); } - - if (!req.user || !req.user.id) { + if (!req.user?.id) { return res.status(401).json({ error: "Unauthorized: user info missing" }); } @@ -586,6 +55,7 @@ router.post( ? JSON.parse(req.body.data) : req.body.data; + // Fetch credentials from DB const credentials = await storage.getInsuranceCredentialByUserAndSiteKey( req.user.id, rawData.insuranceSiteKey @@ -593,7 +63,7 @@ router.post( if (!credentials) { return res.status(404).json({ error: - "No insurance credentials found for this provider, Kindly Update this at Settings Page.", + "No insurance credentials found for this provider. Please update them at the Settings page.", }); } @@ -605,40 +75,25 @@ router.post( const socketId: string | undefined = req.body.socketId; - const agentResp = - await forwardToSeleniumDdmaEligibilityAgent(enrichedData); - - if ( - !agentResp || - agentResp.status !== "started" || - !agentResp.session_id - ) { - return res.status(502).json({ - error: "Selenium agent did not return a started session", - detail: agentResp, - }); - } - - const sessionId = agentResp.session_id as string; - - // Save job context - ddmaJobs[sessionId] = { + // Enqueue — this enforces the same concurrency=1 as all other selenium jobs + const jobId = enqueueSeleniumJob({ + jobType: "ddma-eligibility-check", userId: req.user.id, - insuranceEligibilityData: enrichedData, socketId, - }; + enrichedPayload: enrichedData, + insuranceId: String(rawData.memberId ?? "").trim(), + formFirstName: rawData.firstName, + formLastName: rawData.lastName, + formDob: rawData.dateOfBirth, + }); - // start polling in background to notify client via socket and process job - pollAgentSessionAndProcess(sessionId, socketId).catch((e) => - console.warn("pollAgentSessionAndProcess failed", e) - ); + log("ddma-route", "job enqueued", { jobId, insuranceId: rawData.memberId }); - // reply immediately with started status - return res.json({ status: "started", session_id: sessionId }); + return res.json({ status: "queued", jobId }); } catch (err: any) { - console.error(err); + console.error("[ddma-route] enqueue failed:", err); return res.status(500).json({ - error: err.message || "Failed to start ddma selenium agent", + error: err.message || "Failed to enqueue DDMA selenium job", }); } } @@ -646,8 +101,13 @@ router.post( /** * POST /selenium/submit-otp + * + * Forwards the OTP entered by the user directly to the Python agent. + * This is a side-channel — it does NOT go through the queue. + * The polling loop inside ddmaEligibilityProcessor picks up the completed + * state after OTP is submitted. + * * Body: { session_id, otp, socketId? } - * Forwards OTP to Python agent and optionally notifies client socket. */ router.post( "/selenium/submit-otp", @@ -660,7 +120,6 @@ router.post( try { const r = await forwardOtpToSeleniumDdmaAgent(sessionId, otp); - // emit OTP accepted (if socket present) emitSafe(socketId, "selenium:otp_submitted", { session_id: sessionId, result: r, @@ -669,31 +128,15 @@ router.post( return res.json(r); } catch (err: any) { console.error( - "Failed to forward OTP:", + "[ddma-route] submit-otp failed:", err?.response?.data || err?.message || err ); return res.status(500).json({ - error: "Failed to forward otp to selenium agent", + error: "Failed to forward OTP to selenium agent", detail: err?.message || err, }); } } ); -// GET /selenium/session/:sid/final -router.get( - "/selenium/session/:sid/final", - async (req: Request, res: Response) => { - const sid = req.params.sid; - if (!sid) return res.status(400).json({ error: "session id required" }); - - // Only the current in-memory result is available - if (currentFinalSessionId !== sid || !currentFinalResult) { - return res.status(404).json({ error: "final result not found" }); - } - - return res.json(currentFinalResult); - } -); - export default router; diff --git a/apps/Frontend/src/components/insurance-status/ddma-buton-modal.tsx b/apps/Frontend/src/components/insurance-status/ddma-buton-modal.tsx index 9d39337..e986ebf 100755 --- a/apps/Frontend/src/components/insurance-status/ddma-buton-modal.tsx +++ b/apps/Frontend/src/components/insurance-status/ddma-buton-modal.tsx @@ -1,5 +1,4 @@ import { useEffect, useRef, useState } from "react"; -import { io as ioClient, Socket } from "socket.io-client"; import { Button } from "@/components/ui/button"; import { Input } from "@/components/ui/input"; import { Label } from "@/components/ui/label"; @@ -9,13 +8,11 @@ import { apiRequest, queryClient } from "@/lib/queryClient"; import { useAppDispatch } from "@/redux/hooks"; import { setTaskStatus } from "@/redux/slices/seleniumTaskSlice"; import { formatLocalDate } from "@/utils/dateUtils"; +import { socket } from "@/lib/socket"; import { QK_PATIENTS_BASE } from "@/components/patients/patient-table"; -const SOCKET_URL = - import.meta.env.VITE_API_BASE_URL_BACKEND || - (typeof window !== "undefined" ? window.location.origin : ""); +// ─── OTP Modal ──────────────────────────────────────────────────────────────── -// ---------- OTP Modal component ---------- interface DdmaOtpModalProps { open: boolean; onClose: () => void; @@ -23,12 +20,7 @@ interface DdmaOtpModalProps { isSubmitting: boolean; } -function DdmaOtpModal({ - open, - onClose, - onSubmit, - isSubmitting, -}: DdmaOtpModalProps) { +function DdmaOtpModal({ open, onClose, onSubmit, isSubmitting }: DdmaOtpModalProps) { const [otp, setOtp] = useState(""); useEffect(() => { @@ -48,17 +40,13 @@ function DdmaOtpModal({

Enter OTP

-

- We need the one-time password (OTP) sent by the Delta Dental MA portal - to complete this eligibility check. + We need the one-time password (OTP) sent by the Delta Dental MA portal to complete this + eligibility check.

@@ -72,12 +60,7 @@ function DdmaOtpModal({ />
-