feat: add BullMQ queue infrastructure and frontend job status hook

- apps/Backend/src/queue/: connection, queues, workers, processors
- apps/Frontend/src/hooks/use-job-status.ts: WebSocket job progress hook
- apps/Frontend/src/lib/socket.ts: shared Socket.IO singleton

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
ff
2026-04-13 22:30:40 -04:00
parent e10126f772
commit 90302a76b7
13 changed files with 1079 additions and 0 deletions

View File

@@ -0,0 +1,36 @@
import Redis from "ioredis";
const REDIS_URL = process.env.REDIS_URL || "redis://127.0.0.1:6379";
/**
* Shared Redis client for BullMQ.
* BullMQ requires maxRetriesPerRequest: null.
*
* lazyConnect + connectTimeout mean: don't auto-connect on require(),
* and if Redis is unreachable, fail within 3 s instead of hanging forever.
*/
export const redisConnection = new Redis(REDIS_URL, {
maxRetriesPerRequest: null,
enableReadyCheck: false,
lazyConnect: true,
connectTimeout: 3_000, // give up after 3 s if Redis is down
retryStrategy: (times) => {
// Stop retrying after 2 attempts so a missing Redis server is
// reported quickly rather than blocking the request indefinitely.
if (times > 2) return null;
return Math.min(times * 500, 1_000);
},
});
redisConnection.on("error", (err) => {
console.error("[Redis] connection error:", err.message);
});
redisConnection.on("connect", () => {
console.log("[Redis] connected at", REDIS_URL);
});
/** True once a successful connection has been established. */
export let redisReady = false;
redisConnection.on("ready", () => { redisReady = true; });
redisConnection.on("close", () => { redisReady = false; });

View File

@@ -0,0 +1,79 @@
/**
* A lightweight in-process async job queue.
* No Redis, no external dependencies — just Node.js Promises.
*
* Features:
* - Configurable concurrency limit
* - Non-blocking add() — returns a jobId immediately
* - Job status tracking in-memory
* - onComplete / onFail callbacks for WebSocket notifications
*/
import { randomUUID } from "crypto";
export type JobStatus = "queued" | "active" | "completed" | "failed";
export interface QueueJob<T = any> {
id: string;
data: T;
status: JobStatus;
result?: any;
error?: string;
}
type Processor<T> = (job: QueueJob<T>) => Promise<any>;
export class InProcessQueue<T = any> {
private concurrency: number;
private running = 0;
private waitQueue: Array<() => void> = [];
private jobs = new Map<string, QueueJob<T>>();
constructor(concurrency = 1) {
this.concurrency = concurrency;
}
/**
* Enqueue a job. Returns the jobId immediately; processing starts
* as soon as a concurrency slot is free.
*/
add(data: T, processor: Processor<T>): string {
const id = randomUUID();
const job: QueueJob<T> = { id, data, status: "queued" };
this.jobs.set(id, job);
// Fire-and-forget — errors are captured in job.error
this._run(job, processor).catch(() => {});
return id;
}
getJob(id: string): QueueJob<T> | undefined {
return this.jobs.get(id);
}
/** How many jobs are waiting for a slot. */
get waiting() {
return this.waitQueue.length;
}
private async _run(job: QueueJob<T>, processor: Processor<T>) {
// Block until a concurrency slot is available
if (this.running >= this.concurrency) {
await new Promise<void>((resolve) => this.waitQueue.push(resolve));
}
this.running++;
job.status = "active";
try {
job.result = await processor(job);
job.status = "completed";
} catch (err: any) {
job.status = "failed";
job.error = err?.message ?? String(err);
} finally {
this.running--;
// Wake up next waiting job
const next = this.waitQueue.shift();
if (next) next();
}
}
}

View File

