Merge remote-tracking branch 'kai-gitea/develop' into develop

This commit is contained in:
2026-01-28 03:46:51 +05:30
16 changed files with 2793 additions and 25 deletions

View File

@@ -9,10 +9,26 @@ from selenium_preAuthWorker import AutomationMassHealthPreAuth
import os
import time
import helpers_ddma_eligibility as hddma
import helpers_dentaquest_eligibility as hdentaquest
# Import session clear functions for startup
from ddma_browser_manager import clear_ddma_session_on_startup
from dentaquest_browser_manager import clear_dentaquest_session_on_startup
from dotenv import load_dotenv
load_dotenv()
# Clear all sessions on startup (after PC restart)
# This ensures users must login again after PC restart
print("=" * 50)
print("SELENIUM AGENT STARTING - CLEARING ALL SESSIONS")
print("=" * 50)
clear_ddma_session_on_startup()
clear_dentaquest_session_on_startup()
print("=" * 50)
print("SESSION CLEAR COMPLETE - FRESH LOGINS REQUIRED")
print("=" * 50)
app = FastAPI()
# Allow 1 selenium session at a time
semaphore = asyncio.Semaphore(1)
@@ -186,6 +202,79 @@ async def ddma_eligibility(request: Request):
return {"status": "started", "session_id": sid}
# Endpoint:6 - DentaQuest eligibility (background, OTP)
async def _dentaquest_worker_wrapper(sid: str, data: dict, url: str):
"""
Background worker that:
- acquires semaphore (to keep 1 selenium at a time),
- updates active/queued counters,
- runs the DentaQuest flow via helpers.start_dentaquest_run.
"""
global active_jobs, waiting_jobs
async with semaphore:
async with lock:
waiting_jobs -= 1
active_jobs += 1
try:
await hdentaquest.start_dentaquest_run(sid, data, url)
finally:
async with lock:
active_jobs -= 1
@app.post("/dentaquest-eligibility")
async def dentaquest_eligibility(request: Request):
"""
Starts a DentaQuest eligibility session in the background.
Body: { "data": { ... }, "url"?: string }
Returns: { status: "started", session_id: "<uuid>" }
"""
global waiting_jobs
body = await request.json()
data = body.get("data", {})
# create session
sid = hdentaquest.make_session_entry()
hdentaquest.sessions[sid]["type"] = "dentaquest_eligibility"
hdentaquest.sessions[sid]["last_activity"] = time.time()
async with lock:
waiting_jobs += 1
# run in background (queued under semaphore)
asyncio.create_task(_dentaquest_worker_wrapper(sid, data, url="https://providers.dentaquest.com/onboarding/start/"))
return {"status": "started", "session_id": sid}
@app.post("/dentaquest-submit-otp")
async def dentaquest_submit_otp(request: Request):
"""
Body: { "session_id": "<sid>", "otp": "123456" }
Node / frontend call this when user provides OTP for DentaQuest.
"""
body = await request.json()
sid = body.get("session_id")
otp = body.get("otp")
if not sid or not otp:
raise HTTPException(status_code=400, detail="session_id and otp required")
res = hdentaquest.submit_otp(sid, otp)
if res.get("status") == "error":
raise HTTPException(status_code=400, detail=res.get("message"))
return res
@app.get("/dentaquest-session/{sid}/status")
async def dentaquest_session_status(sid: str):
s = hdentaquest.get_session_status(sid)
if s.get("status") == "not_found":
raise HTTPException(status_code=404, detail="session not found")
return s
@app.post("/submit-otp")
async def submit_otp(request: Request):
"""

View File

@@ -1,19 +1,28 @@
"""
Minimal browser manager for DDMA - only handles persistent profile and keeping browser alive.
Does NOT modify any login/OTP logic.
Clears session cookies on startup (after PC restart) to force fresh login.
Tracks credentials to detect changes mid-session.
"""
import os
import glob
import shutil
import hashlib
import threading
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
# Ensure DISPLAY is set for Chrome to work (needed when running from SSH/background)
if not os.environ.get("DISPLAY"):
os.environ["DISPLAY"] = ":0"
class DDMABrowserManager:
"""
Singleton that manages a persistent Chrome browser instance.
- Uses --user-data-dir for persistent profile (device trust tokens, cookies)
- Keeps browser alive between patient runs
- Uses --user-data-dir for persistent profile (device trust tokens)
- Clears session cookies on startup (after PC restart)
- Tracks credentials to detect changes mid-session
"""
_instance = None
_lock = threading.Lock()
@@ -25,31 +34,188 @@ class DDMABrowserManager:
cls._instance._driver = None
cls._instance.profile_dir = os.path.abspath("chrome_profile_ddma")
cls._instance.download_dir = os.path.abspath("seleniumDownloads")
cls._instance._credentials_file = os.path.join(cls._instance.profile_dir, ".last_credentials")
cls._instance._needs_session_clear = False # Flag to clear session on next driver creation
os.makedirs(cls._instance.profile_dir, exist_ok=True)
os.makedirs(cls._instance.download_dir, exist_ok=True)
return cls._instance
def clear_session_on_startup(self):
"""
Clear session cookies from Chrome profile on startup.
This forces a fresh login after PC restart.
Preserves device trust tokens (LocalStorage, IndexedDB) to avoid OTPs.
"""
print("[DDMA BrowserManager] Clearing session on startup...")
try:
# Clear the credentials tracking file
if os.path.exists(self._credentials_file):
os.remove(self._credentials_file)
print("[DDMA BrowserManager] Cleared credentials tracking file")
# Clear session-related files from Chrome profile
# These are the files that store login session cookies
session_files = [
"Cookies",
"Cookies-journal",
"Login Data",
"Login Data-journal",
"Web Data",
"Web Data-journal",
]
for filename in session_files:
filepath = os.path.join(self.profile_dir, "Default", filename)
if os.path.exists(filepath):
try:
os.remove(filepath)
print(f"[DDMA BrowserManager] Removed {filename}")
except Exception as e:
print(f"[DDMA BrowserManager] Could not remove {filename}: {e}")
# Also try root level (some Chrome versions)
for filename in session_files:
filepath = os.path.join(self.profile_dir, filename)
if os.path.exists(filepath):
try:
os.remove(filepath)
print(f"[DDMA BrowserManager] Removed root {filename}")
except Exception as e:
print(f"[DDMA BrowserManager] Could not remove root {filename}: {e}")
# Clear Session Storage (contains login state)
session_storage_dir = os.path.join(self.profile_dir, "Default", "Session Storage")
if os.path.exists(session_storage_dir):
try:
shutil.rmtree(session_storage_dir)
print("[DDMA BrowserManager] Cleared Session Storage")
except Exception as e:
print(f"[DDMA BrowserManager] Could not clear Session Storage: {e}")
# Clear Local Storage (may contain auth tokens)
local_storage_dir = os.path.join(self.profile_dir, "Default", "Local Storage")
if os.path.exists(local_storage_dir):
try:
shutil.rmtree(local_storage_dir)
print("[DDMA BrowserManager] Cleared Local Storage")
except Exception as e:
print(f"[DDMA BrowserManager] Could not clear Local Storage: {e}")
# Clear IndexedDB (may contain auth tokens)
indexeddb_dir = os.path.join(self.profile_dir, "Default", "IndexedDB")
if os.path.exists(indexeddb_dir):
try:
shutil.rmtree(indexeddb_dir)
print("[DDMA BrowserManager] Cleared IndexedDB")
except Exception as e:
print(f"[DDMA BrowserManager] Could not clear IndexedDB: {e}")
# Set flag to clear session via JavaScript after browser opens
self._needs_session_clear = True
print("[DDMA BrowserManager] Session cleared - will require fresh login")
except Exception as e:
print(f"[DDMA BrowserManager] Error clearing session: {e}")
def _hash_credentials(self, username: str) -> str:
"""Create a hash of the username to track credential changes."""
return hashlib.sha256(username.encode()).hexdigest()[:16]
def get_last_credentials_hash(self) -> str | None:
"""Get the hash of the last-used credentials."""
try:
if os.path.exists(self._credentials_file):
with open(self._credentials_file, 'r') as f:
return f.read().strip()
except Exception:
pass
return None
def save_credentials_hash(self, username: str):
"""Save the hash of the current credentials."""
try:
cred_hash = self._hash_credentials(username)
with open(self._credentials_file, 'w') as f:
f.write(cred_hash)
except Exception as e:
print(f"[DDMA BrowserManager] Failed to save credentials hash: {e}")
def credentials_changed(self, username: str) -> bool:
"""Check if the credentials have changed since last login."""
last_hash = self.get_last_credentials_hash()
if last_hash is None:
return False # No previous credentials, not a change
current_hash = self._hash_credentials(username)
changed = last_hash != current_hash
if changed:
print(f"[DDMA BrowserManager] Credentials changed - logout required")
return changed
def clear_credentials_hash(self):
"""Clear the saved credentials hash (used after logout)."""
try:
if os.path.exists(self._credentials_file):
os.remove(self._credentials_file)
except Exception as e:
print(f"[DDMA BrowserManager] Failed to clear credentials hash: {e}")
def _kill_existing_chrome_for_profile(self):
"""Kill any existing Chrome processes using this profile and clean up locks."""
import subprocess
import time as time_module
try:
# Find and kill Chrome processes using this profile
result = subprocess.run(
["pgrep", "-f", f"user-data-dir={self.profile_dir}"],
capture_output=True, text=True
)
if result.stdout.strip():
pids = result.stdout.strip().split('\n')
for pid in pids:
try:
subprocess.run(["kill", "-9", pid], check=False)
except:
pass
time_module.sleep(1)
except Exception:
pass
# Remove lock files if they exist
for lock_file in ["SingletonLock", "SingletonSocket", "SingletonCookie"]:
lock_path = os.path.join(self.profile_dir, lock_file)
try:
if os.path.islink(lock_path) or os.path.exists(lock_path):
os.remove(lock_path)
except:
pass
def get_driver(self, headless=False):
"""Get or create the persistent browser instance."""
with self._lock:
if self._driver is None:
print("[BrowserManager] Driver is None, creating new driver")
print("[DDMA BrowserManager] Driver is None, creating new driver")
self._kill_existing_chrome_for_profile()
self._create_driver(headless)
elif not self._is_alive():
print("[BrowserManager] Driver not alive, recreating")
print("[DDMA BrowserManager] Driver not alive, recreating")
self._kill_existing_chrome_for_profile()
self._create_driver(headless)
else:
print("[BrowserManager] Reusing existing driver")
print("[DDMA BrowserManager] Reusing existing driver")
return self._driver
def _is_alive(self):
"""Check if browser is still responsive."""
try:
if self._driver is None:
return False
url = self._driver.current_url
print(f"[BrowserManager] Driver alive, current URL: {url[:50]}...")
print(f"[DDMA BrowserManager] Driver alive, current URL: {url[:50]}...")
return True
except Exception as e:
print(f"[BrowserManager] Driver not alive: {e}")
print(f"[DDMA BrowserManager] Driver not alive: {e}")
return False
def _create_driver(self, headless=False):
@@ -80,6 +246,9 @@ class DDMABrowserManager:
service = Service(ChromeDriverManager().install())
self._driver = webdriver.Chrome(service=service, options=options)
self._driver.maximize_window()
# Reset the session clear flag (file-based clearing is done on startup)
self._needs_session_clear = False
def quit_driver(self):
"""Quit browser (only call on shutdown)."""
@@ -100,3 +269,9 @@ def get_browser_manager():
if _manager is None:
_manager = DDMABrowserManager()
return _manager
def clear_ddma_session_on_startup():
"""Called by agent.py on startup to clear session."""
manager = get_browser_manager()
manager.clear_session_on_startup()

View File

@@ -0,0 +1,277 @@
"""
Minimal browser manager for DentaQuest - only handles persistent profile and keeping browser alive.
Clears session cookies on startup (after PC restart) to force fresh login.
Tracks credentials to detect changes mid-session.
"""
import os
import shutil
import hashlib
import threading
import subprocess
import time
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
# Ensure DISPLAY is set for Chrome to work (needed when running from SSH/background)
if not os.environ.get("DISPLAY"):
os.environ["DISPLAY"] = ":0"
class DentaQuestBrowserManager:
"""
Singleton that manages a persistent Chrome browser instance for DentaQuest.
- Uses --user-data-dir for persistent profile (device trust tokens)
- Clears session cookies on startup (after PC restart)
- Tracks credentials to detect changes mid-session
"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._driver = None
cls._instance.profile_dir = os.path.abspath("chrome_profile_dentaquest")
cls._instance.download_dir = os.path.abspath("seleniumDownloads")
cls._instance._credentials_file = os.path.join(cls._instance.profile_dir, ".last_credentials")
cls._instance._needs_session_clear = False # Flag to clear session on next driver creation
os.makedirs(cls._instance.profile_dir, exist_ok=True)
os.makedirs(cls._instance.download_dir, exist_ok=True)
return cls._instance
def clear_session_on_startup(self):
"""
Clear session cookies from Chrome profile on startup.
This forces a fresh login after PC restart.
Preserves device trust tokens (LocalStorage, IndexedDB) to avoid OTPs.
"""
print("[DentaQuest BrowserManager] Clearing session on startup...")
try:
# Clear the credentials tracking file
if os.path.exists(self._credentials_file):
os.remove(self._credentials_file)
print("[DentaQuest BrowserManager] Cleared credentials tracking file")
# Clear session-related files from Chrome profile
# These are the files that store login session cookies
session_files = [
"Cookies",
"Cookies-journal",
"Login Data",
"Login Data-journal",
"Web Data",
"Web Data-journal",
]
for filename in session_files:
filepath = os.path.join(self.profile_dir, "Default", filename)
if os.path.exists(filepath):
try:
os.remove(filepath)
print(f"[DentaQuest BrowserManager] Removed {filename}")
except Exception as e:
print(f"[DentaQuest BrowserManager] Could not remove {filename}: {e}")
# Also try root level (some Chrome versions)
for filename in session_files:
filepath = os.path.join(self.profile_dir, filename)
if os.path.exists(filepath):
try:
os.remove(filepath)
print(f"[DentaQuest BrowserManager] Removed root {filename}")
except Exception as e:
print(f"[DentaQuest BrowserManager] Could not remove root {filename}: {e}")
# Clear Session Storage (contains login state)
session_storage_dir = os.path.join(self.profile_dir, "Default", "Session Storage")
if os.path.exists(session_storage_dir):
try:
shutil.rmtree(session_storage_dir)
print("[DentaQuest BrowserManager] Cleared Session Storage")
except Exception as e:
print(f"[DentaQuest BrowserManager] Could not clear Session Storage: {e}")
# Clear Local Storage (may contain auth tokens)
local_storage_dir = os.path.join(self.profile_dir, "Default", "Local Storage")
if os.path.exists(local_storage_dir):
try:
shutil.rmtree(local_storage_dir)
print("[DentaQuest BrowserManager] Cleared Local Storage")
except Exception as e:
print(f"[DentaQuest BrowserManager] Could not clear Local Storage: {e}")
# Clear IndexedDB (may contain auth tokens)
indexeddb_dir = os.path.join(self.profile_dir, "Default", "IndexedDB")
if os.path.exists(indexeddb_dir):
try:
shutil.rmtree(indexeddb_dir)
print("[DentaQuest BrowserManager] Cleared IndexedDB")
except Exception as e:
print(f"[DentaQuest BrowserManager] Could not clear IndexedDB: {e}")
# Set flag to clear session via JavaScript after browser opens
self._needs_session_clear = True
print("[DentaQuest BrowserManager] Session cleared - will require fresh login")
except Exception as e:
print(f"[DentaQuest BrowserManager] Error clearing session: {e}")
def _hash_credentials(self, username: str) -> str:
"""Create a hash of the username to track credential changes."""
return hashlib.sha256(username.encode()).hexdigest()[:16]
def get_last_credentials_hash(self) -> str | None:
"""Get the hash of the last-used credentials."""
try:
if os.path.exists(self._credentials_file):
with open(self._credentials_file, 'r') as f:
return f.read().strip()
except Exception:
pass
return None
def save_credentials_hash(self, username: str):
"""Save the hash of the current credentials."""
try:
cred_hash = self._hash_credentials(username)
with open(self._credentials_file, 'w') as f:
f.write(cred_hash)
except Exception as e:
print(f"[DentaQuest BrowserManager] Failed to save credentials hash: {e}")
def credentials_changed(self, username: str) -> bool:
"""Check if the credentials have changed since last login."""
last_hash = self.get_last_credentials_hash()
if last_hash is None:
return False # No previous credentials, not a change
current_hash = self._hash_credentials(username)
changed = last_hash != current_hash
if changed:
print(f"[DentaQuest BrowserManager] Credentials changed - logout required")
return changed
def clear_credentials_hash(self):
"""Clear the saved credentials hash (used after logout)."""
try:
if os.path.exists(self._credentials_file):
os.remove(self._credentials_file)
except Exception as e:
print(f"[DentaQuest BrowserManager] Failed to clear credentials hash: {e}")
def _kill_existing_chrome_for_profile(self):
"""Kill any existing Chrome processes using this profile."""
try:
# Find and kill Chrome processes using this profile
result = subprocess.run(
["pgrep", "-f", f"user-data-dir={self.profile_dir}"],
capture_output=True, text=True
)
if result.stdout.strip():
pids = result.stdout.strip().split('\n')
for pid in pids:
try:
subprocess.run(["kill", "-9", pid], check=False)
except:
pass
time.sleep(1)
except Exception as e:
pass
# Remove SingletonLock if exists
lock_file = os.path.join(self.profile_dir, "SingletonLock")
try:
if os.path.islink(lock_file) or os.path.exists(lock_file):
os.remove(lock_file)
except:
pass
def get_driver(self, headless=False):
"""Get or create the persistent browser instance."""
with self._lock:
if self._driver is None:
print("[DentaQuest BrowserManager] Driver is None, creating new driver")
self._kill_existing_chrome_for_profile()
self._create_driver(headless)
elif not self._is_alive():
print("[DentaQuest BrowserManager] Driver not alive, recreating")
self._kill_existing_chrome_for_profile()
self._create_driver(headless)
else:
print("[DentaQuest BrowserManager] Reusing existing driver")
return self._driver
def _is_alive(self):
"""Check if browser is still responsive."""
try:
if self._driver is None:
return False
url = self._driver.current_url
return True
except Exception as e:
return False
def _create_driver(self, headless=False):
"""Create browser with persistent profile."""
if self._driver:
try:
self._driver.quit()
except:
pass
self._driver = None
time.sleep(1)
options = webdriver.ChromeOptions()
if headless:
options.add_argument("--headless")
# Persistent profile - THIS IS THE KEY for device trust
options.add_argument(f"--user-data-dir={self.profile_dir}")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
prefs = {
"download.default_directory": self.download_dir,
"plugins.always_open_pdf_externally": True,
"download.prompt_for_download": False,
"download.directory_upgrade": True
}
options.add_experimental_option("prefs", prefs)
service = Service(ChromeDriverManager().install())
self._driver = webdriver.Chrome(service=service, options=options)
self._driver.maximize_window()
# Reset the session clear flag (file-based clearing is done on startup)
self._needs_session_clear = False
def quit_driver(self):
"""Quit browser (only call on shutdown)."""
with self._lock:
if self._driver:
try:
self._driver.quit()
except:
pass
self._driver = None
# Also clean up any orphaned processes
self._kill_existing_chrome_for_profile()
# Singleton accessor
_manager = None
def get_browser_manager():
global _manager
if _manager is None:
_manager = DentaQuestBrowserManager()
return _manager
def clear_dentaquest_session_on_startup():
"""Called by agent.py on startup to clear session."""
manager = get_browser_manager()
manager.clear_session_on_startup()

