feat(ddma-eligibility) - v6 updated file
This commit is contained in:
@@ -22,6 +22,7 @@ const router = Router();
|
||||
interface DdmaJobContext {
|
||||
userId: number;
|
||||
insuranceEligibilityData: any; // parsed, enriched (includes username/password)
|
||||
socketId?: string;
|
||||
}
|
||||
|
||||
const ddmaJobs: Record<string, DdmaJobContext> = {};
|
||||
@@ -133,11 +134,10 @@ async function handleDdmaCompletedJob(
|
||||
let createdPdfFileId: number | null = null;
|
||||
const outputResult: any = {};
|
||||
|
||||
const insuranceEligibilityData = job.insuranceEligibilityData;
|
||||
|
||||
// We'll wrap the processing in try/catch/finally so cleanup always runs
|
||||
try {
|
||||
// 1) ensuring memberid.
|
||||
const insuranceEligibilityData = job.insuranceEligibilityData;
|
||||
const insuranceId = String(insuranceEligibilityData.memberId ?? "").trim();
|
||||
if (!insuranceId) {
|
||||
throw new Error("Missing memberId for ddma job");
|
||||
@@ -154,14 +154,23 @@ async function handleDdmaCompletedJob(
|
||||
const patient = await storage.getPatientByInsuranceId(
|
||||
insuranceEligibilityData.memberId
|
||||
);
|
||||
if (!patient?.id) {
|
||||
outputResult.patientUpdateStatus =
|
||||
"Patient not found; no update performed";
|
||||
return {
|
||||
patientUpdateStatus: outputResult.patientUpdateStatus,
|
||||
pdfUploadStatus: "none",
|
||||
pdfFileId: null,
|
||||
};
|
||||
}
|
||||
|
||||
if (patient && patient.id !== undefined) {
|
||||
// update patient status.
|
||||
const newStatus =
|
||||
seleniumResult.eligibility === "active" ? "ACTIVE" : "INACTIVE";
|
||||
await storage.updatePatient(patient.id, { status: newStatus });
|
||||
outputResult.patientUpdateStatus = `Patient status updated to ${newStatus}`;
|
||||
|
||||
// Expect only ss_path (screenshot)
|
||||
// convert screenshot -> pdf if available
|
||||
let pdfBuffer: Buffer | null = null;
|
||||
let generatedPdfPath: string | null = null;
|
||||
|
||||
@@ -232,10 +241,6 @@ async function handleDdmaCompletedJob(
|
||||
outputResult.pdfUploadStatus =
|
||||
"No valid PDF path provided by Selenium, Couldn't upload pdf to server.";
|
||||
}
|
||||
} else {
|
||||
outputResult.patientUpdateStatus =
|
||||
"Patient not found or missing ID; no update performed";
|
||||
}
|
||||
|
||||
return {
|
||||
patientUpdateStatus: outputResult.patientUpdateStatus,
|
||||
@@ -273,7 +278,8 @@ async function handleDdmaCompletedJob(
|
||||
}
|
||||
|
||||
// --- top of file, alongside ddmaJobs ---
|
||||
const finalResults: Record<string, any> = {};
|
||||
let currentFinalSessionId: string | null = null;
|
||||
let currentFinalResult: any = null;
|
||||
|
||||
function now() {
|
||||
return new Date().toISOString();
|
||||
@@ -307,20 +313,39 @@ function emitSafe(socketId: string | undefined, event: string, payload: any) {
|
||||
* Polls Python agent for session status and emits socket events:
|
||||
* - 'selenium:otp_required' when waiting_for_otp
|
||||
* - 'selenium:session_update' when completed/error
|
||||
* - rabsolute timeout + transient error handling.
|
||||
* - pollTimeoutMs default = 2 minutes (adjust where invoked)
|
||||
*/
|
||||
async function pollAgentSessionAndProcess(
|
||||
sessionId: string,
|
||||
socketId?: string
|
||||
socketId?: string,
|
||||
pollTimeoutMs = 2 * 60 * 1000
|
||||
) {
|
||||
const maxAttempts = 300; // ~5 minutes @ 1s base (adjust if needed)
|
||||
const maxAttempts = 300;
|
||||
const baseDelayMs = 1000;
|
||||
const maxTransientErrors = 12; // tolerate more transient errors
|
||||
const maxTransientErrors = 12;
|
||||
|
||||
// NEW: give up if same non-terminal status repeats this many times
|
||||
const noProgressLimit = 10;
|
||||
|
||||
const job = ddmaJobs[sessionId];
|
||||
let transientErrorCount = 0;
|
||||
let consecutiveNoProgress = 0;
|
||||
let lastStatus: string | null = null;
|
||||
const deadline = Date.now() + pollTimeoutMs;
|
||||
|
||||
for (let attempt = 0; attempt < maxAttempts; attempt++) {
|
||||
const attemptTs = new Date().toISOString();
|
||||
// absolute deadline check
|
||||
if (Date.now() > deadline) {
|
||||
emitSafe(socketId, "selenium:session_update", {
|
||||
session_id: sessionId,
|
||||
status: "error",
|
||||
message: `Polling timeout reached (${Math.round(pollTimeoutMs / 1000)}s).`,
|
||||
});
|
||||
delete ddmaJobs[sessionId];
|
||||
return;
|
||||
}
|
||||
|
||||
log(
|
||||
"poller",
|
||||
`attempt=${attempt} session=${sessionId} transientErrCount=${transientErrorCount}`
|
||||
@@ -328,7 +353,7 @@ async function pollAgentSessionAndProcess(
|
||||
|
||||
try {
|
||||
const st = await getSeleniumDdmaSessionStatus(sessionId);
|
||||
const status = st?.status;
|
||||
const status = st?.status ?? null;
|
||||
log("poller", "got status", {
|
||||
sessionId,
|
||||
status,
|
||||
@@ -339,6 +364,32 @@ async function pollAgentSessionAndProcess(
|
||||
// reset transient errors on success
|
||||
transientErrorCount = 0;
|
||||
|
||||
// if status unchanged and non-terminal, increment no-progress counter
|
||||
const isTerminalLike =
|
||||
status === "completed" || status === "error" || status === "not_found";
|
||||
if (status === lastStatus && !isTerminalLike) {
|
||||
consecutiveNoProgress++;
|
||||
} else {
|
||||
consecutiveNoProgress = 0;
|
||||
}
|
||||
lastStatus = status;
|
||||
|
||||
// if no progress for too many consecutive polls -> abort
|
||||
if (consecutiveNoProgress >= noProgressLimit) {
|
||||
emitSafe(socketId, "selenium:session_update", {
|
||||
session_id: sessionId,
|
||||
status: "error",
|
||||
message: `No progress from selenium agent (status="${status}") after ${consecutiveNoProgress} polls; aborting.`,
|
||||
});
|
||||
emitSafe(socketId, "selenium:session_error", {
|
||||
session_id: sessionId,
|
||||
status: "error",
|
||||
message: "No progress from selenium agent",
|
||||
});
|
||||
delete ddmaJobs[sessionId];
|
||||
return;
|
||||
}
|
||||
|
||||
// always emit debug to client if socket exists
|
||||
emitSafe(socketId, "selenium:debug", {
|
||||
session_id: sessionId,
|
||||
@@ -366,7 +417,8 @@ async function pollAgentSessionAndProcess(
|
||||
});
|
||||
|
||||
// Persist raw result so frontend can fetch if socket disconnects
|
||||
finalResults[sessionId] = {
|
||||
currentFinalSessionId = sessionId;
|
||||
currentFinalResult = {
|
||||
rawSelenium: st.result,
|
||||
processedAt: null,
|
||||
final: null,
|
||||
@@ -380,22 +432,24 @@ async function pollAgentSessionAndProcess(
|
||||
job,
|
||||
st.result
|
||||
);
|
||||
finalResults[sessionId].final = finalResult;
|
||||
finalResults[sessionId].processedAt = Date.now();
|
||||
currentFinalResult.final = finalResult;
|
||||
currentFinalResult.processedAt = Date.now();
|
||||
} catch (err: any) {
|
||||
finalResults[sessionId].final = {
|
||||
currentFinalResult.final = {
|
||||
error: "processing_failed",
|
||||
detail: err?.message ?? String(err),
|
||||
};
|
||||
finalResults[sessionId].processedAt = Date.now();
|
||||
currentFinalResult.processedAt = Date.now();
|
||||
log("poller", "handleDdmaCompletedJob failed", {
|
||||
sessionId,
|
||||
err: err?.message ?? err,
|
||||
});
|
||||
}
|
||||
} else {
|
||||
finalResults[sessionId].final = { error: "no_job_or_no_result" };
|
||||
finalResults[sessionId].processedAt = Date.now();
|
||||
currentFinalResult[sessionId].final = {
|
||||
error: "no_job_or_no_result",
|
||||
};
|
||||
currentFinalResult[sessionId].processedAt = Date.now();
|
||||
}
|
||||
|
||||
// Emit final update (if socket present)
|
||||
@@ -403,7 +457,7 @@ async function pollAgentSessionAndProcess(
|
||||
session_id: sessionId,
|
||||
status: "completed",
|
||||
rawSelenium: st.result,
|
||||
final: finalResults[sessionId].final,
|
||||
final: currentFinalResult.final,
|
||||
});
|
||||
|
||||
// cleanup job context
|
||||
@@ -456,17 +510,6 @@ async function pollAgentSessionAndProcess(
|
||||
|
||||
// Detailed transient error logging
|
||||
transientErrorCount++;
|
||||
const backoffMs = Math.min(
|
||||
30_000,
|
||||
baseDelayMs * Math.pow(2, transientErrorCount - 1)
|
||||
);
|
||||
console.warn(
|
||||
`${new Date().toISOString()} [poller] transient error (#${transientErrorCount}) for ${sessionId}: code=${errCode} status=${axiosStatus} msg=${errMsg} data=${JSON.stringify(errData)}`
|
||||
);
|
||||
console.warn(
|
||||
`${new Date().toISOString()} [poller] backing off ${backoffMs}ms before next attempt`
|
||||
);
|
||||
|
||||
if (transientErrorCount > maxTransientErrors) {
|
||||
const emitPayload = {
|
||||
session_id: sessionId,
|
||||
@@ -479,6 +522,18 @@ async function pollAgentSessionAndProcess(
|
||||
delete ddmaJobs[sessionId];
|
||||
return;
|
||||
}
|
||||
|
||||
const backoffMs = Math.min(
|
||||
30_000,
|
||||
baseDelayMs * Math.pow(2, transientErrorCount - 1)
|
||||
);
|
||||
console.warn(
|
||||
`${new Date().toISOString()} [poller] transient error (#${transientErrorCount}) for ${sessionId}: code=${errCode} status=${axiosStatus} msg=${errMsg} data=${JSON.stringify(errData)}`
|
||||
);
|
||||
console.warn(
|
||||
`${new Date().toISOString()} [poller] backing off ${backoffMs}ms before next attempt`
|
||||
);
|
||||
|
||||
await new Promise((r) => setTimeout(r, backoffMs));
|
||||
continue;
|
||||
}
|
||||
@@ -561,6 +616,7 @@ router.post(
|
||||
ddmaJobs[sessionId] = {
|
||||
userId: req.user.id,
|
||||
insuranceEligibilityData: enrichedData,
|
||||
socketId,
|
||||
};
|
||||
|
||||
// start polling in background to notify client via socket and process job
|
||||
@@ -621,9 +677,13 @@ router.get(
|
||||
async (req: Request, res: Response) => {
|
||||
const sid = req.params.sid;
|
||||
if (!sid) return res.status(400).json({ error: "session id required" });
|
||||
const f = finalResults[sid];
|
||||
if (!f) return res.status(404).json({ error: "final result not found" });
|
||||
return res.json(f);
|
||||
|
||||
// Only the current in-memory result is available
|
||||
if (currentFinalSessionId !== sid || !currentFinalResult) {
|
||||
return res.status(404).json({ error: "final result not found" });
|
||||
}
|
||||
|
||||
return res.json(currentFinalResult);
|
||||
}
|
||||
);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user