@@ -0,0 +1,133 @@
/**
* jobRunner — the single source of truth for enqueueing async jobs.
*
* Uses InProcessQueue (no Redis) instead of BullMQ.
* When a job finishes the worker emits a `job:update` Socket.IO event
* to the originating client so the frontend can react in real time.
*/
import { InProcessQueue } from "./inProcessQueue";
import { io } from "../socket";
import { runEligibilityProcessor } from "./processors/eligibilityProcessor";
import { runClaimStatusProcessor } from "./processors/claimStatusProcessor";
import { runClaimSubmitProcessor } from "./processors/claimSubmitProcessor";
import { runOcrProcessor } from "./processors/ocrProcessor";
import type { SeleniumJobData, OcrJobData } from "./queues";
// ── Queue instances ──────────────────────────────────────────────────────────
// Selenium: 1 browser at a time (mirrors Python semaphore)
const seleniumQ = new InProcessQueue<SeleniumJobData>(1);
// OCR: allow 2 concurrent (mirrors Python MAX_CONCURRENCY=2)
const ocrQ = new InProcessQueue<OcrJobData>(2);
// ── WebSocket helper ─────────────────────────────────────────────────────────
function emitJobUpdate(
socketId: string | undefined,
jobId: string,
jobType: string,
status: "active" | "completed" | "failed",
extra: Record<string, any> = {}
) {
const payload = { jobId, jobType, status, ...extra };
if (socketId && io) {
io.to(socketId).emit("job:update", payload);
} else if (io) {
io.emit("job:update", payload);
}
}
// ── Selenium enqueue ─────────────────────────────────────────────────────────
export function enqueueSeleniumJob(data: SeleniumJobData): string {
const { jobType, socketId } = data;
const jobId = seleniumQ.add(data, async (job) => {
emitJobUpdate(socketId, job.id, jobType, "active", {
message: "Selenium browser starting…",
});
if (jobType === "eligibility-check") {
return runEligibilityProcessor({
enrichedPayload: data.enrichedPayload,
userId: data.userId,
insuranceId: data.insuranceId!,
formFirstName: data.formFirstName,
formLastName: data.formLastName,
formDob: data.formDob,
});
}
if (jobType === "claim-status-check") {
return runClaimStatusProcessor({
enrichedPayload: data.enrichedPayload,
insuranceId: data.insuranceId!,
});
}
if (jobType === "claim-submit" || jobType === "claim-pre-auth") {
return runClaimSubmitProcessor({
enrichedPayload: data.enrichedPayload,
files: data.files ?? [],
claimId: data.claimId,
variant: jobType === "claim-pre-auth" ? "claim-pre-auth" : "claimsubmit",
});
}
throw new Error(`Unknown selenium jobType: ${jobType}`);
});
// Attach completion/failure callbacks after the job is in the queue.
// We poll the job object once per tick until it settles.
(async () => {
while (true) {
await new Promise((r) => setTimeout(r, 500));
const job = seleniumQ.getJob(jobId);
if (!job) break;
if (job.status === "completed") {
emitJobUpdate(socketId, jobId, jobType, "completed", {
result: job.result,
});
console.log(`[seleniumQ] job ${jobId} (${jobType}) completed`);
break;
}
if (job.status === "failed") {
emitJobUpdate(socketId, jobId, jobType, "failed", {
error: job.error,
});
console.error(`[seleniumQ] job ${jobId} (${jobType}) failed:`, job.error);
break;
}
}
})();
return jobId;
}
// ── OCR enqueue ──────────────────────────────────────────────────────────────
export function enqueueOcrJob(data: OcrJobData): string {
const { socketId } = data;
const jobId = ocrQ.add(data, async () => {
emitJobUpdate(socketId, jobId, "ocr", "active", {
message: "OCR processing started…",
});
return runOcrProcessor({ files: data.files });
});
(async () => {
while (true) {
await new Promise((r) => setTimeout(r, 500));
const job = ocrQ.getJob(jobId);
if (!job) break;
if (job.status === "completed") {
emitJobUpdate(socketId, jobId, "ocr", "completed", {
result: { rows: job.result },
});
console.log(`[ocrQ] job ${jobId} completed`);
break;
}
if (job.status === "failed") {
emitJobUpdate(socketId, jobId, "ocr", "failed", { error: job.error });
console.error(`[ocrQ] job ${jobId} failed:`, job.error);
break;
}
}
})();
return jobId;
}

View File

