229 lines
8.8 KiB
Python
229 lines
8.8 KiB
Python
from selenium import webdriver
|
|
from selenium.common import TimeoutException
|
|
from selenium.webdriver.chrome.service import Service
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from webdriver_manager.chrome import ChromeDriverManager
|
|
import time
|
|
import os
|
|
import base64
|
|
|
|
class AutomationMassHealthClaimStatusCheck:
|
|
def __init__(self, data):
|
|
self.headless = False
|
|
self.driver = None
|
|
|
|
self.data = data.get("data")
|
|
|
|
# Flatten values for convenience
|
|
self.memberId = self.data.get("memberId", "")
|
|
self.dateOfBirth = self.data.get("dateOfBirth", "")
|
|
self.massdhp_username = self.data.get("massdhpUsername", "")
|
|
self.massdhp_password = self.data.get("massdhpPassword", "")
|
|
|
|
self.download_dir = os.path.abspath("seleniumDownloads")
|
|
os.makedirs(self.download_dir, exist_ok=True)
|
|
|
|
|
|
def config_driver(self):
|
|
options = webdriver.ChromeOptions()
|
|
if self.headless:
|
|
options.add_argument("--headless")
|
|
|
|
# Add PDF download preferences
|
|
prefs = {
|
|
"download.default_directory": self.download_dir,
|
|
"plugins.always_open_pdf_externally": True,
|
|
"download.prompt_for_download": False,
|
|
"download.directory_upgrade": True
|
|
}
|
|
options.add_experimental_option("prefs", prefs)
|
|
|
|
s = Service(ChromeDriverManager().install())
|
|
driver = webdriver.Chrome(service=s, options=options)
|
|
self.driver = driver
|
|
|
|
def login(self):
|
|
wait = WebDriverWait(self.driver, 30)
|
|
|
|
try:
|
|
# Enter email
|
|
email_field = wait.until(EC.presence_of_element_located((By.XPATH, "//input[@name='Email' and @type='text']")))
|
|
email_field.clear()
|
|
email_field.send_keys(self.massdhp_username)
|
|
|
|
# Enter password
|
|
password_field = wait.until(EC.presence_of_element_located((By.XPATH, "//input[@name='Pass' and @type='password']")))
|
|
password_field.clear()
|
|
password_field.send_keys(self.massdhp_password)
|
|
|
|
# Click login
|
|
login_button = wait.until(EC.element_to_be_clickable((By.XPATH, "//input[@type='submit' and @value='Login']")))
|
|
login_button.click()
|
|
|
|
return "Success"
|
|
|
|
except Exception as e:
|
|
print(f"Error while logging in: {e}")
|
|
return "ERROR:LOGIN FAILED"
|
|
|
|
def step1(self):
|
|
wait = WebDriverWait(self.driver, 30)
|
|
|
|
try:
|
|
eligibility_link = wait.until(
|
|
EC.element_to_be_clickable((By.XPATH, "//a[text()='Claim Status']"))
|
|
)
|
|
eligibility_link.click()
|
|
|
|
time.sleep(3)
|
|
|
|
# Fill Member ID
|
|
member_id_input = wait.until(EC.presence_of_element_located((By.XPATH, '//input[@name="MAMedicaidID"]')))
|
|
member_id_input.clear()
|
|
member_id_input.send_keys(self.memberId)
|
|
|
|
# Click Search button
|
|
search_btn = wait.until(EC.element_to_be_clickable((By.XPATH, '//input[@id="Submit1"]')))
|
|
search_btn.click()
|
|
|
|
time.sleep(2)
|
|
|
|
# Check for error message
|
|
try:
|
|
error_msg = WebDriverWait(self.driver, 5).until(EC.presence_of_element_located(
|
|
(By.XPATH, "//font[contains(text(), 'Your search did not return any results. Please try again.')]")
|
|
))
|
|
if error_msg:
|
|
return "ERROR: THIS MEMBERID HAS NO CLAIM RESULTS"
|
|
except TimeoutException:
|
|
pass
|
|
|
|
# check success message
|
|
try:
|
|
success_msg = WebDriverWait(self.driver, 5).until(EC.presence_of_element_located(
|
|
(By.XPATH, "//td[contains(text(), 'Your search returned')]")
|
|
))
|
|
if not success_msg:
|
|
return "ERROR: THIS MEMBERID HAS NO CLAIM RESULTS"
|
|
except TimeoutException:
|
|
pass
|
|
|
|
return "Success"
|
|
|
|
except Exception as e:
|
|
print(f"Error while step1 i.e Cheking the MemberId and DOB in: {e}")
|
|
return "ERROR:STEP1"
|
|
|
|
def step2(self):
|
|
try:
|
|
try:
|
|
WebDriverWait(self.driver, 30).until(
|
|
lambda d: d.execute_script("return document.readyState") == "complete"
|
|
)
|
|
except Exception:
|
|
print("Warning: document.readyState did not become 'complete' within timeout")
|
|
|
|
# Give some time for lazy content to finish rendering (adjust if needed)
|
|
time.sleep(0.6)
|
|
|
|
# Get total page size and DPR
|
|
total_width = int(self.driver.execute_script(
|
|
"return Math.max(document.body.scrollWidth, document.documentElement.scrollWidth, document.documentElement.clientWidth);"
|
|
))
|
|
total_height = int(self.driver.execute_script(
|
|
"return Math.max(document.body.scrollHeight, document.documentElement.scrollHeight, document.documentElement.clientHeight);"
|
|
))
|
|
dpr = float(self.driver.execute_script("return window.devicePixelRatio || 1;"))
|
|
|
|
# Set device metrics to the full page size so Page.captureScreenshot captures everything
|
|
# Note: Some pages are extremely tall; if you hit memory limits, you can capture in chunks.
|
|
self.driver.execute_cdp_cmd('Emulation.setDeviceMetricsOverride', {
|
|
"mobile": False,
|
|
"width": total_width,
|
|
"height": total_height,
|
|
"deviceScaleFactor": dpr,
|
|
"screenOrientation": {"angle": 0, "type": "portraitPrimary"}
|
|
})
|
|
|
|
# Small pause for layout to settle after emulation change
|
|
time.sleep(0.15)
|
|
|
|
# Capture screenshot (base64 PNG)
|
|
result = self.driver.execute_cdp_cmd("Page.captureScreenshot", {"format": "png", "fromSurface": True})
|
|
image_data = base64.b64decode(result.get('data', ''))
|
|
screenshot_path = os.path.join(self.download_dir, f"ss_{self.memberId}.png")
|
|
with open(screenshot_path, "wb") as f:
|
|
f.write(image_data)
|
|
|
|
# Restore original metrics to avoid affecting further interactions
|
|
try:
|
|
self.driver.execute_cdp_cmd('Emulation.clearDeviceMetricsOverride', {})
|
|
except Exception:
|
|
# non-fatal: continue
|
|
pass
|
|
|
|
print("Screenshot saved at:", screenshot_path)
|
|
return {"status": "success", "ss_path": screenshot_path}
|
|
|
|
except Exception as e:
|
|
print("ERROR in step2:", e)
|
|
# Empty the download folder (remove files / symlinks only)
|
|
try:
|
|
dl = os.path.abspath(self.download_dir)
|
|
if os.path.isdir(dl):
|
|
for name in os.listdir(dl):
|
|
item = os.path.join(dl, name)
|
|
try:
|
|
if os.path.isfile(item) or os.path.islink(item):
|
|
os.remove(item)
|
|
print(f"[cleanup] removed: {item}")
|
|
except Exception as rm_err:
|
|
print(f"[cleanup] failed to remove {item}: {rm_err}")
|
|
print(f"[cleanup] emptied download dir: {dl}")
|
|
else:
|
|
print(f"[cleanup] download dir does not exist: {dl}")
|
|
except Exception as cleanup_exc:
|
|
print(f"[cleanup] unexpected error while cleaning downloads dir: {cleanup_exc}")
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
finally:
|
|
# Keep your existing quit behavior; if you want the driver to remain open for further
|
|
# actions, remove or change this.
|
|
if self.driver:
|
|
try:
|
|
self.driver.quit()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def main_workflow(self, url):
|
|
try:
|
|
self.config_driver()
|
|
self.driver.maximize_window()
|
|
self.driver.get(url)
|
|
time.sleep(3)
|
|
|
|
login_result = self.login()
|
|
if login_result.startswith("ERROR"):
|
|
return {"status": "error", "message": login_result}
|
|
|
|
step1_result = self.step1()
|
|
if step1_result.startswith("ERROR"):
|
|
return {"status": "error", "message": step1_result}
|
|
|
|
step2_result = self.step2()
|
|
if step2_result.get("status") == "error":
|
|
return {"status": "error", "message": step2_result.get("message")}
|
|
|
|
return step2_result
|
|
except Exception as e:
|
|
return {
|
|
"status": "error",
|
|
"message": e
|
|
}
|
|
|
|
finally:
|
|
self.driver.quit()
|