View File

@@ -0,0 +1,264 @@
import os
import time
import asyncio
from typing import Dict, Any
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import WebDriverException
from selenium_DentaQuest_eligibilityCheckWorker import AutomationDentaQuestEligibilityCheck
# In-memory session store
sessions: Dict[str, Dict[str, Any]] = {}
SESSION_OTP_TIMEOUT = int(os.getenv("SESSION_OTP_TIMEOUT", "120")) # seconds
def make_session_entry() -> str:
"""Create a new session entry and return its ID."""
import uuid
sid = str(uuid.uuid4())
sessions[sid] = {
"status": "created", # created -> running -> waiting_for_otp -> otp_submitted -> completed / error
"created_at": time.time(),
"last_activity": time.time(),
"bot": None, # worker instance
"driver": None, # selenium webdriver
"otp_event": asyncio.Event(),
"otp_value": None,
"result": None,
"message": None,
"type": None,
}
return sid
async def cleanup_session(sid: str, message: str | None = None):
"""
Close driver (if any), wake OTP waiter, set final state, and remove session entry.
Idempotent: safe to call multiple times.
"""
s = sessions.get(sid)
if not s:
return
try:
# Ensure final state
try:
if s.get("status") not in ("completed", "error", "not_found"):
s["status"] = "error"
if message:
s["message"] = message
except Exception:
pass
# Wake any OTP waiter (so awaiting coroutines don't hang)
try:
ev = s.get("otp_event")
if ev and not ev.is_set():
ev.set()
except Exception:
pass
# NOTE: Do NOT quit driver - keep browser alive for next patient
# Browser manager handles the persistent browser instance
finally:
# Remove session entry from map
sessions.pop(sid, None)
async def _remove_session_later(sid: str, delay: int = 20):
await asyncio.sleep(delay)
await cleanup_session(sid)
async def start_dentaquest_run(sid: str, data: dict, url: str):
"""
Run the DentaQuest workflow for a session (WITHOUT managing semaphore/counters).
Called by agent.py inside a wrapper that handles queue/counters.
"""
s = sessions.get(sid)
if not s:
return {"status": "error", "message": "session not found"}
s["status"] = "running"
s["last_activity"] = time.time()
try:
bot = AutomationDentaQuestEligibilityCheck({"data": data})
bot.config_driver()
s["bot"] = bot
s["driver"] = bot.driver
s["last_activity"] = time.time()
# Navigate to login URL
try:
if not url:
raise ValueError("URL not provided for DentaQuest run")
bot.driver.maximize_window()
bot.driver.get(url)
await asyncio.sleep(1)
except Exception as e:
s["status"] = "error"
s["message"] = f"Navigation failed: {e}"
await cleanup_session(sid)
return {"status": "error", "message": s["message"]}
# Login
try:
login_result = bot.login(url)
except WebDriverException as wde:
s["status"] = "error"
s["message"] = f"Selenium driver error during login: {wde}"
await cleanup_session(sid, s["message"])
return {"status": "error", "message": s["message"]}
except Exception as e:
s["status"] = "error"
s["message"] = f"Unexpected error during login: {e}"
await cleanup_session(sid, s["message"])
return {"status": "error", "message": s["message"]}
# Already logged in - session persisted from profile, skip to step1
if isinstance(login_result, str) and login_result == "ALREADY_LOGGED_IN":
s["status"] = "running"
s["message"] = "Session persisted"
# Continue to step1 below
# OTP required path
elif isinstance(login_result, str) and login_result == "OTP_REQUIRED":
s["status"] = "waiting_for_otp"
s["message"] = "OTP required for login"
s["last_activity"] = time.time()
try:
await asyncio.wait_for(s["otp_event"].wait(), timeout=SESSION_OTP_TIMEOUT)
except asyncio.TimeoutError:
s["status"] = "error"
s["message"] = "OTP timeout"
await cleanup_session(sid)
return {"status": "error", "message": "OTP not provided in time"}
otp_value = s.get("otp_value")
if not otp_value:
s["status"] = "error"
s["message"] = "OTP missing after event"
await cleanup_session(sid)
return {"status": "error", "message": "OTP missing after event"}
# Submit OTP
try:
driver = s["driver"]
wait = WebDriverWait(driver, 30)
# Check if there's a popup window and switch to it
original_window = driver.current_window_handle
all_windows = driver.window_handles
if len(all_windows) > 1:
for window in all_windows:
if window != original_window:
driver.switch_to.window(window)
break
# DentaQuest OTP input field - adjust selectors as needed
otp_input = wait.until(
EC.presence_of_element_located(
(By.XPATH, "//input[contains(@name,'otp') or contains(@name,'code') or contains(@placeholder,'code') or contains(@id,'otp') or @type='tel']")
)
)
otp_input.clear()
otp_input.send_keys(otp_value)
try:
submit_btn = wait.until(
EC.element_to_be_clickable(
(By.XPATH, "//button[contains(text(),'Verify') or contains(text(),'Submit') or contains(text(),'Continue') or @type='submit']")
)
)
submit_btn.click()
except Exception:
otp_input.send_keys("\n")
# Wait for verification and switch back to main window if needed
await asyncio.sleep(2)
if len(driver.window_handles) > 0:
driver.switch_to.window(driver.window_handles[0])
s["status"] = "otp_submitted"
s["last_activity"] = time.time()
await asyncio.sleep(0.5)
except Exception as e:
s["status"] = "error"
s["message"] = f"Failed to submit OTP into page: {e}"
await cleanup_session(sid)
return {"status": "error", "message": s["message"]}
elif isinstance(login_result, str) and login_result.startswith("ERROR"):
s["status"] = "error"
s["message"] = login_result
await cleanup_session(sid)
return {"status": "error", "message": login_result}
# Step 1
step1_result = bot.step1()
if isinstance(step1_result, str) and step1_result.startswith("ERROR"):
s["status"] = "error"
s["message"] = step1_result
await cleanup_session(sid)
return {"status": "error", "message": step1_result}
# Step 2 (PDF)
step2_result = bot.step2()
if isinstance(step2_result, dict) and step2_result.get("status") == "success":
s["status"] = "completed"
s["result"] = step2_result
s["message"] = "completed"
asyncio.create_task(_remove_session_later(sid, 30))
return step2_result
else:
s["status"] = "error"
if isinstance(step2_result, dict):
s["message"] = step2_result.get("message", "unknown error")
else:
s["message"] = str(step2_result)
await cleanup_session(sid)
return {"status": "error", "message": s["message"]}
except Exception as e:
s["status"] = "error"
s["message"] = f"worker exception: {e}"
await cleanup_session(sid)
return {"status": "error", "message": s["message"]}
def submit_otp(sid: str, otp: str) -> Dict[str, Any]:
"""Set OTP for a session and wake waiting runner."""
s = sessions.get(sid)
if not s:
return {"status": "error", "message": "session not found"}
if s.get("status") != "waiting_for_otp":
return {"status": "error", "message": f"session not waiting for otp (state={s.get('status')})"}
s["otp_value"] = otp
s["last_activity"] = time.time()
try:
s["otp_event"].set()
except Exception:
pass
return {"status": "ok", "message": "otp accepted"}
def get_session_status(sid: str) -> Dict[str, Any]:
s = sessions.get(sid)
if not s:
return {"status": "not_found"}
return {
"session_id": sid,
"status": s.get("status"),
"message": s.get("message"),
"created_at": s.get("created_at"),
"last_activity": s.get("last_activity"),
"result": s.get("result") if s.get("status") == "completed" else None,
}