@@ -0,0 +1,156 @@
/**
* Shared utilities used by all job processors.
* Avoids duplicating helpers that currently live inside route files.
*/
import axios from "axios";
import PDFDocument from "pdfkit";
import fsSync from "fs";
import { storage } from "../../storage";
import {
InsertPatient,
insertPatientSchema,
} from "../../../../../packages/db/types/patient-types";
const SELENIUM_BASE_URL =
process.env.SELENIUM_AGENT_BASE_URL || "http://localhost:5002";
// ---------------------------------------------------------------------------
// Python service helpers
// ---------------------------------------------------------------------------
/** Start an async job on the Python service and return the session ID. */
export async function startPythonJob(
endpoint: string,
payload: any
): Promise<string> {
const resp = await axios.post(
`${SELENIUM_BASE_URL}${endpoint}`,
payload,
{ timeout: 10_000 }
);
const sid: string = resp.data?.session_id;
if (!sid) throw new Error(`Python service did not return a session_id from ${endpoint}`);
return sid;
}
/** Poll /job/<sid>/status until completed/failed or timeout. */
export async function pollPythonJob(
sid: string,
timeoutMs = 5 * 60 * 1000,
intervalMs = 2_000
): Promise<any> {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
const resp = await axios.get(
`${SELENIUM_BASE_URL}/job/${sid}/status`,
{ timeout: 5_000 }
);
const s = resp.data;
if (s.status === "completed") {
if (s.result?.status === "error") {
const msg =
typeof s.result.message === "string"
? s.result.message
: s.result.message?.msg ?? "Selenium returned error status";
throw new Error(msg);
}
return s.result;
}
if (s.status === "failed") {
throw new Error(s.error || "Python job failed");
}
await sleep(intervalMs);
}
throw new Error("Selenium job timed out after polling");
}
// ---------------------------------------------------------------------------
// General utilities
// ---------------------------------------------------------------------------
export function sleep(ms: number) {
return new Promise<void>((r) => setTimeout(r, ms));
}
export function splitName(fullName?: string | null) {
if (!fullName) return { firstName: "", lastName: "" };
const parts = fullName.trim().split(/\s+/).filter(Boolean);
const firstName = parts.shift() ?? "";
const lastName = parts.join(" ") ?? "";
return { firstName, lastName };
}
export async function imageToPdfBuffer(imagePath: string): Promise<Buffer> {
return new Promise<Buffer>((resolve, reject) => {
try {
const doc = new PDFDocument({ autoFirstPage: false });
const chunks: Uint8Array[] = [];
doc.on("data", (c: any) => chunks.push(c));
doc.on("end", () => resolve(Buffer.concat(chunks)));
doc.on("error", reject);
const A4_W = 595.28;
const A4_H = 841.89;
doc.addPage({ size: [A4_W, A4_H] });
doc.image(imagePath, 0, 0, { fit: [A4_W, A4_H], align: "center", valign: "center" });
doc.end();
} catch (e) {
reject(e);
}
});
}
// ---------------------------------------------------------------------------
// Patient DB helpers
// ---------------------------------------------------------------------------
export async function createOrUpdatePatientByInsuranceId(options: {
insuranceId: string;
firstName?: string | null;
lastName?: string | null;
dob?: string | Date | null;
userId: number;
}) {
const { insuranceId, firstName, lastName, dob, userId } = options;
if (!insuranceId) throw new Error("Missing insuranceId");
const incomingFirst = (firstName || "").trim();
const incomingLast = (lastName || "").trim();
let patient = await storage.getPatientByInsuranceId(insuranceId);
if (patient && patient.id) {
const updates: any = {};
if (incomingFirst && String(patient.firstName ?? "").trim() !== incomingFirst)
updates.firstName = incomingFirst;
if (incomingLast && String(patient.lastName ?? "").trim() !== incomingLast)
updates.lastName = incomingLast;
if (Object.keys(updates).length > 0) {
await storage.updatePatient(patient.id, updates);
patient = await storage.getPatientByInsuranceId(insuranceId);
}
return patient;
}
const createPayload: any = {
firstName: incomingFirst,
lastName: incomingLast,
dateOfBirth: dob,
gender: "",
phone: "",
userId,
insuranceId,
};
let patientData: InsertPatient;
try {
patientData = insertPatientSchema.parse(createPayload);
} catch {
const safePayload = { ...createPayload };
delete safePayload.dateOfBirth;
patientData = insertPatientSchema.parse(safePayload);
}
await storage.createPatient(patientData);
return storage.getPatientByInsuranceId(insuranceId);
}

