feat(eligibility-check) - implement Delta Dental Ins eligibility workflow with OTP handling; added routes, services, and frontend components for patient data processing and eligibility status retrieval; enhanced browser session management and logging
This commit is contained in:
@@ -11,11 +11,13 @@ import time
|
||||
import helpers_ddma_eligibility as hddma
|
||||
import helpers_dentaquest_eligibility as hdentaquest
|
||||
import helpers_unitedsco_eligibility as hunitedsco
|
||||
import helpers_deltains_eligibility as hdeltains
|
||||
|
||||
# Import session clear functions for startup
|
||||
from ddma_browser_manager import clear_ddma_session_on_startup
|
||||
from dentaquest_browser_manager import clear_dentaquest_session_on_startup
|
||||
from unitedsco_browser_manager import clear_unitedsco_session_on_startup
|
||||
from deltains_browser_manager import clear_deltains_session_on_startup
|
||||
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv()
|
||||
@@ -28,6 +30,7 @@ print("=" * 50)
|
||||
clear_ddma_session_on_startup()
|
||||
clear_dentaquest_session_on_startup()
|
||||
clear_unitedsco_session_on_startup()
|
||||
clear_deltains_session_on_startup()
|
||||
print("=" * 50)
|
||||
print("SESSION CLEAR COMPLETE - FRESH LOGINS REQUIRED")
|
||||
print("=" * 50)
|
||||
@@ -351,6 +354,77 @@ async def unitedsco_session_status(sid: str):
|
||||
return s
|
||||
|
||||
|
||||
# Endpoint:8 - DeltaIns eligibility (background, OTP)
|
||||
|
||||
async def _deltains_worker_wrapper(sid: str, data: dict, url: str):
|
||||
"""
|
||||
Background worker that:
|
||||
- acquires semaphore (to keep 1 selenium at a time),
|
||||
- updates active/queued counters,
|
||||
- runs the DeltaIns flow via helpers.start_deltains_run.
|
||||
"""
|
||||
global active_jobs, waiting_jobs
|
||||
async with semaphore:
|
||||
async with lock:
|
||||
waiting_jobs -= 1
|
||||
active_jobs += 1
|
||||
try:
|
||||
await hdeltains.start_deltains_run(sid, data, url)
|
||||
finally:
|
||||
async with lock:
|
||||
active_jobs -= 1
|
||||
|
||||
|
||||
@app.post("/deltains-eligibility")
|
||||
async def deltains_eligibility(request: Request):
|
||||
"""
|
||||
Starts a DeltaIns eligibility session in the background.
|
||||
Body: { "data": { ... }, "url"?: string }
|
||||
Returns: { status: "started", session_id: "<uuid>" }
|
||||
"""
|
||||
global waiting_jobs
|
||||
|
||||
body = await request.json()
|
||||
data = body.get("data", {})
|
||||
|
||||
sid = hdeltains.make_session_entry()
|
||||
hdeltains.sessions[sid]["type"] = "deltains_eligibility"
|
||||
hdeltains.sessions[sid]["last_activity"] = time.time()
|
||||
|
||||
async with lock:
|
||||
waiting_jobs += 1
|
||||
|
||||
asyncio.create_task(_deltains_worker_wrapper(sid, data, url="https://www.deltadentalins.com/ciam/login?TARGET=%2Fprovider-tools%2Fv2"))
|
||||
|
||||
return {"status": "started", "session_id": sid}
|
||||
|
||||
|
||||
@app.post("/deltains-submit-otp")
|
||||
async def deltains_submit_otp(request: Request):
|
||||
"""
|
||||
Body: { "session_id": "<sid>", "otp": "123456" }
|
||||
Node / frontend call this when user provides OTP for DeltaIns.
|
||||
"""
|
||||
body = await request.json()
|
||||
sid = body.get("session_id")
|
||||
otp = body.get("otp")
|
||||
if not sid or not otp:
|
||||
raise HTTPException(status_code=400, detail="session_id and otp required")
|
||||
|
||||
res = hdeltains.submit_otp(sid, otp)
|
||||
if res.get("status") == "error":
|
||||
raise HTTPException(status_code=400, detail=res.get("message"))
|
||||
return res
|
||||
|
||||
|
||||
@app.get("/deltains-session/{sid}/status")
|
||||
async def deltains_session_status(sid: str):
|
||||
s = hdeltains.get_session_status(sid)
|
||||
if s.get("status") == "not_found":
|
||||
raise HTTPException(status_code=404, detail="session not found")
|
||||
return s
|
||||
|
||||
|
||||
@app.post("/submit-otp")
|
||||
async def submit_otp(request: Request):
|
||||
"""
|
||||
@@ -425,6 +499,18 @@ async def clear_unitedsco_session():
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
||||
|
||||
@app.post("/clear-deltains-session")
|
||||
async def clear_deltains_session():
|
||||
"""
|
||||
Clears the Delta Dental Ins browser session. Called when DeltaIns credentials are deleted.
|
||||
"""
|
||||
try:
|
||||
clear_deltains_session_on_startup()
|
||||
return {"status": "success", "message": "DeltaIns session cleared"}
|
||||
except Exception as e:
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
host = os.getenv("HOST")
|
||||
port = int(os.getenv("PORT"))
|
||||
|
||||
376
apps/SeleniumService/deltains_browser_manager.py
Normal file
376
apps/SeleniumService/deltains_browser_manager.py
Normal file
@@ -0,0 +1,376 @@
|
||||
"""
|
||||
Browser manager for Delta Dental Ins - handles persistent profile, cookie
|
||||
save/restore (for Okta session-only cookies), and keeping browser alive.
|
||||
Tracks credentials to detect changes mid-session.
|
||||
"""
|
||||
import os
|
||||
import json
|
||||
import shutil
|
||||
import hashlib
|
||||
import threading
|
||||
import subprocess
|
||||
import time
|
||||
from selenium import webdriver
|
||||
from selenium.webdriver.chrome.service import Service
|
||||
from webdriver_manager.chrome import ChromeDriverManager
|
||||
|
||||
if not os.environ.get("DISPLAY"):
|
||||
os.environ["DISPLAY"] = ":0"
|
||||
|
||||
DELTAINS_DOMAIN = ".deltadentalins.com"
|
||||
OKTA_DOMAINS = [".okta.com", ".oktacdn.com"]
|
||||
|
||||
|
||||
class DeltaInsBrowserManager:
|
||||
"""
|
||||
Singleton that manages a persistent Chrome browser instance for Delta Dental Ins.
|
||||
- Uses --user-data-dir for persistent profile
|
||||
- Saves/restores Okta session cookies to survive browser restarts
|
||||
- Tracks credentials to detect changes mid-session
|
||||
"""
|
||||
_instance = None
|
||||
_lock = threading.Lock()
|
||||
|
||||
def __new__(cls):
|
||||
with cls._lock:
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
cls._instance._driver = None
|
||||
cls._instance.profile_dir = os.path.abspath("chrome_profile_deltains")
|
||||
cls._instance.download_dir = os.path.abspath("seleniumDownloads")
|
||||
cls._instance._credentials_file = os.path.join(cls._instance.profile_dir, ".last_credentials")
|
||||
cls._instance._cookies_file = os.path.join(cls._instance.profile_dir, ".saved_cookies.json")
|
||||
cls._instance._needs_session_clear = False
|
||||
os.makedirs(cls._instance.profile_dir, exist_ok=True)
|
||||
os.makedirs(cls._instance.download_dir, exist_ok=True)
|
||||
return cls._instance
|
||||
|
||||
# ── Cookie save / restore ──────────────────────────────────────────
|
||||
|
||||
def save_cookies(self):
|
||||
"""Save all browser cookies to a JSON file so they survive browser restart."""
|
||||
try:
|
||||
if not self._driver:
|
||||
return
|
||||
cookies = self._driver.get_cookies()
|
||||
if not cookies:
|
||||
return
|
||||
with open(self._cookies_file, "w") as f:
|
||||
json.dump(cookies, f)
|
||||
print(f"[DeltaIns BrowserManager] Saved {len(cookies)} cookies to disk")
|
||||
except Exception as e:
|
||||
print(f"[DeltaIns BrowserManager] Failed to save cookies: {e}")
|
||||
|
||||
def restore_cookies(self):
|
||||
"""Restore saved cookies into the current browser session."""
|
||||
if not os.path.exists(self._cookies_file):
|
||||
print("[DeltaIns BrowserManager] No saved cookies file found")
|
||||
return False
|
||||
|
||||
try:
|
||||
with open(self._cookies_file, "r") as f:
|
||||
cookies = json.load(f)
|
||||
|
||||
if not cookies:
|
||||
print("[DeltaIns BrowserManager] Saved cookies file is empty")
|
||||
return False
|
||||
|
||||
# Navigate to the DeltaIns domain first so we can set cookies for it
|
||||
try:
|
||||
self._driver.get("https://www.deltadentalins.com/favicon.ico")
|
||||
time.sleep(2)
|
||||
except Exception:
|
||||
self._driver.get("https://www.deltadentalins.com")
|
||||
time.sleep(3)
|
||||
|
||||
restored = 0
|
||||
for cookie in cookies:
|
||||
try:
|
||||
# Remove problematic fields that Selenium doesn't accept
|
||||
for key in ["sameSite", "storeId", "hostOnly", "session"]:
|
||||
cookie.pop(key, None)
|
||||
# sameSite must be one of: Strict, Lax, None
|
||||
cookie["sameSite"] = "None"
|
||||
self._driver.add_cookie(cookie)
|
||||
restored += 1
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
print(f"[DeltaIns BrowserManager] Restored {restored}/{len(cookies)} cookies")
|
||||
return restored > 0
|
||||
|
||||
except Exception as e:
|
||||
print(f"[DeltaIns BrowserManager] Failed to restore cookies: {e}")
|
||||
return False
|
||||
|
||||
def clear_saved_cookies(self):
|
||||
"""Delete the saved cookies file."""
|
||||
try:
|
||||
if os.path.exists(self._cookies_file):
|
||||
os.remove(self._cookies_file)
|
||||
print("[DeltaIns BrowserManager] Cleared saved cookies file")
|
||||
except Exception as e:
|
||||
print(f"[DeltaIns BrowserManager] Failed to clear saved cookies: {e}")
|
||||
|
||||
# ── Session clear ──────────────────────────────────────────────────
|
||||
|
||||
def clear_session_on_startup(self):
|
||||
"""
|
||||
Clear session cookies from Chrome profile on startup.
|
||||
This forces a fresh login after PC restart.
|
||||
"""
|
||||
print("[DeltaIns BrowserManager] Clearing session on startup...")
|
||||
|
||||
try:
|
||||
if os.path.exists(self._credentials_file):
|
||||
os.remove(self._credentials_file)
|
||||
print("[DeltaIns BrowserManager] Cleared credentials tracking file")
|
||||
|
||||
# Also clear saved cookies
|
||||
self.clear_saved_cookies()
|
||||
|
||||
session_files = [
|
||||
"Cookies",
|
||||
"Cookies-journal",
|
||||
"Login Data",
|
||||
"Login Data-journal",
|
||||
"Web Data",
|
||||
"Web Data-journal",
|
||||
]
|
||||
|
||||
for filename in session_files:
|
||||
filepath = os.path.join(self.profile_dir, "Default", filename)
|
||||
if os.path.exists(filepath):
|
||||
try:
|
||||
os.remove(filepath)
|
||||
print(f"[DeltaIns BrowserManager] Removed {filename}")
|
||||
except Exception as e:
|
||||
print(f"[DeltaIns BrowserManager] Could not remove {filename}: {e}")
|
||||
|
||||
for filename in session_files:
|
||||
filepath = os.path.join(self.profile_dir, filename)
|
||||
if os.path.exists(filepath):
|
||||
try:
|
||||
os.remove(filepath)
|
||||
print(f"[DeltaIns BrowserManager] Removed root {filename}")
|
||||
except Exception as e:
|
||||
print(f"[DeltaIns BrowserManager] Could not remove root {filename}: {e}")
|
||||
|
||||
session_storage_dir = os.path.join(self.profile_dir, "Default", "Session Storage")
|
||||
if os.path.exists(session_storage_dir):
|
||||
try:
|
||||
shutil.rmtree(session_storage_dir)
|
||||
print("[DeltaIns BrowserManager] Cleared Session Storage")
|
||||
except Exception as e:
|
||||
print(f"[DeltaIns BrowserManager] Could not clear Session Storage: {e}")
|
||||
|
||||
local_storage_dir = os.path.join(self.profile_dir, "Default", "Local Storage")
|
||||
if os.path.exists(local_storage_dir):
|
||||
try:
|
||||
shutil.rmtree(local_storage_dir)
|
||||
print("[DeltaIns BrowserManager] Cleared Local Storage")
|
||||
except Exception as e:
|
||||
print(f"[DeltaIns BrowserManager] Could not clear Local Storage: {e}")
|
||||
|
||||
indexeddb_dir = os.path.join(self.profile_dir, "Default", "IndexedDB")
|
||||
if os.path.exists(indexeddb_dir):
|
||||
try:
|
||||
shutil.rmtree(indexeddb_dir)
|
||||
print("[DeltaIns BrowserManager] Cleared IndexedDB")
|
||||
except Exception as e:
|
||||
print(f"[DeltaIns BrowserManager] Could not clear IndexedDB: {e}")
|
||||
|
||||
cache_dirs = [
|
||||
os.path.join(self.profile_dir, "Default", "Cache"),
|
||||
os.path.join(self.profile_dir, "Default", "Code Cache"),
|
||||
os.path.join(self.profile_dir, "Default", "GPUCache"),
|
||||
os.path.join(self.profile_dir, "Default", "Service Worker"),
|
||||
os.path.join(self.profile_dir, "Cache"),
|
||||
os.path.join(self.profile_dir, "Code Cache"),
|
||||
os.path.join(self.profile_dir, "GPUCache"),
|
||||
os.path.join(self.profile_dir, "Service Worker"),
|
||||
os.path.join(self.profile_dir, "ShaderCache"),
|
||||
]
|
||||
for cache_dir in cache_dirs:
|
||||
if os.path.exists(cache_dir):
|
||||
try:
|
||||
shutil.rmtree(cache_dir)
|
||||
print(f"[DeltaIns BrowserManager] Cleared {os.path.basename(cache_dir)}")
|
||||
except Exception as e:
|
||||
print(f"[DeltaIns BrowserManager] Could not clear {os.path.basename(cache_dir)}: {e}")
|
||||
|
||||
self._needs_session_clear = True
|
||||
print("[DeltaIns BrowserManager] Session cleared - will require fresh login")
|
||||
|
||||
except Exception as e:
|
||||
print(f"[DeltaIns BrowserManager] Error clearing session: {e}")
|
||||
|
||||
# ── Credential tracking ────────────────────────────────────────────
|
||||
|
||||
def _hash_credentials(self, username: str) -> str:
|
||||
return hashlib.sha256(username.encode()).hexdigest()[:16]
|
||||
|
||||
def get_last_credentials_hash(self) -> str | None:
|
||||
try:
|
||||
if os.path.exists(self._credentials_file):
|
||||
with open(self._credentials_file, 'r') as f:
|
||||
return f.read().strip()
|
||||
except Exception:
|
||||
pass
|
||||
return None
|
||||
|
||||
def save_credentials_hash(self, username: str):
|
||||
try:
|
||||
cred_hash = self._hash_credentials(username)
|
||||
with open(self._credentials_file, 'w') as f:
|
||||
f.write(cred_hash)
|
||||
except Exception as e:
|
||||
print(f"[DeltaIns BrowserManager] Failed to save credentials hash: {e}")
|
||||
|
||||
def credentials_changed(self, username: str) -> bool:
|
||||
last_hash = self.get_last_credentials_hash()
|
||||
if last_hash is None:
|
||||
return False
|
||||
current_hash = self._hash_credentials(username)
|
||||
changed = last_hash != current_hash
|
||||
if changed:
|
||||
print("[DeltaIns BrowserManager] Credentials changed - logout required")
|
||||
return changed
|
||||
|
||||
def clear_credentials_hash(self):
|
||||
try:
|
||||
if os.path.exists(self._credentials_file):
|
||||
os.remove(self._credentials_file)
|
||||
except Exception as e:
|
||||
print(f"[DeltaIns BrowserManager] Failed to clear credentials hash: {e}")
|
||||
|
||||
# ── Chrome process management ──────────────────────────────────────
|
||||
|
||||
def _kill_existing_chrome_for_profile(self):
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["pgrep", "-f", f"user-data-dir={self.profile_dir}"],
|
||||
capture_output=True, text=True
|
||||
)
|
||||
if result.stdout.strip():
|
||||
pids = result.stdout.strip().split('\n')
|
||||
for pid in pids:
|
||||
try:
|
||||
subprocess.run(["kill", "-9", pid], check=False)
|
||||
except:
|
||||
pass
|
||||
time.sleep(1)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
for lock_file in ["SingletonLock", "SingletonSocket", "SingletonCookie"]:
|
||||
lock_path = os.path.join(self.profile_dir, lock_file)
|
||||
try:
|
||||
if os.path.islink(lock_path) or os.path.exists(lock_path):
|
||||
os.remove(lock_path)
|
||||
except:
|
||||
pass
|
||||
|
||||
# ── Driver lifecycle ───────────────────────────────────────────────
|
||||
|
||||
def get_driver(self, headless=False):
|
||||
with self._lock:
|
||||
need_cookie_restore = False
|
||||
|
||||
if self._driver is None:
|
||||
print("[DeltaIns BrowserManager] Driver is None, creating new driver")
|
||||
self._kill_existing_chrome_for_profile()
|
||||
self._create_driver(headless)
|
||||
need_cookie_restore = True
|
||||
elif not self._is_alive():
|
||||
print("[DeltaIns BrowserManager] Driver not alive, recreating")
|
||||
# Save cookies from the dead session if possible (usually can't)
|
||||
self._kill_existing_chrome_for_profile()
|
||||
self._create_driver(headless)
|
||||
need_cookie_restore = True
|
||||
else:
|
||||
print("[DeltaIns BrowserManager] Reusing existing driver")
|
||||
|
||||
if need_cookie_restore and os.path.exists(self._cookies_file):
|
||||
print("[DeltaIns BrowserManager] Restoring saved cookies into new browser...")
|
||||
self.restore_cookies()
|
||||
|
||||
return self._driver
|
||||
|
||||
def _is_alive(self):
|
||||
try:
|
||||
if self._driver is None:
|
||||
return False
|
||||
_ = self._driver.current_url
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def _create_driver(self, headless=False):
|
||||
if self._driver:
|
||||
try:
|
||||
self._driver.quit()
|
||||
except:
|
||||
pass
|
||||
self._driver = None
|
||||
time.sleep(1)
|
||||
|
||||
options = webdriver.ChromeOptions()
|
||||
if headless:
|
||||
options.add_argument("--headless")
|
||||
|
||||
options.add_argument(f"--user-data-dir={self.profile_dir}")
|
||||
options.add_argument("--no-sandbox")
|
||||
options.add_argument("--disable-dev-shm-usage")
|
||||
|
||||
options.add_argument("--disable-blink-features=AutomationControlled")
|
||||
options.add_experimental_option("excludeSwitches", ["enable-automation"])
|
||||
options.add_experimental_option("useAutomationExtension", False)
|
||||
options.add_argument("--disable-infobars")
|
||||
|
||||
prefs = {
|
||||
"download.default_directory": self.download_dir,
|
||||
"plugins.always_open_pdf_externally": True,
|
||||
"download.prompt_for_download": False,
|
||||
"download.directory_upgrade": True,
|
||||
"credentials_enable_service": False,
|
||||
"profile.password_manager_enabled": False,
|
||||
"profile.password_manager_leak_detection": False,
|
||||
}
|
||||
options.add_experimental_option("prefs", prefs)
|
||||
|
||||
service = Service(ChromeDriverManager().install())
|
||||
self._driver = webdriver.Chrome(service=service, options=options)
|
||||
self._driver.maximize_window()
|
||||
|
||||
try:
|
||||
self._driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
self._needs_session_clear = False
|
||||
|
||||
def quit_driver(self):
|
||||
with self._lock:
|
||||
if self._driver:
|
||||
try:
|
||||
self._driver.quit()
|
||||
except:
|
||||
pass
|
||||
self._driver = None
|
||||
self._kill_existing_chrome_for_profile()
|
||||
|
||||
|
||||
_manager = None
|
||||
|
||||
def get_browser_manager():
|
||||
global _manager
|
||||
if _manager is None:
|
||||
_manager = DeltaInsBrowserManager()
|
||||
return _manager
|
||||
|
||||
|
||||
def clear_deltains_session_on_startup():
|
||||
"""Called by agent.py on startup to clear session."""
|
||||
manager = get_browser_manager()
|
||||
manager.clear_session_on_startup()
|
||||
300
apps/SeleniumService/helpers_deltains_eligibility.py
Normal file
300
apps/SeleniumService/helpers_deltains_eligibility.py
Normal file
@@ -0,0 +1,300 @@
|
||||
import os
|
||||
import time
|
||||
import asyncio
|
||||
from typing import Dict, Any
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.common.keys import Keys
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
from selenium.common.exceptions import WebDriverException, TimeoutException
|
||||
|
||||
from selenium_DeltaIns_eligibilityCheckWorker import AutomationDeltaInsEligibilityCheck
|
||||
from deltains_browser_manager import get_browser_manager
|
||||
|
||||
# In-memory session store
|
||||
sessions: Dict[str, Dict[str, Any]] = {}
|
||||
|
||||
SESSION_OTP_TIMEOUT = int(os.getenv("SESSION_OTP_TIMEOUT", "240"))
|
||||
|
||||
|
||||
def make_session_entry() -> str:
|
||||
import uuid
|
||||
sid = str(uuid.uuid4())
|
||||
sessions[sid] = {
|
||||
"status": "created",
|
||||
"created_at": time.time(),
|
||||
"last_activity": time.time(),
|
||||
"bot": None,
|
||||
"driver": None,
|
||||
"otp_event": asyncio.Event(),
|
||||
"otp_value": None,
|
||||
"result": None,
|
||||
"message": None,
|
||||
"type": None,
|
||||
}
|
||||
return sid
|
||||
|
||||
|
||||
async def cleanup_session(sid: str, message: str | None = None):
|
||||
s = sessions.get(sid)
|
||||
if not s:
|
||||
return
|
||||
try:
|
||||
try:
|
||||
if s.get("status") not in ("completed", "error", "not_found"):
|
||||
s["status"] = "error"
|
||||
if message:
|
||||
s["message"] = message
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
ev = s.get("otp_event")
|
||||
if ev and not ev.is_set():
|
||||
ev.set()
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
sessions.pop(sid, None)
|
||||
|
||||
|
||||
async def _remove_session_later(sid: str, delay: int = 30):
|
||||
await asyncio.sleep(delay)
|
||||
await cleanup_session(sid)
|
||||
|
||||
|
||||
def _close_browser(bot):
|
||||
"""Save cookies and close the browser after task completion."""
|
||||
try:
|
||||
bm = get_browser_manager()
|
||||
try:
|
||||
bm.save_cookies()
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
bm.quit_driver()
|
||||
print("[DeltaIns] Browser closed")
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as e:
|
||||
print(f"[DeltaIns] Could not close browser: {e}")
|
||||
|
||||
|
||||
async def start_deltains_run(sid: str, data: dict, url: str):
|
||||
"""
|
||||
Run the DeltaIns eligibility check workflow:
|
||||
1. Login (with OTP if needed)
|
||||
2. Search patient by Member ID + DOB
|
||||
3. Extract eligibility info + PDF
|
||||
"""
|
||||
s = sessions.get(sid)
|
||||
if not s:
|
||||
return {"status": "error", "message": "session not found"}
|
||||
|
||||
s["status"] = "running"
|
||||
s["last_activity"] = time.time()
|
||||
bot = None
|
||||
|
||||
try:
|
||||
bot = AutomationDeltaInsEligibilityCheck({"data": data})
|
||||
bot.config_driver()
|
||||
|
||||
s["bot"] = bot
|
||||
s["driver"] = bot.driver
|
||||
s["last_activity"] = time.time()
|
||||
|
||||
# Maximize window and login (bot.login handles navigation itself,
|
||||
# checking provider-tools URL first to preserve existing sessions)
|
||||
try:
|
||||
bot.driver.maximize_window()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
login_result = bot.login(url)
|
||||
except WebDriverException as wde:
|
||||
s["status"] = "error"
|
||||
s["message"] = f"Selenium driver error during login: {wde}"
|
||||
s["result"] = {"status": "error", "message": s["message"]}
|
||||
_close_browser(bot)
|
||||
asyncio.create_task(_remove_session_later(sid, 30))
|
||||
return {"status": "error", "message": s["message"]}
|
||||
except Exception as e:
|
||||
s["status"] = "error"
|
||||
s["message"] = f"Unexpected error during login: {e}"
|
||||
s["result"] = {"status": "error", "message": s["message"]}
|
||||
_close_browser(bot)
|
||||
asyncio.create_task(_remove_session_later(sid, 30))
|
||||
return {"status": "error", "message": s["message"]}
|
||||
|
||||
# Handle login result
|
||||
if isinstance(login_result, str) and login_result == "ALREADY_LOGGED_IN":
|
||||
s["status"] = "running"
|
||||
s["message"] = "Session persisted"
|
||||
print("[DeltaIns] Session persisted - skipping OTP")
|
||||
# Re-save cookies to keep them fresh on disk
|
||||
get_browser_manager().save_cookies()
|
||||
|
||||
elif isinstance(login_result, str) and login_result == "OTP_REQUIRED":
|
||||
s["status"] = "waiting_for_otp"
|
||||
s["message"] = "OTP required - please enter the code sent to your email"
|
||||
s["last_activity"] = time.time()
|
||||
|
||||
driver = s["driver"]
|
||||
max_polls = SESSION_OTP_TIMEOUT
|
||||
login_success = False
|
||||
|
||||
print(f"[DeltaIns OTP] Waiting for OTP (polling for {SESSION_OTP_TIMEOUT}s)...")
|
||||
|
||||
for poll in range(max_polls):
|
||||
await asyncio.sleep(1)
|
||||
s["last_activity"] = time.time()
|
||||
|
||||
try:
|
||||
otp_value = s.get("otp_value")
|
||||
if otp_value:
|
||||
print(f"[DeltaIns OTP] OTP received from app: {otp_value}")
|
||||
try:
|
||||
otp_input = driver.find_element(By.XPATH,
|
||||
"//input[@name='credentials.passcode' and @type='text'] | "
|
||||
"//input[contains(@name,'passcode')]")
|
||||
otp_input.clear()
|
||||
otp_input.send_keys(otp_value)
|
||||
|
||||
try:
|
||||
verify_btn = driver.find_element(By.XPATH,
|
||||
"//input[@type='submit'] | "
|
||||
"//button[@type='submit']")
|
||||
verify_btn.click()
|
||||
print("[DeltaIns OTP] Clicked verify button")
|
||||
except Exception:
|
||||
otp_input.send_keys(Keys.RETURN)
|
||||
print("[DeltaIns OTP] Pressed Enter as fallback")
|
||||
|
||||
s["otp_value"] = None
|
||||
await asyncio.sleep(8)
|
||||
except Exception as type_err:
|
||||
print(f"[DeltaIns OTP] Failed to type OTP: {type_err}")
|
||||
|
||||
current_url = driver.current_url.lower()
|
||||
if poll % 10 == 0:
|
||||
print(f"[DeltaIns OTP Poll {poll+1}/{max_polls}] URL: {current_url[:80]}")
|
||||
|
||||
if "provider-tools" in current_url and "login" not in current_url and "ciam" not in current_url:
|
||||
print("[DeltaIns OTP] Login successful!")
|
||||
login_success = True
|
||||
break
|
||||
|
||||
except Exception as poll_err:
|
||||
if poll % 10 == 0:
|
||||
print(f"[DeltaIns OTP Poll {poll+1}] Error: {poll_err}")
|
||||
|
||||
if not login_success:
|
||||
try:
|
||||
current_url = driver.current_url.lower()
|
||||
if "provider-tools" in current_url and "login" not in current_url and "ciam" not in current_url:
|
||||
login_success = True
|
||||
else:
|
||||
s["status"] = "error"
|
||||
s["message"] = "OTP timeout - login not completed"
|
||||
s["result"] = {"status": "error", "message": "OTP not completed in time"}
|
||||
_close_browser(bot)
|
||||
asyncio.create_task(_remove_session_later(sid, 30))
|
||||
return {"status": "error", "message": "OTP not completed in time"}
|
||||
except Exception as final_err:
|
||||
s["status"] = "error"
|
||||
s["message"] = f"OTP verification failed: {final_err}"
|
||||
s["result"] = {"status": "error", "message": s["message"]}
|
||||
_close_browser(bot)
|
||||
asyncio.create_task(_remove_session_later(sid, 30))
|
||||
return {"status": "error", "message": s["message"]}
|
||||
|
||||
if login_success:
|
||||
s["status"] = "running"
|
||||
s["message"] = "Login successful after OTP"
|
||||
print("[DeltaIns OTP] Proceeding to step1...")
|
||||
# Save cookies to disk so session survives browser restart
|
||||
get_browser_manager().save_cookies()
|
||||
|
||||
elif isinstance(login_result, str) and login_result.startswith("ERROR"):
|
||||
s["status"] = "error"
|
||||
s["message"] = login_result
|
||||
s["result"] = {"status": "error", "message": login_result}
|
||||
_close_browser(bot)
|
||||
asyncio.create_task(_remove_session_later(sid, 30))
|
||||
return {"status": "error", "message": login_result}
|
||||
|
||||
elif isinstance(login_result, str) and login_result == "SUCCESS":
|
||||
print("[DeltaIns] Login succeeded without OTP")
|
||||
s["status"] = "running"
|
||||
s["message"] = "Login succeeded"
|
||||
# Save cookies to disk so session survives browser restart
|
||||
get_browser_manager().save_cookies()
|
||||
|
||||
# Step 1 - search patient
|
||||
step1_result = bot.step1()
|
||||
print(f"[DeltaIns] step1 result: {step1_result}")
|
||||
|
||||
if isinstance(step1_result, str) and step1_result.startswith("ERROR"):
|
||||
s["status"] = "error"
|
||||
s["message"] = step1_result
|
||||
s["result"] = {"status": "error", "message": step1_result}
|
||||
_close_browser(bot)
|
||||
asyncio.create_task(_remove_session_later(sid, 30))
|
||||
return {"status": "error", "message": step1_result}
|
||||
|
||||
# Step 2 - extract eligibility info + PDF
|
||||
step2_result = bot.step2()
|
||||
print(f"[DeltaIns] step2 result: {step2_result.get('status') if isinstance(step2_result, dict) else step2_result}")
|
||||
|
||||
if isinstance(step2_result, dict):
|
||||
s["status"] = "completed"
|
||||
s["result"] = step2_result
|
||||
s["message"] = "completed"
|
||||
asyncio.create_task(_remove_session_later(sid, 60))
|
||||
return step2_result
|
||||
else:
|
||||
s["status"] = "error"
|
||||
s["message"] = f"step2 returned unexpected result: {step2_result}"
|
||||
s["result"] = {"status": "error", "message": s["message"]}
|
||||
_close_browser(bot)
|
||||
asyncio.create_task(_remove_session_later(sid, 30))
|
||||
return {"status": "error", "message": s["message"]}
|
||||
|
||||
except Exception as e:
|
||||
if s:
|
||||
s["status"] = "error"
|
||||
s["message"] = f"worker exception: {e}"
|
||||
s["result"] = {"status": "error", "message": s["message"]}
|
||||
if bot:
|
||||
_close_browser(bot)
|
||||
asyncio.create_task(_remove_session_later(sid, 30))
|
||||
return {"status": "error", "message": f"worker exception: {e}"}
|
||||
|
||||
|
||||
def submit_otp(sid: str, otp: str) -> Dict[str, Any]:
|
||||
s = sessions.get(sid)
|
||||
if not s:
|
||||
return {"status": "error", "message": "session not found"}
|
||||
if s.get("status") != "waiting_for_otp":
|
||||
return {"status": "error", "message": f"session not waiting for otp (state={s.get('status')})"}
|
||||
s["otp_value"] = otp
|
||||
s["last_activity"] = time.time()
|
||||
try:
|
||||
s["otp_event"].set()
|
||||
except Exception:
|
||||
pass
|
||||
return {"status": "ok", "message": "otp accepted"}
|
||||
|
||||
|
||||
def get_session_status(sid: str) -> Dict[str, Any]:
|
||||
s = sessions.get(sid)
|
||||
if not s:
|
||||
return {"status": "not_found"}
|
||||
return {
|
||||
"session_id": sid,
|
||||
"status": s.get("status"),
|
||||
"message": s.get("message"),
|
||||
"created_at": s.get("created_at"),
|
||||
"last_activity": s.get("last_activity"),
|
||||
"result": s.get("result") if s.get("status") in ("completed", "error") else None,
|
||||
}
|
||||
BIN
apps/SeleniumService/seleniumDownloads/EligibilityBenefit.pdf
Normal file
BIN
apps/SeleniumService/seleniumDownloads/EligibilityBenefit.pdf
Normal file
Binary file not shown.
686
apps/SeleniumService/selenium_DeltaIns_eligibilityCheckWorker.py
Normal file
686
apps/SeleniumService/selenium_DeltaIns_eligibilityCheckWorker.py
Normal file
@@ -0,0 +1,686 @@
|
||||
from selenium import webdriver
|
||||
from selenium.common.exceptions import WebDriverException, TimeoutException
|
||||
from selenium.webdriver.chrome.service import Service
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.common.keys import Keys
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
from webdriver_manager.chrome import ChromeDriverManager
|
||||
import time
|
||||
import os
|
||||
import base64
|
||||
import re
|
||||
import glob
|
||||
|
||||
from deltains_browser_manager import get_browser_manager
|
||||
|
||||
LOGIN_URL = "https://www.deltadentalins.com/ciam/login?TARGET=%2Fprovider-tools%2Fv2"
|
||||
PROVIDER_TOOLS_URL = "https://www.deltadentalins.com/provider-tools/v2"
|
||||
|
||||
|
||||
class AutomationDeltaInsEligibilityCheck:
|
||||
def __init__(self, data):
|
||||
self.headless = False
|
||||
self.driver = None
|
||||
|
||||
self.data = data.get("data", {}) if isinstance(data, dict) else {}
|
||||
|
||||
self.memberId = self.data.get("memberId", "")
|
||||
self.dateOfBirth = self.data.get("dateOfBirth", "")
|
||||
self.firstName = self.data.get("firstName", "")
|
||||
self.lastName = self.data.get("lastName", "")
|
||||
self.deltains_username = self.data.get("deltains_username", "")
|
||||
self.deltains_password = self.data.get("deltains_password", "")
|
||||
|
||||
self.download_dir = get_browser_manager().download_dir
|
||||
os.makedirs(self.download_dir, exist_ok=True)
|
||||
|
||||
def config_driver(self):
|
||||
self.driver = get_browser_manager().get_driver(self.headless)
|
||||
|
||||
def _dismiss_cookie_banner(self):
|
||||
try:
|
||||
accept_btn = WebDriverWait(self.driver, 5).until(
|
||||
EC.element_to_be_clickable((By.ID, "onetrust-accept-btn-handler"))
|
||||
)
|
||||
accept_btn.click()
|
||||
print("[DeltaIns login] Dismissed cookie consent banner")
|
||||
time.sleep(1)
|
||||
except TimeoutException:
|
||||
print("[DeltaIns login] No cookie consent banner found")
|
||||
except Exception as e:
|
||||
print(f"[DeltaIns login] Error dismissing cookie banner: {e}")
|
||||
|
||||
def _force_logout(self):
|
||||
try:
|
||||
print("[DeltaIns login] Forcing logout due to credential change...")
|
||||
browser_manager = get_browser_manager()
|
||||
try:
|
||||
self.driver.delete_all_cookies()
|
||||
print("[DeltaIns login] Cleared all cookies")
|
||||
except Exception as e:
|
||||
print(f"[DeltaIns login] Error clearing cookies: {e}")
|
||||
browser_manager.clear_credentials_hash()
|
||||
print("[DeltaIns login] Logout complete")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"[DeltaIns login] Error during forced logout: {e}")
|
||||
return False
|
||||
|
||||
def login(self, url):
|
||||
"""
|
||||
Multi-step login flow for DeltaIns (Okta-based):
|
||||
1. Enter username (name='identifier') -> click Next
|
||||
2. Enter password (type='password') -> click Submit
|
||||
3. Handle MFA: click 'Send me an email' -> wait for OTP
|
||||
Returns: ALREADY_LOGGED_IN, SUCCESS, OTP_REQUIRED, or ERROR:...
|
||||
"""
|
||||
wait = WebDriverWait(self.driver, 30)
|
||||
browser_manager = get_browser_manager()
|
||||
|
||||
try:
|
||||
if self.deltains_username and browser_manager.credentials_changed(self.deltains_username):
|
||||
self._force_logout()
|
||||
self.driver.get(url)
|
||||
time.sleep(3)
|
||||
|
||||
# First, try navigating to provider-tools directly (not login URL)
|
||||
# This avoids triggering Okta password re-verification when session is valid
|
||||
try:
|
||||
current_url = self.driver.current_url
|
||||
print(f"[DeltaIns login] Current URL: {current_url}")
|
||||
if "provider-tools" in current_url and "login" not in current_url.lower() and "ciam" not in current_url.lower():
|
||||
print("[DeltaIns login] Already on provider tools page - logged in")
|
||||
return "ALREADY_LOGGED_IN"
|
||||
except Exception as e:
|
||||
print(f"[DeltaIns login] Error checking current state: {e}")
|
||||
|
||||
# Navigate to provider-tools URL first to check if session is still valid
|
||||
print("[DeltaIns login] Trying provider-tools URL to check session...")
|
||||
self.driver.get(PROVIDER_TOOLS_URL)
|
||||
time.sleep(5)
|
||||
|
||||
current_url = self.driver.current_url
|
||||
print(f"[DeltaIns login] After provider-tools nav URL: {current_url}")
|
||||
|
||||
if "provider-tools" in current_url and "login" not in current_url.lower() and "ciam" not in current_url.lower():
|
||||
print("[DeltaIns login] Session still valid - already logged in")
|
||||
return "ALREADY_LOGGED_IN"
|
||||
|
||||
# Session expired or not logged in - navigate to login URL
|
||||
print("[DeltaIns login] Session not valid, navigating to login page...")
|
||||
self.driver.get(url)
|
||||
time.sleep(3)
|
||||
|
||||
current_url = self.driver.current_url
|
||||
print(f"[DeltaIns login] After login nav URL: {current_url}")
|
||||
|
||||
if "provider-tools" in current_url and "login" not in current_url.lower() and "ciam" not in current_url.lower():
|
||||
print("[DeltaIns login] Already logged in - on provider tools")
|
||||
return "ALREADY_LOGGED_IN"
|
||||
|
||||
self._dismiss_cookie_banner()
|
||||
|
||||
# Step 1: Username entry (name='identifier')
|
||||
print("[DeltaIns login] Looking for username field...")
|
||||
username_entered = False
|
||||
for sel in [
|
||||
(By.NAME, "identifier"),
|
||||
(By.ID, "okta-signin-username"),
|
||||
(By.XPATH, "//input[@type='text' and @autocomplete='username']"),
|
||||
(By.XPATH, "//input[@type='text']"),
|
||||
]:
|
||||
try:
|
||||
field = WebDriverWait(self.driver, 8).until(EC.presence_of_element_located(sel))
|
||||
if field.is_displayed():
|
||||
field.clear()
|
||||
field.send_keys(self.deltains_username)
|
||||
username_entered = True
|
||||
print(f"[DeltaIns login] Username entered via {sel}")
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if not username_entered:
|
||||
return "ERROR: Could not find username field"
|
||||
|
||||
# Click Next/Submit
|
||||
time.sleep(1)
|
||||
for sel in [
|
||||
(By.XPATH, "//input[@type='submit' and @value='Next']"),
|
||||
(By.XPATH, "//input[@type='submit']"),
|
||||
(By.XPATH, "//button[@type='submit']"),
|
||||
]:
|
||||
try:
|
||||
btn = self.driver.find_element(*sel)
|
||||
if btn.is_displayed():
|
||||
btn.click()
|
||||
print(f"[DeltaIns login] Clicked Next via {sel}")
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
time.sleep(4)
|
||||
|
||||
current_url = self.driver.current_url
|
||||
if "provider-tools" in current_url and "login" not in current_url.lower() and "ciam" not in current_url.lower():
|
||||
return "ALREADY_LOGGED_IN"
|
||||
|
||||
# Step 2: Password entry
|
||||
print("[DeltaIns login] Looking for password field...")
|
||||
pw_entered = False
|
||||
for sel in [
|
||||
(By.XPATH, "//input[@type='password']"),
|
||||
(By.ID, "okta-signin-password"),
|
||||
(By.NAME, "password"),
|
||||
]:
|
||||
try:
|
||||
field = WebDriverWait(self.driver, 10).until(EC.presence_of_element_located(sel))
|
||||
if field.is_displayed():
|
||||
field.clear()
|
||||
field.send_keys(self.deltains_password)
|
||||
pw_entered = True
|
||||
print(f"[DeltaIns login] Password entered via {sel}")
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if not pw_entered:
|
||||
current_url = self.driver.current_url
|
||||
if "provider-tools" in current_url and "login" not in current_url.lower():
|
||||
return "ALREADY_LOGGED_IN"
|
||||
return "ERROR: Password field not found"
|
||||
|
||||
# Click Sign In
|
||||
time.sleep(1)
|
||||
for sel in [
|
||||
(By.ID, "okta-signin-submit"),
|
||||
(By.XPATH, "//input[@type='submit']"),
|
||||
(By.XPATH, "//button[@type='submit']"),
|
||||
]:
|
||||
try:
|
||||
btn = self.driver.find_element(*sel)
|
||||
if btn.is_displayed():
|
||||
btn.click()
|
||||
print(f"[DeltaIns login] Clicked Sign In via {sel}")
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if self.deltains_username:
|
||||
browser_manager.save_credentials_hash(self.deltains_username)
|
||||
|
||||
time.sleep(6)
|
||||
|
||||
current_url = self.driver.current_url
|
||||
print(f"[DeltaIns login] After password submit URL: {current_url}")
|
||||
|
||||
if "provider-tools" in current_url and "login" not in current_url.lower() and "ciam" not in current_url.lower():
|
||||
print("[DeltaIns login] Login successful - on provider tools")
|
||||
return "SUCCESS"
|
||||
|
||||
# Step 3: MFA handling
|
||||
# There are two possible MFA pages:
|
||||
# A) Method selection: "Verify it's you with a security method" with Email/Phone Select buttons
|
||||
# B) Direct: "Send me an email" button
|
||||
print("[DeltaIns login] Handling MFA...")
|
||||
|
||||
# Check for method selection page first (Email "Select" link)
|
||||
# The Okta MFA page uses <a> tags (not buttons/inputs) with class "select-factor"
|
||||
# inside <div data-se="okta_email"> for Email selection
|
||||
try:
|
||||
body_text = self.driver.find_element(By.TAG_NAME, "body").text
|
||||
if "security method" in body_text.lower() or "select from the following" in body_text.lower():
|
||||
print("[DeltaIns login] MFA method selection page detected")
|
||||
email_select = None
|
||||
for sel in [
|
||||
(By.CSS_SELECTOR, "div[data-se='okta_email'] a.select-factor"),
|
||||
(By.XPATH, "//div[@data-se='okta_email']//a[contains(@class,'select-factor')]"),
|
||||
(By.XPATH, "//a[contains(@aria-label,'Select Email')]"),
|
||||
(By.XPATH, "//div[@data-se='okta_email']//a[@data-se='button']"),
|
||||
(By.CSS_SELECTOR, "a.select-factor.link-button"),
|
||||
]:
|
||||
try:
|
||||
btn = self.driver.find_element(*sel)
|
||||
if btn.is_displayed():
|
||||
email_select = btn
|
||||
print(f"[DeltaIns login] Found Email Select via {sel}")
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if email_select:
|
||||
email_select.click()
|
||||
print("[DeltaIns login] Clicked 'Select' for Email MFA")
|
||||
time.sleep(5)
|
||||
else:
|
||||
print("[DeltaIns login] Could not find Email Select button")
|
||||
except Exception as e:
|
||||
print(f"[DeltaIns login] Error checking MFA method selection: {e}")
|
||||
|
||||
# Now look for "Send me an email" button (may appear after method selection or directly)
|
||||
try:
|
||||
send_btn = WebDriverWait(self.driver, 8).until(
|
||||
EC.element_to_be_clickable((By.XPATH,
|
||||
"//input[@type='submit' and @value='Send me an email'] | "
|
||||
"//input[@value='Send me an email'] | "
|
||||
"//button[contains(text(),'Send me an email')]"))
|
||||
)
|
||||
send_btn.click()
|
||||
print("[DeltaIns login] Clicked 'Send me an email'")
|
||||
time.sleep(5)
|
||||
except TimeoutException:
|
||||
print("[DeltaIns login] No 'Send me an email' button, checking for OTP input...")
|
||||
|
||||
# Step 4: OTP entry page
|
||||
try:
|
||||
otp_input = WebDriverWait(self.driver, 10).until(
|
||||
EC.presence_of_element_located((By.XPATH,
|
||||
"//input[@name='credentials.passcode' and @type='text'] | "
|
||||
"//input[contains(@name,'passcode')]"))
|
||||
)
|
||||
print("[DeltaIns login] OTP input found -> OTP_REQUIRED")
|
||||
return "OTP_REQUIRED"
|
||||
except TimeoutException:
|
||||
pass
|
||||
|
||||
current_url = self.driver.current_url
|
||||
if "provider-tools" in current_url and "login" not in current_url.lower() and "ciam" not in current_url.lower():
|
||||
return "SUCCESS"
|
||||
|
||||
try:
|
||||
error_elem = self.driver.find_element(By.XPATH,
|
||||
"//*[contains(@class,'error') or contains(@class,'alert-error')]")
|
||||
error_text = error_elem.text.strip()[:200]
|
||||
if error_text:
|
||||
return f"ERROR: {error_text}"
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
print("[DeltaIns login] Could not determine login state - returning OTP_REQUIRED as fallback")
|
||||
return "OTP_REQUIRED"
|
||||
|
||||
except Exception as e:
|
||||
print(f"[DeltaIns login] Exception: {e}")
|
||||
return f"ERROR:LOGIN FAILED: {e}"
|
||||
|
||||
def _format_dob(self, dob_str):
|
||||
"""Convert DOB from YYYY-MM-DD to MM/DD/YYYY format."""
|
||||
if dob_str and "-" in dob_str:
|
||||
dob_parts = dob_str.split("-")
|
||||
if len(dob_parts) == 3:
|
||||
return f"{dob_parts[1]}/{dob_parts[2]}/{dob_parts[0]}"
|
||||
return dob_str
|
||||
|
||||
def _close_browser(self):
|
||||
"""Save cookies and close the browser after task completion."""
|
||||
browser_manager = get_browser_manager()
|
||||
try:
|
||||
browser_manager.save_cookies()
|
||||
except Exception as e:
|
||||
print(f"[DeltaIns] Failed to save cookies before close: {e}")
|
||||
try:
|
||||
browser_manager.quit_driver()
|
||||
print("[DeltaIns] Browser closed")
|
||||
except Exception as e:
|
||||
print(f"[DeltaIns] Could not close browser: {e}")
|
||||
|
||||
def step1(self):
|
||||
"""
|
||||
Navigate to Eligibility search, enter patient info, search, and
|
||||
click 'Check eligibility and benefits' on the result card.
|
||||
|
||||
Search flow:
|
||||
1. Click 'Eligibility and benefits' link
|
||||
2. Click 'Search for a new patient' button
|
||||
3. Click 'Search by member ID' tab
|
||||
4. Enter Member ID in #memberId
|
||||
5. Enter DOB in #dob (MM/DD/YYYY)
|
||||
6. Click Search
|
||||
7. Extract patient info from result card
|
||||
8. Click 'Check eligibility and benefits'
|
||||
"""
|
||||
try:
|
||||
formatted_dob = self._format_dob(self.dateOfBirth)
|
||||
print(f"[DeltaIns step1] Starting — memberId={self.memberId}, DOB={formatted_dob}")
|
||||
|
||||
# 1. Click "Eligibility and benefits" link
|
||||
print("[DeltaIns step1] Clicking 'Eligibility and benefits'...")
|
||||
try:
|
||||
elig_link = WebDriverWait(self.driver, 15).until(
|
||||
EC.element_to_be_clickable((By.XPATH,
|
||||
"//a[contains(text(),'Eligibility and benefits')] | "
|
||||
"//a[contains(text(),'Eligibility')]"))
|
||||
)
|
||||
elig_link.click()
|
||||
time.sleep(5)
|
||||
print("[DeltaIns step1] Clicked Eligibility link")
|
||||
except TimeoutException:
|
||||
print("[DeltaIns step1] No Eligibility link found, checking if already on page...")
|
||||
if "patient-search" not in self.driver.current_url and "eligibility" not in self.driver.current_url:
|
||||
self.driver.get("https://www.deltadentalins.com/provider-tools/v2/patient-search")
|
||||
time.sleep(5)
|
||||
|
||||
# 2. Click "Search for a new patient" button
|
||||
print("[DeltaIns step1] Clicking 'Search for a new patient'...")
|
||||
try:
|
||||
new_patient_btn = WebDriverWait(self.driver, 10).until(
|
||||
EC.element_to_be_clickable((By.XPATH,
|
||||
"//button[contains(text(),'Search for a new patient')]"))
|
||||
)
|
||||
new_patient_btn.click()
|
||||
time.sleep(3)
|
||||
print("[DeltaIns step1] Clicked 'Search for a new patient'")
|
||||
except TimeoutException:
|
||||
print("[DeltaIns step1] 'Search for a new patient' button not found - may already be on search page")
|
||||
|
||||
# 3. Click "Search by member ID" tab
|
||||
print("[DeltaIns step1] Clicking 'Search by member ID' tab...")
|
||||
try:
|
||||
member_id_tab = WebDriverWait(self.driver, 10).until(
|
||||
EC.element_to_be_clickable((By.XPATH,
|
||||
"//button[contains(text(),'Search by member ID')]"))
|
||||
)
|
||||
member_id_tab.click()
|
||||
time.sleep(2)
|
||||
print("[DeltaIns step1] Clicked 'Search by member ID' tab")
|
||||
except TimeoutException:
|
||||
print("[DeltaIns step1] 'Search by member ID' tab not found")
|
||||
return "ERROR: Could not find 'Search by member ID' tab"
|
||||
|
||||
# 4. Enter Member ID
|
||||
print(f"[DeltaIns step1] Entering Member ID: {self.memberId}")
|
||||
try:
|
||||
mid_field = WebDriverWait(self.driver, 10).until(
|
||||
EC.presence_of_element_located((By.ID, "memberId"))
|
||||
)
|
||||
mid_field.click()
|
||||
mid_field.send_keys(Keys.CONTROL + "a")
|
||||
mid_field.send_keys(Keys.DELETE)
|
||||
time.sleep(0.3)
|
||||
mid_field.send_keys(self.memberId)
|
||||
time.sleep(0.5)
|
||||
print(f"[DeltaIns step1] Member ID entered: '{mid_field.get_attribute('value')}'")
|
||||
except TimeoutException:
|
||||
return "ERROR: Member ID field not found"
|
||||
|
||||
# 5. Enter DOB
|
||||
print(f"[DeltaIns step1] Entering DOB: {formatted_dob}")
|
||||
try:
|
||||
dob_field = self.driver.find_element(By.ID, "dob")
|
||||
dob_field.click()
|
||||
dob_field.send_keys(Keys.CONTROL + "a")
|
||||
dob_field.send_keys(Keys.DELETE)
|
||||
time.sleep(0.3)
|
||||
dob_field.send_keys(formatted_dob)
|
||||
time.sleep(0.5)
|
||||
print(f"[DeltaIns step1] DOB entered: '{dob_field.get_attribute('value')}'")
|
||||
except Exception as e:
|
||||
return f"ERROR: DOB field not found: {e}"
|
||||
|
||||
# 6. Click Search
|
||||
print("[DeltaIns step1] Clicking Search...")
|
||||
try:
|
||||
search_btn = self.driver.find_element(By.XPATH,
|
||||
"//button[@type='submit'][contains(text(),'Search')] | "
|
||||
"//button[@data-testid='searchButton']")
|
||||
search_btn.click()
|
||||
time.sleep(10)
|
||||
print("[DeltaIns step1] Search clicked")
|
||||
except Exception as e:
|
||||
return f"ERROR: Search button not found: {e}"
|
||||
|
||||
# 7. Check for results - look for patient card
|
||||
print("[DeltaIns step1] Checking for results...")
|
||||
try:
|
||||
patient_card = WebDriverWait(self.driver, 15).until(
|
||||
EC.presence_of_element_located((By.XPATH,
|
||||
"//div[contains(@class,'patient-card-root')] | "
|
||||
"//div[@data-testid='patientCard'] | "
|
||||
"//div[starts-with(@data-testid,'patientCard')]"))
|
||||
)
|
||||
print("[DeltaIns step1] Patient card found!")
|
||||
|
||||
# Extract patient name
|
||||
try:
|
||||
name_el = patient_card.find_element(By.XPATH, ".//h3")
|
||||
patient_name = name_el.text.strip()
|
||||
print(f"[DeltaIns step1] Patient name: {patient_name}")
|
||||
except Exception:
|
||||
patient_name = ""
|
||||
|
||||
# Extract eligibility dates
|
||||
try:
|
||||
elig_el = patient_card.find_element(By.XPATH,
|
||||
".//*[@data-testid='patientCardMemberEligibility']//*[contains(@class,'pt-staticfield-text')]")
|
||||
elig_text = elig_el.text.strip()
|
||||
print(f"[DeltaIns step1] Eligibility: {elig_text}")
|
||||
except Exception:
|
||||
elig_text = ""
|
||||
|
||||
# Store for step2
|
||||
self._patient_name = patient_name
|
||||
self._eligibility_text = elig_text
|
||||
|
||||
except TimeoutException:
|
||||
# Check for error messages
|
||||
try:
|
||||
body_text = self.driver.find_element(By.TAG_NAME, "body").text
|
||||
if "no results" in body_text.lower() or "not found" in body_text.lower() or "no patient" in body_text.lower():
|
||||
return "ERROR: No patient found with the provided Member ID and DOB"
|
||||
# Check for specific error alerts
|
||||
alerts = self.driver.find_elements(By.XPATH, "//*[@role='alert']")
|
||||
for alert in alerts:
|
||||
if alert.is_displayed():
|
||||
return f"ERROR: {alert.text.strip()[:200]}"
|
||||
except Exception:
|
||||
pass
|
||||
return "ERROR: No patient results found within timeout"
|
||||
|
||||
# 8. Click "Check eligibility and benefits"
|
||||
print("[DeltaIns step1] Clicking 'Check eligibility and benefits'...")
|
||||
try:
|
||||
check_btn = WebDriverWait(self.driver, 10).until(
|
||||
EC.element_to_be_clickable((By.XPATH,
|
||||
"//button[contains(text(),'Check eligibility and benefits')] | "
|
||||
"//button[@data-testid='eligibilityBenefitsButton']"))
|
||||
)
|
||||
check_btn.click()
|
||||
time.sleep(10)
|
||||
print(f"[DeltaIns step1] Navigated to: {self.driver.current_url}")
|
||||
except TimeoutException:
|
||||
return "ERROR: 'Check eligibility and benefits' button not found"
|
||||
|
||||
return "SUCCESS"
|
||||
|
||||
except Exception as e:
|
||||
print(f"[DeltaIns step1] Exception: {e}")
|
||||
return f"ERROR: step1 failed: {e}"
|
||||
|
||||
def step2(self):
|
||||
"""
|
||||
Extract eligibility information and capture PDF from the
|
||||
Eligibility & Benefits detail page.
|
||||
|
||||
URL: .../provider-tools/v2/eligibility-benefits
|
||||
|
||||
Extracts:
|
||||
- Patient name from h3 in patient-card-header
|
||||
- DOB, Member ID, eligibility from data-testid fields
|
||||
- PDF via Page.printToPDF
|
||||
"""
|
||||
try:
|
||||
print("[DeltaIns step2] Extracting eligibility data...")
|
||||
time.sleep(3)
|
||||
|
||||
current_url = self.driver.current_url
|
||||
print(f"[DeltaIns step2] URL: {current_url}")
|
||||
|
||||
if "eligibility-benefits" not in current_url:
|
||||
print("[DeltaIns step2] Not on eligibility page, checking body text...")
|
||||
|
||||
# Extract patient name
|
||||
patientName = ""
|
||||
try:
|
||||
name_el = self.driver.find_element(By.XPATH,
|
||||
"//div[contains(@class,'patient-card-header')]//h3 | "
|
||||
"//div[starts-with(@data-testid,'patientCard')]//h3")
|
||||
patientName = name_el.text.strip()
|
||||
print(f"[DeltaIns step2] Patient name: {patientName}")
|
||||
except Exception:
|
||||
patientName = getattr(self, '_patient_name', '') or f"{self.firstName} {self.lastName}".strip()
|
||||
print(f"[DeltaIns step2] Using stored/fallback name: {patientName}")
|
||||
|
||||
# Extract DOB from card
|
||||
extractedDob = ""
|
||||
try:
|
||||
dob_el = self.driver.find_element(By.XPATH,
|
||||
"//*[@data-testid='patientCardDateOfBirth']//*[contains(@class,'pt-staticfield-text')]")
|
||||
extractedDob = dob_el.text.strip()
|
||||
print(f"[DeltaIns step2] DOB: {extractedDob}")
|
||||
except Exception:
|
||||
extractedDob = self._format_dob(self.dateOfBirth)
|
||||
|
||||
# Extract Member ID from card
|
||||
foundMemberId = ""
|
||||
try:
|
||||
mid_el = self.driver.find_element(By.XPATH,
|
||||
"//*[@data-testid='patientCardMemberId']//*[contains(@class,'pt-staticfield-text')]")
|
||||
foundMemberId = mid_el.text.strip()
|
||||
print(f"[DeltaIns step2] Member ID: {foundMemberId}")
|
||||
except Exception:
|
||||
foundMemberId = self.memberId
|
||||
|
||||
# Extract eligibility status
|
||||
eligibility = "Unknown"
|
||||
try:
|
||||
elig_el = self.driver.find_element(By.XPATH,
|
||||
"//*[@data-testid='patientCardMemberEligibility']//*[contains(@class,'pt-staticfield-text')]")
|
||||
elig_text = elig_el.text.strip()
|
||||
print(f"[DeltaIns step2] Eligibility text: {elig_text}")
|
||||
|
||||
if "present" in elig_text.lower():
|
||||
eligibility = "Eligible"
|
||||
elif elig_text:
|
||||
eligibility = elig_text
|
||||
except Exception:
|
||||
elig_text = getattr(self, '_eligibility_text', '')
|
||||
if elig_text and "present" in elig_text.lower():
|
||||
eligibility = "Eligible"
|
||||
elif elig_text:
|
||||
eligibility = elig_text
|
||||
|
||||
# Check page body for additional eligibility info
|
||||
try:
|
||||
body_text = self.driver.find_element(By.TAG_NAME, "body").text
|
||||
if "not eligible" in body_text.lower():
|
||||
eligibility = "Not Eligible"
|
||||
elif "terminated" in body_text.lower():
|
||||
eligibility = "Terminated"
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Capture PDF via "Download summary" -> "Download PDF" button
|
||||
pdfBase64 = ""
|
||||
try:
|
||||
existing_files = set(glob.glob(os.path.join(self.download_dir, "*")))
|
||||
|
||||
dl_link = WebDriverWait(self.driver, 10).until(
|
||||
EC.element_to_be_clickable((By.XPATH,
|
||||
"//a[@data-testid='downloadBenefitSummaryLink']"))
|
||||
)
|
||||
dl_link.click()
|
||||
print("[DeltaIns step2] Clicked 'Download summary'")
|
||||
time.sleep(3)
|
||||
|
||||
dl_btn = WebDriverWait(self.driver, 10).until(
|
||||
EC.element_to_be_clickable((By.XPATH,
|
||||
"//button[@data-testid='downloadPdfButton']"))
|
||||
)
|
||||
dl_btn.click()
|
||||
print("[DeltaIns step2] Clicked 'Download PDF'")
|
||||
|
||||
pdf_path = None
|
||||
for i in range(30):
|
||||
time.sleep(2)
|
||||
current_files = set(glob.glob(os.path.join(self.download_dir, "*")))
|
||||
new_files = current_files - existing_files
|
||||
completed = [f for f in new_files
|
||||
if not f.endswith(".crdownload") and not f.endswith(".tmp")]
|
||||
if completed:
|
||||
pdf_path = completed[0]
|
||||
break
|
||||
|
||||
if pdf_path and os.path.exists(pdf_path):
|
||||
with open(pdf_path, "rb") as f:
|
||||
pdfBase64 = base64.b64encode(f.read()).decode()
|
||||
print(f"[DeltaIns step2] PDF downloaded: {os.path.basename(pdf_path)} "
|
||||
f"({os.path.getsize(pdf_path)} bytes), b64 len={len(pdfBase64)}")
|
||||
try:
|
||||
os.remove(pdf_path)
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
print("[DeltaIns step2] Download PDF timed out, falling back to CDP")
|
||||
cdp_result = self.driver.execute_cdp_cmd("Page.printToPDF", {
|
||||
"printBackground": True,
|
||||
"preferCSSPageSize": True,
|
||||
"scale": 0.7,
|
||||
"paperWidth": 11,
|
||||
"paperHeight": 17,
|
||||
})
|
||||
pdfBase64 = cdp_result.get("data", "")
|
||||
print(f"[DeltaIns step2] CDP fallback PDF, b64 len={len(pdfBase64)}")
|
||||
|
||||
# Dismiss the download modal
|
||||
try:
|
||||
self.driver.find_element(By.TAG_NAME, "body").send_keys(Keys.ESCAPE)
|
||||
time.sleep(1)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
except Exception as e:
|
||||
print(f"[DeltaIns step2] PDF capture failed: {e}")
|
||||
try:
|
||||
cdp_result = self.driver.execute_cdp_cmd("Page.printToPDF", {
|
||||
"printBackground": True,
|
||||
"preferCSSPageSize": True,
|
||||
"scale": 0.7,
|
||||
"paperWidth": 11,
|
||||
"paperHeight": 17,
|
||||
})
|
||||
pdfBase64 = cdp_result.get("data", "")
|
||||
print(f"[DeltaIns step2] CDP fallback PDF, b64 len={len(pdfBase64)}")
|
||||
except Exception as e2:
|
||||
print(f"[DeltaIns step2] CDP fallback also failed: {e2}")
|
||||
|
||||
# Hide browser after completion
|
||||
self._close_browser()
|
||||
|
||||
result = {
|
||||
"status": "success",
|
||||
"patientName": patientName,
|
||||
"eligibility": eligibility,
|
||||
"pdfBase64": pdfBase64,
|
||||
"extractedDob": extractedDob,
|
||||
"memberId": foundMemberId,
|
||||
}
|
||||
|
||||
print(f"[DeltaIns step2] Result: name={result['patientName']}, "
|
||||
f"eligibility={result['eligibility']}, "
|
||||
f"memberId={result['memberId']}")
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
print(f"[DeltaIns step2] Exception: {e}")
|
||||
self._close_browser()
|
||||
return {
|
||||
"status": "error",
|
||||
"patientName": getattr(self, '_patient_name', '') or f"{self.firstName} {self.lastName}".strip(),
|
||||
"eligibility": "Unknown",
|
||||
"pdfBase64": "",
|
||||
"extractedDob": self._format_dob(self.dateOfBirth),
|
||||
"memberId": self.memberId,
|
||||
"error": str(e),
|
||||
}
|
||||
Reference in New Issue
Block a user