View File

@@ -31,9 +31,63 @@ class AutomationDeltaDentalMAEligibilityCheck:
# Use persistent browser from manager (keeps device trust tokens)
self.driver = get_browser_manager().get_driver(self.headless)
def _force_logout(self):
"""Force logout by clearing cookies for Delta Dental domain."""
try:
print("[DDMA login] Forcing logout due to credential change...")
browser_manager = get_browser_manager()
# First try to click logout button if visible
try:
self.driver.get("https://providers.deltadentalma.com/")
time.sleep(2)
logout_selectors = [
"//button[contains(text(), 'Log out') or contains(text(), 'Logout') or contains(text(), 'Sign out')]",
"//a[contains(text(), 'Log out') or contains(text(), 'Logout') or contains(text(), 'Sign out')]",
"//button[@aria-label='Log out' or @aria-label='Logout' or @aria-label='Sign out']",
"//*[contains(@class, 'logout') or contains(@class, 'signout')]"
]
for selector in logout_selectors:
try:
logout_btn = WebDriverWait(self.driver, 3).until(
EC.element_to_be_clickable((By.XPATH, selector))
)
logout_btn.click()
print("[DDMA login] Clicked logout button")
time.sleep(2)
break
except TimeoutException:
continue
except Exception as e:
print(f"[DDMA login] Could not click logout button: {e}")
# Clear cookies as backup
try:
self.driver.delete_all_cookies()
print("[DDMA login] Cleared all cookies")
except Exception as e:
print(f"[DDMA login] Error clearing cookies: {e}")
browser_manager.clear_credentials_hash()
print("[DDMA login] Logout complete")
return True
except Exception as e:
print(f"[DDMA login] Error during forced logout: {e}")
return False
def login(self, url):
wait = WebDriverWait(self.driver, 30)
browser_manager = get_browser_manager()
try:
# Check if credentials have changed - if so, force logout first
if self.massddma_username and browser_manager.credentials_changed(self.massddma_username):
self._force_logout()
self.driver.get(url)
time.sleep(2)
# First check if we're already on a logged-in page (from previous run)
try:
current_url = self.driver.current_url
@@ -175,6 +229,10 @@ class AutomationDeltaDentalMAEligibilityCheck:
login_button = wait.until(EC.element_to_be_clickable((By.XPATH, "//button[@type='submit' and @aria-label='Sign in']")))
login_button.click()
# Save credentials hash after login attempt
if self.massddma_username:
browser_manager.save_credentials_hash(self.massddma_username)
# OTP detection
try:

