feat: integrate DeltaIns, Tufts SCO, United SCO, and CCA eligibility checks

This commit is contained in:
ff
2026-04-16 15:02:50 -04:00
parent 289ea426d3
commit 7fa7f405e2
931 changed files with 307681 additions and 45 deletions

View File

@@ -12,6 +12,9 @@ import { runClaimStatusProcessor } from "./processors/claimStatusProcessor";
import { runClaimSubmitProcessor } from "./processors/claimSubmitProcessor";
import { runOcrProcessor } from "./processors/ocrProcessor";
import { runDdmaEligibilityProcessor } from "./processors/ddmaEligibilityProcessor";
import { runDeltaInsEligibilityProcessor } from "./processors/deltaInsEligibilityProcessor";
import { runUnitedSCOEligibilityProcessor } from "./processors/unitedSCOEligibilityProcessor";
import { runCCAEligibilityProcessor } from "./processors/ccaEligibilityProcessor";
import type { SeleniumJobData, OcrJobData } from "./queues";
// ── Queue instances ──────────────────────────────────────────────────────────
@@ -83,6 +86,48 @@ export function enqueueSeleniumJob(data: SeleniumJobData): string {
job.id
);
}
if (jobType === "deltains-eligibility-check") {
return runDeltaInsEligibilityProcessor(
{
enrichedPayload: data.enrichedPayload,
userId: data.userId,
insuranceId: data.insuranceId!,
formFirstName: data.formFirstName,
formLastName: data.formLastName,
formDob: data.formDob,
socketId: data.socketId,
},
job.id
);
}
if (jobType === "unitedsco-eligibility-check") {
return runUnitedSCOEligibilityProcessor(
{
enrichedPayload: data.enrichedPayload,
userId: data.userId,
insuranceId: data.insuranceId!,
formFirstName: data.formFirstName,
formLastName: data.formLastName,
formDob: data.formDob,
socketId: data.socketId,
},
job.id
);
}
if (jobType === "cca-eligibility-check") {
return runCCAEligibilityProcessor(
{
enrichedPayload: data.enrichedPayload,
userId: data.userId,
insuranceId: data.insuranceId!,
formFirstName: data.formFirstName,
formLastName: data.formLastName,
formDob: data.formDob,
socketId: data.socketId,
},
job.id
);
}
throw new Error(`Unknown selenium jobType: ${jobType}`);
});

View File

