GDPRScanner/checkpoint.py

90 lines
3.2 KiB
Python

"""
checkpoint.py — Scan checkpoint and delta-token persistence for GDPRScanner.
Provides save/load/clear for mid-scan checkpoints (so interrupted scans can
resume) and load/save for Microsoft Graph delta-link tokens.
"""
from __future__ import annotations
import hashlib
import json
import logging
import time
from pathlib import Path
logger = logging.getLogger(__name__)
_DATA_DIR = Path.home() / ".gdprscanner"
_DATA_DIR.mkdir(exist_ok=True)
def _cp_path(prefix: str) -> Path:
return _DATA_DIR / f"checkpoint_{prefix}.json"
def _checkpoint_key(options: dict) -> str:
"""Stable hash of the scan options — used to detect when a checkpoint
belongs to a different scan configuration and should be ignored."""
sig = json.dumps({
"sources": sorted(options.get("sources", [])),
"user_ids": sorted([u["id"] if isinstance(u, dict) else u for u in options.get("user_ids", [])]),
"older_than_days": options.get("options", {}).get("older_than_days", 0),
}, sort_keys=True)
return hashlib.sha256(sig.encode()).hexdigest()[:16]
def _save_checkpoint(key: str, scanned_ids: set, flagged: list, meta: dict, *, prefix: str = "m365") -> None:
"""Write checkpoint to disk. Called periodically during scanning."""
try:
payload = {
"key": key,
"scanned_ids": list(scanned_ids),
"flagged": flagged,
"meta": {k: v for k, v in meta.items() if k != "options"},
}
path = _cp_path(prefix)
tmp = path.with_suffix(".tmp")
tmp.write_text(json.dumps(payload, ensure_ascii=False, default=str), encoding="utf-8")
tmp.replace(path)
except Exception as e:
logger.error("[checkpoint] save failed: %s", e)
def _load_checkpoint(key: str, *, prefix: str = "m365") -> dict | None:
"""Load checkpoint if it matches the current scan key. Returns None on mismatch or error."""
try:
path = _cp_path(prefix)
if not path.exists():
return None
payload = json.loads(path.read_text(encoding="utf-8"))
if payload.get("key") != key:
return None
return payload
except Exception:
return None
def _clear_checkpoint(*, prefix: str = "m365") -> None:
try:
path = _cp_path(prefix)
if path.exists():
path.unlink()
except Exception:
pass
_DELTA_PATH = _DATA_DIR / "delta.json"
def _load_delta_tokens() -> dict:
"""Return saved delta token map {key: deltaLink_url}."""
try:
if _DELTA_PATH.exists():
return json.loads(_DELTA_PATH.read_text(encoding="utf-8"))
except Exception:
pass
return {}
def _save_delta_tokens(tokens: dict) -> None:
"""Persist delta tokens atomically."""
try:
tmp = _DELTA_PATH.with_suffix(".tmp")
tmp.write_text(json.dumps(tokens, ensure_ascii=False), encoding="utf-8")
tmp.replace(_DELTA_PATH)
except Exception as e:
logger.error("[delta] save failed: %s", e)
# ── Broadcast ─────────────────────────────────────────────────────────────────