View File

@@ -0,0 +1,99 @@
/**
* Processor for "claim-status-check" jobs.
* Mirrors routes/insuranceStatus.ts /claim-status-check
*/
import fs from "fs/promises";
import fsSync from "fs";
import path from "path";
import { storage } from "../../storage";
import { emptyFolderContainingFile } from "../../utils/emptyTempFolder";
import {
startPythonJob,
pollPythonJob,
imageToPdfBuffer,
} from "./_shared";
export interface ClaimStatusProcessorInput {
enrichedPayload: any;
insuranceId: string; // memberId used to look up the patient
}
export interface ClaimStatusProcessorResult {
pdfUploadStatus?: string;
pdfFileId?: number | null;
}
export async function runClaimStatusProcessor(
input: ClaimStatusProcessorInput
): Promise<ClaimStatusProcessorResult> {
const { enrichedPayload, insuranceId } = input;
// 1) Start async Python job
const sid = await startPythonJob("/claim-status-check/async", {
data: enrichedPayload,
});
// 2) Poll for completion
const result = await pollPythonJob(sid);
const outputResult: ClaimStatusProcessorResult = {};
// 3) Look up patient
const patient = await storage.getPatientByInsuranceId(insuranceId);
if (patient && patient.id !== undefined) {
let pdfBuffer: Buffer | null = null;
let generatedPdfPath: string | null = null;
if (
result.ss_path &&
/\.(png|jpg|jpeg)$/i.test(result.ss_path) &&
fsSync.existsSync(result.ss_path)
) {
try {
pdfBuffer = await imageToPdfBuffer(result.ss_path);
const pdfFileName = `claimStatus_${insuranceId}_${Date.now()}.pdf`;
generatedPdfPath = path.join(path.dirname(result.ss_path), pdfFileName);
await fs.writeFile(generatedPdfPath, pdfBuffer);
} catch (e) {
console.error("[claimStatusProcessor] img→PDF conversion failed:", e);
outputResult.pdfUploadStatus = `Failed to convert screenshot to PDF: ${e}`;
}
} else {
outputResult.pdfUploadStatus =
"No valid screenshot provided by Selenium; nothing to upload.";
}
if (pdfBuffer && generatedPdfPath) {
const groupTitleKey = "CLAIM_STATUS";
const groupTitle = "Claim Status";
let group = await storage.findPdfGroupByPatientTitleKey(patient.id, groupTitleKey);
if (!group) group = await storage.createPdfGroup(patient.id, groupTitle, groupTitleKey);
if (!group?.id) throw new Error("PDF group creation failed");
const basename = path.basename(generatedPdfPath);
const created = await storage.createPdfFile(group.id, basename, pdfBuffer);
let createdPdfFileId: number | null = null;
if (created && typeof created === "object" && "id" in created) {
createdPdfFileId = Number(created.id);
}
outputResult.pdfUploadStatus = `PDF saved to group: ${group.title}`;
outputResult.pdfFileId = createdPdfFileId;
}
} else {
outputResult.pdfUploadStatus =
"Patient not found; no PDF saved.";
}
// 4) Cleanup
try {
if (result.ss_path) await emptyFolderContainingFile(result.ss_path);
} catch (e) {
console.error("[claimStatusProcessor] cleanup failed:", e);
}
return outputResult;
}

View File