@@ -0,0 +1,309 @@
/**
* Processor for "cca-eligibility-check" jobs.
*
* CCA (Commonwealth Care Alliance) uses ScionDental portal.
* No OTP required — simple username/password persistent session.
*
* Flow:
* 1. Start a session on the Python agent (POST /cca-eligibility)
* 2. Emit selenium:cca_session_started → frontend stores session_id
* 3. Poll agent status until completed/error (no OTP handling needed)
* 4. On completion: decode pdfBase64, save PDF, create/update patient, update status
* 5. Return { pdfFileId, pdfFilename, patientUpdateStatus, pdfUploadStatus }
*
* CCA result returns pdfBase64 (base64-encoded PDF), same as DeltaIns.
*/
import { storage } from "../../storage";
import {
forwardToSeleniumCCAEligibilityAgent,
getSeleniumCCASessionStatus,
} from "../../services/seleniumCCAEligibilityClient";
import { splitName, createOrUpdatePatientByInsuranceId } from "./_shared";
import { io } from "../../socket";
// ─── Helpers ────────────────────────────────────────────────────────────────
function now() {
return new Date().toISOString();
}
function log(tag: string, msg: string, ctx?: any) {
console.log(`${now()} [${tag}] ${msg}`, ctx ?? "");
}
function emitToSocket(socketId: string | undefined, event: string, payload: any) {
if (!socketId || !io) return;
try {
const socket = io.sockets.sockets.get(socketId);
if (socket) {
socket.emit(event, payload);
log("cca-processor", `emitted ${event}`, { socketId });
}
} catch (err: any) {
log("cca-processor", `emit failed for ${event}`, { err: err?.message });
}
}
// ─── Types ───────────────────────────────────────────────────────────────────
export interface CCAEligibilityProcessorInput {
enrichedPayload: any;
userId: number;
insuranceId: string;
formFirstName?: string;
formLastName?: string;
formDob?: string;
socketId?: string;
}
export interface CCAEligibilityProcessorResult {
patientUpdateStatus?: string;
pdfUploadStatus?: string;
pdfFileId?: number | null;
pdfFilename?: string | null;
}
// ─── Core DB processing ───────────────────────────────────────────────────────
async function processCCAResult(
userId: number,
insuranceId: string,
formFirstName: string | undefined,
formLastName: string | undefined,
formDob: string | undefined,
seleniumResult: any
): Promise<CCAEligibilityProcessorResult> {
const output: CCAEligibilityProcessorResult = {};
let createdPdfFileId: number | null = null;
try {
// 1) Resolve patient name
const rawName =
typeof seleniumResult?.patientName === "string"
? seleniumResult.patientName.trim()
: null;
const { firstName, lastName } = rawName
? splitName(rawName)
: { firstName: formFirstName ?? "", lastName: formLastName ?? "" };
// 2) Create / update patient
await createOrUpdatePatientByInsuranceId({
insuranceId,
firstName,
lastName,
dob: formDob,
userId,
});
// 3) Fetch patient
const patient = await storage.getPatientByInsuranceId(insuranceId);
if (!patient?.id) {
output.patientUpdateStatus = "Patient not found; no update performed";
return output;
}
// 4) Determine eligibility status
// Python returns "Eligible" / "Not Eligible" / "Unknown"
const eligRaw: string = seleniumResult?.eligibility ?? "";
const eligLower = eligRaw.toLowerCase();
const newStatus =
eligLower === "eligible" || eligLower === "active" || eligLower === "y"
? "ACTIVE"
: "INACTIVE";
// Use insurerName from result if available, fall back to default
const insuranceProvider =
typeof seleniumResult?.insurerName === "string" && seleniumResult.insurerName.trim()
? seleniumResult.insurerName.trim()
: "Commonwealth Care Alliance";
await storage.updatePatient(patient.id, {
status: newStatus,
insuranceProvider,
});
output.patientUpdateStatus = `Patient status updated to ${newStatus}`;
// 5) Decode pdfBase64 → Buffer
const pdfBase64: string = seleniumResult?.pdfBase64 ?? "";
let pdfBuffer: Buffer | null = null;
let pdfFilename: string | null = null;
if (pdfBase64) {
try {
pdfBuffer = Buffer.from(pdfBase64, "base64");
pdfFilename = `cca_eligibility_${insuranceId}_${Date.now()}.pdf`;
log("cca-processor", "decoded pdfBase64", { bytes: pdfBuffer.length });
} catch (e: any) {
output.pdfUploadStatus = `Failed to decode PDF base64: ${e.message}`;
}
} else {
output.pdfUploadStatus = "No PDF data returned from Selenium.";
}
// 6) Save PDF to patient document group
if (pdfBuffer && pdfFilename) {
const groupTitleKey = "ELIGIBILITY_STATUS";
const groupTitle = "Eligibility Status";
let group = await storage.findPdfGroupByPatientTitleKey(patient.id, groupTitleKey);
if (!group) group = await storage.createPdfGroup(patient.id, groupTitle, groupTitleKey);
if (!group?.id) throw new Error("PDF group creation failed");
const created = await storage.createPdfFile(group.id, pdfFilename, pdfBuffer);
if (created && typeof created === "object" && "id" in created) {
createdPdfFileId = Number(created.id);
}
output.pdfUploadStatus = `PDF saved to group: ${group.title}`;
output.pdfFilename = pdfFilename;
}
output.pdfFileId = createdPdfFileId;
return output;
} catch (err: any) {
return {
...output,
pdfUploadStatus:
output.pdfUploadStatus ?? `Processing failed: ${err?.message ?? String(err)}`,
pdfFileId: createdPdfFileId,
};
}
}
// ─── Polling loop ────────────────────────────────────────────────────────────
async function pollUntilDone(
sessionId: string,
pollTimeoutMs = 5 * 60 * 1000
): Promise<any> {
const maxAttempts = 600;
const pollIntervalMs = 500;
const maxTransientErrors = 12;
const noProgressLimit = 120;
let transientErrors = 0;
let consecutiveNoProgress = 0;
let lastStatus: string | null = null;
const deadline = Date.now() + pollTimeoutMs;
for (let attempt = 0; attempt < maxAttempts; attempt++) {
if (Date.now() > deadline) {
throw new Error(
`CCA polling timeout (${Math.round(pollTimeoutMs / 1000)}s) for session ${sessionId}`
);
}
try {
const st = await getSeleniumCCASessionStatus(sessionId);
const status: string = st?.status ?? "unknown";
log("cca-processor", `poll attempt=${attempt}`, { sessionId, status });
transientErrors = 0;
const isTerminal =
status === "completed" || status === "error" || status === "not_found";
if (status === lastStatus && !isTerminal) {
consecutiveNoProgress++;
} else {
consecutiveNoProgress = 0;
}
lastStatus = status;
if (consecutiveNoProgress >= noProgressLimit) {
throw new Error(
`No progress from Python agent (status="${status}") after ${consecutiveNoProgress} polls`
);
}
if (status === "completed") {
log("cca-processor", "session completed", { sessionId });
return st.result;
}
if (status === "error" || status === "not_found") {
throw new Error(st?.message || `CCA session ended with status: ${status}`);
}
await new Promise((r) => setTimeout(r, pollIntervalMs));
} catch (err: any) {
const isTerminal =
err?.response?.status === 404 ||
(typeof err?.message === "string" &&
(err.message.includes("not_found") ||
err.message.includes("polling timeout") ||
err.message.includes("No progress")));
if (isTerminal) throw err;
transientErrors++;
if (transientErrors > maxTransientErrors) {
throw new Error(
`Too many transient network errors polling CCA session ${sessionId}`
);
}
const backoff = Math.min(30_000, 500 * Math.pow(2, transientErrors - 1));
log("cca-processor", `transient error #${transientErrors}, backoff ${backoff}ms`, {
err: err?.message,
});
await new Promise((r) => setTimeout(r, backoff));
}
}
throw new Error(`CCA polling exhausted all attempts for session ${sessionId}`);
}
// ─── Main processor entry point ───────────────────────────────────────────────
export async function runCCAEligibilityProcessor(
input: CCAEligibilityProcessorInput,
jobId: string
): Promise<CCAEligibilityProcessorResult> {
const {
enrichedPayload,
userId,
insuranceId,
formFirstName,
formLastName,
formDob,
socketId,
} = input;
// 1) Tell Python agent to start a CCA session
log("cca-processor", "starting Python agent session", { insuranceId });
const agentResp = await forwardToSeleniumCCAEligibilityAgent(enrichedPayload);
if (!agentResp?.session_id) {
throw new Error("Python agent did not return a session_id for CCA eligibility");
}
const sessionId = agentResp.session_id as string;
log("cca-processor", "got session_id", { sessionId });
// 2) Emit session started so frontend can track progress
emitToSocket(socketId, "selenium:cca_session_started", {
session_id: sessionId,
jobId,
});
// 3) Poll until done (no OTP required for CCA)
const seleniumResult = await pollUntilDone(sessionId);
if (!seleniumResult || seleniumResult.status === "error") {
throw new Error(seleniumResult?.message ?? "CCA session returned an error result");
}
// 4) Process DB writes and PDF upload
log("cca-processor", "processing DB result", { insuranceId });
const result = await processCCAResult(
userId,
insuranceId,
formFirstName,
formLastName,
formDob,
seleniumResult
);
log("cca-processor", "done", { result });
return result;
}

