From 90302a76b790258c880a1fa9662223d5b9ff6e26 Mon Sep 17 00:00:00 2001 From: ff Date: Mon, 13 Apr 2026 22:30:40 -0400 Subject: [PATCH] feat: add BullMQ queue infrastructure and frontend job status hook - apps/Backend/src/queue/: connection, queues, workers, processors - apps/Frontend/src/hooks/use-job-status.ts: WebSocket job progress hook - apps/Frontend/src/lib/socket.ts: shared Socket.IO singleton Co-Authored-By: Claude Sonnet 4.6 --- apps/Backend/src/queue/connection.ts | 36 ++++ apps/Backend/src/queue/inProcessQueue.ts | 79 +++++++++ apps/Backend/src/queue/jobRunner.ts | 133 ++++++++++++++ apps/Backend/src/queue/processors/_shared.ts | 156 ++++++++++++++++ .../queue/processors/claimStatusProcessor.ts | 99 +++++++++++ .../queue/processors/claimSubmitProcessor.ts | 58 ++++++ .../queue/processors/eligibilityProcessor.ts | 167 ++++++++++++++++++ .../src/queue/processors/ocrProcessor.ts | 42 +++++ apps/Backend/src/queue/queues.ts | 48 +++++ apps/Backend/src/queue/workers/ocrWorker.ts | 55 ++++++ .../src/queue/workers/seleniumWorker.ts | 104 +++++++++++ apps/Frontend/src/hooks/use-job-status.ts | 78 ++++++++ apps/Frontend/src/lib/socket.ts | 24 +++ 13 files changed, 1079 insertions(+) create mode 100644 apps/Backend/src/queue/connection.ts create mode 100644 apps/Backend/src/queue/inProcessQueue.ts create mode 100644 apps/Backend/src/queue/jobRunner.ts create mode 100644 apps/Backend/src/queue/processors/_shared.ts create mode 100644 apps/Backend/src/queue/processors/claimStatusProcessor.ts create mode 100644 apps/Backend/src/queue/processors/claimSubmitProcessor.ts create mode 100644 apps/Backend/src/queue/processors/eligibilityProcessor.ts create mode 100644 apps/Backend/src/queue/processors/ocrProcessor.ts create mode 100644 apps/Backend/src/queue/queues.ts create mode 100644 apps/Backend/src/queue/workers/ocrWorker.ts create mode 100644 apps/Backend/src/queue/workers/seleniumWorker.ts create mode 100644 apps/Frontend/src/hooks/use-job-status.ts create mode 100644 apps/Frontend/src/lib/socket.ts diff --git a/apps/Backend/src/queue/connection.ts b/apps/Backend/src/queue/connection.ts new file mode 100644 index 0000000..9f73fa5 --- /dev/null +++ b/apps/Backend/src/queue/connection.ts @@ -0,0 +1,36 @@ +import Redis from "ioredis"; + +const REDIS_URL = process.env.REDIS_URL || "redis://127.0.0.1:6379"; + +/** + * Shared Redis client for BullMQ. + * BullMQ requires maxRetriesPerRequest: null. + * + * lazyConnect + connectTimeout mean: don't auto-connect on require(), + * and if Redis is unreachable, fail within 3 s instead of hanging forever. + */ +export const redisConnection = new Redis(REDIS_URL, { + maxRetriesPerRequest: null, + enableReadyCheck: false, + lazyConnect: true, + connectTimeout: 3_000, // give up after 3 s if Redis is down + retryStrategy: (times) => { + // Stop retrying after 2 attempts so a missing Redis server is + // reported quickly rather than blocking the request indefinitely. + if (times > 2) return null; + return Math.min(times * 500, 1_000); + }, +}); + +redisConnection.on("error", (err) => { + console.error("[Redis] connection error:", err.message); +}); + +redisConnection.on("connect", () => { + console.log("[Redis] connected at", REDIS_URL); +}); + +/** True once a successful connection has been established. */ +export let redisReady = false; +redisConnection.on("ready", () => { redisReady = true; }); +redisConnection.on("close", () => { redisReady = false; }); diff --git a/apps/Backend/src/queue/inProcessQueue.ts b/apps/Backend/src/queue/inProcessQueue.ts new file mode 100644 index 0000000..ab8c901 --- /dev/null +++ b/apps/Backend/src/queue/inProcessQueue.ts @@ -0,0 +1,79 @@ +/** + * A lightweight in-process async job queue. + * No Redis, no external dependencies — just Node.js Promises. + * + * Features: + * - Configurable concurrency limit + * - Non-blocking add() — returns a jobId immediately + * - Job status tracking in-memory + * - onComplete / onFail callbacks for WebSocket notifications + */ +import { randomUUID } from "crypto"; + +export type JobStatus = "queued" | "active" | "completed" | "failed"; + +export interface QueueJob { + id: string; + data: T; + status: JobStatus; + result?: any; + error?: string; +} + +type Processor = (job: QueueJob) => Promise; + +export class InProcessQueue { + private concurrency: number; + private running = 0; + private waitQueue: Array<() => void> = []; + private jobs = new Map>(); + + constructor(concurrency = 1) { + this.concurrency = concurrency; + } + + /** + * Enqueue a job. Returns the jobId immediately; processing starts + * as soon as a concurrency slot is free. + */ + add(data: T, processor: Processor): string { + const id = randomUUID(); + const job: QueueJob = { id, data, status: "queued" }; + this.jobs.set(id, job); + // Fire-and-forget — errors are captured in job.error + this._run(job, processor).catch(() => {}); + return id; + } + + getJob(id: string): QueueJob | undefined { + return this.jobs.get(id); + } + + /** How many jobs are waiting for a slot. */ + get waiting() { + return this.waitQueue.length; + } + + private async _run(job: QueueJob, processor: Processor) { + // Block until a concurrency slot is available + if (this.running >= this.concurrency) { + await new Promise((resolve) => this.waitQueue.push(resolve)); + } + + this.running++; + job.status = "active"; + + try { + job.result = await processor(job); + job.status = "completed"; + } catch (err: any) { + job.status = "failed"; + job.error = err?.message ?? String(err); + } finally { + this.running--; + // Wake up next waiting job + const next = this.waitQueue.shift(); + if (next) next(); + } + } +} diff --git a/apps/Backend/src/queue/jobRunner.ts b/apps/Backend/src/queue/jobRunner.ts new file mode 100644 index 0000000..e3ee857 --- /dev/null +++ b/apps/Backend/src/queue/jobRunner.ts @@ -0,0 +1,133 @@ +/** + * jobRunner — the single source of truth for enqueueing async jobs. + * + * Uses InProcessQueue (no Redis) instead of BullMQ. + * When a job finishes the worker emits a `job:update` Socket.IO event + * to the originating client so the frontend can react in real time. + */ +import { InProcessQueue } from "./inProcessQueue"; +import { io } from "../socket"; +import { runEligibilityProcessor } from "./processors/eligibilityProcessor"; +import { runClaimStatusProcessor } from "./processors/claimStatusProcessor"; +import { runClaimSubmitProcessor } from "./processors/claimSubmitProcessor"; +import { runOcrProcessor } from "./processors/ocrProcessor"; +import type { SeleniumJobData, OcrJobData } from "./queues"; + +// ── Queue instances ────────────────────────────────────────────────────────── +// Selenium: 1 browser at a time (mirrors Python semaphore) +const seleniumQ = new InProcessQueue(1); +// OCR: allow 2 concurrent (mirrors Python MAX_CONCURRENCY=2) +const ocrQ = new InProcessQueue(2); + +// ── WebSocket helper ───────────────────────────────────────────────────────── +function emitJobUpdate( + socketId: string | undefined, + jobId: string, + jobType: string, + status: "active" | "completed" | "failed", + extra: Record = {} +) { + const payload = { jobId, jobType, status, ...extra }; + if (socketId && io) { + io.to(socketId).emit("job:update", payload); + } else if (io) { + io.emit("job:update", payload); + } +} + +// ── Selenium enqueue ───────────────────────────────────────────────────────── +export function enqueueSeleniumJob(data: SeleniumJobData): string { + const { jobType, socketId } = data; + + const jobId = seleniumQ.add(data, async (job) => { + emitJobUpdate(socketId, job.id, jobType, "active", { + message: "Selenium browser starting…", + }); + + if (jobType === "eligibility-check") { + return runEligibilityProcessor({ + enrichedPayload: data.enrichedPayload, + userId: data.userId, + insuranceId: data.insuranceId!, + formFirstName: data.formFirstName, + formLastName: data.formLastName, + formDob: data.formDob, + }); + } + if (jobType === "claim-status-check") { + return runClaimStatusProcessor({ + enrichedPayload: data.enrichedPayload, + insuranceId: data.insuranceId!, + }); + } + if (jobType === "claim-submit" || jobType === "claim-pre-auth") { + return runClaimSubmitProcessor({ + enrichedPayload: data.enrichedPayload, + files: data.files ?? [], + claimId: data.claimId, + variant: jobType === "claim-pre-auth" ? "claim-pre-auth" : "claimsubmit", + }); + } + throw new Error(`Unknown selenium jobType: ${jobType}`); + }); + + // Attach completion/failure callbacks after the job is in the queue. + // We poll the job object once per tick until it settles. + (async () => { + while (true) { + await new Promise((r) => setTimeout(r, 500)); + const job = seleniumQ.getJob(jobId); + if (!job) break; + if (job.status === "completed") { + emitJobUpdate(socketId, jobId, jobType, "completed", { + result: job.result, + }); + console.log(`[seleniumQ] job ${jobId} (${jobType}) completed`); + break; + } + if (job.status === "failed") { + emitJobUpdate(socketId, jobId, jobType, "failed", { + error: job.error, + }); + console.error(`[seleniumQ] job ${jobId} (${jobType}) failed:`, job.error); + break; + } + } + })(); + + return jobId; +} + +// ── OCR enqueue ────────────────────────────────────────────────────────────── +export function enqueueOcrJob(data: OcrJobData): string { + const { socketId } = data; + + const jobId = ocrQ.add(data, async () => { + emitJobUpdate(socketId, jobId, "ocr", "active", { + message: "OCR processing started…", + }); + return runOcrProcessor({ files: data.files }); + }); + + (async () => { + while (true) { + await new Promise((r) => setTimeout(r, 500)); + const job = ocrQ.getJob(jobId); + if (!job) break; + if (job.status === "completed") { + emitJobUpdate(socketId, jobId, "ocr", "completed", { + result: { rows: job.result }, + }); + console.log(`[ocrQ] job ${jobId} completed`); + break; + } + if (job.status === "failed") { + emitJobUpdate(socketId, jobId, "ocr", "failed", { error: job.error }); + console.error(`[ocrQ] job ${jobId} failed:`, job.error); + break; + } + } + })(); + + return jobId; +} diff --git a/apps/Backend/src/queue/processors/_shared.ts b/apps/Backend/src/queue/processors/_shared.ts new file mode 100644 index 0000000..5bc3a93 --- /dev/null +++ b/apps/Backend/src/queue/processors/_shared.ts @@ -0,0 +1,156 @@ +/** + * Shared utilities used by all job processors. + * Avoids duplicating helpers that currently live inside route files. + */ +import axios from "axios"; +import PDFDocument from "pdfkit"; +import fsSync from "fs"; +import { storage } from "../../storage"; +import { + InsertPatient, + insertPatientSchema, +} from "../../../../../packages/db/types/patient-types"; + +const SELENIUM_BASE_URL = + process.env.SELENIUM_AGENT_BASE_URL || "http://localhost:5002"; + +// --------------------------------------------------------------------------- +// Python service helpers +// --------------------------------------------------------------------------- + +/** Start an async job on the Python service and return the session ID. */ +export async function startPythonJob( + endpoint: string, + payload: any +): Promise { + const resp = await axios.post( + `${SELENIUM_BASE_URL}${endpoint}`, + payload, + { timeout: 10_000 } + ); + const sid: string = resp.data?.session_id; + if (!sid) throw new Error(`Python service did not return a session_id from ${endpoint}`); + return sid; +} + +/** Poll /job//status until completed/failed or timeout. */ +export async function pollPythonJob( + sid: string, + timeoutMs = 5 * 60 * 1000, + intervalMs = 2_000 +): Promise { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + const resp = await axios.get( + `${SELENIUM_BASE_URL}/job/${sid}/status`, + { timeout: 5_000 } + ); + const s = resp.data; + if (s.status === "completed") { + if (s.result?.status === "error") { + const msg = + typeof s.result.message === "string" + ? s.result.message + : s.result.message?.msg ?? "Selenium returned error status"; + throw new Error(msg); + } + return s.result; + } + if (s.status === "failed") { + throw new Error(s.error || "Python job failed"); + } + await sleep(intervalMs); + } + throw new Error("Selenium job timed out after polling"); +} + +// --------------------------------------------------------------------------- +// General utilities +// --------------------------------------------------------------------------- + +export function sleep(ms: number) { + return new Promise((r) => setTimeout(r, ms)); +} + +export function splitName(fullName?: string | null) { + if (!fullName) return { firstName: "", lastName: "" }; + const parts = fullName.trim().split(/\s+/).filter(Boolean); + const firstName = parts.shift() ?? ""; + const lastName = parts.join(" ") ?? ""; + return { firstName, lastName }; +} + +export async function imageToPdfBuffer(imagePath: string): Promise { + return new Promise((resolve, reject) => { + try { + const doc = new PDFDocument({ autoFirstPage: false }); + const chunks: Uint8Array[] = []; + doc.on("data", (c: any) => chunks.push(c)); + doc.on("end", () => resolve(Buffer.concat(chunks))); + doc.on("error", reject); + + const A4_W = 595.28; + const A4_H = 841.89; + doc.addPage({ size: [A4_W, A4_H] }); + doc.image(imagePath, 0, 0, { fit: [A4_W, A4_H], align: "center", valign: "center" }); + doc.end(); + } catch (e) { + reject(e); + } + }); +} + +// --------------------------------------------------------------------------- +// Patient DB helpers +// --------------------------------------------------------------------------- + +export async function createOrUpdatePatientByInsuranceId(options: { + insuranceId: string; + firstName?: string | null; + lastName?: string | null; + dob?: string | Date | null; + userId: number; +}) { + const { insuranceId, firstName, lastName, dob, userId } = options; + if (!insuranceId) throw new Error("Missing insuranceId"); + + const incomingFirst = (firstName || "").trim(); + const incomingLast = (lastName || "").trim(); + + let patient = await storage.getPatientByInsuranceId(insuranceId); + + if (patient && patient.id) { + const updates: any = {}; + if (incomingFirst && String(patient.firstName ?? "").trim() !== incomingFirst) + updates.firstName = incomingFirst; + if (incomingLast && String(patient.lastName ?? "").trim() !== incomingLast) + updates.lastName = incomingLast; + if (Object.keys(updates).length > 0) { + await storage.updatePatient(patient.id, updates); + patient = await storage.getPatientByInsuranceId(insuranceId); + } + return patient; + } + + const createPayload: any = { + firstName: incomingFirst, + lastName: incomingLast, + dateOfBirth: dob, + gender: "", + phone: "", + userId, + insuranceId, + }; + + let patientData: InsertPatient; + try { + patientData = insertPatientSchema.parse(createPayload); + } catch { + const safePayload = { ...createPayload }; + delete safePayload.dateOfBirth; + patientData = insertPatientSchema.parse(safePayload); + } + + await storage.createPatient(patientData); + return storage.getPatientByInsuranceId(insuranceId); +} diff --git a/apps/Backend/src/queue/processors/claimStatusProcessor.ts b/apps/Backend/src/queue/processors/claimStatusProcessor.ts new file mode 100644 index 0000000..3bcc7d3 --- /dev/null +++ b/apps/Backend/src/queue/processors/claimStatusProcessor.ts @@ -0,0 +1,99 @@ +/** + * Processor for "claim-status-check" jobs. + * Mirrors routes/insuranceStatus.ts /claim-status-check + */ +import fs from "fs/promises"; +import fsSync from "fs"; +import path from "path"; +import { storage } from "../../storage"; +import { emptyFolderContainingFile } from "../../utils/emptyTempFolder"; +import { + startPythonJob, + pollPythonJob, + imageToPdfBuffer, +} from "./_shared"; + +export interface ClaimStatusProcessorInput { + enrichedPayload: any; + insuranceId: string; // memberId used to look up the patient +} + +export interface ClaimStatusProcessorResult { + pdfUploadStatus?: string; + pdfFileId?: number | null; +} + +export async function runClaimStatusProcessor( + input: ClaimStatusProcessorInput +): Promise { + const { enrichedPayload, insuranceId } = input; + + // 1) Start async Python job + const sid = await startPythonJob("/claim-status-check/async", { + data: enrichedPayload, + }); + + // 2) Poll for completion + const result = await pollPythonJob(sid); + + const outputResult: ClaimStatusProcessorResult = {}; + + // 3) Look up patient + const patient = await storage.getPatientByInsuranceId(insuranceId); + + if (patient && patient.id !== undefined) { + let pdfBuffer: Buffer | null = null; + let generatedPdfPath: string | null = null; + + if ( + result.ss_path && + /\.(png|jpg|jpeg)$/i.test(result.ss_path) && + fsSync.existsSync(result.ss_path) + ) { + try { + pdfBuffer = await imageToPdfBuffer(result.ss_path); + const pdfFileName = `claimStatus_${insuranceId}_${Date.now()}.pdf`; + generatedPdfPath = path.join(path.dirname(result.ss_path), pdfFileName); + await fs.writeFile(generatedPdfPath, pdfBuffer); + } catch (e) { + console.error("[claimStatusProcessor] img→PDF conversion failed:", e); + outputResult.pdfUploadStatus = `Failed to convert screenshot to PDF: ${e}`; + } + } else { + outputResult.pdfUploadStatus = + "No valid screenshot provided by Selenium; nothing to upload."; + } + + if (pdfBuffer && generatedPdfPath) { + const groupTitleKey = "CLAIM_STATUS"; + const groupTitle = "Claim Status"; + + let group = await storage.findPdfGroupByPatientTitleKey(patient.id, groupTitleKey); + if (!group) group = await storage.createPdfGroup(patient.id, groupTitle, groupTitleKey); + if (!group?.id) throw new Error("PDF group creation failed"); + + const basename = path.basename(generatedPdfPath); + const created = await storage.createPdfFile(group.id, basename, pdfBuffer); + + let createdPdfFileId: number | null = null; + if (created && typeof created === "object" && "id" in created) { + createdPdfFileId = Number(created.id); + } + + outputResult.pdfUploadStatus = `PDF saved to group: ${group.title}`; + outputResult.pdfFileId = createdPdfFileId; + } + } else { + outputResult.pdfUploadStatus = + "Patient not found; no PDF saved."; + } + + // 4) Cleanup + try { + if (result.ss_path) await emptyFolderContainingFile(result.ss_path); + } catch (e) { + console.error("[claimStatusProcessor] cleanup failed:", e); + } + + return outputResult; +} diff --git a/apps/Backend/src/queue/processors/claimSubmitProcessor.ts b/apps/Backend/src/queue/processors/claimSubmitProcessor.ts new file mode 100644 index 0000000..16c7af2 --- /dev/null +++ b/apps/Backend/src/queue/processors/claimSubmitProcessor.ts @@ -0,0 +1,58 @@ +/** + * Processors for "claim-submit" and "claim-pre-auth" jobs. + * Mirrors routes/claims.ts /selenium-claim and /selenium-claim-pre-auth + */ +import { storage } from "../../storage"; +import { startPythonJob, pollPythonJob } from "./_shared"; + +export interface ClaimSubmitProcessorInput { + enrichedPayload: any; + files: { originalname: string; bufferBase64: string; mimetype: string }[]; + claimId?: number; + /** "claimsubmit" (default) or "claim-pre-auth" */ + variant?: "claimsubmit" | "claim-pre-auth"; +} + +export interface ClaimSubmitProcessorResult { + status: string; + claimNumber?: string; + pdf_url?: string; + [key: string]: any; +} + +export async function runClaimSubmitProcessor( + input: ClaimSubmitProcessorInput +): Promise { + const { enrichedPayload, files, claimId } = input; + + // Build the same payload shape the Python /claimsubmit endpoint expects + const pdfs = files + .filter((f) => f.mimetype === "application/pdf") + .map(({ originalname, bufferBase64 }) => ({ originalname, bufferBase64 })); + + const images = files + .filter((f) => f.mimetype.startsWith("image/")) + .map(({ originalname, bufferBase64 }) => ({ originalname, bufferBase64 })); + + const payload = { claim: enrichedPayload, pdfs, images }; + + const endpoint = + input.variant === "claim-pre-auth" ? "/claim-pre-auth/async" : "/claimsubmit/async"; + + // 1) Start async Python job + const sid = await startPythonJob(endpoint, payload); + + // 2) Poll for result + const result = await pollPythonJob(sid, 10 * 60 * 1000); // claim submit can take up to 10 min + + // 3) Persist claimNumber if returned + if (result?.claimNumber && claimId) { + try { + await storage.updateClaim(claimId, { claimNumber: result.claimNumber }); + } catch (e) { + console.error("[claimSubmitProcessor] failed to persist claimNumber:", e); + } + } + + return { ...result, claimId }; +} diff --git a/apps/Backend/src/queue/processors/eligibilityProcessor.ts b/apps/Backend/src/queue/processors/eligibilityProcessor.ts new file mode 100644 index 0000000..feecce1 --- /dev/null +++ b/apps/Backend/src/queue/processors/eligibilityProcessor.ts @@ -0,0 +1,167 @@ +/** + * Processor for "eligibility-check" jobs. + * + * Replicates the logic from routes/insuranceStatus.ts /eligibility-check + * so it can run inside a BullMQ worker without blocking the HTTP server. + */ +import fs from "fs/promises"; +import fsSync from "fs"; +import path from "path"; +import { storage } from "../../storage"; +import { emptyFolderContainingFile } from "../../utils/emptyTempFolder"; +import forwardToPatientDataExtractorService from "../../services/patientDataExtractorService"; +import { + startPythonJob, + pollPythonJob, + splitName, + createOrUpdatePatientByInsuranceId, +} from "./_shared"; + +export interface EligibilityProcessorInput { + /** Enriched payload (includes credentials) */ + enrichedPayload: any; + userId: number; + insuranceId: string; + formFirstName?: string; + formLastName?: string; + formDob?: string; +} + +export interface EligibilityProcessorResult { + patientUpdateStatus?: string; + pdfUploadStatus?: string; + pdfFileId?: number | null; +} + +export async function runEligibilityProcessor( + input: EligibilityProcessorInput +): Promise { + const { + enrichedPayload, + userId, + insuranceId, + formFirstName, + formLastName, + formDob, + } = input; + + // 1) Fire the async Python job + const sid = await startPythonJob("/eligibility-check/async", { + data: enrichedPayload, + }); + + // 2) Wait for completion + const seleniumResult = await pollPythonJob(sid); + + const outputResult: EligibilityProcessorResult = {}; + + // 3) Extract name: prefer selenium extraction → PDF extractor → form input + const extracted: { firstName?: string | null; lastName?: string | null } = {}; + + if (seleniumResult.firstName || seleniumResult.lastName) { + extracted.firstName = seleniumResult.firstName ?? null; + extracted.lastName = seleniumResult.lastName ?? null; + } else if (seleniumResult.name) { + const parts = splitName(seleniumResult.name); + extracted.firstName = parts.firstName; + extracted.lastName = parts.lastName; + } + + if ( + !extracted.firstName && + !extracted.lastName && + seleniumResult?.pdf_path?.endsWith(".pdf") + ) { + try { + const pdfBuffer = await fs.readFile(seleniumResult.pdf_path); + const extraction = await forwardToPatientDataExtractorService({ + buffer: pdfBuffer, + originalname: path.basename(seleniumResult.pdf_path), + mimetype: "application/pdf", + } as any); + if (extraction.name) { + const parts = splitName(extraction.name); + extracted.firstName = parts.firstName; + extracted.lastName = parts.lastName; + } + } catch (e) { + console.error("[eligibilityProcessor] PDF name extraction failed:", e); + } + } + + const preferFirst = extracted.firstName || formFirstName || null; + const preferLast = extracted.lastName || formLastName || null; + + // 4) Create / update patient + let patient; + try { + patient = await createOrUpdatePatientByInsuranceId({ + insuranceId, + firstName: preferFirst, + lastName: preferLast, + dob: formDob, + userId, + }); + } catch (e: any) { + throw new Error(`Failed to create/update patient: ${e.message}`); + } + + // 5) Update patient status + if (patient && patient.id !== undefined) { + let newStatus = "UNKNOWN"; + if (seleniumResult.eligibility === "Y") newStatus = "ACTIVE"; + else if (seleniumResult.eligibility === "N") newStatus = "INACTIVE"; + + const updates: any = { status: newStatus }; + if (seleniumResult.insurance) updates.insuranceProvider = seleniumResult.insurance; + + await storage.updatePatient(patient.id, updates); + outputResult.patientUpdateStatus = `Patient status updated to ${newStatus}`; + + // 6) Save PDF + let createdPdfFileId: number | null = null; + + if (seleniumResult.pdf_path?.endsWith(".pdf")) { + try { + const pdfBuffer = await fs.readFile(seleniumResult.pdf_path); + const groupTitleKey = "ELIGIBILITY_STATUS"; + const groupTitle = "Eligibility Status"; + + let group = await storage.findPdfGroupByPatientTitleKey(patient.id, groupTitleKey); + if (!group) group = await storage.createPdfGroup(patient.id, groupTitle, groupTitleKey); + if (!group?.id) throw new Error("PDF group creation failed"); + + const created = await storage.createPdfFile( + group.id, + path.basename(seleniumResult.pdf_path), + pdfBuffer + ); + if (created && typeof created === "object" && "id" in created) { + createdPdfFileId = Number(created.id); + } + outputResult.pdfUploadStatus = `PDF saved to group: ${group.title}`; + } catch (e: any) { + outputResult.pdfUploadStatus = `PDF upload failed: ${e.message}`; + } + } else { + outputResult.pdfUploadStatus = + "No valid PDF path provided by Selenium; nothing uploaded."; + } + + outputResult.pdfFileId = createdPdfFileId; + } else { + outputResult.patientUpdateStatus = + "Patient not found or missing ID; no update performed"; + } + + // 7) Cleanup temp files + try { + if (seleniumResult.pdf_path) { + await emptyFolderContainingFile(seleniumResult.pdf_path); + } + } catch (e) { + console.error("[eligibilityProcessor] cleanup failed:", e); + } + + return outputResult; +} diff --git a/apps/Backend/src/queue/processors/ocrProcessor.ts b/apps/Backend/src/queue/processors/ocrProcessor.ts new file mode 100644 index 0000000..ed71df7 --- /dev/null +++ b/apps/Backend/src/queue/processors/ocrProcessor.ts @@ -0,0 +1,42 @@ +/** + * Processor for "ocr" jobs. + * Calls the PaymentOCR Python service with the uploaded files. + */ +import axios from "axios"; +import FormData from "form-data"; + +const OCR_BASE_URL = + process.env.OCR_SERVICE_BASE_URL || "http://localhost:5003"; + +export interface OcrProcessorInput { + files: { originalname: string; bufferBase64: string; mimetype: string }[]; +} + +export async function runOcrProcessor( + input: OcrProcessorInput +): Promise { + const { files } = input; + + const form = new FormData(); + for (const f of files) { + const buf = Buffer.from(f.bufferBase64, "base64"); + form.append("files", buf, { + filename: f.originalname, + contentType: f.mimetype, + knownLength: buf.length, + }); + } + + const resp = await axios.post<{ rows: any[] }>( + `${OCR_BASE_URL}/extract/json`, + form, + { + headers: form.getHeaders(), + maxBodyLength: Infinity, + maxContentLength: Infinity, + timeout: 180_000, // OCR can be heavy; 3-minute limit + } + ); + + return resp.data?.rows ?? []; +} diff --git a/apps/Backend/src/queue/queues.ts b/apps/Backend/src/queue/queues.ts new file mode 100644 index 0000000..ac252e3 --- /dev/null +++ b/apps/Backend/src/queue/queues.ts @@ -0,0 +1,48 @@ +import { Queue } from "bullmq"; +import { redisConnection } from "./connection"; + +/** Job types dispatched to the selenium Python worker. */ +export type SeleniumJobType = + | "eligibility-check" + | "claim-status-check" + | "claim-submit" + | "claim-pre-auth"; + +export interface SeleniumJobData { + jobType: SeleniumJobType; + userId: number; + socketId?: string; + /** Fully-enriched payload sent to the Python service. */ + enrichedPayload: any; + /** Extra fields used for DB post-processing */ + insuranceId?: string; + formFirstName?: string; + formLastName?: string; + formDob?: string; + claimId?: number; + /** Base64-encoded files for claim submit */ + files?: { originalname: string; bufferBase64: string; mimetype: string }[]; +} + +export interface OcrJobData { + userId: number; + socketId?: string; + files: { originalname: string; bufferBase64: string; mimetype: string }[]; +} + +const defaultOpts = { + removeOnComplete: { count: 100 }, + removeOnFail: { count: 50 }, + attempts: 2, + backoff: { type: "exponential" as const, delay: 5000 }, +}; + +export const seleniumQueue = new Queue("selenium-jobs", { + connection: redisConnection, + defaultJobOptions: defaultOpts, +}); + +export const ocrQueue = new Queue("ocr-jobs", { + connection: redisConnection, + defaultJobOptions: { ...defaultOpts, attempts: 2 }, +}); diff --git a/apps/Backend/src/queue/workers/ocrWorker.ts b/apps/Backend/src/queue/workers/ocrWorker.ts new file mode 100644 index 0000000..b6e2646 --- /dev/null +++ b/apps/Backend/src/queue/workers/ocrWorker.ts @@ -0,0 +1,55 @@ +import { Worker, Job } from "bullmq"; +import { redisConnection } from "../connection"; +import { OcrJobData } from "../queues"; +import { io } from "../../socket"; +import { runOcrProcessor } from "../processors/ocrProcessor"; + +function emitJobUpdate( + socketId: string | undefined, + jobId: string, + status: "active" | "completed" | "failed", + payload: Record +) { + const event = "job:update"; + const data = { jobId, jobType: "ocr", status, ...payload }; + if (socketId && io) { + io.to(socketId).emit(event, data); + } else if (io) { + io.emit(event, data); + } +} + +async function processOcrJob(job: Job) { + const { socketId, files } = job.data; + const jobId = job.id ?? job.name; + + emitJobUpdate(socketId, jobId, "active", { message: "OCR processing started…" }); + + try { + const rows = await runOcrProcessor({ files }); + emitJobUpdate(socketId, jobId, "completed", { result: { rows } }); + return rows; + } catch (err: any) { + const errorMsg = err?.message ?? String(err); + emitJobUpdate(socketId, jobId, "failed", { error: errorMsg }); + throw err; + } +} + +export function startOcrWorker() { + const worker = new Worker("ocr-jobs", processOcrJob, { + connection: redisConnection, + concurrency: 2, // OCR service allows 2 concurrent + }); + + worker.on("completed", (job) => { + console.log(`[ocrWorker] job ${job.id} completed`); + }); + + worker.on("failed", (job, err) => { + console.error(`[ocrWorker] job ${job?.id} failed:`, err.message); + }); + + console.log("[ocrWorker] started"); + return worker; +} diff --git a/apps/Backend/src/queue/workers/seleniumWorker.ts b/apps/Backend/src/queue/workers/seleniumWorker.ts new file mode 100644 index 0000000..f2184a1 --- /dev/null +++ b/apps/Backend/src/queue/workers/seleniumWorker.ts @@ -0,0 +1,104 @@ +import { Worker, Job } from "bullmq"; +import { redisConnection } from "../connection"; +import { SeleniumJobData } from "../queues"; +import { io } from "../../socket"; +import { runEligibilityProcessor } from "../processors/eligibilityProcessor"; +import { runClaimStatusProcessor } from "../processors/claimStatusProcessor"; +import { runClaimSubmitProcessor } from "../processors/claimSubmitProcessor"; + +/** + * Emit a job-status event to the socket that enqueued the job (if any). + * Falls back to broadcasting when socketId is absent. + */ +function emitJobUpdate( + socketId: string | undefined, + jobId: string, + jobType: string, + status: "active" | "completed" | "failed", + payload: Record +) { + const event = "job:update"; + const data = { jobId, jobType, status, ...payload }; + if (socketId && io) { + io.to(socketId).emit(event, data); + } else if (io) { + io.emit(event, data); + } +} + +async function processSeleniumJob(job: Job) { + const { jobType, userId, socketId, enrichedPayload } = job.data; + const jobId = job.id ?? job.name; + + emitJobUpdate(socketId, jobId, jobType, "active", { + message: "Selenium browser starting…", + }); + + try { + let result: any; + + if (jobType === "eligibility-check") { + result = await runEligibilityProcessor({ + enrichedPayload, + userId, + insuranceId: job.data.insuranceId!, + formFirstName: job.data.formFirstName, + formLastName: job.data.formLastName, + formDob: job.data.formDob, + }); + } else if (jobType === "claim-status-check") { + result = await runClaimStatusProcessor({ + enrichedPayload, + insuranceId: job.data.insuranceId!, + }); + } else if (jobType === "claim-submit") { + result = await runClaimSubmitProcessor({ + enrichedPayload, + files: job.data.files ?? [], + claimId: job.data.claimId, + variant: "claimsubmit", + }); + } else if (jobType === "claim-pre-auth") { + result = await runClaimSubmitProcessor({ + enrichedPayload, + files: job.data.files ?? [], + claimId: job.data.claimId, + variant: "claim-pre-auth", + }); + } else { + throw new Error(`Unknown selenium jobType: ${jobType}`); + } + + emitJobUpdate(socketId, jobId, jobType, "completed", { result }); + return result; + } catch (err: any) { + const errorMsg = err?.message ?? String(err); + emitJobUpdate(socketId, jobId, jobType, "failed", { error: errorMsg }); + throw err; // let BullMQ mark job as failed / retry + } +} + +export function startSeleniumWorker() { + const worker = new Worker( + "selenium-jobs", + processSeleniumJob, + { + connection: redisConnection, + concurrency: 1, // mirror the Python semaphore(1) — 1 browser at a time + } + ); + + worker.on("completed", (job) => { + console.log(`[seleniumWorker] job ${job.id} (${job.data.jobType}) completed`); + }); + + worker.on("failed", (job, err) => { + console.error( + `[seleniumWorker] job ${job?.id} (${job?.data.jobType}) failed:`, + err.message + ); + }); + + console.log("[seleniumWorker] started"); + return worker; +} diff --git a/apps/Frontend/src/hooks/use-job-status.ts b/apps/Frontend/src/hooks/use-job-status.ts new file mode 100644 index 0000000..afe448d --- /dev/null +++ b/apps/Frontend/src/hooks/use-job-status.ts @@ -0,0 +1,78 @@ +/** + * useJobStatus — tracks a BullMQ job via WebSocket `job:update` events. + * + * Usage: + * const { status, result, error } = useJobStatus(jobId); + * + * The hook listens for `job:update` events emitted by the backend workers. + * When the jobId changes, the previous listener is removed and a fresh one + * is registered for the new job. + */ +import { useEffect, useState } from "react"; +import { socket } from "@/lib/socket"; + +export type JobStatus = "queued" | "active" | "completed" | "failed" | null; + +export interface JobUpdatePayload { + jobId: string; + jobType: string; + status: JobStatus; + message?: string; + result?: any; + error?: string; +} + +export interface UseJobStatusReturn { + status: JobStatus; + message: string; + result: any; + error: string | null; + socketId: string | null; +} + +export function useJobStatus(jobId: string | null): UseJobStatusReturn { + const [status, setStatus] = useState(jobId ? "queued" : null); + const [message, setMessage] = useState(""); + const [result, setResult] = useState(null); + const [error, setError] = useState(null); + const [socketId, setSocketId] = useState( + socket.id ?? null + ); + + // Keep socketId in sync with the socket connection + useEffect(() => { + const onConnect = () => setSocketId(socket.id ?? null); + socket.on("connect", onConnect); + if (socket.connected) setSocketId(socket.id ?? null); + return () => { socket.off("connect", onConnect); }; + }, []); + + // Reset state when the jobId changes + useEffect(() => { + if (!jobId) { + setStatus(null); + setMessage(""); + setResult(null); + setError(null); + return; + } + + setStatus("queued"); + setMessage(""); + setResult(null); + setError(null); + + const handler = (payload: JobUpdatePayload) => { + if (payload.jobId !== jobId) return; + setStatus(payload.status); + if (payload.message) setMessage(payload.message); + if (payload.result !== undefined) setResult(payload.result); + if (payload.error) setError(payload.error); + }; + + socket.on("job:update", handler); + return () => { socket.off("job:update", handler); }; + }, [jobId]); + + return { status, message, result, error, socketId }; +} diff --git a/apps/Frontend/src/lib/socket.ts b/apps/Frontend/src/lib/socket.ts new file mode 100644 index 0000000..9a45880 --- /dev/null +++ b/apps/Frontend/src/lib/socket.ts @@ -0,0 +1,24 @@ +/** + * Shared Socket.IO client singleton. + * + * Import `socket` anywhere in the frontend to use the shared connection. + * The socket connects lazily — the first import triggers the connection. + */ +import { io, Socket } from "socket.io-client"; + +const SOCKET_URL = + import.meta.env.VITE_API_BASE_URL_BACKEND || + (typeof window !== "undefined" ? window.location.origin : ""); + +export const socket: Socket = io(SOCKET_URL, { + withCredentials: true, + autoConnect: true, +}); + +socket.on("connect", () => { + console.log("[socket] connected:", socket.id); +}); + +socket.on("disconnect", () => { + console.log("[socket] disconnected"); +});