import io import re import pdfplumber DCODE_RE = re.compile(r'^D\d{4}$') MEMBER_RE = re.compile(r'\b(\d{12})\b') # MassHealth member IDs are always 12 digits # ── Page-1 header patterns ──────────────────────────────────────────────────── _H = { "Payee ID": re.compile(r'Payee ID:\s*(\S+)'), "Business NPI": re.compile(r'Business NPI:\s*(\S+)'), "Run #": re.compile(r'Run #:\s*(\S+)'), "RA #": re.compile(r'RA #:\s*(\S+)'), "RA Date": re.compile(r'RA Date:\s*(\S+)'), "Claim Detail Amount": re.compile(r'Claim Detail Amount:\s*([\$\d,\.]+)'), "Claim Adjustment Amount": re.compile(r'Claim Adjustment Amount:\s*([\$\(\)\d,\.]+)'), "Misc. Adjustment Amount": re.compile(r'Misc\. Adjustment Amount:\s*([\$\(\)\d,\.]+)'), "Payment Amount": re.compile(r'Payment Amount:\s*([\$\d,\.]+)'), } def extract_ra_header(pdf_bytes: bytes, filename: str) -> dict: """Extract the cover-page summary (Payee ID, RA #, Payment Amount, etc.).""" header: dict[str, str] = {"Source File": filename} with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf: # Header info lives on pages 1 and 2 — scan both to be safe for page in pdf.pages[:2]: text = page.extract_text() or "" for field, pattern in _H.items(): if field not in header or not header[field]: m = pattern.search(text) if m: header[field] = m.group(1).strip() return header def _c(val) -> str: return str(val).replace("\n", " ").strip() if val else "" def _amt(val: str) -> str: return val.replace("$", "").replace(",", "").strip() if val else "" def _col(headers: list[str], *keywords) -> int | None: for i, h in enumerate(headers): hl = h.lower() if all(k.lower() in hl for k in keywords): return i return None def _find_header_row(table: list[list]) -> tuple[int | None, list[str]]: for i, row in enumerate(table): # Skip merged context rows (only one non-None cell) if sum(1 for c in row if c) <= 1: continue flat = [_c(c) for c in row] j = " ".join(flat).lower() if "patient name" in j and "icn" in j and "code" not in j: return i, flat if ("submitted" in j and "code" in j) or "paid code" in j: return i, flat return None, [] def _is_summary(h: list[str]) -> bool: j = " ".join(h).lower() return "patient name" in j and "icn" in j and "code" not in j def _is_detail(h: list[str]) -> bool: j = " ".join(h).lower() return ("submitted" in j and "code" in j) or "paid code" in j def _merge_headers(table: list[list], hdr_idx: int) -> list[str]: n = max(len(r) for r in table[: hdr_idx + 1]) merged = [] for ci in range(n): parts = [_c(table[ri][ci]) for ri in range(hdr_idx + 1) if ci < len(table[ri]) and table[ri][ci]] merged.append(" ".join(parts)) return merged # ── Pass 1: summary pages → {icn: patient_name} ────────────────────────────── def _build_patient_map(pdf) -> dict[str, str]: patient_map: dict[str, str] = {} for page in pdf.pages: for tobj in page.find_tables(): table = tobj.extract() if not table or len(table) < 2: continue hdr_idx, headers = _find_header_row(table) if hdr_idx is None or not _is_summary(headers): continue pi = _col(headers, "Patient Name") ii = _col(headers, "ICN") if pi is None or ii is None: continue for row in table[hdr_idx + 1:]: if not row: continue patient = _c(row[pi]) if pi < len(row) else "" icn = _c(row[ii]) if ii < len(row) else "" if not patient or not icn: continue if "Total" in patient or not icn.replace(" ", "").isdigit(): continue patient_map[icn] = patient return patient_map # ── Pass 2: detail pages → {icn: procedure_dict} ───────────────────────────── def _build_detail_map(pdf) -> dict[str, dict]: detail_map: dict[str, dict] = {} for page in pdf.pages: for tobj in page.find_tables(): table = tobj.extract() if not table or len(table) < 2: continue hdr_idx, headers = _find_header_row(table) if hdr_idx is None or not _is_detail(headers): continue # ICN is in the merged first cell (row 0) context_cell = str(table[0][0]) if table[0] and table[0][0] else "" icn_m = re.search(r'ICN:\s*(\d+)', context_cell) member_m = MEMBER_RE.search(context_cell) icn = icn_m.group(1) if icn_m else "" member = member_m.group(1) if member_m else "" if not icn: continue h = _merge_headers(table, hdr_idx) sub_code_i = _col(h, "Submitted", "Code") paid_code_i = _col(h, "Paid", "Code") tooth_i = _col(h, "Tooth") date_i = _col(h, "Date") allowed_i = _col(h, "Allowed") sub_amt_i = paid_amt_i = None for i, col_h in enumerate(h): lh = col_h.lower() if "submitted" in lh and "code" not in lh and sub_amt_i is None: sub_amt_i = i if "paid" in lh and "code" not in lh and ("amount" in lh or paid_amt_i is None): paid_amt_i = i for row in table[hdr_idx + 1:]: if not row: continue cdt = _c(row[sub_code_i]) if sub_code_i is not None and sub_code_i < len(row) else "" if not DCODE_RE.match(cdt): continue paid_code = _c(row[paid_code_i]) if paid_code_i is not None and paid_code_i < len(row) else "" tooth = _c(row[tooth_i]) if tooth_i is not None and tooth_i < len(row) else "" date = _c(row[date_i]) if date_i is not None and date_i < len(row) else "" sub_a = _amt(_c(row[sub_amt_i])) if sub_amt_i is not None and sub_amt_i < len(row) else "" allow_a = _amt(_c(row[allowed_i])) if allowed_i is not None and allowed_i < len(row) else "" paid_a = _amt(_c(row[paid_amt_i])) if paid_amt_i is not None and paid_amt_i < len(row) else "" detail_map[icn] = { "Member #": member, "Submitted Code": cdt, "Paid Code": paid_code, "Tooth": tooth, "Date of Service": date, "Submitted Amount": sub_a, "Allowed Amount": allow_a, "Paid Amount": paid_a, } return detail_map # ── Main: join on ICN ───────────────────────────────────────────────────────── def extract_ra_pdf(pdf_bytes: bytes, filename: str) -> dict: """ Two-pass extraction of a MassHealth Remittance Advice PDF. Returns: { "header": { Payee ID, Business NPI, Run #, RA #, RA Date, Claim Detail Amount, Claim Adjustment Amount, Misc. Adjustment Amount, Payment Amount }, "rows": [ one dict per ICN … ] } """ header = extract_ra_header(pdf_bytes, filename) with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf: patient_map = _build_patient_map(pdf) detail_map = _build_detail_map(pdf) rows = [] for icn, patient_name in patient_map.items(): detail = detail_map.get(icn, {}) rows.append({ "Patient Name": patient_name, "Member #": detail.get("Member #", ""), "ICN": icn, "Submitted Code": detail.get("Submitted Code", ""), "Paid Code": detail.get("Paid Code", ""), "Tooth": detail.get("Tooth", ""), "Date of Service": detail.get("Date of Service", ""), "Submitted Amount": detail.get("Submitted Amount", ""), "Allowed Amount": detail.get("Allowed Amount", ""), "Paid Amount": detail.get("Paid Amount", ""), "Source File": filename, }) return {"header": header, "rows": rows}