View File

@@ -0,0 +1,310 @@
/**
* Processor for "deltains-eligibility-check" jobs.
*
* Mirrors the DDMA persistent-session flow but for Delta Dental Ins (Okta-based):
* 1. Start a session on the Python agent (POST /deltains-eligibility)
* 2. Emit selenium:deltains_session_started → frontend stores session_id for OTP
* 3. Poll agent status, emitting selenium:otp_required when OTP is needed
* 4. On completion: decode pdfBase64, save PDF, create/update patient, update status
* 5. Return { pdfFileId, pdfFilename, patientUpdateStatus, pdfUploadStatus }
*
* DeltaIns result returns pdfBase64 (base64-encoded PDF) instead of a file path.
*/
import { storage } from "../../storage";
import {
forwardToSeleniumDeltaInsEligibilityAgent,
getSeleniumDeltaInsSessionStatus,
} from "../../services/seleniumDeltaInsEligibilityClient";
import { splitName, createOrUpdatePatientByInsuranceId } from "./_shared";
import { io } from "../../socket";
// ─── Helpers ────────────────────────────────────────────────────────────────
function now() {
return new Date().toISOString();
}
function log(tag: string, msg: string, ctx?: any) {
console.log(`${now()} [${tag}] ${msg}`, ctx ?? "");
}
function emitToSocket(socketId: string | undefined, event: string, payload: any) {
if (!socketId || !io) return;
try {
const socket = io.sockets.sockets.get(socketId);
if (socket) {
socket.emit(event, payload);
log("deltains-processor", `emitted ${event}`, { socketId });
}
} catch (err: any) {
log("deltains-processor", `emit failed for ${event}`, { err: err?.message });
}
}
// ─── Types ───────────────────────────────────────────────────────────────────
export interface DeltaInsEligibilityProcessorInput {
enrichedPayload: any;
userId: number;
insuranceId: string;
formFirstName?: string;
formLastName?: string;
formDob?: string;
socketId?: string;
}
export interface DeltaInsEligibilityProcessorResult {
patientUpdateStatus?: string;
pdfUploadStatus?: string;
pdfFileId?: number | null;
pdfFilename?: string | null;
}
// ─── Core DB processing ───────────────────────────────────────────────────────
async function processDeltaInsResult(
userId: number,
insuranceId: string,
formFirstName: string | undefined,
formLastName: string | undefined,
formDob: string | undefined,
seleniumResult: any
): Promise<DeltaInsEligibilityProcessorResult> {
const output: DeltaInsEligibilityProcessorResult = {};
let createdPdfFileId: number | null = null;
try {
// 1) Resolve patient name
const rawName =
typeof seleniumResult?.patientName === "string"
? seleniumResult.patientName.trim()
: null;
const { firstName, lastName } = rawName
? splitName(rawName)
: { firstName: formFirstName ?? "", lastName: formLastName ?? "" };
// 2) Create / update patient
await createOrUpdatePatientByInsuranceId({
insuranceId,
firstName,
lastName,
dob: formDob,
userId,
});
// 3) Fetch patient
const patient = await storage.getPatientByInsuranceId(insuranceId);
if (!patient?.id) {
output.patientUpdateStatus = "Patient not found; no update performed";
return output;
}
// 4) Determine eligibility status
const eligStatus = (seleniumResult?.eligibility ?? "").toLowerCase();
const newStatus =
eligStatus === "eligible" || eligStatus === "active" || eligStatus === "y"
? "ACTIVE"
: "INACTIVE";
await storage.updatePatient(patient.id, {
status: newStatus,
insuranceProvider: "Delta Dental Ins",
});
output.patientUpdateStatus = `Patient status updated to ${newStatus}`;
// 5) Decode pdfBase64 → Buffer
const pdfBase64: string = seleniumResult?.pdfBase64 ?? "";
let pdfBuffer: Buffer | null = null;
let pdfFilename: string | null = null;
if (pdfBase64) {
try {
pdfBuffer = Buffer.from(pdfBase64, "base64");
pdfFilename = `deltains_eligibility_${insuranceId}_${Date.now()}.pdf`;
log("deltains-processor", "decoded pdfBase64", { bytes: pdfBuffer.length });
} catch (e: any) {
output.pdfUploadStatus = `Failed to decode PDF base64: ${e.message}`;
}
} else {
output.pdfUploadStatus = "No PDF data returned from Selenium.";
}
// 6) Save PDF to patient document group
if (pdfBuffer && pdfFilename) {
const groupTitleKey = "ELIGIBILITY_STATUS";
const groupTitle = "Eligibility Status";
let group = await storage.findPdfGroupByPatientTitleKey(patient.id, groupTitleKey);
if (!group) group = await storage.createPdfGroup(patient.id, groupTitle, groupTitleKey);
if (!group?.id) throw new Error("PDF group creation failed");
const created = await storage.createPdfFile(group.id, pdfFilename, pdfBuffer);
if (created && typeof created === "object" && "id" in created) {
createdPdfFileId = Number(created.id);
}
output.pdfUploadStatus = `PDF saved to group: ${group.title}`;
output.pdfFilename = pdfFilename;
}
output.pdfFileId = createdPdfFileId;
return output;
} catch (err: any) {
return {
...output,
pdfUploadStatus:
output.pdfUploadStatus ?? `Processing failed: ${err?.message ?? String(err)}`,
pdfFileId: createdPdfFileId,
};
}
}
// ─── Polling loop ────────────────────────────────────────────────────────────
async function pollUntilDone(
sessionId: string,
socketId: string | undefined,
jobId: string,
pollTimeoutMs = 5 * 60 * 1000
): Promise<any> {
const maxAttempts = 600;
const pollIntervalMs = 500;
const maxTransientErrors = 12;
const noProgressLimit = 120;
let transientErrors = 0;
let consecutiveNoProgress = 0;
let lastStatus: string | null = null;
const deadline = Date.now() + pollTimeoutMs;
for (let attempt = 0; attempt < maxAttempts; attempt++) {
if (Date.now() > deadline) {
throw new Error(
`DeltaIns polling timeout (${Math.round(pollTimeoutMs / 1000)}s) for session ${sessionId}`
);
}
try {
const st = await getSeleniumDeltaInsSessionStatus(sessionId);
const status: string = st?.status ?? "unknown";
log("deltains-processor", `poll attempt=${attempt}`, { sessionId, status });
transientErrors = 0;
const isTerminal =
status === "completed" || status === "error" || status === "not_found";
if (status === lastStatus && !isTerminal) {
consecutiveNoProgress++;
} else {
consecutiveNoProgress = 0;
}
lastStatus = status;
if (consecutiveNoProgress >= noProgressLimit) {
throw new Error(
`No progress from Python agent (status="${status}") after ${consecutiveNoProgress} polls`
);
}
if (status === "waiting_for_otp") {
emitToSocket(socketId, "selenium:otp_required", {
session_id: sessionId,
jobId,
message: "OTP required. Please enter the code sent to your email.",
});
await new Promise((r) => setTimeout(r, pollIntervalMs));
continue;
}
if (status === "completed") {
log("deltains-processor", "session completed", { sessionId });
return st.result;
}
if (status === "error" || status === "not_found") {
throw new Error(st?.message || `DeltaIns session ended with status: ${status}`);
}
await new Promise((r) => setTimeout(r, pollIntervalMs));
} catch (err: any) {
const isTerminal =
err?.response?.status === 404 ||
(typeof err?.message === "string" &&
(err.message.includes("not_found") ||
err.message.includes("polling timeout") ||
err.message.includes("No progress")));
if (isTerminal) throw err;
transientErrors++;
if (transientErrors > maxTransientErrors) {
throw new Error(
`Too many transient network errors polling DeltaIns session ${sessionId}`
);
}
const backoff = Math.min(30_000, 500 * Math.pow(2, transientErrors - 1));
log("deltains-processor", `transient error #${transientErrors}, backoff ${backoff}ms`, {
err: err?.message,
});
await new Promise((r) => setTimeout(r, backoff));
}
}
throw new Error(`DeltaIns polling exhausted all attempts for session ${sessionId}`);
}
// ─── Main processor entry point ───────────────────────────────────────────────
export async function runDeltaInsEligibilityProcessor(
input: DeltaInsEligibilityProcessorInput,
jobId: string
): Promise<DeltaInsEligibilityProcessorResult> {
const {
enrichedPayload,
userId,
insuranceId,
formFirstName,
formLastName,
formDob,
socketId,
} = input;
// 1) Tell Python agent to start a DeltaIns session
log("deltains-processor", "starting Python agent session", { insuranceId });
const agentResp = await forwardToSeleniumDeltaInsEligibilityAgent(enrichedPayload);
if (!agentResp?.session_id) {
throw new Error("Python agent did not return a session_id for DeltaIns eligibility");
}
const sessionId = agentResp.session_id as string;
log("deltains-processor", "got session_id", { sessionId });
// 2) Emit session started so frontend can store session_id for OTP submission
emitToSocket(socketId, "selenium:deltains_session_started", {
session_id: sessionId,
jobId,
});
// 3) Poll until done (handles OTP events internally)
const seleniumResult = await pollUntilDone(sessionId, socketId, jobId);
if (!seleniumResult || seleniumResult.status === "error") {
throw new Error(seleniumResult?.message ?? "DeltaIns session returned an error result");
}
// 4) Process DB writes and PDF upload
log("deltains-processor", "processing DB result", { insuranceId });
const result = await processDeltaInsResult(
userId,
insuranceId,
formFirstName,
formLastName,
formDob,
seleniumResult
);
log("deltains-processor", "done", { result });
return result;
}

