#!/usr/bin/env python3 """ Scan PDF and Word documents (.docx) for Danish CPR numbers and dates. Handles text-based and image-based (scanned) PDFs automatically via OCR. Supports masking, full anonymisation, dry-run preview, and JSON logging. Supported formats: .pdf, .docx, .xlsx, .xlsm, .csv, .jpg, .jpeg, .png, .bmp, .tiff, .webp (.doc requires conversion: soffice --headless --convert-to docx file.doc) Usage: python document_scanner.py file.pdf python document_scanner.py file.docx python document_scanner.py file1.pdf file2.docx spreadsheet.xlsx /path/to/folder/ Options: --mask Redact CPR numbers only -> _masked.pdf/.docx --anonymise Redact all personal data -> _anonymised.pdf/.docx (CPR, names, addresses, phone numbers, emails) --dry-run Scan and report without writing any output files --log FILE Write a structured JSON log of all findings to FILE --older-than DAYS List files with CPR numbers AND dates older than DAYS --ocr Force OCR on every page (even if text is extractable) --lang LANG Tesseract language(s), default: dan+eng --dpi DPI DPI for OCR image rendering, default: 300 --poppler PATH Path to Poppler bin folder (Windows only) Dependencies: pip install pdfplumber pdf2image pytesseract pypdf reportlab spacy python-docx openpyxl opencv-python python -m spacy download da_core_news_lg # Danish NER model (~500 MB) System packages: macOS: brew install tesseract tesseract-lang poppler Linux: sudo apt install tesseract-ocr tesseract-ocr-dan poppler-utils Note: Python 3.12 recommended -- spaCy does not yet support Python 3.14. Recommended workflow: # 1. Dry run first to audit without writing anything python document_scanner.py /folder/ --anonymise --dry-run --log audit.json # 2. Run for real once satisfied python document_scanner.py /folder/ --anonymise --log run.json """ import argparse import hashlib import io import json import logging import re import sqlite3 import sys from datetime import date, datetime, timedelta from pathlib import Path # Suppress pdfminer's noisy font-descriptor warnings that appear when PDFs # contain malformed or incomplete font definitions. These do not affect text # extraction or CPR detection — the warning is informational only. logging.getLogger("pdfminer").setLevel(logging.ERROR) logging.getLogger("pdfminer.pdffont").setLevel(logging.ERROR) logging.getLogger("pdfminer.pdfpage").setLevel(logging.ERROR) logging.getLogger("pdfplumber").setLevel(logging.ERROR) # ── Dependency checks ────────────────────────────────────────────────────────── try: import pdfplumber except ImportError: print("Missing dependency. Install with: pip install pdfplumber") sys.exit(1) try: from pdf2image import convert_from_path PDF2IMAGE_OK = True except ImportError: PDF2IMAGE_OK = False try: import pytesseract TESSERACT_OK = True except ImportError: TESSERACT_OK = False OCR_AVAILABLE = PDF2IMAGE_OK and TESSERACT_OK try: from pypdf import PdfReader, PdfWriter from reportlab.pdfgen import canvas as rl_canvas from reportlab.lib.colors import black as rl_black MASK_AVAILABLE = True except ImportError: MASK_AVAILABLE = False try: import fitz as _fitz # PyMuPDF — for secure (sanitised) PDF redaction PYMUPDF_AVAILABLE = True except ImportError: PYMUPDF_AVAILABLE = False try: import spacy SPACY_OK = True except ImportError: SPACY_OK = False try: from docx import Document as DocxDocument DOCX_OK = True except ImportError: DOCX_OK = False try: import openpyxl XLSX_OK = True except ImportError: XLSX_OK = False # cv2 is imported lazily inside _get_cv2() to avoid macOS recursion errors. # Never import cv2 at module level or from server.py. CV2_OK = False def _face_log(msg: str): """Debug logging — file output disabled.""" import sys as _sys print(msg, file=_sys.stderr, flush=True) _cv2_version = None _cv2_import_error = None _cv2_mod = None _np_mod = None def _get_cv2(): """Return (cv2, numpy) tuple, importing once on first call. In a PyInstaller bundle we exclude cv2/__init__.py entirely (it causes a macOS arm64 recursion crash) and load cv2.abi3.so directly instead. Outside the bundle, plain 'import cv2' works normally. """ global CV2_OK, _cv2_version, _cv2_import_error, _cv2_mod, _np_mod if _cv2_mod is not None: return _cv2_mod, _np_mod if _cv2_import_error is not None: return None, None # already tried and failed try: import sys as _sys import numpy as _np if getattr(_sys, "frozen", False): # Bundle has cv2.abi3.so but NOT cv2/__init__.py. # Load the .so directly and register it as 'cv2'. import importlib.util as _ilu import types as _types from pathlib import Path as _Path _so = _Path(_sys._MEIPASS) / "cv2" / "cv2.abi3.so" if not _so.exists(): raise RuntimeError(f"cv2.abi3.so not found at {_so}") _spec = _ilu.spec_from_file_location("cv2", str(_so), submodule_search_locations=[]) _cv2 = _ilu.module_from_spec(_spec) _sys.modules["cv2"] = _cv2 # register before exec to break cycles _spec.loader.exec_module(_cv2) # Wire up cv2.data.haarcascades for cascade path resolution _data = _types.ModuleType("cv2.data") _data.haarcascades = str(_Path(_sys._MEIPASS) / "cv2" / "data") + "/" _sys.modules["cv2.data"] = _data _cv2.data = _data else: import cv2 as _cv2 if not hasattr(_cv2, "imread"): raise RuntimeError( f"cv2 binary not loaded (file: {getattr(_cv2, '__file__', '?')})" ) _cv2_version = getattr(_cv2, "__version__", "unknown") CV2_OK = True _cv2_mod = _cv2 _np_mod = _np _cv2_import_error = None except Exception as e: CV2_OK = False _cv2_import_error = str(e) import sys as _sys _sys.modules.pop("cv2", None) # clean up partial registration return _cv2_mod, _np_mod # spaCy model preference: large Danish → medium → small → multilingual → English fallback SPACY_MODEL_PREFERENCE = [ "da_core_news_lg", "da_core_news_md", "da_core_news_sm", "xx_ent_wiki_sm", "en_core_web_sm", ] _NLP = None # lazy-loaded singleton def load_nlp(): """Load the best available spaCy model. Returns model or None.""" global _NLP if _NLP is not None: return _NLP if not SPACY_OK: return None import sys as _sys _frozen = getattr(_sys, "frozen", False) for model_name in SPACY_MODEL_PREFERENCE: try: import importlib as _il _mod = _il.import_module(model_name) _NLP = _mod.load() print(f" [NER] Loaded spaCy model: {model_name}", flush=True) _face_log(f"[NER] Loaded spaCy model: {model_name}") return _NLP except Exception as _e: _face_log(f"[NER] {model_name} failed: {_e} (frozen={_frozen})") continue return None # ── OCR page cache ─────────────────────────────────────────────────────────── _OCR_CACHE_PATH = Path.home() / ".document_scanner_ocr_cache.db" class OCRCache: """ SQLite-backed cache for OCR text extraction. Key: SHA-256 of the raw page image bytes + lang string Value: extracted text string This means: - Rescanning the same file reuses cached text (near-instant). - Editing a file invalidates its pages (hash changes). - Different OCR language settings get separate cache entries. - The cache is shared across all processes (safe: writes are idempotent). """ def __init__(self, path: Path = _OCR_CACHE_PATH): self._path = path self._conn: sqlite3.Connection | None = None def _connect(self) -> sqlite3.Connection: if self._conn is None: conn = sqlite3.connect(str(self._path), check_same_thread=False, timeout=10) conn.execute(""" CREATE TABLE IF NOT EXISTS ocr_cache ( key TEXT PRIMARY KEY, text TEXT NOT NULL, ts INTEGER NOT NULL ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_ts ON ocr_cache(ts)") conn.commit() self._conn = conn return self._conn @staticmethod def _key(image_bytes: bytes, lang: str) -> str: h = hashlib.sha256(image_bytes) h.update(lang.encode()) return h.hexdigest() def get(self, image_bytes: bytes, lang: str) -> str | None: key = self._key(image_bytes, lang) try: row = self._connect().execute( "SELECT text FROM ocr_cache WHERE key=?", (key,) ).fetchone() return row[0] if row else None except Exception: return None def put(self, image_bytes: bytes, lang: str, text: str) -> None: key = self._key(image_bytes, lang) ts = int(datetime.now().timestamp()) try: self._connect().execute( "INSERT OR REPLACE INTO ocr_cache(key, text, ts) VALUES(?,?,?)", (key, text, ts), ) self._connect().commit() except Exception: pass def prune(self, max_entries: int = 50_000) -> None: """Delete oldest entries when the cache grows beyond max_entries.""" try: conn = self._connect() n = conn.execute("SELECT COUNT(*) FROM ocr_cache").fetchone()[0] if n > max_entries: to_del = n - max_entries conn.execute(""" DELETE FROM ocr_cache WHERE key IN ( SELECT key FROM ocr_cache ORDER BY ts ASC LIMIT ? ) """, (to_del,)) conn.commit() except Exception: pass def clear(self) -> None: try: self._connect().execute("DELETE FROM ocr_cache") self._connect().commit() except Exception: pass def stats(self) -> dict: try: conn = self._connect() n = conn.execute("SELECT COUNT(*) FROM ocr_cache").fetchone()[0] size = self._path.stat().st_size if self._path.exists() else 0 return {"entries": n, "size_bytes": size} except Exception: return {"entries": 0, "size_bytes": 0} # Module-level singleton — shared within a process _ocr_cache = OCRCache() def ocr_page_cached(image, lang: str) -> str: """ Run Tesseract OCR on `image`, returning cached text when available. Falls back to uncached OCR if the cache is unavailable. """ import io as _io # Serialise image to bytes for hashing (use PNG for lossless round-trip) buf = _io.BytesIO() image.save(buf, format="PNG") img_bytes = buf.getvalue() cached = _ocr_cache.get(img_bytes, lang) if cached is not None: return cached text = ocr_page(image, lang) _ocr_cache.put(img_bytes, lang, text) _ocr_cache.prune() return text # ── Patterns ────────────────────────────────────────────────────────────────── # Danish CPR: DDMMYY-XXXX or DDMMYYXXXX (optional space/dash separator) CPR_PATTERN = re.compile(r"\b(\d{2})(\d{2})(\d{2})[-\s]?(\d{4})\b") DATE_PATTERNS = [ (re.compile(r"\b(\d{4})[-/](\d{1,2})[-/](\d{1,2})\b"), "ISO YYYY-MM-DD"), (re.compile(r"\b(\d{1,2})[.\-/](\d{1,2})[.\-/](\d{4})\b"), "DD.MM.YYYY"), (re.compile(r"\b(\d{1,2})[.\-/](\d{1,2})[.\-/](\d{2})\b"), "DD.MM.YY"), (re.compile( r"\b(\d{1,2})\.\s*(januar|februar|marts|april|maj|juni|juli|" r"august|september|oktober|november|december)\s+(\d{4})\b", re.IGNORECASE), "D. maaned YYYY"), (re.compile( r"\b(\d{1,2})\s+(January|February|March|April|May|June|July|" r"August|September|October|November|December)\s+(\d{4})\b", re.IGNORECASE), "D Month YYYY"), (re.compile( r"\b(January|February|March|April|May|June|July|August|" r"September|October|November|December)\s+(\d{1,2}),?\s+(\d{4})\b", re.IGNORECASE), "Month D, YYYY"), ] # ── Regex patterns for PII beyond CPR ───────────────────────────────────────── # Danish phone: 8 digits, optionally grouped in pairs/fours with spaces or dashes # Also matches +45 prefix PHONE_PATTERN = re.compile( r"(? bool: """Return True if a DANISH_NAME_PATTERN match looks like a real person name.""" parts = m.group(0).split() if len(parts) < 2: return False # First and last parts must be at least 2 chars if len(parts[0]) < 2 or len(parts[-1]) < 2: return False # Reject if the first (capitalised) word is a document stopword if parts[0].lower() in _NAME_STOPWORDS: return False # Reject if ALL non-particle parts are stopwords real_parts = [p for p in parts if p.lower() not in _NAME_PARTICLES] if all(p.lower() in _NAME_STOPWORDS for p in real_parts): return False # Reject strings that are all-uppercase (acronyms, e.g. "CVR NR") if all(p.isupper() and len(p) > 1 for p in parts): return False # Require at least the first word to look like a name (starts uppercase, has lowercase) if not re.search(r'[a-zæøå]', parts[0]): return False return True # Words that strongly suggest a nearby 10-digit sequence is a CPR number. # Used by cpr_context_boost() to raise the risk score. CPR_CONTEXT_WORDS = re.compile( r"\b(?:cpr|personnummer|person[\-\s]?nr|cpr[\-\s]?nr|" r"f\.?d\.?t\.?|fodt|fødselsdato|fdato|" r"born|date\s+of\s+birth|dob|" r"civil\s*registration|NemID|MitID)\b", re.IGNORECASE | re.UNICODE, ) # ── False-positive exclusion: invoice / document-number context ─────────────── # If any of these words appear within ~120 characters of a candidate match, # it is very likely an invoice number, order number, or part number — not a CPR. CPR_FALSE_POSITIVE_WORDS = re.compile( r"\b(?:" # Invoice / order documents r"faktura(?:nr|nummer)?|invoice|invoicenr|invno|inv\.?\s*no" r"|ordre(?:nr|nummer)?|order(?:nr|number)?" r"|rekvisition|requisition" r"|tilbud(?:snr|snummer)?" r"|kvittering" r"|kreditnota|credit\s*note" # Item / part / product references r"|varenr|vare(?:nummer)?" r"|art(?:ikel)?(?:nr|nummer|no)?" r"|item\s*(?:nr|no|number|#)?" r"|part\s*(?:nr|no|number|#)?" r"|produkt(?:nr|nummer)?" r"|model(?:nr|number)?" r"|serial\s*(?:nr|no|number)?" r"|serie(?:nr|nummer)?" r"|lot\s*(?:nr|no|number)?" r"|batch\s*(?:nr|no|number)?" # Reference / document codes r"|referencenr|ref(?:erence)?\.?\s*(?:nr|no|number)?" r"|sagsnr|sags(?:nummer)?" r"|doc(?:ument)?\s*(?:nr|no|number|#)?" r"|bilag(?:snr|snummer)?" r"|bogf(?:øring)?" r"|kontonr|konto(?:nummer)?" r"|ean\s*(?:nr|no|number)?" r"|gln" r"|p(?:urchase)?\s*order" r"|po\s*(?:nr|no|number)?" r"|so\s*(?:nr|no)?" # sales order # Typical invoice line columns r"|antal|quantity|qty" r"|stk\.|pcs\.|units?" r"|enhedspris|unit\s*price" r"|rabat|discount" r"|moms|vat|tax" r"|subtotal|i\s*alt|total\s*(?:ekskl|inkl)" r")\b", re.IGNORECASE, ) # Characters that, if appearing immediately before the 10-digit match, # indicate it's embedded in a longer document/product code — not a CPR. # e.g. "REF-250312-4821", "ART250312-4821", "V250312-4821" _CPR_PREFIX_NOISE = re.compile(r"[A-Za-z0-9]$") def _is_false_positive(text: str, match_start: int, match_end: int, window: int = 120) -> bool: """ Return True if the 10-digit candidate is almost certainly NOT a CPR number. Two checks: 1. Invoice/order/part-number keyword within `window` chars of the match. 2. The character immediately preceding the match is alphanumeric (suggests the number is part of a product or reference code). """ # Check 1 — surrounding keyword context lo = max(0, match_start - window) hi = min(len(text), match_end + window) if CPR_FALSE_POSITIVE_WORDS.search(text[lo:hi]): return True # Check 2 — prefix character (letter or digit immediately before match) if match_start > 0 and _CPR_PREFIX_NOISE.search(text[match_start - 1]): return True return False def cpr_context_boost(text: str, cpr_match_start: int, cpr_match_end: int, window: int = 80) -> bool: """ Return True if a CPR-context keyword appears within `window` characters of the match — used to boost risk score for contextually confirmed CPRs. """ lo = max(0, cpr_match_start - window) hi = min(len(text), cpr_match_end + window) return bool(CPR_CONTEXT_WORDS.search(text[lo:hi])) # ── NER entity types to redact ───────────────────────────────────────────────── # spaCy label → human label. Covers Danish (da_core_news) and multilingual models. NER_REDACT_LABELS = { "PER": "NAME", # da_core_news "PERSON": "NAME", # en_core_web / xx_ent_wiki "LOC": "ADDRESS", # da_core_news locations (includes addresses) "GPE": "ADDRESS", # geopolitical entity (en/xx models) "FAC": "ADDRESS", # facilities / addresses "ORG": "ORG", # organisations (optional — included for thoroughness) } # ── General helpers ─────────────────────────────────────────────────────────── # Official CPR mod-11 weights applied to digits 1-10 _MOD11_WEIGHTS = (4, 3, 2, 7, 6, 5, 4, 3, 2, 1) def _passes_mod11(dd: str, mm: str, yy: str, seq: str) -> bool: """ Return True if the 10-digit CPR passes the official Danish mod-11 checksum. Note: Denmark stopped issuing mod-11-valid CPR numbers around 2007 when the number space was exhausted. Post-2007 births have CPR numbers that do NOT pass this check — so mod-11 failure does NOT prove a number is fake. Use this as a CONFIDENCE signal, not a hard gate. """ digits = [int(c) for c in (dd + mm + yy + seq)] return sum(d * w for d, w in zip(digits, _MOD11_WEIGHTS)) % 11 == 0 def is_valid_cpr(dd, mm, yy, seq): """ Validate a candidate CPR number. Returns: (False, False) — fails date/range/century check — not a CPR (True, True) — passes date check AND mod-11 checksum (high confidence) (True, False) — passes date only, not mod-11 (post-2007 numbers are legitimately valid but fail mod-11 — require context) Rules applied: - Month must be 01-12 - Day must be 01-31 (or 41-71 for protected numbers where day += 40) - The date DDMMYY must be a real calendar date (e.g. 310200 is invalid) - Sequence (last 4 digits) must not be 0000 - Century digit (first digit of seq) must be consistent with the year according to the official Danish CPR century table CPR century digit rules (7th digit → birth century): 0-3 → always 1900s 4 → 1937-1999 → 1900s ; 2000-2036 → 2000s 5-8 → 1858-1899 → 1800s ; 1900-1999 → 1900s (effectively 1900s for modern docs) 9 → 1937-1999 → 1900s ; 2000-2036 → 2000s """ try: d, m, y, s = int(dd), int(mm), int(yy), int(seq) except ValueError: return False, False # Reject all-zero sequence if s == 0: return False, False # Normalise protected numbers (day += 40) d_norm = d - 40 if d > 40 else d # Basic range checks if not (1 <= m <= 12): return False, False if not (1 <= d_norm <= 31): return False, False # Determine century from 7th digit (first digit of seq) c7 = s // 1000 if c7 in (0, 1, 2, 3): century = 1900 elif c7 == 4: century = 2000 if y <= 36 else 1900 elif c7 in (5, 6, 7, 8): century = 1900 elif c7 == 9: century = 2000 if y <= 36 else 1900 else: return False, False # Validate actual calendar date (catches 310200, 290200 in non-leap years, etc.) try: date(century + y, m, d_norm) except ValueError: return False, False return True, _passes_mod11(dd, mm, yy, seq) def is_text_page(page) -> bool: text = page.extract_text() or "" return len(text.replace(" ", "").replace("\n", "")) >= 20 def ocr_page(image, lang: str) -> str: config = "--oem 3 --psm 3" return pytesseract.image_to_string(image, lang=lang, config=config) def extract_matches(text: str, page_num: int, source: str): """Extract CPR numbers and dates. Returns (cprs, dates).""" cprs, dates = [], [] for m in CPR_PATTERN.finditer(text): dd, mm, yy, seq = m.groups() date_ok, mod11_ok = is_valid_cpr(dd, mm, yy, seq) if not date_ok: continue if _is_false_positive(text, m.start(), m.end()): continue ctx = cpr_context_boost(text, m.start(), m.end()) # Gate: require mod-11 OR explicit CPR context keyword. # This rejects ~91% of random date-valid numbers (invoice/part numbers) # while keeping real post-2007 CPRs that appear with explicit labels. if not mod11_ok and not ctx: continue cprs.append({"page": page_num, "raw": m.group(0), "formatted": f"{dd}{mm}{yy}-{seq}", "source": source, "context_confirmed": ctx, "mod11": mod11_ok}) for pattern, fmt in DATE_PATTERNS: for m in pattern.finditer(text): dates.append({"page": page_num, "raw": m.group(0), "format": fmt, "source": source}) return cprs, dates def dedup_dates(dates): seen, result = set(), [] for d in dates: key = (d["page"], d["raw"].strip()) if key not in seen: seen.add(key) result.append(d) return result def count_pii_types(text: str, use_ner: bool = True) -> dict: """ Count all PII types in text. Returns e.g. {"PHONE": 2, "EMAIL": 1, "IBAN": 0, "BANK_ACCOUNT": 1, "NAME": 3, "ADDRESS": 1, "ORG": 2}. NER (NAME/ADDRESS/ORG) is run when use_ner=True and the spaCy model is loaded. """ counts: dict[str, int] = { "PHONE": 0, "EMAIL": 0, "IBAN": 0, "BANK_ACCOUNT": 0, "NAME": 0, "ADDRESS": 0, "ORG": 0, } for m in PHONE_PATTERN.finditer(text): raw = m.group(0).replace(" ", "").replace("-", "").lstrip("+") digits = re.sub(r"\D", "", raw) if len(digits) in (8, 10, 11): counts["PHONE"] += 1 for _ in EMAIL_PATTERN.finditer(text): counts["EMAIL"] += 1 for _ in IBAN_PATTERN.finditer(text): counts["IBAN"] += 1 for m in REG_KONTO_PATTERN.finditer(text): reg, acct = m.group(1), m.group(2) if 1 <= int(reg) <= 9999 and len(acct) >= 6: counts["BANK_ACCOUNT"] += 1 # NER-based counts — only run if model is loaded and text is non-trivial if use_ner and len(text.strip()) > 20: nlp = load_nlp() if nlp: NER_LIMIT = 20_000 for chunk_start in range(0, min(len(text), NER_LIMIT * 10), NER_LIMIT): chunk = text[chunk_start:chunk_start + NER_LIMIT] if not chunk.strip(): continue doc = nlp(chunk) for ent in doc.ents: mapped = NER_REDACT_LABELS.get(ent.label_) if mapped in counts: counts[mapped] += 1 return counts # ── Date parsing (for --older-than) ────────────────────────────────────────── MONTH_DA = {"januar":1,"februar":2,"marts":3,"april":4,"maj":5,"juni":6, "juli":7,"august":8,"september":9,"oktober":10,"november":11,"december":12} MONTH_EN = {"january":1,"february":2,"march":3,"april":4,"may":5,"june":6, "july":7,"august":8,"september":9,"october":10,"november":11,"december":12} def parse_date(raw: str, fmt: str): raw = raw.strip() try: if fmt == "ISO YYYY-MM-DD": return datetime.strptime(raw, "%Y-%m-%d").date() if fmt in ("DD.MM.YYYY", "DD.MM.YY"): for sep in ".-/": try: d, m, y = raw.split(sep) y = int(y) if fmt == "DD.MM.YY": y += 2000 if y <= 30 else 1900 return date(y, int(m), int(d)) except Exception: pass if fmt == "D. maaned YYYY": mo = re.match(r"(\d{1,2})\.\s*(\w+)\s+(\d{4})", raw, re.IGNORECASE) if mo: d, mon, y = mo.groups() mn = MONTH_DA.get(mon.lower()) if mn: return date(int(y), mn, int(d)) if fmt == "D Month YYYY": mo = re.match(r"(\d{1,2})\s+(\w+)\s+(\d{4})", raw, re.IGNORECASE) if mo: d, mon, y = mo.groups() mn = MONTH_EN.get(mon.lower()) if mn: return date(int(y), mn, int(d)) if fmt == "Month D, YYYY": mo = re.match(r"(\w+)\s+(\d{1,2}),?\s+(\d{4})", raw, re.IGNORECASE) if mo: mon, d, y = mo.groups() mn = MONTH_EN.get(mon.lower()) if mn: return date(int(y), mn, int(d)) except Exception: pass return None def older_than(d, days: int) -> bool: return d <= date.today() - timedelta(days=days) def build_flagged_list(all_results, min_age_days): flagged = [] for path, results in all_results: if not results["cprs"]: continue old_dates = [] for hit in results["dates"]: d = parse_date(hit["raw"], hit["format"]) if d and older_than(d, min_age_days): old_dates.append((d, hit["raw"], hit["page"])) if old_dates: old_dates.sort(key=lambda x: x[0]) flagged.append({"path": path, "cpr_count": len(results["cprs"]), "oldest_date": old_dates[0], "old_dates": old_dates}) return flagged def print_flagged(flagged, min_age_days): print(f"\n{'#'*62}") print(f" FILES WITH CPR + DATES OLDER THAN {min_age_days} DAYS: {len(flagged)}") print(f"{'#'*62}") if not flagged: print(" None found.\n") return for i, entry in enumerate(flagged, 1): oldest_d, oldest_raw, oldest_page = entry["oldest_date"] print(f"\n {i}. {entry['path']}") print(f" CPR numbers : {entry['cpr_count']}") print(f" Oldest date : {oldest_raw} ({oldest_d.isoformat()}, page {oldest_page})") for d, raw, pg in entry["old_dates"][1:4]: print(f" {raw} ({d.isoformat()}, page {pg})") if len(entry["old_dates"]) > 4: print(f" ... and {len(entry['old_dates'])-4} more") print() # ── PII detection: text spans ───────────────────────────────────────────────── def find_pii_spans_in_text(text: str, use_ner: bool = True) -> list[tuple[int, int, str]]: """ Return list of (start, end, label) for all PII found in text. Covers: CPR, phone, email, and (if use_ner) NER entities. """ spans = [] # CPR for m in CPR_PATTERN.finditer(text): dd, mm, yy, seq = m.groups() date_ok, mod11_ok = is_valid_cpr(dd, mm, yy, seq) if not date_ok: continue if _is_false_positive(text, m.start(), m.end()): continue ctx = cpr_context_boost(text, m.start(), m.end()) if not mod11_ok and not ctx: continue spans.append((m.start(), m.end(), "CPR")) # Phone for m in PHONE_PATTERN.finditer(text): raw = m.group(0).replace(" ", "").replace("-", "").lstrip("+") digits = re.sub(r"\D", "", raw) if len(digits) in (8, 10, 11): # 8=DK, 10/11=with country code spans.append((m.start(), m.end(), "PHONE")) # Email for m in EMAIL_PATTERN.finditer(text): spans.append((m.start(), m.end(), "EMAIL")) # Danish IBAN for m in IBAN_PATTERN.finditer(text): spans.append((m.start(), m.end(), "IBAN")) # Danish REG/Konto bank account (only when plausibly formatted as account) for m in REG_KONTO_PATTERN.finditer(text): reg, acct = m.group(1), m.group(2) if 1 <= int(reg) <= 9999 and len(acct) >= 6: spans.append((m.start(), m.end(), "BANK_ACCOUNT")) # Danish postal addresses for m in DANISH_ADDRESS_PATTERN.finditer(text): # Only include if the match is long enough to avoid false positives if len(m.group(0).strip()) >= 8: spans.append((m.start(), m.end(), "ADDRESS")) # Regex-based name detection — catches isolated "Firstname Lastname" cells # where spaCy has no surrounding context to work from. if use_ner: for m in DANISH_NAME_PATTERN.finditer(text): if _is_name_match(m): spans.append((m.start(), m.end(), "NAME")) # NER (names, addresses, orgs) # Cap at 20 000 chars per call — spaCy NER is O(n) but dense tabular text # (e.g. Excel-converted PDFs) can have thousands of tokens per page and stall. # # Context boosting: spaCy needs sentence context to recognise isolated names. # For short text (< 80 chars, e.g. a single cell or line) we prepend a label # so the model sees "Navn: Peter Hansen" instead of bare "Peter Hansen". # Matches are shifted back by the prefix length before being recorded. if use_ner: nlp = load_nlp() if nlp: NER_LIMIT = 20_000 PREFIX = "Navn: " PLEN = len(PREFIX) # Only inject prefix for short/isolated text if len(text.strip()) < 80: ner_input = PREFIX + text ner_offset = -PLEN else: ner_input = text ner_offset = 0 for chunk_start in range(0, min(len(ner_input), NER_LIMIT * 10), NER_LIMIT): chunk = ner_input[chunk_start:chunk_start + NER_LIMIT] if not chunk.strip(): continue doc = nlp(chunk) for ent in doc.ents: if ent.label_ in NER_REDACT_LABELS: s = chunk_start + ent.start_char + ner_offset e = chunk_start + ent.end_char + ner_offset if e <= 0: # entity was entirely within the prefix continue spans.append((max(s, 0), e, NER_REDACT_LABELS[ent.label_])) # Merge overlapping spans spans.sort() merged = [] for start, end, label in spans: if merged and start <= merged[-1][1]: prev_s, prev_e, prev_l = merged[-1] merged[-1] = (prev_s, max(prev_e, end), prev_l) else: merged.append((start, end, label)) return merged # ── Bounding box finders ────────────────────────────────────────────────────── def find_pii_char_bboxes(page, use_ner: bool = True) -> list[tuple[float, float, float, float, str]]: """ Return (x0, top, x1, bottom, label) for all PII on a text-based pdfplumber page. Uses extract_words() for bbox lookup, but extract_text() for the NER text so that spaCy sees newlines between lines — critical for name recognition. Without newlines, names from adjacent rows run together and spaCy misses them. """ words = page.extract_words(keep_blank_chars=False, x_tolerance=3, y_tolerance=3) if not words: return [] # Build a word-span index for bbox lookup (space-separated, no newlines) word_text = "" word_spans = [] for w in words: ws = len(word_text) word_text += w["text"] word_spans.append((ws, len(word_text), w)) word_text += " " # For PII/NER detection use extract_text() which preserves newlines between # lines — spaCy needs sentence structure to reliably recognise names. ner_text = page.extract_text() or word_text spans = find_pii_spans_in_text(ner_text, use_ner=use_ner) bboxes = [] PAD = 1 for span_start, span_end, label in spans: # The matched span is in ner_text coordinates. Map to word_text by # extracting the matched surface form and fuzzy-searching in word_text. matched_surface = ner_text[span_start:span_end].strip() if not matched_surface: continue # Search for the token sequence in the word list # Split matched surface into tokens (same split as extract_words uses) import re as _re tokens = _re.split(r'\s+', matched_surface) tokens = [t for t in tokens if t] hit_words = [] if tokens: # Find the first word that starts with the first token for i, (ws, we, w) in enumerate(word_spans): if w["text"].startswith(tokens[0]) or tokens[0].startswith(w["text"]): # Try to match the full token sequence from here candidate = word_spans[i:i + len(tokens)] if len(candidate) == len(tokens): hit_words = [cw for (_, _, cw) in candidate] break # Partial match — just take as many words as match hit_words = [cw for (_, _, cw) in candidate] break if not hit_words: # Fallback: find words whose text overlaps with matched_surface tokens surface_lower = matched_surface.lower() hit_words = [w for (_, _, w) in word_spans if w["text"].lower() in surface_lower or surface_lower in w["text"].lower()] if not hit_words: continue bboxes.append(( min(w["x0"] for w in hit_words) - PAD, min(w["top"] for w in hit_words) - PAD, max(w["x1"] for w in hit_words) + PAD, max(w["bottom"] for w in hit_words) + PAD, label, )) return bboxes def find_cpr_char_bboxes(page): """ CPR-only version for --mask (no NER). Uses extract_words() to build the text string — the same tokenisation that extract_text() uses during scanning. Raw page.chars iteration fails on Excel-converted PDFs where chars have no inter-word spacing or are stored in a different order than reading order, causing CPR patterns to either not match or match at the wrong offsets. Strategy: 1. Build a word list with bboxes via extract_words(). 2. Concatenate words (space-separated) and run CPR_PATTERN on that string. 3. For each match, find which word(s) it falls in and union their bboxes. Add a small padding so the black box covers the full glyph. """ words = page.extract_words(keep_blank_chars=False, x_tolerance=3, y_tolerance=3) if not words: return [] # Build concatenated text and track each word's start offset full_text = "" word_spans = [] # (start_offset, end_offset, word_dict) for w in words: start = len(full_text) full_text += w["text"] word_spans.append((start, len(full_text), w)) full_text += " " # space separator between words bboxes = [] for m in CPR_PATTERN.finditer(full_text): dd, mm, yy, seq = m.groups() date_ok, mod11_ok = is_valid_cpr(dd, mm, yy, seq) if not date_ok: continue if _is_false_positive(full_text, m.start(), m.end()): continue ctx = cpr_context_boost(full_text, m.start(), m.end()) if not mod11_ok and not ctx: continue ms, me = m.start(), m.end() # Collect all words that overlap this match span hit_words = [w for (ws, we, w) in word_spans if ws < me and we > ms] if not hit_words: continue PAD = 1 # points of padding around the glyph bboxes.append(( min(w["x0"] for w in hit_words) - PAD, min(w["top"] for w in hit_words) - PAD, max(w["x1"] for w in hit_words) + PAD, max(w["bottom"]for w in hit_words) + PAD, )) return bboxes def find_cpr_image_bboxes(image, lang: str): """CPR-only image bboxes for --mask.""" raw_bboxes = find_pii_image_bboxes(image, lang, use_ner=False) return [(l, t, r, b) for (l, t, r, b, lbl) in raw_bboxes if lbl == "CPR"] # ── Drawing helpers ─────────────────────────────────────────────────────────── def build_redaction_overlay(page_width, page_height, bboxes_pdfplumber) -> bytes: """Build a PDF overlay with black boxes. bboxes: (x0, top, x1, bottom[, label]).""" buf = io.BytesIO() c = rl_canvas.Canvas(buf, pagesize=(page_width, page_height)) c.setFillColor(rl_black) c.setStrokeColor(rl_black) pad = 1.5 for bbox in bboxes_pdfplumber: x0, top, x1, bot = bbox[:4] rl_y = page_height - bot - pad rl_h = (bot - top) + pad * 2 c.rect(x0 - pad, rl_y, (x1 - x0) + pad * 2, rl_h, fill=1, stroke=0) c.save() buf.seek(0) return buf.read() def apply_overlay_to_page(writer, reader_page, bboxes): page_width = float(reader_page.mediabox.width) page_height = float(reader_page.mediabox.height) overlay_bytes = build_redaction_overlay(page_width, page_height, bboxes) overlay_page = PdfReader(io.BytesIO(overlay_bytes)).pages[0] reader_page.merge_page(overlay_page) writer.add_page(reader_page) def redact_image(image, bboxes_px): """Paint black rectangles over pixel bboxes in a PIL image.""" from PIL import ImageDraw img = image.copy() draw = ImageDraw.Draw(img) for bbox in bboxes_px: left, top, right, bottom = bbox[:4] draw.rectangle([left, top, right, bottom], fill="black") return img def image_to_pdf_page(image, dpi=300) -> bytes: buf = io.BytesIO() image.convert("RGB").save(buf, format="PDF", resolution=dpi) buf.seek(0) return buf.read() # ── Secure PDF redaction (PyMuPDF) ─────────────────────────────────────────── def redact_pdf_secure(input_path: Path, output_path: Path, results: dict, force_ocr: bool, lang: str, dpi: int, poppler_path, use_ner: bool = False) -> "int | bool": """ Physically-secure PDF redaction using PyMuPDF (fitz). Unlike the reportlab overlay approach, PyMuPDF: 1. Draws opaque redaction annotations over the target character bboxes. 2. Calls page.apply_redactions() which physically REMOVES the underlying text/image data — not just paints over it. 3. Saves with garbage collection and compression to strip orphaned objects. This means a user cannot recover the redacted text by: - Selecting text under the black box in a viewer - Extracting the PDF text layer programmatically - Inspecting raw PDF object streams Falls back to the reportlab overlay method if PyMuPDF is not installed. """ if not PYMUPDF_AVAILABLE: return redact_pdf(input_path, output_path, results, force_ocr, lang, dpi, poppler_path, use_ner) page_methods = results["page_methods"] images = None ocr_pages = [p for p, m in page_methods.items() if m == "ocr"] if ocr_pages and OCR_AVAILABLE: images = convert_from_path(str(input_path), dpi=dpi, poppler_path=poppler_path) total = 0 doc = _fitz.open(str(input_path)) with pdfplumber.open(input_path) as plumb_pdf: for page_num, plumb_page in enumerate(plumb_pdf.pages, start=1): method = page_methods.get(page_num, "text") fitz_page = doc[page_num - 1] # Get bboxes in pdfplumber coordinates (origin top-left, y increases down) if method == "text": bboxes = (find_pii_char_bboxes(plumb_page, use_ner=use_ner) if use_ner else find_cpr_char_bboxes(plumb_page)) elif method == "ocr" and images is not None: img = images[page_num - 1] bboxes = (find_pii_image_bboxes(img, lang, use_ner=use_ner) if use_ner else find_cpr_image_bboxes(img, lang)) else: bboxes = [] # pdfplumber char coords: origin top-left of CropBox, y increases DOWN. # fitz Rect coords: origin top-left of MediaBox, y increases DOWN. # Both already have y=0 at the top — no flip needed. # Add the CropBox offset so boxes land correctly when CropBox != MediaBox. cb = fitz_page.cropbox mb = fitz_page.mediabox crop_x0 = cb.x0 - mb.x0 crop_y0 = cb.y0 - mb.y0 for bbox in bboxes: x0, top, x1, bottom = bbox[:4] rect = _fitz.Rect( x0 + crop_x0, top + crop_y0, x1 + crop_x0, bottom + crop_y0, ) annot = fitz_page.add_redact_annot(rect, fill=(0, 0, 0)) _ = annot # silence linter # Apply redactions — physically removes text/image data under rects # PDF_REDACT_IMAGE_REMOVE / PDF_REDACT_LINE_ART_REMOVE were added in # PyMuPDF 1.22; fall back to their integer values (2) on older builds. _img_flag = getattr(_fitz, "PDF_REDACT_IMAGE_REMOVE", 2) _art_flag = getattr(_fitz, "PDF_REDACT_LINE_ART_REMOVE", 2) fitz_page.apply_redactions(images=_img_flag, graphics=_art_flag) total += len(bboxes) # Save with full garbage collection (removes orphaned objects/streams) doc.save( str(output_path), garbage=4, # maximum GC: also removes unused xref entries deflate=True, # compress streams clean=True, # sanitise content streams linear=False, ) doc.close() return total # ── Generic redact-PDF engine (reportlab overlay — visual only) ─────────────── def redact_pdf(input_path: Path, output_path: Path, results: dict, force_ocr: bool, lang: str, dpi: int, poppler_path, use_ner: bool = False) -> int | bool: """ Write a redacted PDF to output_path. If use_ner=False: CPR only (--mask). If use_ner=True: all PII (--anonymise). Returns count of redacted regions, or False on error. """ if not MASK_AVAILABLE: print(" Requires: pip install pypdf reportlab") return False page_methods = results["page_methods"] reader = PdfReader(str(input_path)) writer = PdfWriter() images = None ocr_pages = [p for p, m in page_methods.items() if m == "ocr"] if ocr_pages and OCR_AVAILABLE: images = convert_from_path(str(input_path), dpi=dpi, poppler_path=poppler_path) total = 0 with pdfplumber.open(input_path) as plumb_pdf: for page_num, plumb_page in enumerate(plumb_pdf.pages, start=1): method = page_methods.get(page_num, "text") reader_page = reader.pages[page_num - 1] if method == "text": bboxes = (find_pii_char_bboxes(plumb_page, use_ner=use_ner) if use_ner else find_cpr_char_bboxes(plumb_page)) if bboxes: apply_overlay_to_page(writer, reader_page, bboxes) total += len(bboxes) else: writer.add_page(reader_page) elif method == "ocr" and images is not None: img = images[page_num - 1] bboxes = (find_pii_image_bboxes(img, lang, use_ner=use_ner) if use_ner else find_cpr_image_bboxes(img, lang)) if bboxes: writer.add_page( PdfReader(io.BytesIO( image_to_pdf_page(redact_image(img, bboxes), dpi) )).pages[0] ) total += len(bboxes) else: writer.add_page(reader_page) else: writer.add_page(reader_page) with open(output_path, "wb") as f: writer.write(f) return total # ── Word document support ───────────────────────────────────────────────────── def _iter_docx_runs(doc): """Yield every run in a docx Document: body, tables, headers, footers.""" def _from_paragraphs(paragraphs): for para in paragraphs: for run in para.runs: yield run yield from _from_paragraphs(doc.paragraphs) for table in doc.tables: for row in table.rows: for cell in row.cells: yield from _from_paragraphs(cell.paragraphs) for section in doc.sections: for hf in [section.header, section.footer, section.even_page_header, section.even_page_footer, section.first_page_header, section.first_page_footer]: try: yield from _from_paragraphs(hf.paragraphs) except Exception: pass def scan_docx(docx_path: Path) -> dict: """ Scan a .docx file for CPR numbers and dates. Returns the same results dict shape as scan_document(), plus internal _doc / _run_map / _full_text keys used by redact_docx(). """ if not DOCX_OK: print(" .docx support requires: pip install python-docx") return {"cprs": [], "dates": [], "page_methods": {1: "docx"}, "_doc": None, "_run_map": [], "_full_text": ""} doc = DocxDocument(str(docx_path)) # Build full text + run map (global_start, global_end, run) full_text = "" run_map = [] for run in _iter_docx_runs(doc): if run.text: start = len(full_text) full_text += run.text run_map.append((start, len(full_text), run)) cprs, dates = extract_matches(full_text, 1, "docx") return { "cprs": cprs, "dates": dates, "page_methods": {1: "docx"}, "_full_text": full_text, "_run_map": run_map, "_doc": doc, } def _redact_runs(run_map: list, spans: list): """ Replace characters in the given spans with block characters (█). Modifies runs in-place. spans: list of (start, end, label) in full_text coordinates. """ if not spans: return # Build char → (run, index_within_run) lookup char_owner = [] # index = position in full_text, value = (run, char_pos_in_run) for (gs, ge, run) in run_map: for i in range(ge - gs): char_owner.append((run, i)) # Apply redactions (process in reverse so earlier spans aren't shifted) for span_start, span_end, _label in sorted(spans, key=lambda s: s[0], reverse=True): # Group by run by_run = {} for pos in range(span_start, min(span_end, len(char_owner))): run_obj, char_pos = char_owner[pos] rid = id(run_obj) if rid not in by_run: by_run[rid] = {"run": run_obj, "positions": []} by_run[rid]["positions"].append(char_pos) for entry in by_run.values(): run_obj = entry["run"] chars = list(run_obj.text) for p in entry["positions"]: if p < len(chars): chars[p] = "█" run_obj.text = "".join(chars) def redact_docx(input_path: Path, output_path: Path, results: dict, use_ner: bool = False) -> int: """ Write a redacted copy of a .docx. use_ner=False → CPR only; use_ner=True → all PII. Returns number of spans redacted. """ doc = results.get("_doc") run_map = results.get("_run_map", []) text = results.get("_full_text", "") if doc is None: return 0 spans = find_pii_spans_in_text(text, use_ner=use_ner) # If CPR-only, filter to CPR spans if not use_ner: spans = [(s, e, l) for s, e, l in spans if l == "CPR"] _redact_runs(run_map, spans) doc.save(str(output_path)) return len(spans) def print_docx_results(docx_path: Path, results: dict): cprs = results["cprs"] dates = results["dates"] print(f"\n{'='*62}") print(f"File : {docx_path} [Word document]") print(f"{'='*62}") print(f"\n CPR Numbers found: {len(cprs)}") if cprs: for hit in cprs: print(f" {hit['formatted']:<16} (raw: \"{hit['raw']}\")") else: print(" None found.") print(f"\n Dates found: {len(dates)}") if dates: for hit in dates: print(f" {hit['raw']:<28} [{hit['format']}]") else: print(" None found.") print() # ── Logging ─────────────────────────────────────────────────────────────────── # Module-level logger — handlers are added in main() based on --log argument logger = logging.getLogger("scanner") logger.setLevel(logging.DEBUG) _log_records: list[dict] = [] # in-memory log, flushed to JSON at end def _log(level: str, path: Path | None, event: str, **kwargs): """ Append a structured log record and emit to the logger. level: "INFO" | "WARNING" | "ACTION" | "DRY_RUN" | "ERROR" """ record = { "time": datetime.now().isoformat(timespec="seconds"), "level": level, "file": str(path) if path else None, "event": event, **kwargs, } _log_records.append(record) msg = f"[{level}] {path.name if path else ''} — {event}" if kwargs: extras = " " + " ".join(f"{k}={v}" for k, v in kwargs.items()) msg += extras if level == "ERROR": logger.error(msg) elif level == "WARNING": logger.warning(msg) else: logger.info(msg) def flush_log(log_path: Path): """Write all accumulated log records to a JSON file.""" with open(log_path, "w", encoding="utf-8") as f: json.dump(_log_records, f, ensure_ascii=False, indent=2, default=str) print(f"\nLog written to: {log_path} ({len(_log_records)} records)") # ── Excel / CSV support ─────────────────────────────────────────────────────── def _cell_text(cell) -> str: """Return a string representation of a cell value, or empty string.""" if cell.value is None: return "" return str(cell.value) def scan_xlsx(path: Path) -> dict: """ Scan an .xlsx / .xlsm file for CPR numbers and dates across all sheets. Returns results dict compatible with the rest of the pipeline, plus _wb (workbook) for use by redact_xlsx(). Each CPR/date hit carries sheet + row + col in the "page" field (formatted as "Sheet!R{row}C{col}"). """ if not XLSX_OK: print(" .xlsx support requires: pip install openpyxl") return {"cprs": [], "dates": [], "page_methods": {1: "xlsx"}, "_wb": None} wb = openpyxl.load_workbook(str(path), data_only=True) all_cprs, all_dates = [], [] for sheet in wb.worksheets: for row in sheet.iter_rows(): for cell in row: val = _cell_text(cell) if not val: continue location = f"{sheet.title}!R{cell.row}C{cell.column}" cprs, dates = extract_matches(val, location, "xlsx") all_cprs.extend(cprs) all_dates.extend(dates) return { "cprs": all_cprs, "dates": all_dates, "page_methods": {1: "xlsx"}, "_wb": wb, "_path": path, } def scan_csv(path: Path) -> dict: """ Scan a .csv file for CPR numbers and dates. Returns results dict compatible with the rest of the pipeline. """ import csv as _csv all_cprs, all_dates = [], [] try: with open(path, newline="", encoding="utf-8-sig", errors="replace") as f: reader = _csv.reader(f) for row_num, row in enumerate(reader, start=1): for col_num, cell in enumerate(row, start=1): if not cell.strip(): continue location = f"R{row_num}C{col_num}" cprs, dates = extract_matches(cell, location, "csv") all_cprs.extend(cprs) all_dates.extend(dates) except Exception as e: print(f" Warning: could not read CSV: {e}") return { "cprs": all_cprs, "dates": all_dates, "page_methods": {1: "csv"}, "_wb": None, "_path": path, } def scan_text(text: str, source: str = "text") -> dict: """ Scan a plain text string for CPR numbers and dates. Returns a results dict compatible with the rest of the pipeline. False-positive suppression (invoice/part-number context) is applied via extract_matches → extract_cpr_and_dates → _is_false_positive. """ cprs, dates = extract_cpr_and_dates(text, page_num=1, source=source) return { "cprs": cprs, "dates": dates, "page_methods": {1: "text"}, } def scan_image(path: Path, lang: str = "dan+eng") -> dict: """ OCR an image file and scan the resulting text for CPR numbers. Requires Tesseract and pytesseract. """ try: import pytesseract as _tess from PIL import Image as _PILImage img = _PILImage.open(path) text = _tess.image_to_string(img, lang=lang, config="--oem 3 --psm 3") return scan_text(text, source="image-ocr") except ImportError: return {"cprs": [], "dates": [], "error": "pytesseract/PIL not available"} except Exception as e: return {"cprs": [], "dates": [], "error": str(e)} def redact_xlsx(input_path: Path, output_path: Path, results: dict, use_ner: bool = False) -> int: """ Write a redacted copy of an .xlsx file. Cells containing PII are overwritten with "████████". use_ner=False -> CPR only; use_ner=True -> all PII. Returns number of cells redacted. """ wb = results.get("_wb") if wb is None: return 0 redacted = 0 for sheet in wb.worksheets: for row in sheet.iter_rows(): for cell in row: val = _cell_text(cell) if not val: continue # Wrap cell in a context sentence so spaCy NER can recognise # names that appear in isolation (e.g. a name-only cell has no # surrounding text to provide the model with PER entity context). PREFIX = "Navn: " ctx = PREFIX + val raw_spans = find_pii_spans_in_text(ctx, use_ner=use_ner) # Shift spans back by prefix length; discard any that start in prefix plen = len(PREFIX) spans = [(s - plen, e - plen, l) for s, e, l in raw_spans if e > plen] spans = [(max(s, 0), e, l) for s, e, l in spans] if not use_ner: spans = [(s, e, l) for s, e, l in spans if l == "CPR"] if spans: # Replace the whole cell value with redaction marker # (partial in-cell redaction is not reliably possible in xlsx) cell.value = "████████" redacted += 1 wb.save(str(output_path)) return redacted def redact_csv(input_path: Path, output_path: Path, use_ner: bool = False) -> int: """ Write a redacted copy of a .csv file. Cells containing PII are overwritten with "████████". Returns number of cells redacted. """ import csv as _csv rows_out = [] redacted = 0 try: with open(input_path, newline="", encoding="utf-8-sig", errors="replace") as f: reader = _csv.reader(f) for row in reader: new_row = [] for cell in row: if cell.strip(): PREFIX = "Navn: " ctx = PREFIX + cell plen = len(PREFIX) raw_spans = find_pii_spans_in_text(ctx, use_ner=use_ner) spans = [(max(s - plen, 0), e - plen, l) for s, e, l in raw_spans if e > plen] else: spans = [] if not use_ner: spans = [(s, e, l) for s, e, l in spans if l == "CPR"] if spans: new_row.append("████████") redacted += 1 else: new_row.append(cell) rows_out.append(new_row) except Exception as e: print(f" Warning: could not read CSV for redaction: {e}") return 0 with open(output_path, "w", newline="", encoding="utf-8") as f: _csv.writer(f).writerows(rows_out) return redacted def print_xlsx_results(path: Path, results: dict, file_type: str = "xlsx"): cprs = results["cprs"] dates = results["dates"] label = "Excel spreadsheet" if file_type == "xlsx" else "CSV file" print(f"\n{'='*62}") print(f"File : {path} [{label}]") print(f"{'='*62}") print(f"\n CPR Numbers found: {len(cprs)}") if cprs: for hit in cprs: print(f" {hit['page']:<20} {hit['formatted']:<16} (raw: \"{hit['raw']}\")") else: print(" None found.") print(f"\n Dates found: {len(dates)}") if dates: for hit in dates: print(f" {hit['page']:<20} {hit['raw']:<28} [{hit['format']}]") else: print(" None found.") print() # ── Face detection & pixelation ─────────────────────────────────────────────── # Use both frontal and profile cascades for better coverage _FACE_CASCADES = None def _get_face_cascades(): global _FACE_CASCADES if _FACE_CASCADES is not None: return _FACE_CASCADES cv2, np = _get_cv2() if cv2 is None: return [] def _find_cascade(name: str): """Try multiple locations to find a Haar cascade XML file.""" import sys as _sys candidates = [] # 1. PyInstaller bundle — check FIRST so bundle path wins over stale install paths if hasattr(_sys, "_MEIPASS"): candidates.append(str(Path(_sys._MEIPASS) / "cv2" / "data" / name)) candidates.append(str(Path(_sys._MEIPASS) / name)) # 2. cv2.data attribute (standard install / venv) try: candidates.append(cv2.data.haarcascades + name) except Exception: pass # 3. Relative to cv2 package directory try: candidates.append(str(Path(cv2.__file__).parent / "data" / name)) except Exception: pass # 4. Common system paths for base in ["/usr/share/opencv4", "/usr/share/opencv", "/usr/local/share/opencv4", "/usr/local/share/opencv"]: candidates.append(str(Path(base) / "haarcascades" / name)) for p in candidates: if p and Path(p).exists(): c = cv2.CascadeClassifier(p) if not c.empty(): _face_log(f" [+] Cascade: {p}") return c # Nothing worked — log all paths tried so it shows in the app console _face_log(f" [!] Cascade not found: {name}") for p in candidates: _face_log(f" {p} exists={Path(p).exists()}") return None cascades = [] for name in ["haarcascade_frontalface_default.xml", "haarcascade_profileface.xml"]: c = _find_cascade(name) if c is not None: cascades.append(c) if not cascades: _face_log(" [!] No Haar cascade XML files found — face detection disabled") _FACE_CASCADES = cascades return cascades def detect_faces_cv2(img_cv2, min_size: int = 40, neighbors: int = 4, strict: bool = False): """ Detect faces in a BGR cv2 image using Haar cascades (frontal + profile). Returns list of (x, y, w, h) in pixel coordinates. Parameters ---------- min_size : minimum face side in pixels neighbors : minNeighbors for detectMultiScale (higher = stricter, fewer detections) strict : unused, kept for API compatibility """ cv2, np = _get_cv2() if cv2 is None: return [] gray = cv2.cvtColor(img_cv2, cv2.COLOR_BGR2GRAY) # Equalise histogram to improve detection on dark or low-contrast images gray = cv2.equalizeHist(gray) cascades = _get_face_cascades() if not cascades: return [] found = [] seen = set() def _add(x, y, w, h): key = (x // 10, y // 10, w // 10, h // 10) if key not in seen: seen.add(key) found.append((x, y, w, h)) for cascade in cascades: for img in [gray, cv2.flip(gray, 1)]: faces = cascade.detectMultiScale( img, scaleFactor=1.1, minNeighbors=neighbors, minSize=(min_size, min_size), flags=cv2.CASCADE_SCALE_IMAGE ) if faces is not None and len(faces) > 0: if img is not gray: # flip back x coords w_img = img.shape[1] faces = [(w_img - x - w, y, w, h) for (x, y, w, h) in faces] for face in faces: _add(*face) return found def pixelate_region(img_cv2, x: int, y: int, w: int, h: int, blocks: int = 6): """Pixelate a rectangular region in a cv2 image. Returns modified copy. Lower blocks = larger pixels = stronger anonymisation. A Gaussian blur is applied on top to prevent edge-sharpening attacks. """ cv2, np = _get_cv2() out = img_cv2.copy() roi = out[y:y+h, x:x+w] bw = max(1, w // blocks) bh = max(1, h // blocks) small = cv2.resize(roi, (bw, bh), interpolation=cv2.INTER_LINEAR) pixelated = cv2.resize(small, (w, h), interpolation=cv2.INTER_NEAREST) ksize = max(3, (min(w, h) // blocks) | 1) pixelated = cv2.GaussianBlur(pixelated, (ksize, ksize), 0) out[y:y+h, x:x+w] = pixelated return out def blur_faces_in_image(img_cv2, min_size: int = 30, blocks: int = 6): """ Detect faces and apply pixelation to each. Returns (modified_img, face_count). """ cv2, np = _get_cv2() if cv2 is None: return img_cv2, 0 faces = detect_faces_cv2(img_cv2, min_size=min_size) out = img_cv2.copy() for (x, y, w, h) in faces: pad_x = int(w * 0.1) pad_y = int(h * 0.1) x2 = max(0, x - pad_x) y2 = max(0, y - pad_y) w2 = min(out.shape[1] - x2, w + pad_x * 2) h2 = min(out.shape[0] - y2, h + pad_y * 2) out = pixelate_region(out, x2, y2, w2, h2, blocks=blocks) return out, len(faces) def pil_to_cv2(pil_img): cv2, np = _get_cv2() return cv2.cvtColor(np.array(pil_img.convert("RGB")), cv2.COLOR_RGB2BGR) def cv2_to_pil(img_cv2): cv2, np = _get_cv2() from PIL import Image as PILImage return PILImage.fromarray(cv2.cvtColor(img_cv2, cv2.COLOR_BGR2RGB)) def cv2_to_bytes(img_cv2, fmt: str = "JPEG") -> bytes: """Encode cv2 image to bytes in given format.""" cv2, np = _get_cv2() ext = {"JPEG": ".jpg", "PNG": ".png", "WEBP": ".webp"}.get(fmt.upper(), ".jpg") ok, buf = cv2.imencode(ext, img_cv2) if not ok: raise RuntimeError(f"cv2.imencode failed for format {fmt}") return buf.tobytes() # ── Face blur: standalone image files ───────────────────────────────────────── def blur_faces_image_file(input_path: Path, output_path: Path, blocks: int = 6) -> int: """ Detect and pixelate faces in a standalone image file. Returns number of faces blurred. """ cv2, np = _get_cv2() if cv2 is None: raise RuntimeError("OpenCV not available") img = cv2.imread(str(input_path)) if img is None: raise ValueError(f"Could not read image: {input_path}") result, count = blur_faces_in_image(img, blocks=blocks) cv2.imwrite(str(output_path), result) return count # ── Face blur: PDF pages ─────────────────────────────────────────────────────── def blur_faces_pdf(input_path: Path, output_path: Path, dpi: int = 150, poppler_path=None, blocks: int = 6) -> int: """ Render each PDF page, detect faces, draw pixelated overlay back onto the original page (preserving the text layer), save as new PDF. Returns total number of faces blurred across all pages. """ if not OCR_AVAILABLE: raise RuntimeError("pdf2image required: pip install pdf2image") cv2, np = _get_cv2() if cv2 is None: raise RuntimeError("OpenCV not available") from PIL import Image as PILImage images = convert_from_path(str(input_path), dpi=dpi, poppler_path=poppler_path) reader = PdfReader(str(input_path)) writer = PdfWriter() total_faces = 0 for page_num, (pil_img, reader_page) in enumerate(zip(images, reader.pages), start=1): page_w = float(reader_page.mediabox.width) # PDF points page_h = float(reader_page.mediabox.height) img_px_w, img_px_h = pil_img.size scale_x = page_w / img_px_w scale_y = page_h / img_px_h img_cv2 = pil_to_cv2(pil_img) _, face_count = blur_faces_in_image(img_cv2, blocks=blocks) if face_count == 0: writer.add_page(reader_page) continue # Build a pixelated patch for each face and compose into a reportlab overlay faces = detect_faces_cv2(img_cv2) buf = io.BytesIO() c = rl_canvas.Canvas(buf, pagesize=(page_w, page_h)) for (x, y, w, h) in faces: pad_x = int(w * 0.1) pad_y = int(h * 0.1) x2, y2 = max(0, x - pad_x), max(0, y - pad_y) w2 = min(img_px_w - x2, w + pad_x * 2) h2 = min(img_px_h - y2, h + pad_y * 2) # Pixelate just this region from the rendered page image face_roi = img_cv2[y2:y2+h2, x2:x2+w2] bw = max(1, w2 // blocks) bh = max(1, h2 // blocks) small = cv2.resize(face_roi, (bw, bh), interpolation=cv2.INTER_LINEAR) pixelated_roi = cv2.resize(small, (w2, h2), interpolation=cv2.INTER_NEAREST) # Convert to PIL for reportlab roi_pil = cv2_to_pil(pixelated_roi) roi_buf = io.BytesIO() roi_pil.save(roi_buf, format="PNG") roi_buf.seek(0) # PDF coords: reportlab origin is bottom-left; image origin is top-left pdf_x = x2 * scale_x pdf_y = page_h - (y2 + h2) * scale_y pdf_w = w2 * scale_x pdf_h = h2 * scale_y c.drawImage( __import__("reportlab.lib.utils", fromlist=["ImageReader"]).ImageReader(roi_buf), pdf_x, pdf_y, width=pdf_w, height=pdf_h ) c.save() buf.seek(0) overlay_page = PdfReader(buf).pages[0] reader_page.merge_page(overlay_page) writer.add_page(reader_page) total_faces += face_count with open(output_path, "wb") as f: writer.write(f) return total_faces # ── Face blur: Word documents ───────────────────────────────────────────────── def blur_faces_docx(input_path: Path, output_path: Path, blocks: int = 6) -> int: """ Detect and pixelate faces in images embedded in a .docx file. Replaces the image part bytes in-place and saves as a new file. Returns number of faces blurred. """ if not DOCX_OK: raise RuntimeError("python-docx required: pip install python-docx") cv2, np = _get_cv2() if cv2 is None: raise RuntimeError("OpenCV not available") import shutil from docx import Document from docx.oxml.ns import qn from docx.enum.shape import WD_INLINE_SHAPE from PIL import Image as PILImage shutil.copy2(str(input_path), str(output_path)) doc = Document(str(output_path)) total_faces = 0 for shape in doc.inline_shapes: try: if shape.type != WD_INLINE_SHAPE.PICTURE: continue blip = shape._inline.graphic.graphicData.pic.blipFill.blip rId = blip.embed image_part = doc.part.related_parts[rId] # Decode image bytes → cv2 img_data = image_part.blob np_arr = np.frombuffer(img_data, dtype=np.uint8) img_cv2 = cv2.imdecode(np_arr, cv2.IMREAD_COLOR) if img_cv2 is None: continue result, count = blur_faces_in_image(img_cv2, blocks=blocks) if count == 0: continue # Re-encode with same format (try JPEG first, fall back to PNG) ct = image_part.content_type fmt = "PNG" if "png" in ct.lower() else "JPEG" new_bytes = cv2_to_bytes(result, fmt=fmt) # Monkey-patch blob on the part object image_part._blob = new_bytes total_faces += count except Exception as e: pass # skip shapes that can't be processed doc.save(str(output_path)) return total_faces # ── Face blur: Excel workbooks ──────────────────────────────────────────────── def blur_faces_xlsx(input_path: Path, output_path: Path, blocks: int = 6) -> int: """ Detect and pixelate faces in images embedded in an .xlsx workbook. Returns number of faces blurred. """ if not XLSX_OK: raise RuntimeError("openpyxl required: pip install openpyxl") cv2, np = _get_cv2() if cv2 is None: raise RuntimeError("OpenCV not available") import shutil shutil.copy2(str(input_path), str(output_path)) # openpyxl stores images as _images list on each worksheet wb = openpyxl.load_workbook(str(output_path)) total_faces = 0 for sheet in wb.worksheets: for img_obj in getattr(sheet, "_images", []): try: # img_obj.ref is the image data (BytesIO or bytes) raw = img_obj.ref if hasattr(raw, "read"): raw = raw.read() np_arr = np.frombuffer(raw, dtype=np.uint8) img_cv2 = cv2.imdecode(np_arr, cv2.IMREAD_COLOR) if img_cv2 is None: continue result, count = blur_faces_in_image(img_cv2, blocks=blocks) if count == 0: continue # Re-encode and replace new_bytes = cv2_to_bytes(result, fmt="PNG") img_obj.ref = io.BytesIO(new_bytes) total_faces += count except Exception: pass wb.save(str(output_path)) return total_faces # ── Core scanner ────────────────────────────────────────────────────────────── def scan_pdf(pdf_path: Path, force_ocr=False, lang="dan+eng", dpi=300, poppler_path=None) -> dict: results = {"cprs": [], "dates": [], "page_methods": {}} with pdfplumber.open(pdf_path) as pdf: images = None if OCR_AVAILABLE: needs_ocr = (list(range(len(pdf.pages))) if force_ocr else [i for i, p in enumerate(pdf.pages) if not is_text_page(p)]) if needs_ocr: print(f" Rendering pages to images for OCR (DPI={dpi})...", flush=True) images = convert_from_path(str(pdf_path), dpi=dpi, poppler_path=poppler_path) for page_num, page in enumerate(pdf.pages, start=1): use_text = not force_ocr and is_text_page(page) if use_text: method = "text" text = page.extract_text() or "" cprs, dates = extract_matches(text, page_num, "text") elif OCR_AVAILABLE and images is not None: method = "ocr" _img = images[page_num-1] images[page_num-1] = None # release PIL image as soon as OCR is done cprs, dates = extract_matches(ocr_page_cached(_img, lang), page_num, "ocr") del _img else: method = "skipped" if not OCR_AVAILABLE: print(f" Page {page_num}: image-based but OCR unavailable.") cprs, dates = [], [] results["page_methods"][page_num] = method results["cprs"].extend(cprs) results["dates"].extend(dates) results["dates"] = dedup_dates(results["dates"]) return results # ── Output ──────────────────────────────────────────────────────────────────── def print_results(pdf_path: Path, results: dict): methods = results["page_methods"] text_pages = [p for p, m in methods.items() if m == "text"] ocr_pages = [p for p, m in methods.items() if m == "ocr"] skip_pages = [p for p, m in methods.items() if m == "skipped"] print(f"\n{'='*62}") print(f"File : {pdf_path}") print(f"Pages: {len(methods)} | text: {len(text_pages)} | OCR: {len(ocr_pages)} | skipped: {len(skip_pages)}") print(f"{'='*62}") if ocr_pages: print(f" [OCR] Applied to page(s): {', '.join(map(str, ocr_pages))}") if skip_pages: print(f" [SKIP] Skipped page(s): {', '.join(map(str, skip_pages))}") cprs = results["cprs"] dates = results["dates"] print(f"\n CPR Numbers found: {len(cprs)}") if cprs: for hit in cprs: tag = " [OCR]" if hit["source"] == "ocr" else "" print(f" Page {hit['page']:>3}: {hit['formatted']:<16} (raw: \"{hit['raw']}\"){tag}") else: print(" None found.") print(f"\n Dates found: {len(dates)}") if dates: for hit in dates: tag = " [OCR]" if hit["source"] == "ocr" else "" print(f" Page {hit['page']:>3}: {hit['raw']:<28} [{hit['format']}]{tag}") else: print(" None found.") print() # ── Entry point ─────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser( description="Scan PDF and Word documents for Danish CPR numbers, dates and personal data.", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__, ) parser.add_argument("pdfs", nargs="+", metavar="FILE", help="PDF/Word file(s) or folder(s) to scan") parser.add_argument("--ocr", action="store_true", help="Force OCR on every page") parser.add_argument("--lang", default="dan+eng", metavar="LANG", help="Tesseract language(s), default: dan+eng") parser.add_argument("--dpi", type=int, default=300, metavar="DPI", help="Rendering DPI for OCR, default: 300") parser.add_argument("--poppler", default=None, metavar="PATH", help="Path to Poppler bin folder (Windows)") parser.add_argument("--older-than", type=int, default=None, metavar="DAYS", help="List files with CPR numbers AND dates older than DAYS") parser.add_argument("--mask", action="store_true", help="Black out CPR numbers -> _masked.pdf/.docx") parser.add_argument("--anonymise", action="store_true", help="Black out ALL personal data -> _anonymised.pdf/.docx") parser.add_argument("--dry-run", action="store_true", help="Scan and report findings without writing any output files") parser.add_argument("--log", default=None, metavar="FILE", help="Write a structured JSON log of all findings to FILE") parser.add_argument("--blur-faces", action="store_true", help="Detect and pixelate portrait photos -> _faces.pdf/.docx/.xlsx/.jpg") parser.add_argument("--blur-strength", type=int, default=6, metavar="N", help="Face blur strength: lower = stronger (default: 6, range: 2-20)") args = parser.parse_args() dry_run = args.dry_run # Logging setup console_handler = logging.StreamHandler(sys.stdout) console_handler.setFormatter(logging.Formatter("%(message)s")) logger.addHandler(console_handler) if dry_run: print("=" * 62) print(" DRY RUN - no files will be written") print("=" * 62 + "\n") _log("INFO", None, "dry_run_started") # Dependency warnings if not OCR_AVAILABLE: missing = [m for m, ok in [("pdf2image", PDF2IMAGE_OK), ("pytesseract", TESSERACT_OK)] if not ok] msg = f"OCR disabled - pip install {' '.join(missing)}" print(f"WARNING: {msg}\n") _log("WARNING", None, msg) if (args.mask or args.anonymise) and not MASK_AVAILABLE: msg = "--mask/--anonymise require: pip install pypdf reportlab" print(f"WARNING: {msg}\n") _log("WARNING", None, msg) if not DOCX_OK: print("INFO: python-docx not installed - .docx files will be skipped.") print(" Install with: pip install python-docx\n") _log("WARNING", None, "python-docx not installed - .docx files skipped") if not XLSX_OK: print("INFO: openpyxl not installed - .xlsx/.csv files will be skipped.") print(" Install with: pip install openpyxl\n") _log("WARNING", None, "openpyxl not installed - .xlsx files skipped") if args.blur_faces and not CV2_OK: print("WARNING: --blur-faces requires OpenCV: pip install opencv-python\n") if args.anonymise: if not SPACY_OK: msg = "--anonymise requires spaCy: pip install spacy" print(f"WARNING: {msg}\n") _log("WARNING", None, msg) else: nlp = load_nlp() if nlp is None: msg = "No spaCy model found - falling back to regex-only" print(f"WARNING: {msg}\n") _log("WARNING", None, msg) # Collect files SUPPORTED = {".pdf", ".docx", ".xlsx", ".xlsm", ".csv", ".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".tif", ".webp"} all_paths = [] for entry in args.pdfs: path = Path(entry) if not path.exists(): print(f"Not found: {path}") _log("WARNING", path, "file_not_found") elif path.is_dir(): found = sorted(p for p in path.rglob("*") if p.suffix.lower() in SUPPORTED) pdf_count = sum(1 for p in found if p.suffix.lower() == ".pdf") docx_count = sum(1 for p in found if p.suffix.lower() == ".docx") xlsx_count = sum(1 for p in found if p.suffix.lower() in {".xlsx", ".xlsm", ".csv"}) img_count = sum(1 for p in found if p.suffix.lower() in {".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".tif", ".webp"}) print(f"Found {pdf_count} PDF(s), {docx_count} Word doc(s), {xlsx_count} spreadsheet(s) and {img_count} image(s) in: {path}") _log("INFO", path, "folder_scanned", pdf_count=pdf_count, docx_count=docx_count, xlsx_count=xlsx_count, img_count=img_count) all_paths.extend(found) elif path.suffix.lower() in SUPPORTED: all_paths.append(path) else: print(f"Unsupported file type, skipping: {path}") _log("WARNING", path, "unsupported_type") if not all_paths: print("No supported files to process.") _log("INFO", None, "no_files_found") if args.log: flush_log(Path(args.log)) return # Process files all_results = [] for path in all_paths: try: ext = path.suffix.lower() if ext in {".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".tif", ".webp"}: # Standalone image — face blur only (triggered by --blur-faces, --mask, or --anonymise) print(f"\n{'='*62}") print(f"File : {path} [image]") print(f"{'='*62}") _log("INFO", path, "scanned", file_type="image") do_blur = args.blur_faces or args.mask or args.anonymise if do_blur: if not CV2_OK: print(f" [FACE] Skipping - opencv-python not installed.") print(f" pip install opencv-python\n") _log("WARNING", path, "skipped_no_opencv") continue out = path.with_stem(path.stem + "_faces") if dry_run: print(f" [DRY-RUN] Would write -> {out.name} (face blur)\n") _log("DRY_RUN", path, "face_blur_skipped_dry_run", output=str(out)) else: print(f" [FACE] Scanning for faces ...", flush=True) n = blur_faces_image_file(path, out, blocks=args.blur_strength) if n: print(f" [FACE] Done - {n} face(s) blurred -> {out.name}\n") _log("ACTION", path, "faces_blurred", output=str(out), faces=n) else: out.unlink(missing_ok=True) print(f" [FACE] No faces detected - no output written.\n") _log("INFO", path, "no_faces_detected") else: print(f" Image file: use --blur-faces, --mask, or --anonymise to pixelate portraits.\n") _log("INFO", path, "image_no_action_requested") # Images have no CPR/date data — don't add to all_results continue elif ext == ".docx": if not DOCX_OK: print(f"Skipping {path.name} - python-docx not installed.") _log("WARNING", path, "skipped_no_python_docx") continue results = scan_docx(path) print_docx_results(path, results) all_results.append((path, results)) _log("INFO", path, "scanned", file_type="docx", cpr_count=len(results["cprs"]), date_count=len(results["dates"]), cprs=[h["formatted"] for h in results["cprs"]]) if args.mask: out = path.with_stem(path.stem + "_masked") if results["cprs"]: if dry_run: print(f" [DRY-RUN] Would write -> {out.name} ({len(results['cprs'])} CPR region(s))") _log("DRY_RUN", path, "mask_skipped_dry_run", output=str(out), cpr_count=len(results["cprs"])) else: print(f" [MASK] Writing -> {out.name} ...", flush=True) n = redact_docx(path, out, results, use_ner=False) print(f" [MASK] Done - {n} region(s) redacted.\n") _log("ACTION", path, "masked", output=str(out), regions=n) else: print(" [MASK] No CPR numbers found - skipping.\n") _log("INFO", path, "mask_skipped_no_cpr") if args.anonymise: out = path.with_stem(path.stem + "_anonymised") if dry_run: spans = find_pii_spans_in_text(results["_full_text"], use_ner=True) label_counts = {} for _, _, lbl in spans: label_counts[lbl] = label_counts.get(lbl, 0) + 1 summary = " ".join(f"{lbl}:{c}" for lbl, c in sorted(label_counts.items())) print(f" [DRY-RUN] Would write -> {out.name} ({len(spans)} region(s): {summary})") _log("DRY_RUN", path, "anonymise_skipped_dry_run", output=str(out), total_regions=len(spans), by_label=label_counts) else: print(f" [ANON] Writing -> {out.name} ...", flush=True) n = redact_docx(path, out, results, use_ner=True) print(f" [ANON] Done - {n} region(s) redacted.\n") _log("ACTION", path, "anonymised", output=str(out), regions=n) if args.blur_faces: if not CV2_OK: print(f" [FACE] Skipping - opencv-python not installed.") else: out = path.with_stem(path.stem + "_faces") if dry_run: print(f" [DRY-RUN] Would write -> {out.name} (face blur)") _log("DRY_RUN", path, "face_blur_skipped_dry_run", output=str(out)) else: print(f" [FACE] Scanning for faces ...", flush=True) n = blur_faces_docx(path, out, blocks=args.blur_strength) if n: print(f" [FACE] Done - {n} face(s) blurred -> {out.name}\n") _log("ACTION", path, "faces_blurred", output=str(out), faces=n) else: out.unlink(missing_ok=True) print(f" [FACE] No faces detected.\n") _log("INFO", path, "no_faces_detected") elif ext in {".xlsx", ".xlsm"}: if not XLSX_OK: print(f"Skipping {path.name} - openpyxl not installed.") _log("WARNING", path, "skipped_no_openpyxl") continue results = scan_xlsx(path) print_xlsx_results(path, results, "xlsx") all_results.append((path, results)) _log("INFO", path, "scanned", file_type="xlsx", cpr_count=len(results["cprs"]), date_count=len(results["dates"]), cprs=[h["formatted"] for h in results["cprs"]]) if args.mask: out = path.with_stem(path.stem + "_masked") if results["cprs"]: if dry_run: print(f" [DRY-RUN] Would write -> {out.name} ({len(results['cprs'])} CPR cell(s))") _log("DRY_RUN", path, "mask_skipped_dry_run", output=str(out), cpr_count=len(results["cprs"])) else: print(f" [MASK] Writing -> {out.name} ...", flush=True) n = redact_xlsx(path, out, results, use_ner=False) print(f" [MASK] Done - {n} cell(s) redacted.\n") _log("ACTION", path, "masked", output=str(out), regions=n) else: print(" [MASK] No CPR numbers found - skipping.\n") _log("INFO", path, "mask_skipped_no_cpr") if args.anonymise: out = path.with_stem(path.stem + "_anonymised") if dry_run: full_text = " ".join( _cell_text(c) for s in results["_wb"].worksheets for row in s.iter_rows() for c in row ) spans = find_pii_spans_in_text(full_text, use_ner=True) label_counts = {} for _, _, lbl in spans: label_counts[lbl] = label_counts.get(lbl, 0) + 1 summary = " ".join(f"{lbl}:{c}" for lbl, c in sorted(label_counts.items())) print(f" [DRY-RUN] Would write -> {out.name} ({len(spans)} region(s): {summary})") _log("DRY_RUN", path, "anonymise_skipped_dry_run", output=str(out), total_regions=len(spans), by_label=label_counts) else: print(f" [ANON] Writing -> {out.name} ...", flush=True) n = redact_xlsx(path, out, results, use_ner=True) print(f" [ANON] Done - {n} cell(s) redacted.\n") _log("ACTION", path, "anonymised", output=str(out), regions=n) if args.blur_faces: if not CV2_OK: print(f" [FACE] Skipping - opencv-python not installed.") else: out = path.with_stem(path.stem + "_faces") if dry_run: print(f" [DRY-RUN] Would write -> {out.name} (face blur)") _log("DRY_RUN", path, "face_blur_skipped_dry_run", output=str(out)) else: print(f" [FACE] Scanning for faces ...", flush=True) n = blur_faces_xlsx(path, out, blocks=args.blur_strength) if n: print(f" [FACE] Done - {n} face(s) blurred -> {out.name}\n") _log("ACTION", path, "faces_blurred", output=str(out), faces=n) else: out.unlink(missing_ok=True) print(f" [FACE] No faces detected.\n") _log("INFO", path, "no_faces_detected") elif ext == ".csv": results = scan_csv(path) print_xlsx_results(path, results, "csv") all_results.append((path, results)) _log("INFO", path, "scanned", file_type="csv", cpr_count=len(results["cprs"]), date_count=len(results["dates"]), cprs=[h["formatted"] for h in results["cprs"]]) if args.mask: out = path.with_stem(path.stem + "_masked") if results["cprs"]: if dry_run: print(f" [DRY-RUN] Would write -> {out.name} ({len(results['cprs'])} CPR cell(s))") _log("DRY_RUN", path, "mask_skipped_dry_run", output=str(out), cpr_count=len(results["cprs"])) else: print(f" [MASK] Writing -> {out.name} ...", flush=True) n = redact_csv(path, out, use_ner=False) print(f" [MASK] Done - {n} cell(s) redacted.\n") _log("ACTION", path, "masked", output=str(out), regions=n) else: print(" [MASK] No CPR numbers found - skipping.\n") _log("INFO", path, "mask_skipped_no_cpr") if args.anonymise: out = path.with_stem(path.stem + "_anonymised") if dry_run: import csv as _csv full_text = "" with open(path, newline="", encoding="utf-8-sig", errors="replace") as f: for row in _csv.reader(f): full_text += " ".join(row) + " " spans = find_pii_spans_in_text(full_text, use_ner=True) label_counts = {} for _, _, lbl in spans: label_counts[lbl] = label_counts.get(lbl, 0) + 1 summary = " ".join(f"{lbl}:{c}" for lbl, c in sorted(label_counts.items())) print(f" [DRY-RUN] Would write -> {out.name} ({len(spans)} region(s): {summary})") _log("DRY_RUN", path, "anonymise_skipped_dry_run", output=str(out), total_regions=len(spans), by_label=label_counts) else: print(f" [ANON] Writing -> {out.name} ...", flush=True) n = redact_csv(path, out, use_ner=True) print(f" [ANON] Done - {n} cell(s) redacted.\n") _log("ACTION", path, "anonymised", output=str(out), regions=n) else: results = scan_pdf(path, force_ocr=args.ocr, lang=args.lang, dpi=args.dpi, poppler_path=args.poppler) print_results(path, results) all_results.append((path, results)) _log("INFO", path, "scanned", file_type="pdf", pages=len(results["page_methods"]), ocr_pages=sum(1 for m in results["page_methods"].values() if m == "ocr"), cpr_count=len(results["cprs"]), date_count=len(results["dates"]), cprs=[h["formatted"] for h in results["cprs"]]) if args.mask: out = path.with_stem(path.stem + "_masked") if results["cprs"]: if dry_run: print(f" [DRY-RUN] Would write -> {out.name} ({len(results['cprs'])} CPR region(s))") _log("DRY_RUN", path, "mask_skipped_dry_run", output=str(out), cpr_count=len(results["cprs"])) else: print(f" [MASK] Writing -> {out.name} ...", flush=True) n = redact_pdf(path, out, results, args.ocr, args.lang, args.dpi, args.poppler, use_ner=False) if n is not False: print(f" [MASK] Done - {n} region(s) redacted.\n") _log("ACTION", path, "masked", output=str(out), regions=n) else: print(" [MASK] No CPR numbers found - skipping.\n") _log("INFO", path, "mask_skipped_no_cpr") if args.anonymise: out = path.with_stem(path.stem + "_anonymised") if dry_run: full_text = "" with pdfplumber.open(path) as _pdf: for _page in _pdf.pages: full_text += (_page.extract_text() or "") + " " spans = find_pii_spans_in_text(full_text, use_ner=True) label_counts = {} for _, _, lbl in spans: label_counts[lbl] = label_counts.get(lbl, 0) + 1 summary = " ".join(f"{lbl}:{c}" for lbl, c in sorted(label_counts.items())) print(f" [DRY-RUN] Would write -> {out.name} ({len(spans)} region(s): {summary})") _log("DRY_RUN", path, "anonymise_skipped_dry_run", output=str(out), total_regions=len(spans), by_label=label_counts) else: print(f" [ANON] Writing -> {out.name} ...", flush=True) n = redact_pdf(path, out, results, args.ocr, args.lang, args.dpi, args.poppler, use_ner=True) if n is not False: print(f" [ANON] Done - {n} region(s) redacted.\n") _log("ACTION", path, "anonymised", output=str(out), regions=n) if args.blur_faces: if not CV2_OK: print(f" [FACE] Skipping - opencv-python not installed.") elif not OCR_AVAILABLE: print(f" [FACE] Skipping - pdf2image required for PDF face blur.") else: out = path.with_stem(path.stem + "_faces") if dry_run: print(f" [DRY-RUN] Would write -> {out.name} (face blur)") _log("DRY_RUN", path, "face_blur_skipped_dry_run", output=str(out)) else: print(f" [FACE] Scanning pages for faces ...", flush=True) n = blur_faces_pdf(path, out, poppler_path=args.poppler, blocks=args.blur_strength) if n: print(f" [FACE] Done - {n} face(s) blurred -> {out.name}\n") _log("ACTION", path, "faces_blurred", output=str(out), faces=n) else: out.unlink(missing_ok=True) print(f" [FACE] No faces detected.\n") _log("INFO", path, "no_faces_detected") except Exception as e: print(f"Error processing {path}: {e}") _log("ERROR", path, str(e)) if args.older_than is not None: flagged = build_flagged_list(all_results, args.older_than) print_flagged(flagged, args.older_than) _log("INFO", None, "flagged_summary", older_than_days=args.older_than, flagged_count=len(flagged), flagged_files=[str(f["path"]) for f in flagged]) # Final summary total_cprs = sum(len(r["cprs"]) for _, r in all_results) total_dates = sum(len(r["dates"]) for _, r in all_results) files_with_cpr = sum(1 for _, r in all_results if r["cprs"]) print(f"{'--'*31}") print(f" Scanned : {len(all_results)} file(s)") print(f" CPR nos : {total_cprs} found in {files_with_cpr} file(s)") print(f" Dates : {total_dates} found") if dry_run: print(" Mode : DRY RUN - no files written") print(f"{'--'*31}\n") _log("INFO", None, "scan_complete", files_scanned=len(all_results), total_cprs=total_cprs, total_dates=total_dates, files_with_cpr=files_with_cpr, dry_run=dry_run) if args.log: flush_log(Path(args.log)) if __name__ == "__main__": main() def count_faces_in_file(path, poppler_path=None, neighbors: int = 4) -> int: """ Return the number of faces detected in a file (image, PDF, docx, xlsx). Uses only this module's cv2/numpy — never triggers a second import from outside (avoids the 'recursion detected during loading cv2' error on macOS). neighbors controls detection strictness: higher = fewer false positives. """ import sys as _sys cv2, np = _get_cv2() if cv2 is None: _face_log(f"[face] cv2 unavailable: {_cv2_import_error}") return 0 ext = Path(path).suffix.lower() total = 0 cascades = _get_face_cascades() _face_log(f"[face] {Path(path).name} ext={ext} cascades={len(cascades)} neighbors={neighbors}") try: if ext in {".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".tif", ".webp"}: img = cv2.imread(str(path)) _face_log(f"[face] imread={img is not None} shape={getattr(img, 'shape', None)}") if img is not None: total = len(detect_faces_cv2(img, neighbors=neighbors)) _face_log(f"[face] detected={total}") elif ext == ".pdf": if PYMUPDF_AVAILABLE: import fitz as _fitz doc = _fitz.open(str(path)) for page_idx in range(min(5, len(doc))): pix = doc[page_idx].get_pixmap(dpi=100) arr = cv2.imdecode( np.frombuffer(pix.tobytes("jpeg"), np.uint8), cv2.IMREAD_COLOR) if arr is not None: total += len(detect_faces_cv2(arr, neighbors=neighbors)) if total > 0: break doc.close() else: from pdf2image import convert_from_path pages = convert_from_path(str(path), dpi=100, first_page=1, last_page=5, poppler_path=poppler_path) for page in pages: arr = cv2.cvtColor(np.array(page), cv2.COLOR_RGB2BGR) total += len(detect_faces_cv2(arr, neighbors=neighbors)) if total > 0: break elif ext == ".docx": from docx import Document from docx.enum.shape import WD_INLINE_SHAPE doc = Document(str(path)) for shape in doc.inline_shapes: try: if shape.type != WD_INLINE_SHAPE.PICTURE: continue blip = shape._inline.graphic.graphicData.pic.blipFill.blip blob = doc.part.related_parts[blip.embed].blob arr = np.frombuffer(blob, dtype=np.uint8) img = cv2.imdecode(arr, cv2.IMREAD_COLOR) if img is not None: total += len(detect_faces_cv2(img, neighbors=neighbors)) except Exception: pass elif ext in {".xlsx", ".xlsm"}: import openpyxl wb = openpyxl.load_workbook(str(path), read_only=False, data_only=True) for sname in wb.sheetnames: for img_obj in wb[sname]._images: try: blob = img_obj._data() arr = np.frombuffer(blob, dtype=np.uint8) img = cv2.imdecode(arr, cv2.IMREAD_COLOR) if img is not None: total += len(detect_faces_cv2(img, neighbors=neighbors)) except Exception: pass wb.close() except Exception: pass return total