@@ -0,0 +1,58 @@
/**
* Processors for "claim-submit" and "claim-pre-auth" jobs.
* Mirrors routes/claims.ts /selenium-claim and /selenium-claim-pre-auth
*/
import { storage } from "../../storage";
import { startPythonJob, pollPythonJob } from "./_shared";
export interface ClaimSubmitProcessorInput {
enrichedPayload: any;
files: { originalname: string; bufferBase64: string; mimetype: string }[];
claimId?: number;
/** "claimsubmit" (default) or "claim-pre-auth" */
variant?: "claimsubmit" | "claim-pre-auth";
}
export interface ClaimSubmitProcessorResult {
status: string;
claimNumber?: string;
pdf_url?: string;
[key: string]: any;
}
export async function runClaimSubmitProcessor(
input: ClaimSubmitProcessorInput
): Promise<ClaimSubmitProcessorResult> {
const { enrichedPayload, files, claimId } = input;
// Build the same payload shape the Python /claimsubmit endpoint expects
const pdfs = files
.filter((f) => f.mimetype === "application/pdf")
.map(({ originalname, bufferBase64 }) => ({ originalname, bufferBase64 }));
const images = files
.filter((f) => f.mimetype.startsWith("image/"))
.map(({ originalname, bufferBase64 }) => ({ originalname, bufferBase64 }));
const payload = { claim: enrichedPayload, pdfs, images };
const endpoint =
input.variant === "claim-pre-auth" ? "/claim-pre-auth/async" : "/claimsubmit/async";
// 1) Start async Python job
const sid = await startPythonJob(endpoint, payload);
// 2) Poll for result
const result = await pollPythonJob(sid, 10 * 60 * 1000); // claim submit can take up to 10 min
// 3) Persist claimNumber if returned
if (result?.claimNumber && claimId) {
try {
await storage.updateClaim(claimId, { claimNumber: result.claimNumber });
} catch (e) {
console.error("[claimSubmitProcessor] failed to persist claimNumber:", e);
}
}
return { ...result, claimId };
}

View File

@@ -0,0 +1,167 @@
/**
* Processor for "eligibility-check" jobs.
*
* Replicates the logic from routes/insuranceStatus.ts /eligibility-check
* so it can run inside a BullMQ worker without blocking the HTTP server.
*/
import fs from "fs/promises";
import fsSync from "fs";
import path from "path";
import { storage } from "../../storage";
import { emptyFolderContainingFile } from "../../utils/emptyTempFolder";
import forwardToPatientDataExtractorService from "../../services/patientDataExtractorService";
import {
startPythonJob,
pollPythonJob,
splitName,
createOrUpdatePatientByInsuranceId,
} from "./_shared";
export interface EligibilityProcessorInput {
/** Enriched payload (includes credentials) */
enrichedPayload: any;
userId: number;
insuranceId: string;
formFirstName?: string;
formLastName?: string;
formDob?: string;
}
export interface EligibilityProcessorResult {
patientUpdateStatus?: string;
pdfUploadStatus?: string;
pdfFileId?: number | null;
}
export async function runEligibilityProcessor(
input: EligibilityProcessorInput
): Promise<EligibilityProcessorResult> {
const {
enrichedPayload,
userId,
insuranceId,
formFirstName,
formLastName,
formDob,
} = input;
// 1) Fire the async Python job
const sid = await startPythonJob("/eligibility-check/async", {
data: enrichedPayload,
});
// 2) Wait for completion
const seleniumResult = await pollPythonJob(sid);
const outputResult: EligibilityProcessorResult = {};
// 3) Extract name: prefer selenium extraction → PDF extractor → form input
const extracted: { firstName?: string | null; lastName?: string | null } = {};
if (seleniumResult.firstName || seleniumResult.lastName) {
extracted.firstName = seleniumResult.firstName ?? null;
extracted.lastName = seleniumResult.lastName ?? null;
} else if (seleniumResult.name) {
const parts = splitName(seleniumResult.name);
extracted.firstName = parts.firstName;
extracted.lastName = parts.lastName;
}
if (
!extracted.firstName &&
!extracted.lastName &&
seleniumResult?.pdf_path?.endsWith(".pdf")
) {
try {
const pdfBuffer = await fs.readFile(seleniumResult.pdf_path);
const extraction = await forwardToPatientDataExtractorService({
buffer: pdfBuffer,
originalname: path.basename(seleniumResult.pdf_path),
mimetype: "application/pdf",
} as any);
if (extraction.name) {
const parts = splitName(extraction.name);
extracted.firstName = parts.firstName;
extracted.lastName = parts.lastName;
}
} catch (e) {
console.error("[eligibilityProcessor] PDF name extraction failed:", e);
}
}
const preferFirst = extracted.firstName || formFirstName || null;
const preferLast = extracted.lastName || formLastName || null;
// 4) Create / update patient
let patient;
try {
patient = await createOrUpdatePatientByInsuranceId({
insuranceId,
firstName: preferFirst,
lastName: preferLast,
dob: formDob,
userId,
});
} catch (e: any) {
throw new Error(`Failed to create/update patient: ${e.message}`);
}
// 5) Update patient status
if (patient && patient.id !== undefined) {
let newStatus = "UNKNOWN";
if (seleniumResult.eligibility === "Y") newStatus = "ACTIVE";
else if (seleniumResult.eligibility === "N") newStatus = "INACTIVE";
const updates: any = { status: newStatus };
if (seleniumResult.insurance) updates.insuranceProvider = seleniumResult.insurance;
await storage.updatePatient(patient.id, updates);
outputResult.patientUpdateStatus = `Patient status updated to ${newStatus}`;
// 6) Save PDF
let createdPdfFileId: number | null = null;
if (seleniumResult.pdf_path?.endsWith(".pdf")) {
try {
const pdfBuffer = await fs.readFile(seleniumResult.pdf_path);
const groupTitleKey = "ELIGIBILITY_STATUS";
const groupTitle = "Eligibility Status";
let group = await storage.findPdfGroupByPatientTitleKey(patient.id, groupTitleKey);
if (!group) group = await storage.createPdfGroup(patient.id, groupTitle, groupTitleKey);
if (!group?.id) throw new Error("PDF group creation failed");
const created = await storage.createPdfFile(
group.id,
path.basename(seleniumResult.pdf_path),
pdfBuffer
);
if (created && typeof created === "object" && "id" in created) {
createdPdfFileId = Number(created.id);
}
outputResult.pdfUploadStatus = `PDF saved to group: ${group.title}`;
} catch (e: any) {
outputResult.pdfUploadStatus = `PDF upload failed: ${e.message}`;
}
} else {
outputResult.pdfUploadStatus =
"No valid PDF path provided by Selenium; nothing uploaded.";
}
outputResult.pdfFileId = createdPdfFileId;
} else {
outputResult.patientUpdateStatus =
"Patient not found or missing ID; no update performed";
}
// 7) Cleanup temp files
try {
if (seleniumResult.pdf_path) {
await emptyFolderContainingFile(seleniumResult.pdf_path);
}
} catch (e) {
console.error("[eligibilityProcessor] cleanup failed:", e);
}
return outputResult;
}

