GDPRScanner/routes/viewer.py
StyxX65 744813f4ac Add compliance audit log
Immutable audit_log table in the scanner DB records every significant
admin action (profile save/delete, token create/revoke, PIN changes,
source add/update/delete, scheduler job changes, scan start/stop, SMTP
save, dispositions, item delete/redact). GET /api/audit_log exposes
entries newest-first. New Audit Log tab in the Settings modal renders
the table on demand. Settings modal widened 540→640 px and tab labels
set to white-space:nowrap so the six-tab row fits on one line.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-28 10:51:23 +02:00

251 lines
9.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Read-only viewer token + PIN management routes (#33).
"""
from __future__ import annotations
import time
from flask import Blueprint, jsonify, request, session
from app_config import (
create_viewer_token,
validate_viewer_token,
revoke_viewer_token,
cleanup_expired_viewer_tokens,
_load_viewer_tokens,
get_viewer_pin_hash,
set_viewer_pin,
verify_viewer_pin,
clear_viewer_pin,
get_interface_pin_hash,
set_interface_pin,
verify_interface_pin,
clear_interface_pin,
)
try:
from gdpr_db import log_audit_event as _audit
except ImportError:
def _audit(*a, **kw): pass # type: ignore[misc]
bp = Blueprint("viewer", __name__)
# Simple brute-force guard: keyed by remote IP.
_pin_attempts: dict[str, list[float]] = {}
_MAX_ATTEMPTS = 5
_WINDOW_S = 300 # 5 minutes
def _pin_rate_limit(ip: str) -> bool:
"""Return True if the IP is rate-limited (too many recent failures)."""
now = time.time()
times = [t for t in _pin_attempts.get(ip, []) if now - t < _WINDOW_S]
_pin_attempts[ip] = times
return len(times) >= _MAX_ATTEMPTS
def _pin_record_failure(ip: str) -> None:
now = time.time()
_pin_attempts.setdefault(ip, []).append(now)
def _pin_clear_failures(ip: str) -> None:
_pin_attempts.pop(ip, None)
# ── Token endpoints ───────────────────────────────────────────────────────────
@bp.route("/api/viewer/tokens", methods=["GET"])
def list_tokens():
cleanup_expired_viewer_tokens()
tokens = _load_viewer_tokens()
safe = [
{
"token_hint": t["token"][:8] + "",
"token": t["token"],
"label": t.get("label", ""),
"scope": t.get("scope", {}),
"created_at": t.get("created_at"),
"expires_at": t.get("expires_at"),
"last_used_at": t.get("last_used_at"),
}
for t in tokens
]
return jsonify(safe)
@bp.route("/api/viewer/tokens", methods=["POST"])
def create_token():
body = request.get_json(silent=True) or {}
label = str(body.get("label", "")).strip()
expires_days = body.get("expires_days")
if expires_days is not None:
try:
expires_days = int(expires_days)
if expires_days <= 0:
return jsonify({"error": "expires_days must be a positive integer"}), 400
except (TypeError, ValueError):
return jsonify({"error": "expires_days must be a positive integer"}), 400
raw_scope = body.get("scope", {})
if not isinstance(raw_scope, dict):
return jsonify({"error": "scope must be an object"}), 400
role = str(raw_scope.get("role", "")).strip()
# user may be a single email string (legacy) or a list of email strings
raw_user = raw_scope.get("user", "")
if isinstance(raw_user, str):
user_emails = [raw_user.strip().lower()] if raw_user.strip() else []
elif isinstance(raw_user, list):
user_emails = [str(e).strip().lower() for e in raw_user if str(e).strip()]
else:
user_emails = []
display_name = str(raw_scope.get("display_name", "")).strip()
if role and user_emails:
return jsonify({"error": "scope.role and scope.user are mutually exclusive"}), 400
if role not in ("", "student", "staff"):
return jsonify({"error": "scope.role must be '', 'student', or 'staff'"}), 400
if user_emails and not all("@" in e for e in user_emails):
return jsonify({"error": "scope.user entries must be valid email addresses"}), 400
valid_from = str(raw_scope.get("valid_from", "")).strip()
valid_to = str(raw_scope.get("valid_to", "")).strip()
from datetime import datetime as _dt
for _d, _lbl in ((valid_from, "valid_from"), (valid_to, "valid_to")):
if _d:
try:
_dt.strptime(_d, "%Y-%m-%d")
except ValueError:
return jsonify({"error": f"scope.{_lbl} must be YYYY-MM-DD"}), 400
if valid_from and valid_to and valid_from > valid_to:
return jsonify({"error": "scope.valid_from must be ≤ scope.valid_to"}), 400
if user_emails:
scope = {"user": user_emails, "display_name": display_name or user_emails[0]}
elif role:
scope = {"role": role}
else:
scope = {}
if valid_from:
scope["valid_from"] = valid_from
if valid_to:
scope["valid_to"] = valid_to
entry = create_viewer_token(label=label, expires_days=expires_days, scope=scope)
_audit("token_create", f"label={label!r} scope={scope}",
ip=request.remote_addr or "")
return jsonify(entry), 201
@bp.route("/api/viewer/tokens/<token>", methods=["DELETE"])
def delete_token(token: str):
if not token:
return jsonify({"error": "token required"}), 400
removed = revoke_viewer_token(token)
if not removed:
return jsonify({"error": "token not found"}), 404
_audit("token_revoke", f"token={token[:8]}...", ip=request.remote_addr or "")
return jsonify({"ok": True})
@bp.route("/api/viewer/tokens/validate", methods=["POST"])
def validate_token():
body = request.get_json(silent=True) or {}
token = str(body.get("token", "")).strip()
entry = validate_viewer_token(token)
if entry is None:
return jsonify({"valid": False}), 401
return jsonify({"valid": True, "label": entry.get("label", ""), "expires_at": entry.get("expires_at")})
# ── PIN endpoints ─────────────────────────────────────────────────────────────
@bp.route("/api/viewer/pin", methods=["GET"])
def pin_status():
"""Return whether a viewer PIN is currently set."""
return jsonify({"pin_set": bool(get_viewer_pin_hash())})
@bp.route("/api/viewer/pin", methods=["POST"])
def pin_set():
"""Set or change the viewer PIN.
Body: {pin: "...", current_pin: "..."}
current_pin required only when a PIN is already set.
"""
body = request.get_json(silent=True) or {}
new_pin = str(body.get("pin", "")).strip()
if not new_pin:
return jsonify({"error": "pin required"}), 400
if not new_pin.isdigit() or not (4 <= len(new_pin) <= 8):
return jsonify({"error": "PIN must be 48 digits"}), 400
had_pin = bool(get_viewer_pin_hash())
if had_pin:
if not verify_viewer_pin(str(body.get("current_pin", "")).strip()):
return jsonify({"error": "current PIN is incorrect"}), 403
set_viewer_pin(new_pin)
_audit("viewer_pin_change" if had_pin else "viewer_pin_set", "",
ip=request.remote_addr or "")
return jsonify({"ok": True})
@bp.route("/api/viewer/pin", methods=["DELETE"])
def pin_clear():
"""Remove the viewer PIN. Requires current PIN if one is set."""
body = request.get_json(silent=True) or {}
if get_viewer_pin_hash():
if not verify_viewer_pin(str(body.get("current_pin", "")).strip()):
return jsonify({"error": "current PIN is incorrect"}), 403
clear_viewer_pin()
_audit("viewer_pin_clear", "", ip=request.remote_addr or "")
return jsonify({"ok": True})
# ── Interface PIN management endpoints ───────────────────────────────────────
@bp.route("/api/interface/pin", methods=["GET"])
def interface_pin_status():
"""Return whether an interface PIN is currently set."""
return jsonify({"pin_set": bool(get_interface_pin_hash())})
@bp.route("/api/interface/pin", methods=["POST"])
def interface_pin_set():
"""Set or change the interface PIN.
Body: {pin: "...", current_pin: "..."}
current_pin required only when a PIN is already set.
"""
body = request.get_json(silent=True) or {}
new_pin = str(body.get("pin", "")).strip()
if not new_pin:
return jsonify({"error": "pin required"}), 400
if not new_pin.isdigit() or not (4 <= len(new_pin) <= 8):
return jsonify({"error": "PIN must be 48 digits"}), 400
had_ipin = bool(get_interface_pin_hash())
if had_ipin:
if not verify_interface_pin(str(body.get("current_pin", "")).strip()):
return jsonify({"error": "current PIN is incorrect"}), 403
set_interface_pin(new_pin)
_audit("interface_pin_change" if had_ipin else "interface_pin_set", "",
ip=request.remote_addr or "")
return jsonify({"ok": True})
@bp.route("/api/interface/pin", methods=["DELETE"])
def interface_pin_clear():
"""Remove the interface PIN. Requires current PIN if one is set."""
body = request.get_json(silent=True) or {}
if get_interface_pin_hash():
if not verify_interface_pin(str(body.get("current_pin", "")).strip()):
return jsonify({"error": "current PIN is incorrect"}), 403
clear_interface_pin()
_audit("interface_pin_clear", "", ip=request.remote_addr or "")
return jsonify({"ok": True})
@bp.route("/api/viewer/pin/verify", methods=["POST"])
def pin_verify():
"""Verify a PIN submission and set a viewer session on success."""
ip = request.remote_addr or "unknown"
if _pin_rate_limit(ip):
return jsonify({"error": "Too many failed attempts. Try again later."}), 429
body = request.get_json(silent=True) or {}
pin = str(body.get("pin", "")).strip()
if not verify_viewer_pin(pin):
_pin_record_failure(ip)
remaining = _MAX_ATTEMPTS - len(_pin_attempts.get(ip, []))
return jsonify({"error": "Incorrect PIN", "remaining": max(0, remaining)}), 401
_pin_clear_failures(ip)
session["viewer_ok"] = True
return jsonify({"ok": True})