Immutable audit_log table in the scanner DB records every significant admin action (profile save/delete, token create/revoke, PIN changes, source add/update/delete, scheduler job changes, scan start/stop, SMTP save, dispositions, item delete/redact). GET /api/audit_log exposes entries newest-first. New Audit Log tab in the Settings modal renders the table on demand. Settings modal widened 540→640 px and tab labels set to white-space:nowrap so the six-tab row fits on one line. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
251 lines
9.6 KiB
Python
251 lines
9.6 KiB
Python
"""
|
||
Read-only viewer token + PIN management routes (#33).
|
||
"""
|
||
from __future__ import annotations
|
||
import time
|
||
from flask import Blueprint, jsonify, request, session
|
||
from app_config import (
|
||
create_viewer_token,
|
||
validate_viewer_token,
|
||
revoke_viewer_token,
|
||
cleanup_expired_viewer_tokens,
|
||
_load_viewer_tokens,
|
||
get_viewer_pin_hash,
|
||
set_viewer_pin,
|
||
verify_viewer_pin,
|
||
clear_viewer_pin,
|
||
get_interface_pin_hash,
|
||
set_interface_pin,
|
||
verify_interface_pin,
|
||
clear_interface_pin,
|
||
)
|
||
try:
|
||
from gdpr_db import log_audit_event as _audit
|
||
except ImportError:
|
||
def _audit(*a, **kw): pass # type: ignore[misc]
|
||
|
||
bp = Blueprint("viewer", __name__)
|
||
|
||
# Simple brute-force guard: keyed by remote IP.
|
||
_pin_attempts: dict[str, list[float]] = {}
|
||
_MAX_ATTEMPTS = 5
|
||
_WINDOW_S = 300 # 5 minutes
|
||
|
||
|
||
def _pin_rate_limit(ip: str) -> bool:
|
||
"""Return True if the IP is rate-limited (too many recent failures)."""
|
||
now = time.time()
|
||
times = [t for t in _pin_attempts.get(ip, []) if now - t < _WINDOW_S]
|
||
_pin_attempts[ip] = times
|
||
return len(times) >= _MAX_ATTEMPTS
|
||
|
||
|
||
def _pin_record_failure(ip: str) -> None:
|
||
now = time.time()
|
||
_pin_attempts.setdefault(ip, []).append(now)
|
||
|
||
|
||
def _pin_clear_failures(ip: str) -> None:
|
||
_pin_attempts.pop(ip, None)
|
||
|
||
|
||
# ── Token endpoints ───────────────────────────────────────────────────────────
|
||
|
||
@bp.route("/api/viewer/tokens", methods=["GET"])
|
||
def list_tokens():
|
||
cleanup_expired_viewer_tokens()
|
||
tokens = _load_viewer_tokens()
|
||
safe = [
|
||
{
|
||
"token_hint": t["token"][:8] + "…",
|
||
"token": t["token"],
|
||
"label": t.get("label", ""),
|
||
"scope": t.get("scope", {}),
|
||
"created_at": t.get("created_at"),
|
||
"expires_at": t.get("expires_at"),
|
||
"last_used_at": t.get("last_used_at"),
|
||
}
|
||
for t in tokens
|
||
]
|
||
return jsonify(safe)
|
||
|
||
|
||
@bp.route("/api/viewer/tokens", methods=["POST"])
|
||
def create_token():
|
||
body = request.get_json(silent=True) or {}
|
||
label = str(body.get("label", "")).strip()
|
||
expires_days = body.get("expires_days")
|
||
if expires_days is not None:
|
||
try:
|
||
expires_days = int(expires_days)
|
||
if expires_days <= 0:
|
||
return jsonify({"error": "expires_days must be a positive integer"}), 400
|
||
except (TypeError, ValueError):
|
||
return jsonify({"error": "expires_days must be a positive integer"}), 400
|
||
raw_scope = body.get("scope", {})
|
||
if not isinstance(raw_scope, dict):
|
||
return jsonify({"error": "scope must be an object"}), 400
|
||
role = str(raw_scope.get("role", "")).strip()
|
||
# user may be a single email string (legacy) or a list of email strings
|
||
raw_user = raw_scope.get("user", "")
|
||
if isinstance(raw_user, str):
|
||
user_emails = [raw_user.strip().lower()] if raw_user.strip() else []
|
||
elif isinstance(raw_user, list):
|
||
user_emails = [str(e).strip().lower() for e in raw_user if str(e).strip()]
|
||
else:
|
||
user_emails = []
|
||
display_name = str(raw_scope.get("display_name", "")).strip()
|
||
if role and user_emails:
|
||
return jsonify({"error": "scope.role and scope.user are mutually exclusive"}), 400
|
||
if role not in ("", "student", "staff"):
|
||
return jsonify({"error": "scope.role must be '', 'student', or 'staff'"}), 400
|
||
if user_emails and not all("@" in e for e in user_emails):
|
||
return jsonify({"error": "scope.user entries must be valid email addresses"}), 400
|
||
valid_from = str(raw_scope.get("valid_from", "")).strip()
|
||
valid_to = str(raw_scope.get("valid_to", "")).strip()
|
||
from datetime import datetime as _dt
|
||
for _d, _lbl in ((valid_from, "valid_from"), (valid_to, "valid_to")):
|
||
if _d:
|
||
try:
|
||
_dt.strptime(_d, "%Y-%m-%d")
|
||
except ValueError:
|
||
return jsonify({"error": f"scope.{_lbl} must be YYYY-MM-DD"}), 400
|
||
if valid_from and valid_to and valid_from > valid_to:
|
||
return jsonify({"error": "scope.valid_from must be ≤ scope.valid_to"}), 400
|
||
if user_emails:
|
||
scope = {"user": user_emails, "display_name": display_name or user_emails[0]}
|
||
elif role:
|
||
scope = {"role": role}
|
||
else:
|
||
scope = {}
|
||
if valid_from:
|
||
scope["valid_from"] = valid_from
|
||
if valid_to:
|
||
scope["valid_to"] = valid_to
|
||
entry = create_viewer_token(label=label, expires_days=expires_days, scope=scope)
|
||
_audit("token_create", f"label={label!r} scope={scope}",
|
||
ip=request.remote_addr or "")
|
||
return jsonify(entry), 201
|
||
|
||
|
||
@bp.route("/api/viewer/tokens/<token>", methods=["DELETE"])
|
||
def delete_token(token: str):
|
||
if not token:
|
||
return jsonify({"error": "token required"}), 400
|
||
removed = revoke_viewer_token(token)
|
||
if not removed:
|
||
return jsonify({"error": "token not found"}), 404
|
||
_audit("token_revoke", f"token={token[:8]}...", ip=request.remote_addr or "")
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
@bp.route("/api/viewer/tokens/validate", methods=["POST"])
|
||
def validate_token():
|
||
body = request.get_json(silent=True) or {}
|
||
token = str(body.get("token", "")).strip()
|
||
entry = validate_viewer_token(token)
|
||
if entry is None:
|
||
return jsonify({"valid": False}), 401
|
||
return jsonify({"valid": True, "label": entry.get("label", ""), "expires_at": entry.get("expires_at")})
|
||
|
||
|
||
# ── PIN endpoints ─────────────────────────────────────────────────────────────
|
||
|
||
@bp.route("/api/viewer/pin", methods=["GET"])
|
||
def pin_status():
|
||
"""Return whether a viewer PIN is currently set."""
|
||
return jsonify({"pin_set": bool(get_viewer_pin_hash())})
|
||
|
||
|
||
@bp.route("/api/viewer/pin", methods=["POST"])
|
||
def pin_set():
|
||
"""Set or change the viewer PIN.
|
||
Body: {pin: "...", current_pin: "..."}
|
||
current_pin required only when a PIN is already set.
|
||
"""
|
||
body = request.get_json(silent=True) or {}
|
||
new_pin = str(body.get("pin", "")).strip()
|
||
if not new_pin:
|
||
return jsonify({"error": "pin required"}), 400
|
||
if not new_pin.isdigit() or not (4 <= len(new_pin) <= 8):
|
||
return jsonify({"error": "PIN must be 4–8 digits"}), 400
|
||
had_pin = bool(get_viewer_pin_hash())
|
||
if had_pin:
|
||
if not verify_viewer_pin(str(body.get("current_pin", "")).strip()):
|
||
return jsonify({"error": "current PIN is incorrect"}), 403
|
||
set_viewer_pin(new_pin)
|
||
_audit("viewer_pin_change" if had_pin else "viewer_pin_set", "",
|
||
ip=request.remote_addr or "")
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
@bp.route("/api/viewer/pin", methods=["DELETE"])
|
||
def pin_clear():
|
||
"""Remove the viewer PIN. Requires current PIN if one is set."""
|
||
body = request.get_json(silent=True) or {}
|
||
if get_viewer_pin_hash():
|
||
if not verify_viewer_pin(str(body.get("current_pin", "")).strip()):
|
||
return jsonify({"error": "current PIN is incorrect"}), 403
|
||
clear_viewer_pin()
|
||
_audit("viewer_pin_clear", "", ip=request.remote_addr or "")
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
# ── Interface PIN management endpoints ───────────────────────────────────────
|
||
|
||
@bp.route("/api/interface/pin", methods=["GET"])
|
||
def interface_pin_status():
|
||
"""Return whether an interface PIN is currently set."""
|
||
return jsonify({"pin_set": bool(get_interface_pin_hash())})
|
||
|
||
|
||
@bp.route("/api/interface/pin", methods=["POST"])
|
||
def interface_pin_set():
|
||
"""Set or change the interface PIN.
|
||
Body: {pin: "...", current_pin: "..."}
|
||
current_pin required only when a PIN is already set.
|
||
"""
|
||
body = request.get_json(silent=True) or {}
|
||
new_pin = str(body.get("pin", "")).strip()
|
||
if not new_pin:
|
||
return jsonify({"error": "pin required"}), 400
|
||
if not new_pin.isdigit() or not (4 <= len(new_pin) <= 8):
|
||
return jsonify({"error": "PIN must be 4–8 digits"}), 400
|
||
had_ipin = bool(get_interface_pin_hash())
|
||
if had_ipin:
|
||
if not verify_interface_pin(str(body.get("current_pin", "")).strip()):
|
||
return jsonify({"error": "current PIN is incorrect"}), 403
|
||
set_interface_pin(new_pin)
|
||
_audit("interface_pin_change" if had_ipin else "interface_pin_set", "",
|
||
ip=request.remote_addr or "")
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
@bp.route("/api/interface/pin", methods=["DELETE"])
|
||
def interface_pin_clear():
|
||
"""Remove the interface PIN. Requires current PIN if one is set."""
|
||
body = request.get_json(silent=True) or {}
|
||
if get_interface_pin_hash():
|
||
if not verify_interface_pin(str(body.get("current_pin", "")).strip()):
|
||
return jsonify({"error": "current PIN is incorrect"}), 403
|
||
clear_interface_pin()
|
||
_audit("interface_pin_clear", "", ip=request.remote_addr or "")
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
@bp.route("/api/viewer/pin/verify", methods=["POST"])
|
||
def pin_verify():
|
||
"""Verify a PIN submission and set a viewer session on success."""
|
||
ip = request.remote_addr or "unknown"
|
||
if _pin_rate_limit(ip):
|
||
return jsonify({"error": "Too many failed attempts. Try again later."}), 429
|
||
body = request.get_json(silent=True) or {}
|
||
pin = str(body.get("pin", "")).strip()
|
||
if not verify_viewer_pin(pin):
|
||
_pin_record_failure(ip)
|
||
remaining = _MAX_ATTEMPTS - len(_pin_attempts.get(ip, []))
|
||
return jsonify({"error": "Incorrect PIN", "remaining": max(0, remaining)}), 401
|
||
_pin_clear_failures(ip)
|
||
session["viewer_ok"] = True
|
||
return jsonify({"ok": True})
|