View File

@@ -0,0 +1,42 @@
/**
* Processor for "ocr" jobs.
* Calls the PaymentOCR Python service with the uploaded files.
*/
import axios from "axios";
import FormData from "form-data";
const OCR_BASE_URL =
process.env.OCR_SERVICE_BASE_URL || "http://localhost:5003";
export interface OcrProcessorInput {
files: { originalname: string; bufferBase64: string; mimetype: string }[];
}
export async function runOcrProcessor(
input: OcrProcessorInput
): Promise<any[]> {
const { files } = input;
const form = new FormData();
for (const f of files) {
const buf = Buffer.from(f.bufferBase64, "base64");
form.append("files", buf, {
filename: f.originalname,
contentType: f.mimetype,
knownLength: buf.length,
});
}
const resp = await axios.post<{ rows: any[] }>(
`${OCR_BASE_URL}/extract/json`,
form,
{
headers: form.getHeaders(),
maxBodyLength: Infinity,
maxContentLength: Infinity,
timeout: 180_000, // OCR can be heavy; 3-minute limit
}
);
return resp.data?.rows ?? [];
}

View File

@@ -0,0 +1,48 @@
import { Queue } from "bullmq";
import { redisConnection } from "./connection";
/** Job types dispatched to the selenium Python worker. */
export type SeleniumJobType =
| "eligibility-check"
| "claim-status-check"
| "claim-submit"
| "claim-pre-auth";
export interface SeleniumJobData {
jobType: SeleniumJobType;
userId: number;
socketId?: string;
/** Fully-enriched payload sent to the Python service. */
enrichedPayload: any;
/** Extra fields used for DB post-processing */
insuranceId?: string;
formFirstName?: string;
formLastName?: string;
formDob?: string;
claimId?: number;
/** Base64-encoded files for claim submit */
files?: { originalname: string; bufferBase64: string; mimetype: string }[];
}
export interface OcrJobData {
userId: number;
socketId?: string;
files: { originalname: string; bufferBase64: string; mimetype: string }[];
}
const defaultOpts = {
removeOnComplete: { count: 100 },
removeOnFail: { count: 50 },
attempts: 2,
backoff: { type: "exponential" as const, delay: 5000 },
};
export const seleniumQueue = new Queue<SeleniumJobData>("selenium-jobs", {
connection: redisConnection,
defaultJobOptions: defaultOpts,
});
export const ocrQueue = new Queue<OcrJobData>("ocr-jobs", {
connection: redisConnection,
defaultJobOptions: { ...defaultOpts, attempts: 2 },
});