View File

@@ -0,0 +1,479 @@
from selenium import webdriver
from selenium.common.exceptions import WebDriverException, TimeoutException
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
import time
import os
import base64
from dentaquest_browser_manager import get_browser_manager
class AutomationDentaQuestEligibilityCheck:
def __init__(self, data):
self.headless = False
self.driver = None
self.data = data.get("data", {}) if isinstance(data, dict) else {}
# Flatten values for convenience
self.memberId = self.data.get("memberId", "")
self.dateOfBirth = self.data.get("dateOfBirth", "")
self.dentaquest_username = self.data.get("dentaquestUsername", "")
self.dentaquest_password = self.data.get("dentaquestPassword", "")
# Use browser manager's download dir
self.download_dir = get_browser_manager().download_dir
os.makedirs(self.download_dir, exist_ok=True)
def config_driver(self):
# Use persistent browser from manager (keeps device trust tokens)
self.driver = get_browser_manager().get_driver(self.headless)
def _force_logout(self):
"""Force logout by clearing cookies for DentaQuest domain."""
try:
print("[DentaQuest login] Forcing logout due to credential change...")
browser_manager = get_browser_manager()
# First try to click logout button if visible
try:
self.driver.get("https://providers.dentaquest.com/")
time.sleep(2)
logout_selectors = [
"//button[contains(text(), 'Log out') or contains(text(), 'Logout') or contains(text(), 'Sign out')]",
"//a[contains(text(), 'Log out') or contains(text(), 'Logout') or contains(text(), 'Sign out')]",
"//button[@aria-label='Log out' or @aria-label='Logout' or @aria-label='Sign out']",
"//*[contains(@class, 'logout') or contains(@class, 'signout')]"
]
for selector in logout_selectors:
try:
logout_btn = WebDriverWait(self.driver, 3).until(
EC.element_to_be_clickable((By.XPATH, selector))
)
logout_btn.click()
print("[DentaQuest login] Clicked logout button")
time.sleep(2)
break
except TimeoutException:
continue
except Exception as e:
print(f"[DentaQuest login] Could not click logout button: {e}")
# Clear cookies as backup
try:
self.driver.delete_all_cookies()
print("[DentaQuest login] Cleared all cookies")
except Exception as e:
print(f"[DentaQuest login] Error clearing cookies: {e}")
browser_manager.clear_credentials_hash()
print("[DentaQuest login] Logout complete")
return True
except Exception as e:
print(f"[DentaQuest login] Error during forced logout: {e}")
return False
def login(self, url):
wait = WebDriverWait(self.driver, 30)
browser_manager = get_browser_manager()
try:
# Check if credentials have changed - if so, force logout first
if self.dentaquest_username and browser_manager.credentials_changed(self.dentaquest_username):
self._force_logout()
self.driver.get(url)
time.sleep(2)
# First check if we're already on a logged-in page (from previous run)
try:
current_url = self.driver.current_url
print(f"[DentaQuest login] Current URL: {current_url}")
# Check if we're already on dashboard with member search
if "dashboard" in current_url.lower() or "member" in current_url.lower():
try:
member_search = WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
)
print("[DentaQuest login] Already on dashboard with member search")
return "ALREADY_LOGGED_IN"
except TimeoutException:
pass
except Exception as e:
print(f"[DentaQuest login] Error checking current state: {e}")
# Navigate to login URL
self.driver.get(url)
time.sleep(3)
current_url = self.driver.current_url
print(f"[DentaQuest login] After navigation URL: {current_url}")
# If already on dashboard, we're logged in
if "dashboard" in current_url.lower():
print("[DentaQuest login] Already on dashboard")
return "ALREADY_LOGGED_IN"
# Try to dismiss the modal by clicking OK
try:
ok_button = WebDriverWait(self.driver, 5).until(
EC.element_to_be_clickable((By.XPATH, "//button[normalize-space(text())='Ok' or normalize-space(text())='OK' or normalize-space(text())='Continue']"))
)
ok_button.click()
print("[DentaQuest login] Clicked OK modal button")
time.sleep(3)
except TimeoutException:
print("[DentaQuest login] No OK modal button found")
# Check if we're now on dashboard (session was valid)
current_url = self.driver.current_url
print(f"[DentaQuest login] After modal click URL: {current_url}")
if "dashboard" in current_url.lower():
# Check for member search input to confirm logged in
try:
member_search = WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.XPATH, '//input[@placeholder="Search by member ID"]'))
)
print("[DentaQuest login] Session valid - on dashboard with member search")
return "ALREADY_LOGGED_IN"
except TimeoutException:
pass
# Check if OTP is required (popup window or OTP input)
if len(self.driver.window_handles) > 1:
original_window = self.driver.current_window_handle
for window in self.driver.window_handles:
if window != original_window:
self.driver.switch_to.window(window)
print("[DentaQuest login] Switched to popup window")
break
try:
otp_input = WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located((By.XPATH, "//input[@type='tel' or contains(@placeholder,'code') or contains(@aria-label,'Verification')]"))
)
print("[DentaQuest login] OTP input found in popup")
return "OTP_REQUIRED"
except TimeoutException:
self.driver.switch_to.window(original_window)
# Check for OTP input on main page
try:
otp_input = WebDriverWait(self.driver, 3).until(
EC.presence_of_element_located((By.XPATH, "//input[@type='tel' or contains(@placeholder,'code') or contains(@aria-label,'Verification')]"))
)
print("[DentaQuest login] OTP input found")
return "OTP_REQUIRED"
except TimeoutException:
pass
# If still on login page, need to fill credentials
if "onboarding" in current_url.lower() or "login" in current_url.lower():
print("[DentaQuest login] Need to fill login credentials")
try:
email_field = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable((By.XPATH, "//input[@name='username' or @type='text']"))
)
email_field.clear()
email_field.send_keys(self.dentaquest_username)
password_field = wait.until(EC.presence_of_element_located((By.XPATH, "//input[@type='password']")))
password_field.clear()
password_field.send_keys(self.dentaquest_password)
# Click login button
login_button = wait.until(EC.element_to_be_clickable((By.XPATH, "//button[@type='submit']")))
login_button.click()
print("[DentaQuest login] Submitted login form")
# Save credentials hash after login attempt
if self.dentaquest_username:
browser_manager.save_credentials_hash(self.dentaquest_username)
time.sleep(5)
# Check for OTP after login
try:
otp_input = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.XPATH, "//input[@type='tel' or contains(@placeholder,'code') or contains(@aria-label,'Verification')]"))
)
return "OTP_REQUIRED"
except TimeoutException:
pass
# Check if login succeeded
if "dashboard" in self.driver.current_url.lower():
return "SUCCESS"
except TimeoutException:
print("[DentaQuest login] Login form elements not found")
return "ERROR: Login form not found"
return "SUCCESS"
except Exception as e:
print(f"[DentaQuest login] Exception: {e}")
return f"ERROR:LOGIN FAILED: {e}"
def step1(self):
"""Navigate to member search and enter member ID + DOB"""
wait = WebDriverWait(self.driver, 30)
try:
print(f"[DentaQuest step1] Starting member search for ID: {self.memberId}, DOB: {self.dateOfBirth}")
# Wait for page to be ready
time.sleep(2)
# Parse DOB - format: YYYY-MM-DD
try:
dob_parts = self.dateOfBirth.split("-")
dob_year = dob_parts[0]
dob_month = dob_parts[1].zfill(2)
dob_day = dob_parts[2].zfill(2)
print(f"[DentaQuest step1] Parsed DOB: {dob_month}/{dob_day}/{dob_year}")
except Exception as e:
print(f"[DentaQuest step1] Error parsing DOB: {e}")
return "ERROR: PARSING DOB"
# Get today's date for Date of Service
from datetime import datetime
today = datetime.now()
service_month = str(today.month).zfill(2)
service_day = str(today.day).zfill(2)
service_year = str(today.year)
print(f"[DentaQuest step1] Service date: {service_month}/{service_day}/{service_year}")
# Helper function to fill contenteditable date spans within a specific container
def fill_date_by_testid(testid, month_val, day_val, year_val, field_name):
try:
container = self.driver.find_element(By.XPATH, f"//div[@data-testid='{testid}']")
month_elem = container.find_element(By.XPATH, ".//span[@data-type='month' and @contenteditable='true']")
day_elem = container.find_element(By.XPATH, ".//span[@data-type='day' and @contenteditable='true']")
year_elem = container.find_element(By.XPATH, ".//span[@data-type='year' and @contenteditable='true']")
def replace_with_sendkeys(el, value):
el.click()
time.sleep(0.05)
el.send_keys(Keys.CONTROL, "a")
el.send_keys(Keys.BACKSPACE)
el.send_keys(value)
replace_with_sendkeys(month_elem, month_val)
time.sleep(0.1)
replace_with_sendkeys(day_elem, day_val)
time.sleep(0.1)
replace_with_sendkeys(year_elem, year_val)
print(f"[DentaQuest step1] Filled {field_name}: {month_val}/{day_val}/{year_val}")
return True
except Exception as e:
print(f"[DentaQuest step1] Error filling {field_name}: {e}")
return False
# 1. Fill Date of Service with TODAY's date using specific data-testid
fill_date_by_testid("member-search_date-of-service", service_month, service_day, service_year, "Date of Service")
time.sleep(0.3)
# 2. Fill Date of Birth with patient's DOB using specific data-testid
fill_date_by_testid("member-search_date-of-birth", dob_month, dob_day, dob_year, "Date of Birth")
time.sleep(0.3)
# 3. Fill Member ID
member_id_input = wait.until(EC.presence_of_element_located(
(By.XPATH, '//input[@placeholder="Search by member ID"]')
))
member_id_input.clear()
member_id_input.send_keys(self.memberId)
print(f"[DentaQuest step1] Entered member ID: {self.memberId}")
time.sleep(0.3)
# 4. Click Search button
try:
search_btn = wait.until(EC.element_to_be_clickable(
(By.XPATH, '//button[@data-testid="member-search_search-button"]')
))
search_btn.click()
print("[DentaQuest step1] Clicked search button")
except TimeoutException:
# Fallback
try:
search_btn = self.driver.find_element(By.XPATH, '//button[contains(text(),"Search")]')
search_btn.click()
print("[DentaQuest step1] Clicked search button (fallback)")
except:
member_id_input.send_keys(Keys.RETURN)
print("[DentaQuest step1] Pressed Enter to search")
time.sleep(5)
# Check for "no results" error
try:
error_msg = WebDriverWait(self.driver, 3).until(EC.presence_of_element_located(
(By.XPATH, '//*[contains(@data-testid,"no-results") or contains(@class,"no-results") or contains(text(),"No results") or contains(text(),"not found") or contains(text(),"No member found") or contains(text(),"Nothing was found")]')
))
if error_msg and error_msg.is_displayed():
print("[DentaQuest step1] No results found")
return "ERROR: INVALID MEMBERID OR DOB"
except TimeoutException:
pass
print("[DentaQuest step1] Search completed successfully")
return "Success"
except Exception as e:
print(f"[DentaQuest step1] Exception: {e}")
return f"ERROR:STEP1 - {e}"
def step2(self):
"""Get eligibility status and capture screenshot"""
wait = WebDriverWait(self.driver, 90)
try:
print("[DentaQuest step2] Starting eligibility capture")
# Wait for results to load
time.sleep(3)
# Try to find eligibility status from the results
eligibilityText = "unknown"
try:
# Look for a link or element with eligibility status
status_elem = wait.until(EC.presence_of_element_located((
By.XPATH,
"//a[contains(@href,'eligibility')] | //*[contains(@class,'status')] | //*[contains(text(),'Active') or contains(text(),'Inactive') or contains(text(),'Eligible')]"
)))
eligibilityText = status_elem.text.strip().lower()
print(f"[DentaQuest step2] Found status element: {eligibilityText}")
# Normalize status
if "active" in eligibilityText or "eligible" in eligibilityText:
eligibilityText = "active"
elif "inactive" in eligibilityText or "ineligible" in eligibilityText:
eligibilityText = "inactive"
except TimeoutException:
print("[DentaQuest step2] Could not find specific eligibility status")
# Try to find patient name
patientName = ""
try:
# Look for the patient name in the results
name_elem = self.driver.find_element(By.XPATH, "//h1 | //div[contains(@class,'name')] | //*[contains(@class,'member-name') or contains(@class,'patient-name')]")
patientName = name_elem.text.strip()
print(f"[DentaQuest step2] Found patient name: {patientName}")
except:
pass
# Wait for page to fully load
try:
WebDriverWait(self.driver, 30).until(
lambda d: d.execute_script("return document.readyState") == "complete"
)
except Exception:
pass
time.sleep(1)
# Capture full page screenshot
print("[DentaQuest step2] Capturing screenshot")
total_width = int(self.driver.execute_script(
"return Math.max(document.body.scrollWidth, document.documentElement.scrollWidth, document.documentElement.clientWidth);"
))
total_height = int(self.driver.execute_script(
"return Math.max(document.body.scrollHeight, document.documentElement.scrollHeight, document.documentElement.clientHeight);"
))
dpr = float(self.driver.execute_script("return window.devicePixelRatio || 1;"))
self.driver.execute_cdp_cmd('Emulation.setDeviceMetricsOverride', {
"mobile": False,
"width": total_width,
"height": total_height,
"deviceScaleFactor": dpr,
"screenOrientation": {"angle": 0, "type": "portraitPrimary"}
})
time.sleep(0.2)
# Capture screenshot
result = self.driver.execute_cdp_cmd("Page.captureScreenshot", {"format": "png", "fromSurface": True})
image_data = base64.b64decode(result.get('data', ''))
screenshot_path = os.path.join(self.download_dir, f"dentaquest_ss_{self.memberId}_{int(time.time())}.png")
with open(screenshot_path, "wb") as f:
f.write(image_data)
print(f"[DentaQuest step2] Screenshot saved: {screenshot_path}")
# Restore original metrics
try:
self.driver.execute_cdp_cmd('Emulation.clearDeviceMetricsOverride', {})
except Exception:
pass
# Close the browser window after screenshot
try:
from dentaquest_browser_manager import get_browser_manager
get_browser_manager().quit_driver()
print("[DentaQuest step2] Browser closed")
except Exception as e:
print(f"[DentaQuest step2] Error closing browser: {e}")
output = {
"status": "success",
"eligibility": eligibilityText,
"ss_path": screenshot_path,
"patientName": patientName
}
print(f"[DentaQuest step2] Success: {output}")
return output
except Exception as e:
print(f"[DentaQuest step2] Exception: {e}")
# Cleanup download folder on error
try:
dl = os.path.abspath(self.download_dir)
if os.path.isdir(dl):
for name in os.listdir(dl):
item = os.path.join(dl, name)
try:
if os.path.isfile(item) or os.path.islink(item):
os.remove(item)
except Exception:
pass
except Exception:
pass
return {"status": "error", "message": str(e)}
def main_workflow(self, url):
try:
self.config_driver()
self.driver.maximize_window()
time.sleep(3)
login_result = self.login(url)
if login_result.startswith("ERROR"):
return {"status": "error", "message": login_result}
if login_result == "OTP_REQUIRED":
return {"status": "otp_required", "message": "OTP required after login"}
step1_result = self.step1()
if step1_result.startswith("ERROR"):
return {"status": "error", "message": step1_result}
step2_result = self.step2()
if step2_result.get("status") == "error":
return {"status": "error", "message": step2_result.get("message")}
return step2_result
except Exception as e:
return {
"status": "error",
"message": str(e)
}