feat: rewire routes to BullMQ and speed up documents page

This commit is contained in:
ff
2026-04-13 20:43:21 -04:00
parent 4e981c644f
commit 05a8a220bd
5 changed files with 171 additions and 526 deletions

View File

@@ -2,6 +2,8 @@ import app from "./app";
import dotenv from "dotenv";
import http from "http";
import { initSocket } from "./socket";
import { startSeleniumWorker } from "./queue/workers/seleniumWorker";
import { startOcrWorker } from "./queue/workers/ocrWorker";
dotenv.config();
@@ -19,6 +21,10 @@ const server = http.createServer(app);
// Initialize socket.io on this server
initSocket(server);
// Start BullMQ workers (requires Redis at localhost:6379)
startSeleniumWorker();
startOcrWorker();
server.listen(PORT, HOST, () => {
console.log(
`✅ Server running in ${NODE_ENV} mode at http://${HOST}:${PORT}`

View File

@@ -6,6 +6,7 @@ import multer from "multer";
import { forwardToSeleniumClaimAgent } from "../services/seleniumClaimClient";
import path from "path";
import axios from "axios";
import { seleniumQueue } from "../queue/queues";
import { Prisma } from "@repo/db/generated/prisma";
import { Decimal } from "decimal.js";
import {
@@ -138,31 +139,27 @@ router.post(
massdhpPassword: credentials.password,
};
const result = await forwardToSeleniumClaimAgent(enrichedData, [
...pdfs,
...images,
]);
// Encode file buffers as base64 so they can be stored in Redis
const filesForQueue = [...pdfs, ...images].map((f) => ({
originalname: f.originalname,
bufferBase64: f.buffer.toString("base64"),
mimetype: f.mimetype,
}));
// Store claimNumber if returned from Selenium
if (result?.claimNumber && claimData.claimId) {
try {
await storage.updateClaim(claimData.claimId, {
claimNumber: result.claimNumber,
});
console.log(`Updated claim ${claimData.claimId} with claimNumber: ${result.claimNumber}`);
} catch (updateErr) {
console.error("Failed to update claim with claimNumber:", updateErr);
}
}
res.json({
...result,
const job = await seleniumQueue.add("claim-submit", {
jobType: "claim-submit",
userId: req.user.id,
socketId: req.body.socketId,
enrichedPayload: enrichedData,
files: filesForQueue,
claimId: claimData.claimId,
});
return res.json({ jobId: job.id, status: "queued" });
} catch (err: any) {
console.error(err);
return res.status(500).json({
error: err.message || "Failed to forward to selenium agent",
error: err.message || "Failed to enqueue selenium claim job",
});
}
}
@@ -319,19 +316,26 @@ router.post(
massdhpPassword: credentials.password,
};
const result = await forwardToSeleniumClaimPreAuthAgent(enrichedData, [
...pdfs,
...images,
]);
const filesForQueue = [...pdfs, ...images].map((f) => ({
originalname: f.originalname,
bufferBase64: f.buffer.toString("base64"),
mimetype: f.mimetype,
}));
res.json({
...result,
const job = await seleniumQueue.add("claim-pre-auth", {
jobType: "claim-pre-auth",
userId: req.user.id,
socketId: req.body.socketId,
enrichedPayload: enrichedData,
files: filesForQueue,
claimId: claimData.claimId,
});
return res.json({ jobId: job.id, status: "queued" });
} catch (err: any) {
console.error(err);
return res.status(500).json({
error: err.message || "Failed to forward to selenium agent",
error: err.message || "Failed to enqueue selenium pre-auth job",
});
}
}

View File

@@ -14,6 +14,7 @@ import {
insertPatientSchema,
} from "../../../../packages/db/types/patient-types";
import { formatDobForAgent } from "../utils/dateUtils";
import { seleniumQueue } from "../queue/queues";
const router = Router();
@@ -119,241 +120,49 @@ router.post(
.status(400)
.json({ error: "Missing Insurance Eligibility data for selenium" });
}
if (!req.user || !req.user.id) {
return res.status(401).json({ error: "Unauthorized: user info missing" });
}
let seleniumResult: any = undefined;
let createdPdfFileId: number | null = null;
let outputResult: any = {};
const extracted: any = {};
try {
// const insuranceEligibilityData = JSON.parse(req.body.data);
// Handle both string and object data
const insuranceEligibilityData = typeof req.body.data === 'string'
? JSON.parse(req.body.data)
const insuranceEligibilityData =
typeof req.body.data === "string"
? JSON.parse(req.body.data)
: req.body.data;
const credentials = await storage.getInsuranceCredentialByUserAndSiteKey(
req.user.id,
insuranceEligibilityData.insuranceSiteKey
);
if (!credentials) {
return res.status(404).json({
error:
"No insurance credentials found for this provider, Kindly Update this at Settings Page.",
});
}
const enrichedData = {
...insuranceEligibilityData,
massdhpUsername: credentials.username,
massdhpPassword: credentials.password,
};
// 1) Run selenium agent
try {
seleniumResult =
await forwardToSeleniumInsuranceEligibilityAgent(enrichedData);
} catch (seleniumErr: any) {
return res.status(502).json({
error: "Selenium service failed",
detail: seleniumErr?.message ?? String(seleniumErr),
});
}
// 2) Extract data from selenium result (page extraction) and PDF
let extracted: any = {};
// First, try to get data from selenium's page extraction
if (seleniumResult.firstName || seleniumResult.lastName) {
extracted.firstName = seleniumResult.firstName || null;
extracted.lastName = seleniumResult.lastName || null;
console.log('[eligibility-check] Using name from selenium extraction:', {
firstName: extracted.firstName,
lastName: extracted.lastName
});
}
// Also check for combined name field (fallback)
else if (seleniumResult.name) {
const parts = splitName(seleniumResult.name);
extracted.firstName = parts.firstName;
extracted.lastName = parts.lastName;
console.log('[eligibility-check] Using combined name from selenium extraction:', parts);
}
// If no name from selenium, try PDF extraction
if (!extracted.firstName && !extracted.lastName &&
seleniumResult?.pdf_path &&
seleniumResult.pdf_path.endsWith(".pdf")
) {
try {
const pdfPath = seleniumResult.pdf_path;
console.log('[eligibility-check] Extracting data from PDF:', pdfPath);
const pdfBuffer = await fs.readFile(pdfPath);
const extraction = await forwardToPatientDataExtractorService({
buffer: pdfBuffer,
originalname: path.basename(pdfPath),
mimetype: "application/pdf",
} as any);
console.log('[eligibility-check] PDF Extraction result:', extraction);
if (extraction.name) {
const parts = splitName(extraction.name);
extracted.firstName = parts.firstName;
extracted.lastName = parts.lastName;
console.log('[eligibility-check] Split name from PDF:', parts);
} else {
console.warn('[eligibility-check] No name extracted from PDF');
}
} catch (extractErr: any) {
console.error('[eligibility-check] Patient data extraction failed:', extractErr);
// Continue without extracted names - we'll use form names or create patient with empty names
}
}
// Step-3) Create or update patient name using extracted info (prefer extractor -> request)
const insuranceId = String(
insuranceEligibilityData.memberId ?? ""
).trim();
if (!insuranceId) {
return res.status(400).json({ error: "Missing memberId" });
}
// Always prioritize extracted data from MassHealth over form input
// Form input is only used as fallback when extraction fails
const preferFirst = extracted.firstName || null;
const preferLast = extracted.lastName || null;
console.log('[eligibility-check] Name priority:', {
extracted: { firstName: extracted.firstName, lastName: extracted.lastName },
fromForm: { firstName: insuranceEligibilityData.firstName, lastName: insuranceEligibilityData.lastName },
using: { firstName: preferFirst, lastName: preferLast }
const credentials = await storage.getInsuranceCredentialByUserAndSiteKey(
req.user.id,
insuranceEligibilityData.insuranceSiteKey
);
if (!credentials) {
return res.status(404).json({
error:
"No insurance credentials found for this provider, Kindly Update this at Settings Page.",
});
let patient;
try {
patient = await createOrUpdatePatientByInsuranceId({
insuranceId,
firstName: preferFirst,
lastName: preferLast,
dob: insuranceEligibilityData.dateOfBirth,
userId: req.user.id,
});
console.log('[eligibility-check] Patient after create/update:', patient);
} catch (patientOpErr: any) {
return res.status(500).json({
error: "Failed to create/update patient",
detail: patientOpErr?.message ?? String(patientOpErr),
});
}
// ✅ Step 4: Update patient status based on selenium result
if (patient && patient.id !== undefined) {
// Use eligibility from selenium extraction if available, otherwise default to UNKNOWN
let newStatus = "UNKNOWN";
if (seleniumResult.eligibility === "Y") {
newStatus = "ACTIVE";
} else if (seleniumResult.eligibility === "N") {
newStatus = "INACTIVE";
}
// Prepare updates object
const updates: any = { status: newStatus };
// Update insurance provider if extracted
if (seleniumResult.insurance) {
updates.insuranceProvider = seleniumResult.insurance;
console.log('[eligibility-check] Updating insurance provider:', seleniumResult.insurance);
}
await storage.updatePatient(patient.id, updates);
outputResult.patientUpdateStatus = `Patient status updated to ${newStatus}${seleniumResult.insurance ? ', insurance updated' : ''}`;
console.log('[eligibility-check] Status updated:', {
patientId: patient.id,
newStatus,
eligibility: seleniumResult.eligibility,
insurance: seleniumResult.insurance
});
// ✅ Step 5: Handle PDF Upload
if (
seleniumResult.pdf_path &&
seleniumResult.pdf_path.endsWith(".pdf")
) {
const pdfBuffer = await fs.readFile(seleniumResult.pdf_path);
const groupTitle = "Eligibility Status";
const groupTitleKey = "ELIGIBILITY_STATUS";
let group = await storage.findPdfGroupByPatientTitleKey(
patient.id,
groupTitleKey
);
// Step 5b: Create group if it doesnt exist
if (!group) {
group = await storage.createPdfGroup(
patient.id,
groupTitle,
groupTitleKey
);
}
if (!group?.id) {
throw new Error("PDF group creation failed: missing group ID");
}
const created = await storage.createPdfFile(
group.id,
path.basename(seleniumResult.pdf_path),
pdfBuffer
);
// created could be { id, filename } or just id, adapt to your storage API.
if (created && typeof created === "object" && "id" in created) {
createdPdfFileId = Number(created.id);
}
outputResult.pdfUploadStatus = `PDF saved to group: ${group.title}`;
} else {
outputResult.pdfUploadStatus =
"No valid PDF path provided by Selenium, Couldn't upload pdf to server.";
}
} else {
outputResult.patientUpdateStatus =
"Patient not found or missing ID; no update performed";
}
res.json({
patientUpdateStatus: outputResult.patientUpdateStatus,
pdfUploadStatus: outputResult.pdfUploadStatus,
pdfFileId: createdPdfFileId,
});
} catch (err: any) {
console.error(err);
return res.status(500).json({
error: err.message || "Failed to forward to selenium agent",
});
} finally {
try {
if (seleniumResult && seleniumResult.pdf_path) {
await emptyFolderContainingFile(seleniumResult.pdf_path);
} else {
console.log(`[eligibility-check] no pdf_path available to cleanup`);
}
} catch (cleanupErr) {
console.error(
`[eligibility-check cleanup failed for ${seleniumResult?.pdf_path}`,
cleanupErr
);
}
}
const insuranceId = String(insuranceEligibilityData.memberId ?? "").trim();
if (!insuranceId) {
return res.status(400).json({ error: "Missing memberId" });
}
const enrichedData = {
...insuranceEligibilityData,
massdhpUsername: credentials.username,
massdhpPassword: credentials.password,
};
const job = await seleniumQueue.add("eligibility-check", {
jobType: "eligibility-check",
userId: req.user.id,
socketId: req.body.socketId,
enrichedPayload: enrichedData,
insuranceId,
formFirstName: insuranceEligibilityData.firstName,
formLastName: insuranceEligibilityData.lastName,
formDob: insuranceEligibilityData.dateOfBirth,
});
return res.json({ jobId: job.id, status: "queued" });
}
);
@@ -365,170 +174,41 @@ router.post(
.status(400)
.json({ error: "Missing Insurance Status data for selenium" });
}
if (!req.user || !req.user.id) {
return res.status(401).json({ error: "Unauthorized: user info missing" });
}
let result: any = undefined;
const insuranceClaimStatusData =
typeof req.body.data === "string"
? JSON.parse(req.body.data)
: req.body.data;
async function imageToPdfBuffer(imagePath: string): Promise<Buffer> {
return new Promise<Buffer>((resolve, reject) => {
try {
const doc = new PDFDocument({ autoFirstPage: false });
const chunks: Uint8Array[] = [];
// collect data chunks
doc.on("data", (chunk: any) => chunks.push(chunk));
doc.on("end", () => resolve(Buffer.concat(chunks)));
doc.on("error", (err: any) => reject(err));
const A4_WIDTH = 595.28; // points
const A4_HEIGHT = 841.89; // points
doc.addPage({ size: [A4_WIDTH, A4_HEIGHT] });
doc.image(imagePath, 0, 0, {
fit: [A4_WIDTH, A4_HEIGHT],
align: "center",
valign: "center",
});
doc.end();
} catch (err) {
reject(err);
}
const credentials = await storage.getInsuranceCredentialByUserAndSiteKey(
req.user.id,
insuranceClaimStatusData.insuranceSiteKey
);
if (!credentials) {
return res.status(404).json({
error:
"No insurance credentials found for this provider, Kindly Update this at Settings Page.",
});
}
try {
const insuranceClaimStatusData = JSON.parse(req.body.data);
const credentials = await storage.getInsuranceCredentialByUserAndSiteKey(
req.user.id,
insuranceClaimStatusData.insuranceSiteKey
);
if (!credentials) {
return res.status(404).json({
error:
"No insurance credentials found for this provider, Kindly Update this at Settings Page.",
});
}
const enrichedData = {
...insuranceClaimStatusData,
massdhpUsername: credentials.username,
massdhpPassword: credentials.password,
};
const enrichedData = {
...insuranceClaimStatusData,
massdhpUsername: credentials.username,
massdhpPassword: credentials.password,
};
const job = await seleniumQueue.add("claim-status-check", {
jobType: "claim-status-check",
userId: req.user.id,
socketId: req.body.socketId,
enrichedPayload: enrichedData,
insuranceId: String(insuranceClaimStatusData.memberId ?? "").trim(),
});
result = await forwardToSeleniumInsuranceClaimStatusAgent(enrichedData);
let createdPdfFileId: number | null = null;
// ✅ Step 1: Check result
const patient = await storage.getPatientByInsuranceId(
insuranceClaimStatusData.memberId
);
if (patient && patient.id !== undefined) {
let pdfBuffer: Buffer | null = null;
let generatedPdfPath: string | null = null;
if (
result.ss_path &&
(result.ss_path.endsWith(".png") ||
result.ss_path.endsWith(".jpg") ||
result.ss_path.endsWith(".jpeg"))
) {
try {
// Ensure file exists
if (!fsSync.existsSync(result.ss_path)) {
throw new Error(`Screenshot file not found: ${result.ss_path}`);
}
// Convert image to PDF buffer
pdfBuffer = await imageToPdfBuffer(result.ss_path);
// Optionally write generated PDF to temp path (so name is available for createPdfFile)
const pdfFileName = `claimStatus_${insuranceClaimStatusData.memberId}_${Date.now()}.pdf`;
generatedPdfPath = path.join(
path.dirname(result.ss_path),
pdfFileName
);
await fs.writeFile(generatedPdfPath, pdfBuffer);
} catch (err) {
console.error("Failed to convert screenshot to PDF:", err);
result.pdfUploadStatus = `Failed to convert screenshot to PDF: ${String(err)}`;
}
} else {
result.pdfUploadStatus =
"No valid PDF or screenshot path provided by Selenium; nothing to upload.";
}
if (pdfBuffer && generatedPdfPath) {
const groupTitle = "Claim Status";
const groupTitleKey = "CLAIM_STATUS";
let group = await storage.findPdfGroupByPatientTitleKey(
patient.id,
groupTitleKey
);
// Create group if missing
if (!group) {
group = await storage.createPdfGroup(
patient.id,
groupTitle,
groupTitleKey
);
}
if (!group?.id) {
throw new Error("PDF group creation failed: missing group ID");
}
// Use the basename for storage
const basename = path.basename(generatedPdfPath);
const created = await storage.createPdfFile(
group.id,
basename,
pdfBuffer
);
if (created && typeof created === "object" && "id" in created) {
createdPdfFileId = Number(created.id);
}
result.pdfUploadStatus = `PDF saved to group: ${group.title}`;
}
} else {
result.patientUpdateStatus =
"Patient not found or missing ID; no update performed";
}
res.json({
pdfUploadStatus: result.pdfUploadStatus,
pdfFileId: createdPdfFileId,
});
return;
} catch (err: any) {
console.error(err);
return res.status(500).json({
error: err.message || "Failed to forward to selenium agent",
});
} finally {
try {
if (result && result.ss_path) {
await emptyFolderContainingFile(result.ss_path);
} else {
console.log(`claim-status-check] no pdf_path available to cleanup`);
}
} catch (cleanupErr) {
console.error(
`[claim-status-check cleanup failed for ${result?.ss_path}`,
cleanupErr
);
}
}
return res.json({ jobId: job.id, status: "queued" });
}
);

View File

@@ -1,49 +1,56 @@
import { Router, Request, Response } from "express";
import multer from "multer";
import { forwardToPaymentOCRService } from "../services/paymentOCRService";
import { ocrQueue } from "../queue/queues";
const router = Router();
// keep files in memory; FastAPI accepts them as multipart bytes
const upload = multer({ storage: multer.memoryStorage() });
const ALLOWED_MIMES = new Set([
"image/jpeg",
"image/png",
"image/tiff",
"image/bmp",
"image/jpg",
]);
// POST /payment-ocr/extract (field name: "files")
router.post(
"/extract",
upload.array("files"), // allow multiple images
upload.array("files"),
async (req: Request, res: Response): Promise<any> => {
try {
const files = req.files as Express.Multer.File[] | undefined;
const files = req.files as Express.Multer.File[] | undefined;
if (!files || files.length === 0) {
return res
.status(400)
.json({ error: "No image files uploaded. Use field name 'files'." });
}
// (optional) basic client-side MIME guard
const allowed = new Set([
"image/jpeg",
"image/png",
"image/tiff",
"image/bmp",
"image/jpg",
]);
const bad = files.filter((f) => !allowed.has(f.mimetype.toLowerCase()));
if (bad.length) {
return res.status(415).json({
error: `Unsupported file types: ${bad
.map((b) => b.originalname)
.join(", ")}`,
});
}
const rows = await forwardToPaymentOCRService(files);
return res.json({ rows });
} catch (err) {
console.error(err);
return res.status(500).json({ error: "Payment OCR extraction failed" });
if (!files || files.length === 0) {
return res
.status(400)
.json({ error: "No image files uploaded. Use field name 'files'." });
}
const bad = files.filter((f) => !ALLOWED_MIMES.has(f.mimetype.toLowerCase()));
if (bad.length) {
return res.status(415).json({
error: `Unsupported file types: ${bad.map((b) => b.originalname).join(", ")}`,
});
}
const filesForQueue = files.map((f) => ({
originalname: f.originalname,
bufferBase64: f.buffer.toString("base64"),
mimetype: f.mimetype,
}));
const socketId: string | undefined =
(req.body?.socketId as string) ?? undefined;
const job = await ocrQueue.add("ocr", {
userId: (req.user as any)?.id ?? 0,
socketId,
files: filesForQueue,
});
return res.json({ jobId: job.id, status: "queued" });
}
);