90 lines
3.2 KiB
Python
90 lines
3.2 KiB
Python
"""
|
|
checkpoint.py — Scan checkpoint and delta-token persistence for GDPRScanner.
|
|
|
|
Provides save/load/clear for mid-scan checkpoints (so interrupted scans can
|
|
resume) and load/save for Microsoft Graph delta-link tokens.
|
|
"""
|
|
from __future__ import annotations
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import time
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_DATA_DIR = Path.home() / ".gdprscanner"
|
|
_DATA_DIR.mkdir(exist_ok=True)
|
|
|
|
def _cp_path(prefix: str) -> Path:
|
|
return _DATA_DIR / f"checkpoint_{prefix}.json"
|
|
|
|
def _checkpoint_key(options: dict) -> str:
|
|
"""Stable hash of the scan options — used to detect when a checkpoint
|
|
belongs to a different scan configuration and should be ignored."""
|
|
sig = json.dumps({
|
|
"sources": sorted(options.get("sources", [])),
|
|
"user_ids": sorted([u["id"] if isinstance(u, dict) else u for u in options.get("user_ids", [])]),
|
|
"older_than_days": options.get("options", {}).get("older_than_days", 0),
|
|
}, sort_keys=True)
|
|
return hashlib.sha256(sig.encode()).hexdigest()[:16]
|
|
|
|
def _save_checkpoint(key: str, scanned_ids: set, flagged: list, meta: dict, *, prefix: str = "m365") -> None:
|
|
"""Write checkpoint to disk. Called periodically during scanning."""
|
|
try:
|
|
payload = {
|
|
"key": key,
|
|
"scanned_ids": list(scanned_ids),
|
|
"flagged": flagged,
|
|
"meta": {k: v for k, v in meta.items() if k != "options"},
|
|
}
|
|
path = _cp_path(prefix)
|
|
tmp = path.with_suffix(".tmp")
|
|
tmp.write_text(json.dumps(payload, ensure_ascii=False, default=str), encoding="utf-8")
|
|
tmp.replace(path)
|
|
except Exception as e:
|
|
logger.error("[checkpoint] save failed: %s", e)
|
|
|
|
def _load_checkpoint(key: str, *, prefix: str = "m365") -> dict | None:
|
|
"""Load checkpoint if it matches the current scan key. Returns None on mismatch or error."""
|
|
try:
|
|
path = _cp_path(prefix)
|
|
if not path.exists():
|
|
return None
|
|
payload = json.loads(path.read_text(encoding="utf-8"))
|
|
if payload.get("key") != key:
|
|
return None
|
|
return payload
|
|
except Exception:
|
|
return None
|
|
|
|
def _clear_checkpoint(*, prefix: str = "m365") -> None:
|
|
try:
|
|
path = _cp_path(prefix)
|
|
if path.exists():
|
|
path.unlink()
|
|
except Exception:
|
|
pass
|
|
|
|
_DELTA_PATH = _DATA_DIR / "delta.json"
|
|
|
|
def _load_delta_tokens() -> dict:
|
|
"""Return saved delta token map {key: deltaLink_url}."""
|
|
try:
|
|
if _DELTA_PATH.exists():
|
|
return json.loads(_DELTA_PATH.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
pass
|
|
return {}
|
|
|
|
def _save_delta_tokens(tokens: dict) -> None:
|
|
"""Persist delta tokens atomically."""
|
|
try:
|
|
tmp = _DELTA_PATH.with_suffix(".tmp")
|
|
tmp.write_text(json.dumps(tokens, ensure_ascii=False), encoding="utf-8")
|
|
tmp.replace(_DELTA_PATH)
|
|
except Exception as e:
|
|
logger.error("[delta] save failed: %s", e)
|
|
|
|
# ── Broadcast ─────────────────────────────────────────────────────────────────
|