feat(eligibility-check) - enhance United SCO workflows with improved patient creation and update logic; added eligibility status handling and detailed logging; implemented browser cache clearing and anti-detection measures in Selenium service
This commit is contained in:
@@ -73,8 +73,9 @@ async function createOrUpdatePatientByInsuranceId(options: {
|
||||
lastName?: string | null;
|
||||
dob?: string | Date | null;
|
||||
userId: number;
|
||||
eligibilityStatus?: string; // "ACTIVE" or "INACTIVE"
|
||||
}) {
|
||||
const { insuranceId, firstName, lastName, dob, userId } = options;
|
||||
const { insuranceId, firstName, lastName, dob, userId, eligibilityStatus } = options;
|
||||
if (!insuranceId) throw new Error("Missing insuranceId");
|
||||
|
||||
const incomingFirst = (firstName || "").trim();
|
||||
@@ -101,14 +102,17 @@ async function createOrUpdatePatientByInsuranceId(options: {
|
||||
}
|
||||
return;
|
||||
} else {
|
||||
console.log(`[unitedsco-eligibility] Creating new patient: ${incomingFirst} ${incomingLast} with status: ${eligibilityStatus || "UNKNOWN"}`);
|
||||
const createPayload: any = {
|
||||
firstName: incomingFirst,
|
||||
lastName: incomingLast,
|
||||
dateOfBirth: dob,
|
||||
gender: "",
|
||||
gender: "Unknown",
|
||||
phone: "",
|
||||
userId,
|
||||
insuranceId,
|
||||
insuranceProvider: "United SCO",
|
||||
status: eligibilityStatus || "UNKNOWN",
|
||||
};
|
||||
let patientData: InsertPatient;
|
||||
try {
|
||||
@@ -118,7 +122,8 @@ async function createOrUpdatePatientByInsuranceId(options: {
|
||||
delete (safePayload as any).dateOfBirth;
|
||||
patientData = insertPatientSchema.parse(safePayload);
|
||||
}
|
||||
await storage.createPatient(patientData);
|
||||
const newPatient = await storage.createPatient(patientData);
|
||||
console.log(`[unitedsco-eligibility] Created new patient: ${newPatient.id} with status: ${eligibilityStatus || "UNKNOWN"}`);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -171,6 +176,10 @@ async function handleUnitedSCOCompletedJob(
|
||||
lastName = parsedName.lastName || lastName;
|
||||
}
|
||||
|
||||
// Determine eligibility status from Selenium result
|
||||
const eligibilityStatus = seleniumResult.eligibility === "active" ? "ACTIVE" : "INACTIVE";
|
||||
console.log(`[unitedsco-eligibility] Eligibility status from United SCO: ${eligibilityStatus}`);
|
||||
|
||||
// 3) Create or update patient
|
||||
if (insuranceId) {
|
||||
await createOrUpdatePatientByInsuranceId({
|
||||
@@ -179,6 +188,7 @@ async function handleUnitedSCOCompletedJob(
|
||||
lastName,
|
||||
dob: insuranceEligibilityData.dateOfBirth,
|
||||
userId: job.userId,
|
||||
eligibilityStatus,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -187,9 +197,61 @@ async function handleUnitedSCOCompletedJob(
|
||||
? await storage.getPatientByInsuranceId(insuranceId)
|
||||
: null;
|
||||
|
||||
// If no patient found by insuranceId, try to find by firstName + lastName
|
||||
if (!patient?.id && firstName && lastName) {
|
||||
const patients = await storage.getAllPatients(job.userId);
|
||||
patient = patients.find(
|
||||
(p) =>
|
||||
p.firstName?.toLowerCase() === firstName.toLowerCase() &&
|
||||
p.lastName?.toLowerCase() === lastName.toLowerCase()
|
||||
) ?? null;
|
||||
if (patient) {
|
||||
console.log(`[unitedsco-eligibility] Found patient by name: ${patient.id}`);
|
||||
}
|
||||
}
|
||||
|
||||
// If still not found, create new patient
|
||||
console.log(`[unitedsco-eligibility] Patient creation check: patient=${patient?.id || 'null'}, firstName='${firstName}', lastName='${lastName}'`);
|
||||
if (!patient && firstName && lastName) {
|
||||
console.log(`[unitedsco-eligibility] Creating new patient: ${firstName} ${lastName} with status: ${eligibilityStatus}`);
|
||||
try {
|
||||
let parsedDob: Date | undefined = undefined;
|
||||
if (insuranceEligibilityData.dateOfBirth) {
|
||||
try {
|
||||
parsedDob = new Date(insuranceEligibilityData.dateOfBirth);
|
||||
if (isNaN(parsedDob.getTime())) parsedDob = undefined;
|
||||
} catch {
|
||||
parsedDob = undefined;
|
||||
}
|
||||
}
|
||||
|
||||
const newPatientData: InsertPatient = {
|
||||
firstName,
|
||||
lastName,
|
||||
dateOfBirth: parsedDob || new Date(), // Required field
|
||||
insuranceId: insuranceId || undefined,
|
||||
insuranceProvider: "United SCO",
|
||||
gender: "Unknown",
|
||||
phone: "",
|
||||
userId: job.userId,
|
||||
status: eligibilityStatus,
|
||||
};
|
||||
|
||||
const validation = insertPatientSchema.safeParse(newPatientData);
|
||||
if (validation.success) {
|
||||
patient = await storage.createPatient(validation.data);
|
||||
console.log(`[unitedsco-eligibility] Created new patient: ${patient.id} with status: ${eligibilityStatus}`);
|
||||
} else {
|
||||
console.log(`[unitedsco-eligibility] Patient validation failed: ${validation.error.message}`);
|
||||
}
|
||||
} catch (createErr: any) {
|
||||
console.log(`[unitedsco-eligibility] Failed to create patient: ${createErr.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
if (!patient?.id) {
|
||||
outputResult.patientUpdateStatus =
|
||||
"Patient not found; no update performed";
|
||||
"Patient not found and could not be created; no update performed";
|
||||
return {
|
||||
patientUpdateStatus: outputResult.patientUpdateStatus,
|
||||
pdfUploadStatus: "none",
|
||||
@@ -197,11 +259,20 @@ async function handleUnitedSCOCompletedJob(
|
||||
};
|
||||
}
|
||||
|
||||
// Update patient status
|
||||
const newStatus =
|
||||
seleniumResult.eligibility === "active" ? "ACTIVE" : "INACTIVE";
|
||||
await storage.updatePatient(patient.id, { status: newStatus });
|
||||
outputResult.patientUpdateStatus = `Patient status updated to ${newStatus}`;
|
||||
// Update patient status and name from United SCO eligibility result
|
||||
const updatePayload: Record<string, any> = { status: eligibilityStatus };
|
||||
|
||||
// Also update first/last name if we extracted them and patient has empty names
|
||||
if (firstName && (!patient.firstName || patient.firstName.trim() === "")) {
|
||||
updatePayload.firstName = firstName;
|
||||
}
|
||||
if (lastName && (!patient.lastName || patient.lastName.trim() === "")) {
|
||||
updatePayload.lastName = lastName;
|
||||
}
|
||||
|
||||
await storage.updatePatient(patient.id, updatePayload);
|
||||
outputResult.patientUpdateStatus = `Patient ${patient.id} updated: status=${eligibilityStatus}, name=${firstName} ${lastName} (United SCO eligibility: ${seleniumResult.eligibility})`;
|
||||
console.log(`[unitedsco-eligibility] ${outputResult.patientUpdateStatus}`);
|
||||
|
||||
// Handle PDF or convert screenshot -> pdf if available
|
||||
let pdfBuffer: Buffer | null = null;
|
||||
|
||||
@@ -118,6 +118,10 @@ export function UnitedSCOEligibilityButton({
|
||||
}: UnitedSCOEligibilityButtonProps) {
|
||||
const { toast } = useToast();
|
||||
const dispatch = useAppDispatch();
|
||||
|
||||
// Flexible validation: require DOB + at least one identifier (memberId OR firstName OR lastName)
|
||||
const isUnitedSCOFormIncomplete =
|
||||
!dateOfBirth || (!memberId && !firstName && !lastName);
|
||||
|
||||
const socketRef = useRef<Socket | null>(null);
|
||||
const connectingRef = useRef<Promise<void> | null>(null);
|
||||
@@ -370,10 +374,20 @@ export function UnitedSCOEligibilityButton({
|
||||
};
|
||||
|
||||
const startUnitedSCOEligibility = async () => {
|
||||
if (!memberId || !dateOfBirth) {
|
||||
// Flexible: require DOB + at least one identifier (memberId OR firstName OR lastName)
|
||||
if (!dateOfBirth) {
|
||||
toast({
|
||||
title: "Missing fields",
|
||||
description: "Member ID and Date of Birth are required.",
|
||||
description: "Date of Birth is required for United SCO eligibility.",
|
||||
variant: "destructive",
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if (!memberId && !firstName && !lastName) {
|
||||
toast({
|
||||
title: "Missing fields",
|
||||
description: "Member ID, First Name, or Last Name is required for United SCO eligibility.",
|
||||
variant: "destructive",
|
||||
});
|
||||
return;
|
||||
@@ -382,11 +396,11 @@ export function UnitedSCOEligibilityButton({
|
||||
const formattedDob = dateOfBirth ? formatLocalDate(dateOfBirth) : "";
|
||||
|
||||
const payload = {
|
||||
memberId,
|
||||
memberId: memberId || "",
|
||||
dateOfBirth: formattedDob,
|
||||
firstName,
|
||||
lastName,
|
||||
insuranceSiteKey: "UNITEDSCO", // for backend credential lookup (uses DENTAQUEST)
|
||||
firstName: firstName || "",
|
||||
lastName: lastName || "",
|
||||
insuranceSiteKey: "UNITEDSCO",
|
||||
};
|
||||
|
||||
try {
|
||||
@@ -538,7 +552,7 @@ export function UnitedSCOEligibilityButton({
|
||||
<Button
|
||||
className="w-full"
|
||||
variant="outline"
|
||||
disabled={isFormIncomplete || isStarting}
|
||||
disabled={isUnitedSCOFormIncomplete || isStarting}
|
||||
onClick={startUnitedSCOEligibility}
|
||||
>
|
||||
{isStarting ? (
|
||||
|
||||
@@ -73,6 +73,32 @@ async def _remove_session_later(sid: str, delay: int = 20):
|
||||
await cleanup_session(sid)
|
||||
|
||||
|
||||
def _minimize_browser(bot):
|
||||
"""Hide the browser window so it doesn't stay in the user's way."""
|
||||
try:
|
||||
if bot and bot.driver:
|
||||
# Navigate to blank page first
|
||||
try:
|
||||
bot.driver.get("about:blank")
|
||||
except Exception:
|
||||
pass
|
||||
# Try minimize
|
||||
try:
|
||||
bot.driver.minimize_window()
|
||||
print("[UnitedSCO] Browser minimized after error")
|
||||
return
|
||||
except Exception:
|
||||
pass
|
||||
# Fallback: move off-screen
|
||||
try:
|
||||
bot.driver.set_window_position(-10000, -10000)
|
||||
print("[UnitedSCO] Browser moved off-screen after error")
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as e:
|
||||
print(f"[UnitedSCO] Could not hide browser: {e}")
|
||||
|
||||
|
||||
async def start_unitedsco_run(sid: str, data: dict, url: str):
|
||||
"""
|
||||
Run the United SCO workflow for a session (WITHOUT managing semaphore/counters).
|
||||
@@ -266,7 +292,11 @@ async def start_unitedsco_run(sid: str, data: dict, url: str):
|
||||
if isinstance(step1_result, str) and step1_result.startswith("ERROR"):
|
||||
s["status"] = "error"
|
||||
s["message"] = step1_result
|
||||
await cleanup_session(sid)
|
||||
s["result"] = {"status": "error", "message": step1_result}
|
||||
# Minimize browser on error
|
||||
_minimize_browser(bot)
|
||||
# Keep session alive for backend to poll, then clean up
|
||||
asyncio.create_task(_remove_session_later(sid, 30))
|
||||
return {"status": "error", "message": step1_result}
|
||||
|
||||
# Step 2 (PDF)
|
||||
@@ -283,13 +313,24 @@ async def start_unitedsco_run(sid: str, data: dict, url: str):
|
||||
s["message"] = step2_result.get("message", "unknown error")
|
||||
else:
|
||||
s["message"] = str(step2_result)
|
||||
await cleanup_session(sid)
|
||||
s["result"] = {"status": "error", "message": s["message"]}
|
||||
# Minimize browser on error
|
||||
_minimize_browser(bot)
|
||||
# Keep session alive for backend to poll, then clean up
|
||||
asyncio.create_task(_remove_session_later(sid, 30))
|
||||
return {"status": "error", "message": s["message"]}
|
||||
|
||||
except Exception as e:
|
||||
s["status"] = "error"
|
||||
s["message"] = f"worker exception: {e}"
|
||||
await cleanup_session(sid)
|
||||
# Minimize browser on exception
|
||||
try:
|
||||
if bot and bot.driver:
|
||||
bot.driver.minimize_window()
|
||||
except Exception:
|
||||
pass
|
||||
s["result"] = {"status": "error", "message": s["message"]}
|
||||
asyncio.create_task(_remove_session_later(sid, 30))
|
||||
return {"status": "error", "message": s["message"]}
|
||||
|
||||
|
||||
@@ -319,5 +360,5 @@ def get_session_status(sid: str) -> Dict[str, Any]:
|
||||
"message": s.get("message"),
|
||||
"created_at": s.get("created_at"),
|
||||
"last_activity": s.get("last_activity"),
|
||||
"result": s.get("result") if s.get("status") == "completed" else None,
|
||||
"result": s.get("result") if s.get("status") in ("completed", "error") else None,
|
||||
}
|
||||
|
||||
@@ -191,7 +191,59 @@ class AutomationUnitedSCOEligibilityCheck:
|
||||
|
||||
time.sleep(5) # Wait for login to process
|
||||
|
||||
# Check for OTP input after login
|
||||
# Check for MFA method selection page
|
||||
# DentalHub shows: "Phone" / "Authenticator App" radio buttons + "Continue" button
|
||||
try:
|
||||
continue_btn = self.driver.find_element(By.XPATH,
|
||||
"//button[contains(text(),'Continue')]"
|
||||
)
|
||||
# Check if "Phone" radio is present (MFA selection page)
|
||||
phone_elements = self.driver.find_elements(By.XPATH,
|
||||
"//*[contains(text(),'Phone')]"
|
||||
)
|
||||
if continue_btn and phone_elements:
|
||||
print("[UnitedSCO login] MFA method selection page detected")
|
||||
# Select "Phone" radio button if not already selected
|
||||
try:
|
||||
phone_radio = self.driver.find_element(By.XPATH,
|
||||
"//input[@type='radio' and (contains(@value,'phone') or contains(@value,'Phone'))] | "
|
||||
"//label[contains(text(),'Phone')]/preceding-sibling::input[@type='radio'] | "
|
||||
"//label[contains(text(),'Phone')]//input[@type='radio'] | "
|
||||
"//input[@type='radio'][following-sibling::*[contains(text(),'Phone')]] | "
|
||||
"//input[@type='radio']"
|
||||
)
|
||||
if phone_radio and not phone_radio.is_selected():
|
||||
phone_radio.click()
|
||||
print("[UnitedSCO login] Selected 'Phone' radio button")
|
||||
else:
|
||||
print("[UnitedSCO login] 'Phone' already selected")
|
||||
except Exception as radio_err:
|
||||
print(f"[UnitedSCO login] Could not click Phone radio (may already be selected): {radio_err}")
|
||||
# Try clicking the label text instead
|
||||
try:
|
||||
phone_label = self.driver.find_element(By.XPATH, "//*[contains(text(),'Phone') and not(contains(text(),'Authenticator'))]")
|
||||
phone_label.click()
|
||||
print("[UnitedSCO login] Clicked 'Phone' label")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
time.sleep(1)
|
||||
# Click Continue
|
||||
continue_btn.click()
|
||||
print("[UnitedSCO login] Clicked 'Continue' on MFA selection page")
|
||||
time.sleep(5) # Wait for OTP to be sent
|
||||
except Exception:
|
||||
pass # No MFA selection page - proceed normally
|
||||
|
||||
# Check if login succeeded (redirected back to dentalhub dashboard)
|
||||
current_url_after_login = self.driver.current_url.lower()
|
||||
print(f"[UnitedSCO login] After login URL: {current_url_after_login}")
|
||||
|
||||
if "app.dentalhub.com" in current_url_after_login and "login" not in current_url_after_login:
|
||||
print("[UnitedSCO login] Login successful - redirected to dashboard")
|
||||
return "SUCCESS"
|
||||
|
||||
# Check for OTP input after login / after MFA selection
|
||||
try:
|
||||
otp_input = WebDriverWait(self.driver, 15).until(
|
||||
EC.presence_of_element_located((By.XPATH,
|
||||
@@ -207,10 +259,8 @@ class AutomationUnitedSCOEligibilityCheck:
|
||||
except TimeoutException:
|
||||
print("[UnitedSCO login] No OTP input detected")
|
||||
|
||||
# Check if login succeeded (redirected back to dentalhub dashboard)
|
||||
# Re-check dashboard after waiting for OTP check
|
||||
current_url_after_login = self.driver.current_url.lower()
|
||||
print(f"[UnitedSCO login] After login URL: {current_url_after_login}")
|
||||
|
||||
if "app.dentalhub.com" in current_url_after_login and "login" not in current_url_after_login:
|
||||
print("[UnitedSCO login] Login successful - redirected to dashboard")
|
||||
return "SUCCESS"
|
||||
@@ -254,6 +304,55 @@ class AutomationUnitedSCOEligibilityCheck:
|
||||
print(f"[UnitedSCO login] Exception: {e}")
|
||||
return f"ERROR:LOGIN FAILED: {e}"
|
||||
|
||||
def _check_for_error_dialog(self):
|
||||
"""Check for and dismiss common error dialogs. Returns error message string or None."""
|
||||
error_patterns = [
|
||||
("Patient Not Found", "Patient Not Found - please check the Subscriber ID, DOB, and Payer selection"),
|
||||
("Insufficient Information", "Insufficient Information - need Subscriber ID + DOB, or First Name + Last Name + DOB"),
|
||||
("No Eligibility", "No eligibility information found for this patient"),
|
||||
("Error", None), # Generic error - will use the dialog text
|
||||
]
|
||||
|
||||
for pattern, default_msg in error_patterns:
|
||||
try:
|
||||
dialog_elem = self.driver.find_element(By.XPATH,
|
||||
f"//modal-container//*[contains(text(),'{pattern}')] | "
|
||||
f"//div[contains(@class,'modal')]//*[contains(text(),'{pattern}')]"
|
||||
)
|
||||
if dialog_elem.is_displayed():
|
||||
# Get the full dialog text for logging
|
||||
try:
|
||||
modal = self.driver.find_element(By.XPATH, "//modal-container | //div[contains(@class,'modal-dialog')]")
|
||||
dialog_text = modal.text.strip()[:200]
|
||||
except Exception:
|
||||
dialog_text = dialog_elem.text.strip()[:200]
|
||||
|
||||
print(f"[UnitedSCO step1] Error dialog detected: {dialog_text}")
|
||||
|
||||
# Click OK/Close to dismiss
|
||||
try:
|
||||
dismiss_btn = self.driver.find_element(By.XPATH,
|
||||
"//modal-container//button[contains(text(),'Ok') or contains(text(),'OK') or contains(text(),'Close')] | "
|
||||
"//div[contains(@class,'modal')]//button[contains(text(),'Ok') or contains(text(),'OK') or contains(text(),'Close')]"
|
||||
)
|
||||
dismiss_btn.click()
|
||||
print("[UnitedSCO step1] Dismissed error dialog")
|
||||
time.sleep(1)
|
||||
except Exception:
|
||||
# Try clicking the X button
|
||||
try:
|
||||
close_btn = self.driver.find_element(By.XPATH, "//modal-container//button[@class='close']")
|
||||
close_btn.click()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
error_msg = default_msg if default_msg else f"ERROR: {dialog_text}"
|
||||
return f"ERROR: {error_msg}"
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
return None
|
||||
|
||||
def _format_dob(self, dob_str):
|
||||
"""Convert DOB from YYYY-MM-DD to MM/DD/YYYY format"""
|
||||
if dob_str and "-" in dob_str:
|
||||
@@ -301,25 +400,81 @@ class AutomationUnitedSCOEligibilityCheck:
|
||||
print("[UnitedSCO step1] Patient Information form not found")
|
||||
return "ERROR: Patient Information form not found"
|
||||
|
||||
# Fill First Name (id='firstName_Back')
|
||||
try:
|
||||
first_name_input = self.driver.find_element(By.ID, "firstName_Back")
|
||||
first_name_input.clear()
|
||||
first_name_input.send_keys(self.firstName)
|
||||
print(f"[UnitedSCO step1] Entered First Name: {self.firstName}")
|
||||
except Exception as e:
|
||||
print(f"[UnitedSCO step1] Error entering First Name: {e}")
|
||||
return "ERROR: Could not enter First Name"
|
||||
# Fill Subscriber ID / Medicaid ID if memberId is provided
|
||||
# The field is labeled "Subscriber ID or Medicaid ID" on the DentalHub form
|
||||
# Actual DOM field id is 'subscriberId_Front' (not 'subscriberId_Back')
|
||||
if self.memberId:
|
||||
try:
|
||||
subscriber_id_selectors = [
|
||||
"//input[@id='subscriberId_Front']",
|
||||
"//input[@id='subscriberId_Back' or @id='subscriberID_Back']",
|
||||
"//input[@id='memberId_Back' or @id='memberid_Back']",
|
||||
"//input[@id='medicaidId_Back']",
|
||||
"//label[contains(text(),'Subscriber ID')]/..//input[not(@id='firstName_Back') and not(@id='lastName_Back') and not(@id='dateOfBirth_Back')]",
|
||||
"//input[contains(@placeholder,'Subscriber') or contains(@placeholder,'subscriber')]",
|
||||
"//input[contains(@placeholder,'Medicaid') or contains(@placeholder,'medicaid')]",
|
||||
"//input[contains(@placeholder,'Member') or contains(@placeholder,'member')]",
|
||||
]
|
||||
subscriber_filled = False
|
||||
for sel in subscriber_id_selectors:
|
||||
try:
|
||||
sid_input = self.driver.find_element(By.XPATH, sel)
|
||||
if sid_input.is_displayed():
|
||||
sid_input.clear()
|
||||
sid_input.send_keys(self.memberId)
|
||||
field_id = sid_input.get_attribute("id") or "unknown"
|
||||
print(f"[UnitedSCO step1] Entered Subscriber ID: {self.memberId} (field id='{field_id}')")
|
||||
subscriber_filled = True
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if not subscriber_filled:
|
||||
# Fallback: find visible input that is NOT a known field
|
||||
try:
|
||||
all_inputs = self.driver.find_elements(By.XPATH,
|
||||
"//form//input[@type='text' or not(@type)]"
|
||||
)
|
||||
known_ids = {'firstName_Back', 'lastName_Back', 'dateOfBirth_Back', 'procedureDate_Back', 'insurerId'}
|
||||
for inp in all_inputs:
|
||||
inp_id = inp.get_attribute("id") or ""
|
||||
if inp_id not in known_ids and inp.is_displayed():
|
||||
inp.clear()
|
||||
inp.send_keys(self.memberId)
|
||||
print(f"[UnitedSCO step1] Entered Subscriber ID in field id='{inp_id}': {self.memberId}")
|
||||
subscriber_filled = True
|
||||
break
|
||||
except Exception as e2:
|
||||
print(f"[UnitedSCO step1] Fallback subscriber field search error: {e2}")
|
||||
|
||||
if not subscriber_filled:
|
||||
print(f"[UnitedSCO step1] WARNING: Could not find Subscriber ID field (ID: {self.memberId})")
|
||||
except Exception as e:
|
||||
print(f"[UnitedSCO step1] Error entering Subscriber ID: {e}")
|
||||
|
||||
# Fill Last Name (id='lastName_Back')
|
||||
try:
|
||||
last_name_input = self.driver.find_element(By.ID, "lastName_Back")
|
||||
last_name_input.clear()
|
||||
last_name_input.send_keys(self.lastName)
|
||||
print(f"[UnitedSCO step1] Entered Last Name: {self.lastName}")
|
||||
except Exception as e:
|
||||
print(f"[UnitedSCO step1] Error entering Last Name: {e}")
|
||||
return "ERROR: Could not enter Last Name"
|
||||
# Fill First Name (id='firstName_Back') - only if provided
|
||||
if self.firstName:
|
||||
try:
|
||||
first_name_input = self.driver.find_element(By.ID, "firstName_Back")
|
||||
first_name_input.clear()
|
||||
first_name_input.send_keys(self.firstName)
|
||||
print(f"[UnitedSCO step1] Entered First Name: {self.firstName}")
|
||||
except Exception as e:
|
||||
print(f"[UnitedSCO step1] Error entering First Name: {e}")
|
||||
else:
|
||||
print("[UnitedSCO step1] No First Name provided, skipping")
|
||||
|
||||
# Fill Last Name (id='lastName_Back') - only if provided
|
||||
if self.lastName:
|
||||
try:
|
||||
last_name_input = self.driver.find_element(By.ID, "lastName_Back")
|
||||
last_name_input.clear()
|
||||
last_name_input.send_keys(self.lastName)
|
||||
print(f"[UnitedSCO step1] Entered Last Name: {self.lastName}")
|
||||
except Exception as e:
|
||||
print(f"[UnitedSCO step1] Error entering Last Name: {e}")
|
||||
else:
|
||||
print("[UnitedSCO step1] No Last Name provided, skipping")
|
||||
|
||||
# Fill Date of Birth (id='dateOfBirth_Back', format: MM/DD/YYYY)
|
||||
try:
|
||||
@@ -336,32 +491,135 @@ class AutomationUnitedSCOEligibilityCheck:
|
||||
|
||||
# Step 1.2: Select Payer - UnitedHealthcare Massachusetts
|
||||
print("[UnitedSCO step1] Selecting Payer...")
|
||||
|
||||
# First dismiss any blocking dialogs (e.g. Chrome password save)
|
||||
try:
|
||||
# Click the Payer ng-select dropdown
|
||||
payer_ng_select = self.driver.find_element(By.XPATH,
|
||||
"//label[contains(text(),'Payer')]/following-sibling::ng-select"
|
||||
)
|
||||
payer_ng_select.click()
|
||||
time.sleep(1)
|
||||
|
||||
# Find and click "UnitedHealthcare Massachusetts" option
|
||||
payer_options = self.driver.find_elements(By.XPATH,
|
||||
"//ng-dropdown-panel//div[contains(@class,'ng-option')]"
|
||||
)
|
||||
for opt in payer_options:
|
||||
if "UnitedHealthcare Massachusetts" in opt.text:
|
||||
opt.click()
|
||||
print("[UnitedSCO step1] Selected Payer: UnitedHealthcare Massachusetts")
|
||||
break
|
||||
|
||||
# Press Escape to close any dropdown
|
||||
ActionChains(self.driver).send_keys(Keys.ESCAPE).perform()
|
||||
time.sleep(1)
|
||||
self.driver.execute_script("""
|
||||
// Dismiss Chrome password manager popup if present
|
||||
var dialogs = document.querySelectorAll('[role="dialog"], .cdk-overlay-container');
|
||||
dialogs.forEach(function(d) { d.style.display = 'none'; });
|
||||
""")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
payer_selected = False
|
||||
|
||||
# Strategy 1: Click the ng-select, type to search, and select the option
|
||||
try:
|
||||
# Find the Payer ng-select by multiple selectors
|
||||
payer_selectors = [
|
||||
"//label[contains(text(),'Payer')]/following-sibling::ng-select",
|
||||
"//label[contains(text(),'Payer')]/..//ng-select",
|
||||
"//ng-select[contains(@placeholder,'Payer') or contains(@placeholder,'payer')]",
|
||||
"//ng-select[.//input[contains(@placeholder,'Search by Payers')]]",
|
||||
]
|
||||
payer_ng_select = None
|
||||
for sel in payer_selectors:
|
||||
try:
|
||||
payer_ng_select = self.driver.find_element(By.XPATH, sel)
|
||||
if payer_ng_select.is_displayed():
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if payer_ng_select:
|
||||
# Scroll to it and click to open
|
||||
self.driver.execute_script("arguments[0].scrollIntoView({block:'center'});", payer_ng_select)
|
||||
time.sleep(0.5)
|
||||
payer_ng_select.click()
|
||||
time.sleep(1)
|
||||
|
||||
# Type into the search input inside ng-select to filter options
|
||||
try:
|
||||
search_input = payer_ng_select.find_element(By.XPATH, ".//input[contains(@type,'text') or contains(@role,'combobox')]")
|
||||
search_input.clear()
|
||||
search_input.send_keys("UnitedHealthcare Massachusetts")
|
||||
print("[UnitedSCO step1] Typed payer search text")
|
||||
time.sleep(2)
|
||||
except Exception:
|
||||
# If no search input, try sending keys directly to ng-select
|
||||
try:
|
||||
ActionChains(self.driver).send_keys("UnitedHealthcare Mass").perform()
|
||||
print("[UnitedSCO step1] Typed payer search via ActionChains")
|
||||
time.sleep(2)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Find and click the matching option
|
||||
payer_options = self.driver.find_elements(By.XPATH,
|
||||
"//ng-dropdown-panel//div[contains(@class,'ng-option')]"
|
||||
)
|
||||
for opt in payer_options:
|
||||
opt_text = opt.text.strip()
|
||||
if "UnitedHealthcare Massachusetts" in opt_text:
|
||||
opt.click()
|
||||
print(f"[UnitedSCO step1] Selected Payer: {opt_text}")
|
||||
payer_selected = True
|
||||
break
|
||||
|
||||
if not payer_selected and payer_options:
|
||||
# Select first visible option if it contains "United"
|
||||
for opt in payer_options:
|
||||
opt_text = opt.text.strip()
|
||||
if "United" in opt_text and opt.is_displayed():
|
||||
opt.click()
|
||||
print(f"[UnitedSCO step1] Selected first matching Payer: {opt_text}")
|
||||
payer_selected = True
|
||||
break
|
||||
|
||||
# Close dropdown
|
||||
ActionChains(self.driver).send_keys(Keys.ESCAPE).perform()
|
||||
time.sleep(0.5)
|
||||
else:
|
||||
print("[UnitedSCO step1] Could not find Payer ng-select element")
|
||||
|
||||
except Exception as e:
|
||||
print(f"[UnitedSCO step1] Error selecting Payer: {e}")
|
||||
# Try to continue anyway - payer might be pre-selected
|
||||
ActionChains(self.driver).send_keys(Keys.ESCAPE).perform()
|
||||
print(f"[UnitedSCO step1] Payer selection strategy 1 error: {e}")
|
||||
try:
|
||||
ActionChains(self.driver).send_keys(Keys.ESCAPE).perform()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Strategy 2: JavaScript direct selection if strategy 1 failed
|
||||
if not payer_selected:
|
||||
try:
|
||||
# Try clicking via JavaScript
|
||||
clicked = self.driver.execute_script("""
|
||||
// Find ng-select near the Payer label
|
||||
var labels = document.querySelectorAll('label');
|
||||
for (var i = 0; i < labels.length; i++) {
|
||||
if (labels[i].textContent.includes('Payer')) {
|
||||
var parent = labels[i].parentElement;
|
||||
var ngSelect = parent.querySelector('ng-select') || labels[i].nextElementSibling;
|
||||
if (ngSelect) {
|
||||
ngSelect.click();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
""")
|
||||
if clicked:
|
||||
time.sleep(1)
|
||||
ActionChains(self.driver).send_keys("UnitedHealthcare Mass").perform()
|
||||
time.sleep(2)
|
||||
payer_options = self.driver.find_elements(By.XPATH,
|
||||
"//ng-dropdown-panel//div[contains(@class,'ng-option')]"
|
||||
)
|
||||
for opt in payer_options:
|
||||
if "UnitedHealthcare" in opt.text and "Massachusetts" in opt.text:
|
||||
opt.click()
|
||||
print(f"[UnitedSCO step1] Selected Payer via JS: {opt.text.strip()}")
|
||||
payer_selected = True
|
||||
break
|
||||
ActionChains(self.driver).send_keys(Keys.ESCAPE).perform()
|
||||
except Exception as e:
|
||||
print(f"[UnitedSCO step1] Payer selection strategy 2 error: {e}")
|
||||
|
||||
if not payer_selected:
|
||||
print("[UnitedSCO step1] WARNING: Could not select Payer - form may fail")
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
# Step 1.3: Click Continue button (Step 1 - Patient Info)
|
||||
try:
|
||||
@@ -371,38 +629,84 @@ class AutomationUnitedSCOEligibilityCheck:
|
||||
continue_btn.click()
|
||||
print("[UnitedSCO step1] Clicked Continue button (Patient Info)")
|
||||
time.sleep(4)
|
||||
|
||||
# Check for error dialogs (modal) after clicking Continue
|
||||
error_result = self._check_for_error_dialog()
|
||||
if error_result:
|
||||
return error_result
|
||||
|
||||
except Exception as e:
|
||||
print(f"[UnitedSCO step1] Error clicking Continue: {e}")
|
||||
return "ERROR: Could not click Continue button"
|
||||
|
||||
# Step 1.4: Handle Practitioner & Location page
|
||||
# First check if we actually moved to the Practitioner page
|
||||
# by looking for Practitioner-specific elements
|
||||
print("[UnitedSCO step1] Handling Practitioner & Location page...")
|
||||
|
||||
on_practitioner_page = False
|
||||
try:
|
||||
# Click Practitioner Taxonomy dropdown (id='paymentGroupId')
|
||||
taxonomy_input = WebDriverWait(self.driver, 10).until(
|
||||
EC.element_to_be_clickable((By.ID, "paymentGroupId"))
|
||||
# Check for Practitioner page elements (paymentGroupId or treatment location)
|
||||
WebDriverWait(self.driver, 8).until(
|
||||
lambda d: d.find_element(By.ID, "paymentGroupId").is_displayed() or
|
||||
d.find_element(By.ID, "treatmentLocation").is_displayed()
|
||||
)
|
||||
taxonomy_input.click()
|
||||
print("[UnitedSCO step1] Clicked Practitioner Taxonomy dropdown")
|
||||
time.sleep(1)
|
||||
on_practitioner_page = True
|
||||
print("[UnitedSCO step1] Practitioner & Location page loaded")
|
||||
except Exception:
|
||||
# Check if we're already on results page (3rd step)
|
||||
try:
|
||||
results_elem = self.driver.find_element(By.XPATH,
|
||||
"//*[contains(text(),'Selected Patient') or contains(@id,'patient-name') or contains(@id,'eligibility')]"
|
||||
)
|
||||
if results_elem.is_displayed():
|
||||
print("[UnitedSCO step1] Already on Eligibility Results page (skipped Practitioner)")
|
||||
return "Success"
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Select "Summit Dental Care" option
|
||||
summit_option = WebDriverWait(self.driver, 10).until(
|
||||
EC.element_to_be_clickable((By.XPATH,
|
||||
"//ng-dropdown-panel//div[contains(@class,'ng-option') and contains(.,'Summit Dental Care')]"
|
||||
))
|
||||
)
|
||||
summit_option.click()
|
||||
print("[UnitedSCO step1] Selected: Summit Dental Care")
|
||||
# Check for error dialog again
|
||||
error_result = self._check_for_error_dialog()
|
||||
if error_result:
|
||||
return error_result
|
||||
|
||||
# Press Escape to close dropdown
|
||||
ActionChains(self.driver).send_keys(Keys.ESCAPE).perform()
|
||||
time.sleep(1)
|
||||
|
||||
except TimeoutException:
|
||||
print("[UnitedSCO step1] Practitioner Taxonomy not found or already selected")
|
||||
except Exception as e:
|
||||
print(f"[UnitedSCO step1] Practitioner Taxonomy handling: {e}")
|
||||
print("[UnitedSCO step1] Practitioner page not detected, attempting to continue...")
|
||||
|
||||
if on_practitioner_page:
|
||||
try:
|
||||
# Click Practitioner Taxonomy dropdown (id='paymentGroupId')
|
||||
taxonomy_input = self.driver.find_element(By.ID, "paymentGroupId")
|
||||
if taxonomy_input.is_displayed():
|
||||
taxonomy_input.click()
|
||||
print("[UnitedSCO step1] Clicked Practitioner Taxonomy dropdown")
|
||||
time.sleep(1)
|
||||
|
||||
# Select "Summit Dental Care" option
|
||||
try:
|
||||
summit_option = WebDriverWait(self.driver, 5).until(
|
||||
EC.element_to_be_clickable((By.XPATH,
|
||||
"//ng-dropdown-panel//div[contains(@class,'ng-option') and contains(.,'Summit Dental Care')]"
|
||||
))
|
||||
)
|
||||
summit_option.click()
|
||||
print("[UnitedSCO step1] Selected: Summit Dental Care")
|
||||
except TimeoutException:
|
||||
# Select first available option
|
||||
try:
|
||||
first_option = self.driver.find_element(By.XPATH,
|
||||
"//ng-dropdown-panel//div[contains(@class,'ng-option')]"
|
||||
)
|
||||
option_text = first_option.text.strip()
|
||||
first_option.click()
|
||||
print(f"[UnitedSCO step1] Selected first available: {option_text}")
|
||||
except Exception:
|
||||
print("[UnitedSCO step1] No options available in Practitioner dropdown")
|
||||
|
||||
# Press Escape to close dropdown
|
||||
ActionChains(self.driver).send_keys(Keys.ESCAPE).perform()
|
||||
time.sleep(1)
|
||||
except Exception as e:
|
||||
print(f"[UnitedSCO step1] Practitioner Taxonomy handling: {e}")
|
||||
|
||||
# Step 1.5: Click Continue button (Step 2 - Practitioner)
|
||||
try:
|
||||
@@ -414,25 +718,15 @@ class AutomationUnitedSCOEligibilityCheck:
|
||||
time.sleep(5)
|
||||
except Exception as e:
|
||||
print(f"[UnitedSCO step1] Error clicking Continue on Practitioner page: {e}")
|
||||
# Check for error dialog intercepting the click
|
||||
error_result = self._check_for_error_dialog()
|
||||
if error_result:
|
||||
return error_result
|
||||
|
||||
# Check for errors
|
||||
try:
|
||||
error_selectors = [
|
||||
"//*[contains(text(),'No results')]",
|
||||
"//*[contains(text(),'not found')]",
|
||||
"//*[contains(text(),'Invalid')]",
|
||||
]
|
||||
for sel in error_selectors:
|
||||
try:
|
||||
error_elem = self.driver.find_element(By.XPATH, sel)
|
||||
if error_elem and error_elem.is_displayed():
|
||||
error_text = error_elem.text
|
||||
print(f"[UnitedSCO step1] Error found: {error_text}")
|
||||
return f"ERROR: {error_text}"
|
||||
except:
|
||||
continue
|
||||
except:
|
||||
pass
|
||||
# Final check for error dialogs after the search
|
||||
error_result = self._check_for_error_dialog()
|
||||
if error_result:
|
||||
return error_result
|
||||
|
||||
print("[UnitedSCO step1] Patient search completed successfully")
|
||||
return "Success"
|
||||
@@ -442,16 +736,40 @@ class AutomationUnitedSCOEligibilityCheck:
|
||||
return f"ERROR:STEP1 - {e}"
|
||||
|
||||
|
||||
def _get_existing_downloads(self):
|
||||
"""Get set of existing PDF files in download dir before clicking."""
|
||||
import glob
|
||||
return set(glob.glob(os.path.join(self.download_dir, "*.pdf")))
|
||||
|
||||
def _wait_for_new_download(self, existing_files, timeout=15):
|
||||
"""Wait for a new PDF file to appear in the download dir."""
|
||||
import glob
|
||||
for _ in range(timeout * 2): # check every 0.5s
|
||||
time.sleep(0.5)
|
||||
current = set(glob.glob(os.path.join(self.download_dir, "*.pdf")))
|
||||
new_files = current - existing_files
|
||||
if new_files:
|
||||
# Also wait for download to finish (no .crdownload files)
|
||||
crdownloads = glob.glob(os.path.join(self.download_dir, "*.crdownload"))
|
||||
if not crdownloads:
|
||||
return list(new_files)[0]
|
||||
return None
|
||||
|
||||
def step2(self):
|
||||
"""
|
||||
Navigate to eligibility detail page and capture PDF.
|
||||
Extract data from Selected Patient page, click the "Eligibility" tab
|
||||
to navigate to the eligibility details page, then capture PDF.
|
||||
|
||||
At this point we should be on the "Selected Patient" page after step1.
|
||||
Workflow based on actual DOM testing:
|
||||
1. Extract eligibility status and Member ID from the page
|
||||
2. Click the "Eligibility" button (id='eligibility-link')
|
||||
3. Generate PDF using Chrome DevTools Protocol (same as other insurances)
|
||||
The "Eligibility" tab at the bottom (next to "Benefit Summary" and
|
||||
"Service History") may:
|
||||
a) Open a new browser tab with eligibility details
|
||||
b) Download a PDF file
|
||||
c) Load content dynamically on the same page
|
||||
We handle all three cases.
|
||||
"""
|
||||
import glob
|
||||
import re
|
||||
|
||||
try:
|
||||
print("[UnitedSCO step2] Starting eligibility capture")
|
||||
|
||||
@@ -470,7 +788,7 @@ class AutomationUnitedSCOEligibilityCheck:
|
||||
try:
|
||||
status_elem = WebDriverWait(self.driver, 10).until(
|
||||
EC.presence_of_element_located((By.XPATH,
|
||||
"//*[contains(text(),'Member Eligible')]"
|
||||
"//*[contains(text(),'Member Eligible') or contains(text(),'member eligible')]"
|
||||
))
|
||||
)
|
||||
status_text = status_elem.text.strip().lower()
|
||||
@@ -488,12 +806,93 @@ class AutomationUnitedSCOEligibilityCheck:
|
||||
|
||||
print(f"[UnitedSCO step2] Eligibility status: {eligibilityText}")
|
||||
|
||||
# Extract patient name from the page
|
||||
page_text = ""
|
||||
try:
|
||||
page_text = self.driver.find_element(By.TAG_NAME, "body").text
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Log a snippet of page text around "Selected Patient" for debugging
|
||||
try:
|
||||
sp_idx = page_text.find("Selected Patient")
|
||||
if sp_idx >= 0:
|
||||
snippet = page_text[sp_idx:sp_idx+300]
|
||||
print(f"[UnitedSCO step2] Page text near 'Selected Patient': {repr(snippet[:200])}")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Strategy 1: Try DOM element id="patient-name"
|
||||
name_extracted = False
|
||||
try:
|
||||
name_elem = self.driver.find_element(By.ID, "patient-name")
|
||||
extracted_name = name_elem.text.strip()
|
||||
if extracted_name:
|
||||
patientName = extracted_name
|
||||
name_extracted = True
|
||||
print(f"[UnitedSCO step2] Extracted patient name from DOM (id=patient-name): {patientName}")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Strategy 2: Try various DOM patterns for patient name
|
||||
if not name_extracted:
|
||||
name_selectors = [
|
||||
"//*[contains(@class,'patient-name') or contains(@class,'patientName')]",
|
||||
"//*[contains(@class,'selected-patient')]//h3 | //*[contains(@class,'selected-patient')]//h4 | //*[contains(@class,'selected-patient')]//strong",
|
||||
"//div[contains(@class,'patient')]//h3 | //div[contains(@class,'patient')]//h4",
|
||||
"//*[contains(@class,'eligibility__banner')]//h3 | //*[contains(@class,'eligibility__banner')]//h4",
|
||||
"//*[contains(@class,'banner__patient')]",
|
||||
]
|
||||
for sel in name_selectors:
|
||||
try:
|
||||
elems = self.driver.find_elements(By.XPATH, sel)
|
||||
for elem in elems:
|
||||
txt = elem.text.strip()
|
||||
# Filter: must look like a name (2+ words, starts with uppercase)
|
||||
if txt and len(txt.split()) >= 2 and txt[0].isupper() and len(txt) < 60:
|
||||
patientName = txt
|
||||
name_extracted = True
|
||||
print(f"[UnitedSCO step2] Extracted patient name from DOM: {patientName}")
|
||||
break
|
||||
if name_extracted:
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
# Strategy 3: Regex from page text - multiple patterns
|
||||
# IMPORTANT: Use [^\n] to avoid matching across newlines (e.g. picking up "Member Eligible")
|
||||
if not name_extracted:
|
||||
name_patterns = [
|
||||
# Name on the line right after "Selected Patient"
|
||||
r'Selected Patient\s*\n\s*([A-Z][A-Za-z\-\']+(?: [A-Z][A-Za-z\-\']+)+)',
|
||||
r'Patient Name\s*[\n:]\s*([A-Z][A-Za-z\-\']+(?: [A-Z][A-Za-z\-\']+)+)',
|
||||
# "LASTNAME, FIRSTNAME" format
|
||||
r'Selected Patient\s*\n\s*([A-Z][A-Za-z\-\']+,\s*[A-Z][A-Za-z\-\']+)',
|
||||
# Name on the line right before "Member Eligible" or "Member ID"
|
||||
r'\n([A-Z][A-Za-z\-\']+(?: [A-Z]\.?)? [A-Z][A-Za-z\-\']+)\n(?:Member|Date Of Birth|DOB)',
|
||||
]
|
||||
for pattern in name_patterns:
|
||||
try:
|
||||
name_match = re.search(pattern, page_text)
|
||||
if name_match:
|
||||
candidate = name_match.group(1).strip()
|
||||
# Validate: not too long, not a header/label, and doesn't contain "Eligible"/"Member"/"Patient"
|
||||
skip_words = ("Selected Patient", "Patient Name", "Patient Information",
|
||||
"Member Eligible", "Member ID", "Date Of Birth")
|
||||
if (len(candidate) < 50 and candidate not in skip_words
|
||||
and "Eligible" not in candidate and "Member" not in candidate):
|
||||
patientName = candidate
|
||||
name_extracted = True
|
||||
print(f"[UnitedSCO step2] Extracted patient name from text: {patientName}")
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if not name_extracted:
|
||||
print(f"[UnitedSCO step2] WARNING: Could not extract patient name from page")
|
||||
|
||||
# Extract Member ID from the page (for database storage)
|
||||
try:
|
||||
# Look for Member ID on the page
|
||||
page_text = self.driver.find_element(By.TAG_NAME, "body").text
|
||||
import re
|
||||
# Look for "Member ID" followed by a number
|
||||
member_id_match = re.search(r'Member ID\s*[\n:]\s*(\d+)', page_text)
|
||||
if member_id_match:
|
||||
foundMemberId = member_id_match.group(1)
|
||||
@@ -501,44 +900,213 @@ class AutomationUnitedSCOEligibilityCheck:
|
||||
except Exception as e:
|
||||
print(f"[UnitedSCO step2] Could not extract Member ID: {e}")
|
||||
|
||||
# 2) Click the "Eligibility" button (id='eligibility-link')
|
||||
print("[UnitedSCO step2] Looking for 'Eligibility' button...")
|
||||
|
||||
# Extract Date of Birth from page if available (for patient creation)
|
||||
extractedDob = ""
|
||||
try:
|
||||
eligibility_btn = WebDriverWait(self.driver, 10).until(
|
||||
EC.element_to_be_clickable((By.ID, "eligibility-link"))
|
||||
)
|
||||
eligibility_btn.click()
|
||||
print("[UnitedSCO step2] Clicked 'Eligibility' button")
|
||||
time.sleep(5)
|
||||
except TimeoutException:
|
||||
print("[UnitedSCO step2] Eligibility button not found, trying alternative selectors...")
|
||||
try:
|
||||
# Alternative: find button with text "Eligibility"
|
||||
eligibility_btn = self.driver.find_element(By.XPATH,
|
||||
"//button[normalize-space(text())='Eligibility']"
|
||||
)
|
||||
eligibility_btn.click()
|
||||
print("[UnitedSCO step2] Clicked 'Eligibility' button (alternative)")
|
||||
time.sleep(5)
|
||||
except Exception as e:
|
||||
print(f"[UnitedSCO step2] Could not click Eligibility button: {e}")
|
||||
|
||||
# Wait for page to fully load
|
||||
try:
|
||||
WebDriverWait(self.driver, 30).until(
|
||||
lambda d: d.execute_script("return document.readyState") == "complete"
|
||||
)
|
||||
dob_match = re.search(r'Date Of Birth\s*[\n:]\s*(\d{2}/\d{2}/\d{4})', page_text)
|
||||
if dob_match:
|
||||
extractedDob = dob_match.group(1)
|
||||
print(f"[UnitedSCO step2] Extracted DOB from page: {extractedDob}")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
time.sleep(2)
|
||||
# 2) Click the "Eligibility" button to navigate to eligibility details
|
||||
# The DOM has: <button id="eligibility-link" class="btn btn-link">Eligibility</button>
|
||||
# This is near "Benefit Summary" and "Service History" buttons.
|
||||
print("[UnitedSCO step2] Looking for 'Eligibility' button (id='eligibility-link')...")
|
||||
|
||||
print(f"[UnitedSCO step2] Final URL: {self.driver.current_url}")
|
||||
# Record existing downloads BEFORE clicking (to detect new downloads)
|
||||
existing_downloads = self._get_existing_downloads()
|
||||
|
||||
# Record current window handles BEFORE clicking (to detect new tabs)
|
||||
original_window = self.driver.current_window_handle
|
||||
original_windows = set(self.driver.window_handles)
|
||||
|
||||
eligibility_clicked = False
|
||||
|
||||
# Strategy 1 (PRIMARY): Use the known button id="eligibility-link"
|
||||
try:
|
||||
# First check if the button exists and is visible
|
||||
elig_btn = WebDriverWait(self.driver, 15).until(
|
||||
EC.presence_of_element_located((By.ID, "eligibility-link"))
|
||||
)
|
||||
# Wait for it to become visible (it's hidden when no results)
|
||||
WebDriverWait(self.driver, 10).until(
|
||||
EC.visibility_of(elig_btn)
|
||||
)
|
||||
self.driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", elig_btn)
|
||||
time.sleep(0.5)
|
||||
elig_btn.click()
|
||||
eligibility_clicked = True
|
||||
print("[UnitedSCO step2] Clicked 'Eligibility' button (id='eligibility-link')")
|
||||
time.sleep(5)
|
||||
except Exception as e:
|
||||
print(f"[UnitedSCO step2] Could not click by ID: {e}")
|
||||
|
||||
# Strategy 2: Find the button with exact "Eligibility" text (not "Eligibility Check Results" etc.)
|
||||
if not eligibility_clicked:
|
||||
try:
|
||||
buttons = self.driver.find_elements(By.XPATH, "//button")
|
||||
for btn in buttons:
|
||||
try:
|
||||
text = btn.text.strip()
|
||||
if re.match(r'^Eligibility\s*$', text, re.IGNORECASE) and btn.is_displayed():
|
||||
self.driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", btn)
|
||||
time.sleep(0.5)
|
||||
btn.click()
|
||||
eligibility_clicked = True
|
||||
print(f"[UnitedSCO step2] Clicked button with text 'Eligibility'")
|
||||
time.sleep(5)
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
except Exception as e:
|
||||
print(f"[UnitedSCO step2] Button text search error: {e}")
|
||||
|
||||
# Strategy 3: JavaScript click on #eligibility-link
|
||||
if not eligibility_clicked:
|
||||
try:
|
||||
clicked = self.driver.execute_script("""
|
||||
var btn = document.getElementById('eligibility-link');
|
||||
if (btn) { btn.scrollIntoView({block: 'center'}); btn.click(); return true; }
|
||||
// Fallback: find any button/a with exact "Eligibility" text
|
||||
var all = document.querySelectorAll('button, a');
|
||||
for (var i = 0; i < all.length; i++) {
|
||||
if (/^\\s*Eligibility\\s*$/i.test(all[i].textContent)) {
|
||||
all[i].scrollIntoView({block: 'center'});
|
||||
all[i].click();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
""")
|
||||
if clicked:
|
||||
eligibility_clicked = True
|
||||
print("[UnitedSCO step2] Clicked via JavaScript")
|
||||
time.sleep(5)
|
||||
except Exception as e:
|
||||
print(f"[UnitedSCO step2] JS click error: {e}")
|
||||
|
||||
if not eligibility_clicked:
|
||||
print("[UnitedSCO step2] WARNING: Could not click Eligibility button")
|
||||
|
||||
# 3) Handle the result of clicking: new tab, download, or same-page content
|
||||
pdf_path = None
|
||||
|
||||
# Check for new browser tab/window
|
||||
new_windows = set(self.driver.window_handles) - original_windows
|
||||
if new_windows:
|
||||
new_tab = list(new_windows)[0]
|
||||
print(f"[UnitedSCO step2] New tab opened! Switching to it...")
|
||||
self.driver.switch_to.window(new_tab)
|
||||
time.sleep(5)
|
||||
|
||||
# Wait for the new page to load
|
||||
try:
|
||||
WebDriverWait(self.driver, 30).until(
|
||||
lambda d: d.execute_script("return document.readyState") == "complete"
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
time.sleep(2)
|
||||
|
||||
print(f"[UnitedSCO step2] New tab URL: {self.driver.current_url}")
|
||||
|
||||
# Capture PDF from the new tab
|
||||
pdf_path = self._capture_pdf(foundMemberId)
|
||||
|
||||
# Close the new tab and switch back to original
|
||||
self.driver.close()
|
||||
self.driver.switch_to.window(original_window)
|
||||
print("[UnitedSCO step2] Closed new tab, switched back to original")
|
||||
|
||||
# Check for downloaded file
|
||||
if not pdf_path:
|
||||
downloaded_file = self._wait_for_new_download(existing_downloads, timeout=10)
|
||||
if downloaded_file:
|
||||
print(f"[UnitedSCO step2] File downloaded: {downloaded_file}")
|
||||
pdf_path = downloaded_file
|
||||
|
||||
# Fallback: capture current page as PDF
|
||||
if not pdf_path:
|
||||
print("[UnitedSCO step2] No new tab or download detected - capturing current page as PDF")
|
||||
|
||||
# Wait for any dynamic content
|
||||
try:
|
||||
WebDriverWait(self.driver, 15).until(
|
||||
lambda d: d.execute_script("return document.readyState") == "complete"
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
time.sleep(3)
|
||||
|
||||
print(f"[UnitedSCO step2] Capturing PDF from URL: {self.driver.current_url}")
|
||||
pdf_path = self._capture_pdf(foundMemberId)
|
||||
|
||||
# 3) Generate PDF using Chrome DevTools Protocol (same as other insurances)
|
||||
print("[UnitedSCO step2] Generating PDF...")
|
||||
if not pdf_path:
|
||||
return {"status": "error", "message": "STEP2 FAILED: Could not generate PDF"}
|
||||
|
||||
print(f"[UnitedSCO step2] PDF saved: {pdf_path}")
|
||||
|
||||
# Hide browser window after completion
|
||||
self._hide_browser()
|
||||
|
||||
print("[UnitedSCO step2] Eligibility capture complete")
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"eligibility": eligibilityText,
|
||||
"ss_path": pdf_path,
|
||||
"pdf_path": pdf_path,
|
||||
"patientName": patientName,
|
||||
"memberId": foundMemberId
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
print(f"[UnitedSCO step2] Exception: {e}")
|
||||
return {"status": "error", "message": f"STEP2 FAILED: {str(e)}"}
|
||||
|
||||
def _hide_browser(self):
|
||||
"""Hide the browser window after task completion using multiple strategies."""
|
||||
try:
|
||||
# Strategy 1: Navigate to blank page first (clears sensitive data from view)
|
||||
try:
|
||||
self.driver.get("about:blank")
|
||||
time.sleep(0.5)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Strategy 2: Minimize window
|
||||
try:
|
||||
self.driver.minimize_window()
|
||||
print("[UnitedSCO step2] Browser window minimized")
|
||||
return
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Strategy 3: Move window off-screen
|
||||
try:
|
||||
self.driver.set_window_position(-10000, -10000)
|
||||
print("[UnitedSCO step2] Browser window moved off-screen")
|
||||
return
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Strategy 4: Use xdotool to minimize (Linux)
|
||||
try:
|
||||
import subprocess
|
||||
subprocess.run(["xdotool", "getactivewindow", "windowminimize"],
|
||||
timeout=3, capture_output=True)
|
||||
print("[UnitedSCO step2] Browser minimized via xdotool")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
except Exception as e:
|
||||
print(f"[UnitedSCO step2] Could not hide browser: {e}")
|
||||
|
||||
def _capture_pdf(self, member_id):
|
||||
"""Capture the current page as PDF using Chrome DevTools Protocol."""
|
||||
try:
|
||||
pdf_options = {
|
||||
"landscape": False,
|
||||
"displayHeaderFooter": False,
|
||||
@@ -553,31 +1121,17 @@ class AutomationUnitedSCOEligibilityCheck:
|
||||
"scale": 0.9,
|
||||
}
|
||||
|
||||
# Use foundMemberId for filename
|
||||
file_identifier = foundMemberId if foundMemberId else f"{self.firstName}_{self.lastName}"
|
||||
file_identifier = member_id if member_id else f"{self.firstName}_{self.lastName}"
|
||||
|
||||
result = self.driver.execute_cdp_cmd("Page.printToPDF", pdf_options)
|
||||
pdf_data = base64.b64decode(result.get('data', ''))
|
||||
pdf_path = os.path.join(self.download_dir, f"unitedsco_eligibility_{file_identifier}_{int(time.time())}.pdf")
|
||||
with open(pdf_path, "wb") as f:
|
||||
f.write(pdf_data)
|
||||
print(f"[UnitedSCO step2] PDF saved: {pdf_path}")
|
||||
|
||||
# Keep browser alive for next patient
|
||||
print("[UnitedSCO step2] Eligibility capture complete - session preserved")
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"eligibility": eligibilityText,
|
||||
"ss_path": pdf_path,
|
||||
"pdf_path": pdf_path,
|
||||
"patientName": patientName,
|
||||
"memberId": foundMemberId # Return the Member ID found on the page
|
||||
}
|
||||
|
||||
return pdf_path
|
||||
except Exception as e:
|
||||
print(f"[UnitedSCO step2] Exception: {e}")
|
||||
return {"status": "error", "message": f"STEP2 FAILED: {str(e)}"}
|
||||
print(f"[UnitedSCO _capture_pdf] Error: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def main_workflow(self, url):
|
||||
|
||||
@@ -112,6 +112,26 @@ class UnitedSCOBrowserManager:
|
||||
except Exception as e:
|
||||
print(f"[UnitedSCO BrowserManager] Could not clear IndexedDB: {e}")
|
||||
|
||||
# Clear browser cache (prevents corrupted cached responses)
|
||||
cache_dirs = [
|
||||
os.path.join(self.profile_dir, "Default", "Cache"),
|
||||
os.path.join(self.profile_dir, "Default", "Code Cache"),
|
||||
os.path.join(self.profile_dir, "Default", "GPUCache"),
|
||||
os.path.join(self.profile_dir, "Default", "Service Worker"),
|
||||
os.path.join(self.profile_dir, "Cache"),
|
||||
os.path.join(self.profile_dir, "Code Cache"),
|
||||
os.path.join(self.profile_dir, "GPUCache"),
|
||||
os.path.join(self.profile_dir, "Service Worker"),
|
||||
os.path.join(self.profile_dir, "ShaderCache"),
|
||||
]
|
||||
for cache_dir in cache_dirs:
|
||||
if os.path.exists(cache_dir):
|
||||
try:
|
||||
shutil.rmtree(cache_dir)
|
||||
print(f"[UnitedSCO BrowserManager] Cleared {os.path.basename(cache_dir)}")
|
||||
except Exception as e:
|
||||
print(f"[UnitedSCO BrowserManager] Could not clear {os.path.basename(cache_dir)}: {e}")
|
||||
|
||||
# Set flag to clear session via JavaScript after browser opens
|
||||
self._needs_session_clear = True
|
||||
|
||||
@@ -233,11 +253,21 @@ class UnitedSCOBrowserManager:
|
||||
options.add_argument("--no-sandbox")
|
||||
options.add_argument("--disable-dev-shm-usage")
|
||||
|
||||
# Anti-detection options (prevent bot detection)
|
||||
options.add_argument("--disable-blink-features=AutomationControlled")
|
||||
options.add_experimental_option("excludeSwitches", ["enable-automation"])
|
||||
options.add_experimental_option("useAutomationExtension", False)
|
||||
options.add_argument("--disable-infobars")
|
||||
|
||||
prefs = {
|
||||
"download.default_directory": self.download_dir,
|
||||
"plugins.always_open_pdf_externally": True,
|
||||
"download.prompt_for_download": False,
|
||||
"download.directory_upgrade": True
|
||||
"download.directory_upgrade": True,
|
||||
# Disable password save dialog that blocks page interactions
|
||||
"credentials_enable_service": False,
|
||||
"profile.password_manager_enabled": False,
|
||||
"profile.password_manager_leak_detection": False,
|
||||
}
|
||||
options.add_experimental_option("prefs", prefs)
|
||||
|
||||
@@ -245,6 +275,12 @@ class UnitedSCOBrowserManager:
|
||||
self._driver = webdriver.Chrome(service=service, options=options)
|
||||
self._driver.maximize_window()
|
||||
|
||||
# Remove webdriver property to avoid detection
|
||||
try:
|
||||
self._driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Reset the session clear flag (file-based clearing is done on startup)
|
||||
self._needs_session_clear = False
|
||||
|
||||
|
||||
Reference in New Issue
Block a user