View File

@@ -0,0 +1,330 @@
/**
* Processor for "unitedsco-eligibility-check" jobs (Tufts SCO / UnitedHealthcare MA).
*
* Same persistent-session flow as DDMA:
* 1. Start a session on the Python agent (POST /unitedsco-eligibility)
* 2. Emit selenium:unitedsco_session_started → frontend stores session_id for OTP
* 3. Poll agent status, emitting selenium:otp_required when OTP is needed
* 4. On completion: save PDF (file path), create/update patient, update status
* 5. Return { pdfFileId, pdfFilename, patientUpdateStatus, pdfUploadStatus }
*/
import fs from "fs/promises";
import fsSync from "fs";
import path from "path";
import { storage } from "../../storage";
import { emptyFolderContainingFile } from "../../utils/emptyTempFolder";
import {
forwardToSeleniumUnitedSCOEligibilityAgent,
getSeleniumUnitedSCOSessionStatus,
} from "../../services/seleniumUnitedSCOEligibilityClient";
import { splitName, createOrUpdatePatientByInsuranceId, imageToPdfBuffer } from "./_shared";
import { io } from "../../socket";
// ─── Helpers ────────────────────────────────────────────────────────────────
function now() {
return new Date().toISOString();
}
function log(tag: string, msg: string, ctx?: any) {
console.log(`${now()} [${tag}] ${msg}`, ctx ?? "");
}
function emitToSocket(socketId: string | undefined, event: string, payload: any) {
if (!socketId || !io) return;
try {
const socket = io.sockets.sockets.get(socketId);
if (socket) {
socket.emit(event, payload);
log("unitedsco-processor", `emitted ${event}`, { socketId });
}
} catch (err: any) {
log("unitedsco-processor", `emit failed for ${event}`, { err: err?.message });
}
}
// ─── Types ───────────────────────────────────────────────────────────────────
export interface UnitedSCOEligibilityProcessorInput {
enrichedPayload: any;
userId: number;
insuranceId: string;
formFirstName?: string;
formLastName?: string;
formDob?: string;
socketId?: string;
}
export interface UnitedSCOEligibilityProcessorResult {
patientUpdateStatus?: string;
pdfUploadStatus?: string;
pdfFileId?: number | null;
pdfFilename?: string | null;
}
// ─── Core DB processing ───────────────────────────────────────────────────────
async function processUnitedSCOResult(
userId: number,
insuranceId: string,
formFirstName: string | undefined,
formLastName: string | undefined,
formDob: string | undefined,
seleniumResult: any
): Promise<UnitedSCOEligibilityProcessorResult> {
const output: UnitedSCOEligibilityProcessorResult = {};
let createdPdfFileId: number | null = null;
try {
// 1) Resolve patient name
const rawName =
typeof seleniumResult?.patientName === "string"
? seleniumResult.patientName.trim()
: null;
const { firstName, lastName } = rawName
? splitName(rawName)
: { firstName: formFirstName ?? "", lastName: formLastName ?? "" };
// 2) Create / update patient
await createOrUpdatePatientByInsuranceId({
insuranceId,
firstName,
lastName,
dob: formDob,
userId,
});
// 3) Fetch patient
const patient = await storage.getPatientByInsuranceId(insuranceId);
if (!patient?.id) {
output.patientUpdateStatus = "Patient not found; no update performed";
return output;
}
// 4) Determine eligibility status
const eligStatus = (seleniumResult?.eligibility ?? "").toLowerCase();
const newStatus = eligStatus === "active" || eligStatus === "y" ? "ACTIVE" : "INACTIVE";
await storage.updatePatient(patient.id, {
status: newStatus,
insuranceProvider: "United Healthcare SCO",
});
output.patientUpdateStatus = `Patient status updated to ${newStatus}`;
// 5) Resolve PDF buffer from file path (same as DDMA)
let pdfBuffer: Buffer | null = null;
let pdfFilename: string | null = null;
const pdfPath: string | null =
seleniumResult?.pdf_path ?? seleniumResult?.ss_path ?? null;
if (pdfPath && fsSync.existsSync(pdfPath)) {
if (pdfPath.endsWith(".pdf")) {
try {
pdfBuffer = await fs.readFile(pdfPath);
pdfFilename = path.basename(pdfPath);
log("unitedsco-processor", "read PDF directly", { pdfPath });
} catch (e: any) {
output.pdfUploadStatus = `Failed to read PDF: ${e.message}`;
}
} else if (
pdfPath.endsWith(".png") ||
pdfPath.endsWith(".jpg") ||
pdfPath.endsWith(".jpeg")
) {
try {
pdfBuffer = await imageToPdfBuffer(pdfPath);
pdfFilename = `unitedsco_eligibility_${insuranceId}_${Date.now()}.pdf`;
log("unitedsco-processor", "converted screenshot to PDF", { pdfPath });
} catch (e: any) {
output.pdfUploadStatus = `Failed to convert screenshot to PDF: ${e.message}`;
}
}
} else {
output.pdfUploadStatus = "No valid file path from Selenium; nothing uploaded.";
}
// 6) Save PDF to patient document group
if (pdfBuffer && pdfFilename) {
const groupTitleKey = "ELIGIBILITY_STATUS";
const groupTitle = "Eligibility Status";
let group = await storage.findPdfGroupByPatientTitleKey(patient.id, groupTitleKey);
if (!group) group = await storage.createPdfGroup(patient.id, groupTitle, groupTitleKey);
if (!group?.id) throw new Error("PDF group creation failed");
const created = await storage.createPdfFile(group.id, pdfFilename, pdfBuffer);
if (created && typeof created === "object" && "id" in created) {
createdPdfFileId = Number(created.id);
}
output.pdfUploadStatus = `PDF saved to group: ${group.title}`;
output.pdfFilename = pdfFilename;
}
output.pdfFileId = createdPdfFileId;
return output;
} catch (err: any) {
return {
...output,
pdfUploadStatus:
output.pdfUploadStatus ?? `Processing failed: ${err?.message ?? String(err)}`,
pdfFileId: createdPdfFileId,
};
} finally {
const cleanupPath = seleniumResult?.pdf_path ?? seleniumResult?.ss_path ?? null;
if (cleanupPath) {
try {
await emptyFolderContainingFile(cleanupPath);
} catch (e) {
log("unitedsco-processor", "cleanup failed", { cleanupPath });
}
}
}
}
// ─── Polling loop ────────────────────────────────────────────────────────────
async function pollUntilDone(
sessionId: string,
socketId: string | undefined,
jobId: string,
pollTimeoutMs = 5 * 60 * 1000
): Promise<any> {
const maxAttempts = 600;
const pollIntervalMs = 500;
const maxTransientErrors = 12;
const noProgressLimit = 120;
let transientErrors = 0;
let consecutiveNoProgress = 0;
let lastStatus: string | null = null;
const deadline = Date.now() + pollTimeoutMs;
for (let attempt = 0; attempt < maxAttempts; attempt++) {
if (Date.now() > deadline) {
throw new Error(
`UnitedSCO polling timeout (${Math.round(pollTimeoutMs / 1000)}s) for session ${sessionId}`
);
}
try {
const st = await getSeleniumUnitedSCOSessionStatus(sessionId);
const status: string = st?.status ?? "unknown";
log("unitedsco-processor", `poll attempt=${attempt}`, { sessionId, status });
transientErrors = 0;
const isTerminal =
status === "completed" || status === "error" || status === "not_found";
if (status === lastStatus && !isTerminal) {
consecutiveNoProgress++;
} else {
consecutiveNoProgress = 0;
}
lastStatus = status;
if (consecutiveNoProgress >= noProgressLimit) {
throw new Error(
`No progress from Python agent (status="${status}") after ${consecutiveNoProgress} polls`
);
}
if (status === "waiting_for_otp") {
emitToSocket(socketId, "selenium:otp_required", {
session_id: sessionId,
jobId,
message: "OTP required. Please enter the verification code.",
});
await new Promise((r) => setTimeout(r, pollIntervalMs));
continue;
}
if (status === "completed") {
log("unitedsco-processor", "session completed", { sessionId });
return st.result;
}
if (status === "error" || status === "not_found") {
throw new Error(st?.message || `UnitedSCO session ended with status: ${status}`);
}
await new Promise((r) => setTimeout(r, pollIntervalMs));
} catch (err: any) {
const isTerminal =
err?.response?.status === 404 ||
(typeof err?.message === "string" &&
(err.message.includes("not_found") ||
err.message.includes("polling timeout") ||
err.message.includes("No progress")));
if (isTerminal) throw err;
transientErrors++;
if (transientErrors > maxTransientErrors) {
throw new Error(
`Too many transient network errors polling UnitedSCO session ${sessionId}`
);
}
const backoff = Math.min(30_000, 500 * Math.pow(2, transientErrors - 1));
log("unitedsco-processor", `transient error #${transientErrors}, backoff ${backoff}ms`, {
err: err?.message,
});
await new Promise((r) => setTimeout(r, backoff));
}
}
throw new Error(`UnitedSCO polling exhausted all attempts for session ${sessionId}`);
}
// ─── Main processor entry point ───────────────────────────────────────────────
export async function runUnitedSCOEligibilityProcessor(
input: UnitedSCOEligibilityProcessorInput,
jobId: string
): Promise<UnitedSCOEligibilityProcessorResult> {
const {
enrichedPayload,
userId,
insuranceId,
formFirstName,
formLastName,
formDob,
socketId,
} = input;
log("unitedsco-processor", "starting Python agent session", { insuranceId });
const agentResp = await forwardToSeleniumUnitedSCOEligibilityAgent(enrichedPayload);
if (!agentResp?.session_id) {
throw new Error("Python agent did not return a session_id for UnitedSCO eligibility");
}
const sessionId = agentResp.session_id as string;
log("unitedsco-processor", "got session_id", { sessionId });
emitToSocket(socketId, "selenium:unitedsco_session_started", {
session_id: sessionId,
jobId,
});
const seleniumResult = await pollUntilDone(sessionId, socketId, jobId);
if (!seleniumResult || seleniumResult.status === "error") {
throw new Error(seleniumResult?.message ?? "UnitedSCO session returned an error result");
}
log("unitedsco-processor", "processing DB result", { insuranceId });
const result = await processUnitedSCOResult(
userId,
insuranceId,
formFirstName,
formLastName,
formDob,
seleniumResult
);
log("unitedsco-processor", "done", { result });
return result;
}

View File

@@ -7,7 +7,10 @@ export type SeleniumJobType =
| "claim-status-check"
| "claim-submit"
| "claim-pre-auth"
| "ddma-eligibility-check";
| "ddma-eligibility-check"
| "deltains-eligibility-check"
| "unitedsco-eligibility-check"
| "cca-eligibility-check";
export interface SeleniumJobData {
jobType: SeleniumJobType;