From ef02d6454e9ada718a3bff707a658e676ebe9fdf Mon Sep 17 00:00:00 2001 From: Potenz Date: Fri, 12 Dec 2025 00:28:41 +0530 Subject: [PATCH] ddma script - with cookies (not working though with cookies) --- .../selenium_DDMA_eligibilityCheckWorker.py | 326 +++++++++++++----- 1 file changed, 233 insertions(+), 93 deletions(-) diff --git a/apps/SeleniumService/selenium_DDMA_eligibilityCheckWorker.py b/apps/SeleniumService/selenium_DDMA_eligibilityCheckWorker.py index 033d09b..2483d41 100644 --- a/apps/SeleniumService/selenium_DDMA_eligibilityCheckWorker.py +++ b/apps/SeleniumService/selenium_DDMA_eligibilityCheckWorker.py @@ -1,5 +1,5 @@ from selenium import webdriver -from selenium.common import TimeoutException +from selenium.common.exceptions import WebDriverException, TimeoutException from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys @@ -10,6 +10,9 @@ import time import os import base64 import stat +import pickle +import traceback +import urllib class AutomationDeltaDentalMAEligibilityCheck: def __init__(self, data): @@ -27,6 +30,8 @@ class AutomationDeltaDentalMAEligibilityCheck: self.download_dir = os.path.abspath("seleniumDownloads") os.makedirs(self.download_dir, exist_ok=True) + + self.cookies_path = os.path.abspath("cookies.pkl") def config_driver(self): @@ -47,16 +52,145 @@ class AutomationDeltaDentalMAEligibilityCheck: driver = webdriver.Chrome(service=s, options=options) self.driver = driver - def login(self): - """ - Attempts login and detects OTP. - Returns: - - "Success" -> logged in - - "OTP_REQUIRED" -> page requires OTP (we do NOT block here) - - "ERROR:..." -> error occurred - """ + def _origin_from_url(self, url): + p = urllib.parse.urlparse(url) + origin = f"{p.scheme}://{p.netloc}" + return origin + + def try_cookie_login(self, url): + try: + if not os.path.exists(self.cookies_path): + print(f"[try_cookie_login] cookies file not found at: {self.cookies_path}") + return "NO_COOKIES" + + origin = self._origin_from_url(url) + print(f"origin {origin}") + + print(f"[try_cookie_login] Attempting cookie-login. origin={origin}, cookies_path={self.cookies_path}") + + # load origin first + self.driver.get(origin) + time.sleep(1) + + try: + cookies = pickle.load(open(self.cookies_path, "rb")) + print(f"[try_cookie_login] Loaded {len(cookies)} cookies from file.") + except Exception as e: + print("[try_cookie_login] Failed to load cookies.pkl:", e) + traceback.print_exc() + return "FAILED" + + added = 0 + for c in cookies: + try: + self.driver.add_cookie(c) + added += 1 + except Exception as ex: + print(f"[try_cookie_login] warning adding cookie {c.get('name')}: {ex}") + print(f"[try_cookie_login] Added {added}/{len(cookies)} cookies to browser.") + + # now load target url + time.sleep(2) + self.driver.get(origin) + time.sleep(2) + # detect logged-in state + try: + WebDriverWait(self.driver, 8).until( + EC.presence_of_element_located((By.XPATH, "//a[text()='Member Eligibility' or contains(., 'Member Eligibility')]")) + ) + # refresh cookie file with any updates + try: + current = self.driver.get_cookies() + pickle.dump(current, open(self.cookies_path, "wb")) + print("[try_cookie_login] Cookie-login success; cookies.pkl refreshed.") + except Exception as e: + print("[try_cookie_login] Warning saving refreshed cookies:", e) + return "Success" + except TimeoutException: + print("[try_cookie_login] Cookie-login did not find dashboard element.") + return "FAILED" + except Exception as e: + print("[try_cookie_login] Unexpected exception:", e) + traceback.print_exc() + return "FAILED" + + def handle_cookies(self, url, create_if_missing=True): + try: + origin = self._origin_from_url(url) + + self.driver.get(origin) + time.sleep(1) + + if not os.path.exists(self.cookies_path): + if not create_if_missing: + print("[handle_cookies] cookies file missing and create_if_missing=False") + return False + try: + input("add ?") + cookies = self.driver.get_cookies() + pickle.dump(cookies, open(self.cookies_path, "wb")) + print(f"[handle_cookies] Saved {len(cookies)} cookies to {self.cookies_path}") + return True + except Exception as e: + print("[handle_cookies] failed to save cookies:", e) + traceback.print_exc() + return False + else: + try: + cookies = pickle.load(open(self.cookies_path, "rb")) + print(f"[handle_cookies] Loaded {len(cookies)} cookies from {self.cookies_path}") + except Exception as e: + print("[handle_cookies] failed to load cookies.pkl:", e) + traceback.print_exc() + return False + + # ensure on origin to add cookies + self.driver.get(origin) + time.sleep(1) + + added = 0 + for c in cookies: + try: + self.driver.add_cookie(c) + added += 1 + except Exception: + pass + print(f"[handle_cookies] Re-applied {added}/{len(cookies)} cookies.") + + time.sleep(1) + self.driver.refresh() + time.sleep(2) + + try: + new_cookies = self.driver.get_cookies() + pickle.dump(new_cookies, open(self.cookies_path, "wb")) + print(f"[handle_cookies] Refreshed cookies.pkl with {len(new_cookies)} cookies.") + except Exception as e: + print("[handle_cookies] failed to refresh cookies.pkl:", e) + traceback.print_exc() + return True + except Exception as e: + print("[handle_cookies] Unexpected exception:", e) + traceback.print_exc() + return False + + def login(self, url): wait = WebDriverWait(self.driver, 30) try: + print(f"[login] Starting login for url: {url}") + cookie_attempt = self.try_cookie_login(url) + print(f"[login] cookie_attempt result: {cookie_attempt}") + if cookie_attempt == "Success": + print("[login] Logged in via cookies.") + return "Success" + elif cookie_attempt == "NO_COOKIES": + print("[login] No cookies present; will do credential login.") + else: + print("[login] Cookie-login failed; falling back to username/password login.") + + # credential login flow + self.driver.get(url) + print("[login] On login page, attempting to fill credentials.") email_field = wait.until(EC.presence_of_element_located((By.XPATH, "//input[@name='username' and @type='text']"))) email_field.clear() email_field.send_keys(self.massddma_username) @@ -64,18 +198,20 @@ class AutomationDeltaDentalMAEligibilityCheck: password_field = wait.until(EC.presence_of_element_located((By.XPATH, "//input[@name='password' and @type='password']"))) password_field.clear() password_field.send_keys(self.massddma_password) - - # Click Remember me checkbox - remember_me_checkbox = wait.until(EC.element_to_be_clickable( - (By.XPATH, "//label[.//span[contains(text(),'Remember me')]]") - )) - remember_me_checkbox.click() - + + # remember me + try: + remember_me_checkbox = wait.until(EC.element_to_be_clickable( + (By.XPATH, "//label[.//span[contains(text(),'Remember me')]]") + )) + remember_me_checkbox.click() + except: + print("[login] Remember me checkbox not found (continuing).") + login_button = wait.until(EC.element_to_be_clickable((By.XPATH, "//button[@type='submit' and @aria-label='Sign in']"))) login_button.click() - - # 1) Detect OTP presence (adjust the XPath/attributes to the actual site) + # OTP detection try: otp_candidate = WebDriverWait(self.driver, 30).until( EC.presence_of_element_located( @@ -83,26 +219,38 @@ class AutomationDeltaDentalMAEligibilityCheck: ) ) if otp_candidate: - print("[DDMA worker] OTP input detected -> OTP_REQUIRED") + print("[login] OTP input detected -> OTP_REQUIRED") return "OTP_REQUIRED" except TimeoutException: - pass + print("[login] No OTP input detected in allowed time.") - # 2) Detect successful login by presence of a known post-login element + # check for dashboard element try: - logged_in_el = WebDriverWait(self.driver, 5).until( + logged_in_el = WebDriverWait(self.driver, 8).until( EC.presence_of_element_located((By.XPATH, "//a[text()='Member Eligibility' or contains(., 'Member Eligibility')]")) ) if logged_in_el: + print("[login] Credential login succeeded; calling handle_cookies to save/refresh cookies.") + try: + self.handle_cookies(self.driver.current_url, create_if_missing=True) + except Exception as e: + print("[login] handle_cookies raised:", e) + traceback.print_exc() return "Success" except TimeoutException: - # last chance: see if URL changed - if "dashboard" in self.driver.current_url or "providers" in self.driver.current_url: + cur = self.driver.current_url or "" + print(f"[login] Post-login element not found. Current URL: {cur}") + if "dashboard" in cur or "providers" in cur: + print("[login] URL looks like dashboard; treating as success and saving cookies.") + try: + self.handle_cookies(self.driver.current_url, create_if_missing=True) + except Exception: + pass return "Success" + return "ERROR:LOGIN FAILED - unable to detect success or OTP" - return "ERROR:LOGIN FAILED - unable to detect success or OTP" except Exception as e: - print(f"[DDMA worker] login exception: {e}") + print("[login] Exception during login:", e) return f"ERROR:LOGIN FAILED: {e}" def step1(self): @@ -178,81 +326,74 @@ class AutomationDeltaDentalMAEligibilityCheck: def step2(self): - def wait_for_pdf_download(timeout=60): - for _ in range(timeout): - files = [f for f in os.listdir(self.download_dir) if f.endswith(".pdf")] - if files: - return os.path.join(self.download_dir, files[0]) - time.sleep(1) - raise TimeoutError("PDF did not download in time") - - def _unique_target_path(): - """ - Create a unique filename using memberId. - """ - safe_member = "".join(c for c in str(self.memberId) if c.isalnum() or c in "-_.") - filename = f"eligibility_{safe_member}.pdf" - return os.path.join(self.download_dir, filename) - wait = WebDriverWait(self.driver, 90) - tmp_created_path = None try: # 1) find the eligibility inside the correct cell status_link = wait.until(EC.presence_of_element_located(( By.XPATH, - ".//td[contains(@id,'memberEligibilityStatus')]//a" + "(//tbody//tr)[1]//a[contains(@href, 'member-eligibility-search')]" ))) - eligibilityText = status_link.text.strip() + eligibilityText = status_link.text.strip().lower() - # locate and click the Print button - print_button = wait.until(EC.element_to_be_clickable( - (By.XPATH, "//button[@data-testid='print-button']") + try: + WebDriverWait(self.driver, 30).until( + lambda d: d.execute_script("return document.readyState") == "complete" + ) + except Exception: + print("Warning: document.readyState did not become 'complete' within timeout") + + # Give some time for lazy content to finish rendering (adjust if needed) + time.sleep(0.6) + + # Get total page size and DPR + total_width = int(self.driver.execute_script( + "return Math.max(document.body.scrollWidth, document.documentElement.scrollWidth, document.documentElement.clientWidth);" )) - print_button.click() + total_height = int(self.driver.execute_script( + "return Math.max(document.body.scrollHeight, document.documentElement.scrollHeight, document.documentElement.clientHeight);" + )) + dpr = float(self.driver.execute_script("return window.devicePixelRatio || 1;")) - # Increase if your app needs more time to render print preview HTML. - time.sleep(5) + # Set device metrics to the full page size so Page.captureScreenshot captures everything + # Note: Some pages are extremely tall; if you hit memory limits, you can capture in chunks. + self.driver.execute_cdp_cmd('Emulation.setDeviceMetricsOverride', { + "mobile": False, + "width": total_width, + "height": total_height, + "deviceScaleFactor": dpr, + "screenOrientation": {"angle": 0, "type": "portraitPrimary"} + }) - # Use Chrome DevTools Protocol to print the page to PDF: - # options you can tweak: landscape, displayHeaderFooter, printBackground, scale, paperWidth, paperHeight, margins - print_options = { - "landscape": False, - "displayHeaderFooter": False, - "printBackground": True, - "preferCSSPageSize": True, - # optional: "scale": 1.0, - # optional: specify paper size if preferCSSPageSize is False, e.g. A4: {"paperWidth": 8.27, "paperHeight": 11.69} - } + # Small pause for layout to settle after emulation change + time.sleep(0.15) - # execute_cdp_cmd returns a dict with 'data' being base64 PDF - result = self.driver.execute_cdp_cmd("Page.printToPDF", print_options) - pdf_b64 = result.get("data") - if not pdf_b64: - raise RuntimeError("CDP returned no PDF data") + # Capture screenshot (base64 PNG) + result = self.driver.execute_cdp_cmd("Page.captureScreenshot", {"format": "png", "fromSurface": True}) + image_data = base64.b64decode(result.get('data', '')) + screenshot_path = os.path.join(self.download_dir, f"ss_{self.memberId}.png") + with open(screenshot_path, "wb") as f: + f.write(image_data) - # decode and write to unique path - target_path = _unique_target_path() - os.makedirs(os.path.dirname(target_path), exist_ok=True) - with open(target_path, "wb") as f: - f.write(base64.b64decode(pdf_b64)) + # Restore original metrics to avoid affecting further interactions + try: + self.driver.execute_cdp_cmd('Emulation.clearDeviceMetricsOverride', {}) + except Exception: + # non-fatal: continue + pass - # ensure the copied file is writable / stable - os.chmod(target_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH) - - - print("PDF downloaded at:", target_path) - - return { - "status": "success", - "eligibility": eligibilityText, - "pdf_path": target_path - } + print("Screenshot saved at:", screenshot_path) + output = { + "status": "success", + "eligibility": eligibilityText, + "ss_path": screenshot_path + } + print(output) + return output except Exception as e: - print(f"ERROR: {str(e)}") - + print("ERROR in step2:", e) # Empty the download folder (remove files / symlinks only) try: dl = os.path.abspath(self.download_dir) @@ -270,25 +411,24 @@ class AutomationDeltaDentalMAEligibilityCheck: print(f"[cleanup] download dir does not exist: {dl}") except Exception as cleanup_exc: print(f"[cleanup] unexpected error while cleaning downloads dir: {cleanup_exc}") - - - return { - "status": "error", - "message": str(e), - } + return {"status": "error", "message": str(e)} finally: + # Keep your existing quit behavior; if you want the driver to remain open for further + # actions, remove or change this. if self.driver: - self.driver.quit() + try: + self.driver.quit() + except Exception: + pass def main_workflow(self, url): try: self.config_driver() self.driver.maximize_window() - self.driver.get(url) time.sleep(3) - login_result = self.login() + login_result = self.login(url) if login_result.startswith("ERROR"): return {"status": "error", "message": login_result} if login_result == "OTP_REQUIRED":