View File

@@ -0,0 +1,55 @@
import { Worker, Job } from "bullmq";
import { redisConnection } from "../connection";
import { OcrJobData } from "../queues";
import { io } from "../../socket";
import { runOcrProcessor } from "../processors/ocrProcessor";
function emitJobUpdate(
socketId: string | undefined,
jobId: string,
status: "active" | "completed" | "failed",
payload: Record<string, any>
) {
const event = "job:update";
const data = { jobId, jobType: "ocr", status, ...payload };
if (socketId && io) {
io.to(socketId).emit(event, data);
} else if (io) {
io.emit(event, data);
}
}
async function processOcrJob(job: Job<OcrJobData>) {
const { socketId, files } = job.data;
const jobId = job.id ?? job.name;
emitJobUpdate(socketId, jobId, "active", { message: "OCR processing started…" });
try {
const rows = await runOcrProcessor({ files });
emitJobUpdate(socketId, jobId, "completed", { result: { rows } });
return rows;
} catch (err: any) {
const errorMsg = err?.message ?? String(err);
emitJobUpdate(socketId, jobId, "failed", { error: errorMsg });
throw err;
}
}
export function startOcrWorker() {
const worker = new Worker<OcrJobData>("ocr-jobs", processOcrJob, {
connection: redisConnection,
concurrency: 2, // OCR service allows 2 concurrent
});
worker.on("completed", (job) => {
console.log(`[ocrWorker] job ${job.id} completed`);
});
worker.on("failed", (job, err) => {
console.error(`[ocrWorker] job ${job?.id} failed:`, err.message);
});
console.log("[ocrWorker] started");
return worker;
}

View File

@@ -0,0 +1,104 @@
import { Worker, Job } from "bullmq";
import { redisConnection } from "../connection";
import { SeleniumJobData } from "../queues";
import { io } from "../../socket";
import { runEligibilityProcessor } from "../processors/eligibilityProcessor";
import { runClaimStatusProcessor } from "../processors/claimStatusProcessor";
import { runClaimSubmitProcessor } from "../processors/claimSubmitProcessor";
/**
* Emit a job-status event to the socket that enqueued the job (if any).
* Falls back to broadcasting when socketId is absent.
*/
function emitJobUpdate(
socketId: string | undefined,
jobId: string,
jobType: string,
status: "active" | "completed" | "failed",
payload: Record<string, any>
) {
const event = "job:update";
const data = { jobId, jobType, status, ...payload };
if (socketId && io) {
io.to(socketId).emit(event, data);
} else if (io) {
io.emit(event, data);
}
}
async function processSeleniumJob(job: Job<SeleniumJobData>) {
const { jobType, userId, socketId, enrichedPayload } = job.data;
const jobId = job.id ?? job.name;
emitJobUpdate(socketId, jobId, jobType, "active", {
message: "Selenium browser starting…",
});
try {
let result: any;
if (jobType === "eligibility-check") {
result = await runEligibilityProcessor({
enrichedPayload,
userId,
insuranceId: job.data.insuranceId!,
formFirstName: job.data.formFirstName,
formLastName: job.data.formLastName,
formDob: job.data.formDob,
});
} else if (jobType === "claim-status-check") {
result = await runClaimStatusProcessor({
enrichedPayload,
insuranceId: job.data.insuranceId!,
});
} else if (jobType === "claim-submit") {
result = await runClaimSubmitProcessor({
enrichedPayload,
files: job.data.files ?? [],
claimId: job.data.claimId,
variant: "claimsubmit",
});
} else if (jobType === "claim-pre-auth") {
result = await runClaimSubmitProcessor({
enrichedPayload,
files: job.data.files ?? [],
claimId: job.data.claimId,
variant: "claim-pre-auth",
});
} else {
throw new Error(`Unknown selenium jobType: ${jobType}`);
}
emitJobUpdate(socketId, jobId, jobType, "completed", { result });
return result;
} catch (err: any) {
const errorMsg = err?.message ?? String(err);
emitJobUpdate(socketId, jobId, jobType, "failed", { error: errorMsg });
throw err; // let BullMQ mark job as failed / retry
}
}
export function startSeleniumWorker() {
const worker = new Worker<SeleniumJobData>(
"selenium-jobs",
processSeleniumJob,
{
connection: redisConnection,
concurrency: 1, // mirror the Python semaphore(1) — 1 browser at a time
}
);
worker.on("completed", (job) => {
console.log(`[seleniumWorker] job ${job.id} (${job.data.jobType}) completed`);
});
worker.on("failed", (job, err) => {
console.error(
`[seleniumWorker] job ${job?.id} (${job?.data.jobType}) failed:`,
err.message
);
});
console.log("[seleniumWorker] started");
return worker;
}

