selenium eligibility done till now
This commit is contained in:
@@ -5,11 +5,7 @@ from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
from webdriver_manager.chrome import ChromeDriverManager
|
||||
from selenium.webdriver.support.ui import Select
|
||||
import time
|
||||
from datetime import datetime
|
||||
import tempfile
|
||||
import base64
|
||||
import os
|
||||
|
||||
class AutomationMassHealthEligibilityCheck:
|
||||
@@ -17,25 +13,32 @@ class AutomationMassHealthEligibilityCheck:
|
||||
self.headless = False
|
||||
self.driver = None
|
||||
|
||||
self.data = data
|
||||
self.claim = data.get("claim", {})
|
||||
self.upload_files = data.get("pdfs", []) + data.get("images", [])
|
||||
self.data = data.get("data")
|
||||
|
||||
# Flatten values for convenience
|
||||
self.memberId = self.claim.get("memberId", "")
|
||||
self.dateOfBirth = self.claim.get("dateOfBirth", "")
|
||||
self.remarks = self.claim.get("remarks", "")
|
||||
self.massdhp_username = self.claim.get("massdhpUsername", "")
|
||||
self.massdhp_password = self.claim.get("massdhpPassword", "")
|
||||
self.serviceLines = self.claim.get("serviceLines", [])
|
||||
self.missingTeethStatus = self.claim.get("missingTeethStatus", "")
|
||||
self.missingTeeth = self.claim.get("missingTeeth", {})
|
||||
self.memberId = self.data.get("memberId", "")
|
||||
self.dateOfBirth = self.data.get("dateOfBirth", "")
|
||||
self.massdhp_username = self.data.get("massdhpUsername", "")
|
||||
self.massdhp_password = self.data.get("massdhpPassword", "")
|
||||
|
||||
self.download_dir = os.path.abspath("seleniumDownloads")
|
||||
os.makedirs(self.download_dir, exist_ok=True)
|
||||
|
||||
|
||||
def config_driver(self):
|
||||
options = webdriver.ChromeOptions()
|
||||
if self.headless:
|
||||
options.add_argument("--headless")
|
||||
|
||||
# Add PDF download preferences
|
||||
prefs = {
|
||||
"download.default_directory": self.download_dir,
|
||||
"plugins.always_open_pdf_externally": True,
|
||||
"download.prompt_for_download": False,
|
||||
"download.directory_upgrade": True
|
||||
}
|
||||
options.add_experimental_option("prefs", prefs)
|
||||
|
||||
s = Service(ChromeDriverManager().install())
|
||||
driver = webdriver.Chrome(service=s, options=options)
|
||||
self.driver = driver
|
||||
@@ -68,10 +71,10 @@ class AutomationMassHealthEligibilityCheck:
|
||||
wait = WebDriverWait(self.driver, 30)
|
||||
|
||||
try:
|
||||
claim_upload_link = wait.until(
|
||||
EC.element_to_be_clickable((By.XPATH, "//a[text()='Claim Upload']"))
|
||||
eligibility_link = wait.until(
|
||||
EC.element_to_be_clickable((By.XPATH, "//a[text()='Member Eligibility']"))
|
||||
)
|
||||
claim_upload_link.click()
|
||||
eligibility_link.click()
|
||||
|
||||
time.sleep(3)
|
||||
|
||||
@@ -82,38 +85,26 @@ class AutomationMassHealthEligibilityCheck:
|
||||
|
||||
# Fill DOB parts
|
||||
try:
|
||||
dob_parts = self.dateOfBirth.split("/")
|
||||
month= dob_parts[0].zfill(2) # "12"
|
||||
day= dob_parts[1].zfill(2) # "13"
|
||||
year = dob_parts[2] # "1965"
|
||||
dob_parts = self.dateOfBirth.split("-")
|
||||
year = dob_parts[0] # "1964"
|
||||
month = dob_parts[1].zfill(2) # "04"
|
||||
day = dob_parts[2].zfill(2) # "17"
|
||||
except Exception as e:
|
||||
print(f"Error parsing DOB: {e}")
|
||||
return "ERROR: PARSING DOB"
|
||||
|
||||
|
||||
wait.until(EC.presence_of_element_located((By.XPATH, '//*[@id="Text2"]'))).send_keys(month)
|
||||
wait.until(EC.presence_of_element_located((By.XPATH, '//*[@id="Text3"]'))).send_keys(day)
|
||||
wait.until(EC.presence_of_element_located((By.XPATH, '//*[@id="Text4"]'))).send_keys(year)
|
||||
|
||||
# Rendering Provider NPI dropdown
|
||||
npi_dropdown = wait.until(EC.presence_of_element_located((By.XPATH, '//*[@id="Select1"]')))
|
||||
select_npi = Select(npi_dropdown)
|
||||
select_npi.select_by_index(1)
|
||||
|
||||
# Office Location dropdown
|
||||
location_dropdown = wait.until(EC.presence_of_element_located((By.XPATH, '//*[@id="Select2"]')))
|
||||
select_location = Select(location_dropdown)
|
||||
select_location.select_by_index(1)
|
||||
|
||||
# Click Continue button
|
||||
continue_btn = wait.until(EC.element_to_be_clickable((By.XPATH, '//input[@type="submit" and @value="Continue"]')))
|
||||
continue_btn = wait.until(EC.element_to_be_clickable((By.XPATH, '//input[@type="submit" and @value="Add Member"]')))
|
||||
continue_btn.click()
|
||||
|
||||
|
||||
# Check for error message
|
||||
try:
|
||||
error_msg = WebDriverWait(self.driver, 5).until(EC.presence_of_element_located(
|
||||
(By.XPATH, "//td[@class='text_err_msg' and contains(text(), 'Invalid Member ID or Date of Birth')]")
|
||||
(By.XPATH, "//td[@class='text_err_msg' and contains(text(), 'Invalid Medicaid ID or Date of Birth')]")
|
||||
))
|
||||
if error_msg:
|
||||
print("Error: Invalid Member ID or Date of Birth.")
|
||||
@@ -127,197 +118,34 @@ class AutomationMassHealthEligibilityCheck:
|
||||
print(f"Error while step1 i.e Cheking the MemberId and DOB in: {e}")
|
||||
return "ERROR:STEP1"
|
||||
|
||||
def step2(self):
|
||||
|
||||
wait = WebDriverWait(self.driver, 30)
|
||||
|
||||
# already waiting in step1 last part, so no time sleep.
|
||||
|
||||
# 1 - Procedure Codes part
|
||||
try:
|
||||
for proc in self.serviceLines:
|
||||
# Wait for Procedure Code dropdown and select code
|
||||
select_element = wait.until(EC.presence_of_element_located((By.XPATH, "//select[@id='Select3']")))
|
||||
Select(select_element).select_by_value(proc['procedureCode'])
|
||||
|
||||
# Fill Procedure Date if present
|
||||
if proc.get("procedureDate"):
|
||||
try:
|
||||
# Try to normalize date to MM/DD/YYYY format
|
||||
parsed_date = datetime.strptime(proc["procedureDate"], "%Y-%m-%d")
|
||||
formatted_date = parsed_date.strftime("%m/%d/%Y")
|
||||
|
||||
date_xpath = "//input[@name='ProcedureDate']"
|
||||
wait.until(EC.presence_of_element_located((By.XPATH, date_xpath))).clear()
|
||||
self.driver.find_element(By.XPATH, date_xpath).send_keys(formatted_date)
|
||||
|
||||
except ValueError:
|
||||
# Invalid date format - skip filling ProcedureDate field
|
||||
pass
|
||||
|
||||
# Fill Oral Cavity Area if present
|
||||
if proc.get("oralCavityArea"):
|
||||
oral_xpath = "//input[@name='OralCavityArea']"
|
||||
wait.until(EC.presence_of_element_located((By.XPATH, oral_xpath))).clear()
|
||||
self.driver.find_element(By.XPATH, oral_xpath).send_keys(proc["oralCavityArea"])
|
||||
|
||||
# Fill Tooth Number if present
|
||||
if proc.get("toothNumber"):
|
||||
tooth_num_dropdown = wait.until(EC.presence_of_element_located((By.XPATH, "//select[@name='ToothNumber']")))
|
||||
select_tooth = Select(tooth_num_dropdown)
|
||||
select_tooth.select_by_value(proc["toothNumber"])
|
||||
|
||||
|
||||
# Fill Tooth Surface if present
|
||||
if proc.get("toothSurface"):
|
||||
surfaces = proc["toothSurface"].split(",")
|
||||
for surface in surfaces:
|
||||
surface = surface.strip()
|
||||
checkbox_xpath = f"//input[@type='checkbox' and @name='TS_{surface}']"
|
||||
checkbox = wait.until(EC.element_to_be_clickable((By.XPATH, checkbox_xpath)))
|
||||
if not checkbox.is_selected():
|
||||
checkbox.click()
|
||||
|
||||
|
||||
# Fill Fees if present
|
||||
if proc.get("billedAmount"):
|
||||
fees_xpath = "//input[@name='ProcedureFee']"
|
||||
wait.until(EC.presence_of_element_located((By.XPATH, fees_xpath))).clear()
|
||||
self.driver.find_element(By.XPATH, fees_xpath).send_keys(proc["billedAmount"])
|
||||
|
||||
# Click "Add Procedure" button
|
||||
add_proc_xpath = "//input[@type='submit' and @value='Add Procedure']"
|
||||
wait.until(EC.element_to_be_clickable((By.XPATH, add_proc_xpath))).click()
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error while filling Procedure Codes: {e}")
|
||||
return "ERROR:PROCEDURE CODES"
|
||||
|
||||
# 2 - Upload PDFs:
|
||||
try:
|
||||
with tempfile.TemporaryDirectory() as tmp_dir:
|
||||
for file_obj in self.upload_files:
|
||||
base64_data = file_obj["bufferBase64"]
|
||||
file_name = file_obj.get("originalname", "tempfile.bin")
|
||||
|
||||
# Ensure valid extension fallback if missing
|
||||
if not any(file_name.lower().endswith(ext) for ext in [".pdf", ".jpg", ".jpeg", ".png", ".webp"]):
|
||||
file_name += ".bin"
|
||||
|
||||
# Full path with original filename inside temp dir
|
||||
tmp_file_path = os.path.join(tmp_dir, file_name)
|
||||
|
||||
# Decode and save
|
||||
with open(tmp_file_path, "wb") as tmp_file:
|
||||
tmp_file.write(base64.b64decode(base64_data))
|
||||
|
||||
# Upload using Selenium
|
||||
file_input = wait.until(EC.presence_of_element_located((By.XPATH, "//input[@name='FileName' and @type='file']")))
|
||||
file_input.send_keys(tmp_file_path)
|
||||
|
||||
upload_button = wait.until(EC.element_to_be_clickable((By.XPATH, "//input[@type='submit' and @value='Upload File']")))
|
||||
upload_button.click()
|
||||
|
||||
time.sleep(3)
|
||||
except Exception as e:
|
||||
print(f"Error while uploading PDFs: {e}")
|
||||
return "ERROR:PDF FAILED"
|
||||
|
||||
# 3 - Indicate Missing Teeth Part
|
||||
try:
|
||||
# Handle the missing teeth section based on the status
|
||||
missing_status = self.missingTeethStatus.strip() if self.missingTeethStatus else "No_missing"
|
||||
|
||||
if missing_status == "No_missing":
|
||||
missing_teeth_no = wait.until(EC.presence_of_element_located((By.XPATH, "//input[@type='checkbox' and @name='PAU_Step3_Checkbox1']")))
|
||||
missing_teeth_no.click()
|
||||
|
||||
elif missing_status == "endentulous":
|
||||
missing_teeth_edentulous = wait.until(EC.presence_of_element_located((By.XPATH, "//input[@type='checkbox' and @name='PAU_Step3_Checkbox2']")))
|
||||
missing_teeth_edentulous.click()
|
||||
|
||||
elif missing_status == "Yes_missing":
|
||||
missing_teeth_dict = self.missingTeeth
|
||||
|
||||
# For each tooth in the missing teeth dict, select the dropdown option
|
||||
for tooth_name, value in missing_teeth_dict.items():
|
||||
if value: # only if there's a value to select
|
||||
select_element = wait.until(EC.presence_of_element_located((By.XPATH, f"//select[@name='{tooth_name}']")))
|
||||
select_obj = Select(select_element)
|
||||
select_obj.select_by_value(value)
|
||||
|
||||
|
||||
# Wait for upload button and click it
|
||||
update_button = wait.until(EC.element_to_be_clickable((By.XPATH, "//input[@type='submit' and @value='Update Missing Teeth']")))
|
||||
update_button.click()
|
||||
|
||||
time.sleep(3)
|
||||
except Exception as e:
|
||||
print(f"Error while filling missing teeth: {e}")
|
||||
return "ERROR:MISSING TEETH FAILED"
|
||||
|
||||
|
||||
# 4 - Update Remarks
|
||||
try:
|
||||
if self.remarks.strip():
|
||||
textarea = wait.until(EC.presence_of_element_located((By.XPATH, "//textarea[@name='Remarks']")))
|
||||
textarea.clear()
|
||||
textarea.send_keys(self.remarks)
|
||||
|
||||
# Wait for update button and click it
|
||||
update_button = wait.until(EC.element_to_be_clickable((By.XPATH, "//input[@type='submit' and @value='Update Remarks']")))
|
||||
update_button.click()
|
||||
|
||||
time.sleep(3)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error while filling remarks: {e}")
|
||||
return "ERROR:REMARKS FAILED"
|
||||
|
||||
# 5 - close buton
|
||||
try:
|
||||
close_button = wait.until(EC.element_to_be_clickable((By.XPATH, "//input[@type='submit' and @value='Submit Request']")))
|
||||
close_button.click()
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
# Switch to alert and accept it
|
||||
try:
|
||||
wait.until(EC.alert_is_present())
|
||||
|
||||
alert = self.driver.switch_to.alert
|
||||
alert.accept()
|
||||
except TimeoutException:
|
||||
print("No alert appeared after clicking the button.")
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error while Closing: {e}")
|
||||
return "ERROR:CLOSE FAILED"
|
||||
|
||||
return "Success"
|
||||
|
||||
|
||||
def reach_to_pdf(self):
|
||||
def step2(self):
|
||||
wait = WebDriverWait(self.driver, 90)
|
||||
|
||||
def wait_for_pdf_download(timeout=60):
|
||||
for _ in range(timeout):
|
||||
files = [f for f in os.listdir(self.download_dir) if f.endswith(".pdf")]
|
||||
if files:
|
||||
return os.path.join(self.download_dir, files[0])
|
||||
time.sleep(1)
|
||||
raise TimeoutError("PDF did not download in time")
|
||||
try:
|
||||
pdf_link_element = wait.until(
|
||||
EC.element_to_be_clickable((By.XPATH, "//a[contains(@href, '.pdf')]"))
|
||||
)
|
||||
time.sleep(5)
|
||||
pdf_relative_url = pdf_link_element.get_attribute("href")
|
||||
|
||||
if not pdf_relative_url.startswith("http"):
|
||||
full_pdf_url = f"https://providers.massdhp.com{pdf_relative_url}"
|
||||
else:
|
||||
full_pdf_url = pdf_relative_url
|
||||
|
||||
print("FULL PDF LINK: ",full_pdf_url)
|
||||
return full_pdf_url
|
||||
eligibilityElement = wait.until(EC.presence_of_element_located((By.XPATH,
|
||||
f"//table[@id='Table3']//tr[td[contains(text(), '{self.memberId}')]]/td[3]")))
|
||||
eligibilityText = eligibilityElement.text
|
||||
|
||||
report_link = wait.until(EC.element_to_be_clickable((By.XPATH, "//a[contains(text(), 'Click here')]")))
|
||||
report_link.click()
|
||||
|
||||
pdf_path = wait_for_pdf_download()
|
||||
print("PDF downloaded at:", pdf_path)
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"eligibility": eligibilityText,
|
||||
"pdf_path": pdf_path
|
||||
}
|
||||
except Exception as e:
|
||||
print(f"ERROR: {str(e)}")
|
||||
return {
|
||||
@@ -332,7 +160,6 @@ class AutomationMassHealthEligibilityCheck:
|
||||
def main_workflow(self, url):
|
||||
try:
|
||||
self.config_driver()
|
||||
print("Reaching Site :", url)
|
||||
self.driver.maximize_window()
|
||||
self.driver.get(url)
|
||||
time.sleep(3)
|
||||
@@ -346,17 +173,10 @@ class AutomationMassHealthEligibilityCheck:
|
||||
return {"status": "error", "message": step1_result}
|
||||
|
||||
step2_result = self.step2()
|
||||
if step2_result.startswith("ERROR"):
|
||||
return {"status": "error", "message": step2_result}
|
||||
|
||||
reachToPdf_result = self.reach_to_pdf()
|
||||
if reachToPdf_result.startswith("ERROR"):
|
||||
return {"status": "error", "message": reachToPdf_result}
|
||||
if step2_result.get("status") == "error":
|
||||
return {"status": "error", "message": step2_result.get("message")}
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"pdf_url": reachToPdf_result
|
||||
}
|
||||
return step2_result
|
||||
except Exception as e:
|
||||
return {
|
||||
"status": "error",
|
||||
|
||||
Reference in New Issue
Block a user