View File

@@ -0,0 +1,78 @@
/**
* useJobStatus — tracks a BullMQ job via WebSocket `job:update` events.
*
* Usage:
* const { status, result, error } = useJobStatus(jobId);
*
* The hook listens for `job:update` events emitted by the backend workers.
* When the jobId changes, the previous listener is removed and a fresh one
* is registered for the new job.
*/
import { useEffect, useState } from "react";
import { socket } from "@/lib/socket";
export type JobStatus = "queued" | "active" | "completed" | "failed" | null;
export interface JobUpdatePayload {
jobId: string;
jobType: string;
status: JobStatus;
message?: string;
result?: any;
error?: string;
}
export interface UseJobStatusReturn {
status: JobStatus;
message: string;
result: any;
error: string | null;
socketId: string | null;
}
export function useJobStatus(jobId: string | null): UseJobStatusReturn {
const [status, setStatus] = useState<JobStatus>(jobId ? "queued" : null);
const [message, setMessage] = useState("");
const [result, setResult] = useState<any>(null);
const [error, setError] = useState<string | null>(null);
const [socketId, setSocketId] = useState<string | null>(
socket.id ?? null
);
// Keep socketId in sync with the socket connection
useEffect(() => {
const onConnect = () => setSocketId(socket.id ?? null);
socket.on("connect", onConnect);
if (socket.connected) setSocketId(socket.id ?? null);
return () => { socket.off("connect", onConnect); };
}, []);
// Reset state when the jobId changes
useEffect(() => {
if (!jobId) {
setStatus(null);
setMessage("");
setResult(null);
setError(null);
return;
}
setStatus("queued");
setMessage("");
setResult(null);
setError(null);
const handler = (payload: JobUpdatePayload) => {
if (payload.jobId !== jobId) return;
setStatus(payload.status);
if (payload.message) setMessage(payload.message);
if (payload.result !== undefined) setResult(payload.result);
if (payload.error) setError(payload.error);
};
socket.on("job:update", handler);
return () => { socket.off("job:update", handler); };
}, [jobId]);
return { status, message, result, error, socketId };
}

View File

@@ -0,0 +1,24 @@
/**
* Shared Socket.IO client singleton.
*
* Import `socket` anywhere in the frontend to use the shared connection.
* The socket connects lazily — the first import triggers the connection.
*/
import { io, Socket } from "socket.io-client";
const SOCKET_URL =
import.meta.env.VITE_API_BASE_URL_BACKEND ||
(typeof window !== "undefined" ? window.location.origin : "");
export const socket: Socket = io(SOCKET_URL, {
withCredentials: true,
autoConnect: true,
});
socket.on("connect", () => {
console.log("[socket] connected:", socket.id);
});
socket.on("disconnect", () => {
console.log("[socket] disconnected");
});