GDPRScanner/tests/test_route_integration.py
StyxX65 68076eba52 Show all open (unactioned) items by default, not just the last scan
The default results view loaded only the latest scan session (±300s
window), so items dropped out of sight once a newer scan started — and
a long scheduled scan could show little or nothing on browser open.

Add get_open_items(): every flagged item with no disposition (or status
'unreviewed') across all scans, deduped by id to the latest finished
scan. GET /api/db/flagged now serves it when no ?ref is given; ?ref=N
still loads a specific past session. Frontend loadHistorySession(null)
routes to a new loadOpenItems() loader. Rename the banner button to
"Open items" (da/de/en).

get_session_items() default is unchanged — export.py and
scan_scheduler.py still rely on latest-session for the current scan's
report/email.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-22 09:19:55 +02:00

664 lines
26 KiB
Python

"""
Route integration tests — security-sensitive paths and data-correctness contracts.
Covers:
- Viewer token CRUD and scope validation
- GET /api/db/flagged role and user scope enforcement
- POST /api/db/disposition/bulk — only updates selected items
- Viewer PIN set / verify / rate-limit / clear
- Interface PIN set / gate / clear
- Scan lock always released (even when run_scan raises)
- GET /api/db/sessions basic shape
- Profile routes CRUD and rename
"""
from __future__ import annotations
import time
from unittest.mock import MagicMock
import pytest
# ---------------------------------------------------------------------------
# Module-level app fixture (shared with test_routes.py via flask_app)
# ---------------------------------------------------------------------------
@pytest.fixture(scope="module")
def flask_app():
import gdpr_scanner
gdpr_scanner.app.config["TESTING"] = True
gdpr_scanner.app.config["WTF_CSRF_ENABLED"] = False
return gdpr_scanner.app
@pytest.fixture()
def client(flask_app):
with flask_app.test_client() as c:
yield c
@pytest.fixture()
def db_patch(tmp_path, monkeypatch):
from gdpr_db import ScanDB
import routes.database, routes.export
db = ScanDB(str(tmp_path / "test.db"))
monkeypatch.setattr(routes.database, "_get_db", lambda: db)
monkeypatch.setattr(routes.database, "DB_OK", True)
monkeypatch.setattr(routes.export, "_get_db", lambda: db)
monkeypatch.setattr(routes.export, "DB_OK", True)
return db
@pytest.fixture()
def mock_connector(monkeypatch):
from routes import state
conn = MagicMock()
monkeypatch.setattr(state, "connector", conn)
return conn
@pytest.fixture(autouse=True)
def clean_state():
from routes import state
yield
state.flagged_items.clear()
if not state._scan_lock.acquire(blocking=False):
pass
else:
state._scan_lock.release()
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _seed_scan(db, items: list[dict]) -> int:
"""Create a completed scan and persist items. Returns the scan_id."""
scan_id = db.begin_scan({"sources": ["email"], "user_ids": [], "options": {}})
for item in items:
db.save_item(scan_id, item)
db.finish_scan(scan_id, total_scanned=len(items))
return scan_id
def _item(item_id: str, role: str = "staff", account_id: str = "") -> dict:
return {
"id": item_id,
"name": f"{item_id}.docx",
"source": "email",
"source_type": "email",
"account_id": account_id or f"{item_id}@school.dk",
"user_role": role,
"cpr_count": 1,
"face_count": 0,
"size_kb": 10,
"modified": "2025-01-01T00:00:00",
}
def _clear_viewer_pins():
"""Remove both viewer and interface PINs between tests."""
from app_config import clear_viewer_pin, clear_interface_pin
clear_viewer_pin()
clear_interface_pin()
# ---------------------------------------------------------------------------
# Viewer token CRUD
# ---------------------------------------------------------------------------
class TestViewerTokenCRUD:
def test_create_and_list(self, client):
r = client.post("/api/viewer/tokens",
json={"label": "Test token", "expires_days": 7})
assert r.status_code == 201
data = r.get_json()
assert "token" in data
tok = data["token"]
r2 = client.get("/api/viewer/tokens")
assert r2.status_code == 200
tokens = r2.get_json()
assert any(t["token"] == tok for t in tokens)
def test_delete_existing_token(self, client):
r = client.post("/api/viewer/tokens", json={"label": "to-delete"})
tok = r.get_json()["token"]
r2 = client.delete(f"/api/viewer/tokens/{tok}")
assert r2.status_code == 200
assert r2.get_json()["ok"] is True
r3 = client.get("/api/viewer/tokens")
tokens = r3.get_json()
assert not any(t["token"] == tok for t in tokens)
def test_delete_nonexistent_token_returns_404(self, client):
r = client.delete("/api/viewer/tokens/doesnotexist123")
assert r.status_code == 404
def test_validate_valid_token(self, client):
tok = client.post("/api/viewer/tokens", json={}).get_json()["token"]
r = client.post("/api/viewer/tokens/validate", json={"token": tok})
assert r.status_code == 200
assert r.get_json()["valid"] is True
def test_validate_invalid_token(self, client):
r = client.post("/api/viewer/tokens/validate",
json={"token": "notarealtoken00000000"})
assert r.status_code == 401
assert r.get_json()["valid"] is False
class TestViewerTokenScopeValidation:
def test_role_and_user_mutually_exclusive(self, client):
r = client.post("/api/viewer/tokens", json={
"scope": {"role": "student", "user": "alice@school.dk"}
})
assert r.status_code == 400
assert "mutually exclusive" in r.get_json()["error"]
def test_invalid_role_value(self, client):
r = client.post("/api/viewer/tokens", json={
"scope": {"role": "teacher"}
})
assert r.status_code == 400
assert "role" in r.get_json()["error"]
def test_user_email_must_contain_at(self, client):
r = client.post("/api/viewer/tokens", json={
"scope": {"user": "notanemail"}
})
assert r.status_code == 400
assert "email" in r.get_json()["error"].lower()
def test_valid_role_scope_stored(self, client):
r = client.post("/api/viewer/tokens",
json={"scope": {"role": "student"}})
assert r.status_code == 201
assert r.get_json()["scope"] == {"role": "student"}
def test_valid_user_scope_stored(self, client):
r = client.post("/api/viewer/tokens", json={
"scope": {
"user": ["alice@m365.dk", "alice@gws.dk"],
"display_name": "Alice Smith",
}
})
assert r.status_code == 201
scope = r.get_json()["scope"]
assert scope["user"] == ["alice@m365.dk", "alice@gws.dk"]
assert scope["display_name"] == "Alice Smith"
# ---------------------------------------------------------------------------
# GET /api/db/flagged — scope enforcement
# ---------------------------------------------------------------------------
class TestFlaggedScopeEnforcement:
def test_no_scope_returns_all_items(self, client, db_patch):
_seed_scan(db_patch, [
_item("s1", role="student"),
_item("s2", role="staff"),
])
r = client.get("/api/db/flagged")
assert r.status_code == 200
ids = {row["id"] for row in r.get_json()}
assert "s1" in ids
assert "s2" in ids
def test_role_scope_student_excludes_staff(self, client, db_patch):
_seed_scan(db_patch, [
_item("r1", role="student"),
_item("r2", role="staff"),
])
with client.session_transaction() as sess:
sess["viewer_ok"] = True
sess["viewer_scope"] = {"role": "student"}
r = client.get("/api/db/flagged")
ids = {row["id"] for row in r.get_json()}
assert "r1" in ids
assert "r2" not in ids
def test_role_scope_staff_excludes_students(self, client, db_patch):
_seed_scan(db_patch, [
_item("t1", role="student"),
_item("t2", role="staff"),
])
with client.session_transaction() as sess:
sess["viewer_ok"] = True
sess["viewer_scope"] = {"role": "staff"}
r = client.get("/api/db/flagged")
ids = {row["id"] for row in r.get_json()}
assert "t2" in ids
assert "t1" not in ids
def test_user_scope_returns_only_matching_account_id(self, client, db_patch):
_seed_scan(db_patch, [
_item("u1", account_id="alice@m365.dk"),
_item("u2", account_id="bob@m365.dk"),
])
with client.session_transaction() as sess:
sess["viewer_ok"] = True
sess["viewer_scope"] = {"user": ["alice@m365.dk"]}
r = client.get("/api/db/flagged")
ids = {row["id"] for row in r.get_json()}
assert "u1" in ids
assert "u2" not in ids
def test_user_scope_matches_both_platform_emails(self, client, db_patch):
# Same person — M365 UPN and GWS email both in scope
_seed_scan(db_patch, [
_item("p1", account_id="alice@m365.dk"),
_item("p2", account_id="alice@gws.dk"),
_item("p3", account_id="bob@m365.dk"),
])
with client.session_transaction() as sess:
sess["viewer_ok"] = True
sess["viewer_scope"] = {"user": ["alice@m365.dk", "alice@gws.dk"]}
r = client.get("/api/db/flagged")
ids = {row["id"] for row in r.get_json()}
assert "p1" in ids
assert "p2" in ids
assert "p3" not in ids
def test_user_scope_case_insensitive(self, client, db_patch):
_seed_scan(db_patch, [_item("ci1", account_id="Alice@M365.dk")])
with client.session_transaction() as sess:
sess["viewer_ok"] = True
sess["viewer_scope"] = {"user": ["alice@m365.dk"]}
r = client.get("/api/db/flagged")
ids = {row["id"] for row in r.get_json()}
assert "ci1" in ids
def test_no_ref_returns_open_items_across_all_sessions(self, client, db_patch):
# Two scans in separate session windows. The default (no-ref) view must
# surface unactioned items from BOTH, not just the latest session.
old_id = _seed_scan(db_patch, [_item("o1")])
db_patch._connect().execute(
"UPDATE scans SET started_at = started_at - 400 WHERE id = ?", (old_id,)
)
db_patch._connect().commit()
_seed_scan(db_patch, [_item("o2")])
r = client.get("/api/db/flagged")
ids = {row["id"] for row in r.get_json()}
assert ids == {"o1", "o2"}
def test_no_ref_excludes_items_with_a_disposition(self, client, db_patch):
_seed_scan(db_patch, [_item("d1"), _item("d2")])
db_patch.set_disposition("d1", "kept")
r = client.get("/api/db/flagged")
ids = {row["id"] for row in r.get_json()}
assert "d2" in ids # untouched → still open
assert "d1" not in ids # action taken → hidden
def test_no_ref_unreviewed_disposition_stays_open(self, client, db_patch):
_seed_scan(db_patch, [_item("u1")])
db_patch.set_disposition("u1", "unreviewed")
r = client.get("/api/db/flagged")
ids = {row["id"] for row in r.get_json()}
assert "u1" in ids # 'unreviewed' status is not an action
def test_no_ref_dedupes_rescanned_item_to_latest(self, client, db_patch):
# Same item flagged by two scans → appears once.
old_id = _seed_scan(db_patch, [_item("k1")])
db_patch._connect().execute(
"UPDATE scans SET started_at = started_at - 400 WHERE id = ?", (old_id,)
)
db_patch._connect().commit()
_seed_scan(db_patch, [_item("k1")])
rows = [row for row in client.get("/api/db/flagged").get_json() if row["id"] == "k1"]
assert len(rows) == 1
def test_ref_param_loads_historical_session(self, client, db_patch):
# Push first scan >300 s into the past so it occupies its own session window.
old_id = _seed_scan(db_patch, [_item("h1")])
db_patch._connect().execute(
"UPDATE scans SET started_at = started_at - 400 WHERE id = ?", (old_id,)
)
db_patch._connect().commit()
_seed_scan(db_patch, [_item("h2")])
r = client.get(f"/api/db/flagged?ref={old_id}")
ids = {row["id"] for row in r.get_json()}
assert "h1" in ids
# h2 belongs to a different (newer) session window — must not appear
assert "h2" not in ids
# ---------------------------------------------------------------------------
# POST /api/db/disposition/bulk
# ---------------------------------------------------------------------------
class TestBulkDisposition:
def test_updates_selected_items(self, client, db_patch):
_seed_scan(db_patch, [_item("b1"), _item("b2"), _item("b3")])
r = client.post("/api/db/disposition/bulk", json={
"item_ids": ["b1", "b2"],
"status": "retain-legal",
})
assert r.status_code == 200
assert r.get_json()["saved"] == 2
assert db_patch.get_disposition("b1")["status"] == "retain-legal"
assert db_patch.get_disposition("b2")["status"] == "retain-legal"
def test_unselected_item_unchanged(self, client, db_patch):
_seed_scan(db_patch, [_item("c1"), _item("c2")])
client.post("/api/db/disposition/bulk", json={
"item_ids": ["c1"],
"status": "delete-scheduled",
})
d = db_patch.get_disposition("c2")
# c2 was not in the bulk request — must remain unreviewed
assert d is None or d.get("status", "unreviewed") == "unreviewed"
def test_missing_item_ids_returns_400(self, client, db_patch):
r = client.post("/api/db/disposition/bulk",
json={"status": "retain-legal"})
assert r.status_code == 400
def test_missing_status_returns_400(self, client, db_patch):
r = client.post("/api/db/disposition/bulk",
json={"item_ids": ["x"]})
assert r.status_code == 400
def test_without_db_returns_503(self, client, monkeypatch):
import routes.database
monkeypatch.setattr(routes.database, "DB_OK", False)
r = client.post("/api/db/disposition/bulk",
json={"item_ids": ["x"], "status": "retain-legal"})
assert r.status_code == 503
# ---------------------------------------------------------------------------
# Viewer PIN
# ---------------------------------------------------------------------------
class TestViewerPin:
def setup_method(self):
_clear_viewer_pins()
def teardown_method(self):
_clear_viewer_pins()
def test_status_no_pin(self, client):
r = client.get("/api/viewer/pin")
assert r.status_code == 200
assert r.get_json()["pin_set"] is False
def test_set_and_status_reflects_set(self, client):
client.post("/api/viewer/pin", json={"pin": "1234"})
r = client.get("/api/viewer/pin")
assert r.get_json()["pin_set"] is True
def test_set_too_short_rejected(self, client):
r = client.post("/api/viewer/pin", json={"pin": "123"})
assert r.status_code == 400
def test_set_too_long_rejected(self, client):
r = client.post("/api/viewer/pin", json={"pin": "123456789"})
assert r.status_code == 400
def test_set_non_digits_rejected(self, client):
r = client.post("/api/viewer/pin", json={"pin": "abcd"})
assert r.status_code == 400
def test_verify_correct_pin_sets_session(self, client):
client.post("/api/viewer/pin", json={"pin": "4321"})
r = client.post("/api/viewer/pin/verify", json={"pin": "4321"})
assert r.status_code == 200
assert r.get_json()["ok"] is True
def test_verify_wrong_pin_returns_401(self, client):
client.post("/api/viewer/pin", json={"pin": "4321"})
r = client.post("/api/viewer/pin/verify", json={"pin": "9999"})
assert r.status_code == 401
def test_verify_rate_limit_after_5_failures(self, client):
client.post("/api/viewer/pin", json={"pin": "5678"})
from routes.viewer import _pin_attempts
_pin_attempts.clear()
for _ in range(5):
client.post("/api/viewer/pin/verify", json={"pin": "0000"})
r = client.post("/api/viewer/pin/verify", json={"pin": "0000"})
assert r.status_code == 429
_pin_attempts.clear()
def test_change_pin_requires_current(self, client):
client.post("/api/viewer/pin", json={"pin": "1111"})
r = client.post("/api/viewer/pin",
json={"pin": "2222", "current_pin": "9999"})
assert r.status_code == 403
def test_change_pin_with_correct_current(self, client):
client.post("/api/viewer/pin", json={"pin": "1111"})
r = client.post("/api/viewer/pin",
json={"pin": "2222", "current_pin": "1111"})
assert r.status_code == 200
# Old PIN no longer valid
r2 = client.post("/api/viewer/pin/verify", json={"pin": "1111"})
assert r2.status_code == 401
def test_clear_pin_requires_current(self, client):
client.post("/api/viewer/pin", json={"pin": "3333"})
r = client.delete("/api/viewer/pin", json={"current_pin": "0000"})
assert r.status_code == 403
def test_clear_pin_with_correct_current(self, client):
client.post("/api/viewer/pin", json={"pin": "3333"})
r = client.delete("/api/viewer/pin", json={"current_pin": "3333"})
assert r.status_code == 200
assert client.get("/api/viewer/pin").get_json()["pin_set"] is False
# ---------------------------------------------------------------------------
# Interface PIN
# ---------------------------------------------------------------------------
class TestInterfacePin:
def setup_method(self):
_clear_viewer_pins()
def teardown_method(self):
_clear_viewer_pins()
def test_status_no_pin(self, client):
r = client.get("/api/interface/pin")
assert r.get_json()["pin_set"] is False
def test_set_and_verify(self, client):
r = client.post("/api/interface/pin", json={"pin": "7777"})
assert r.status_code == 200
# Gate is now active — authenticate before the status check
with client.session_transaction() as sess:
sess["interface_ok"] = True
assert client.get("/api/interface/pin").get_json()["pin_set"] is True
def test_non_digit_rejected(self, client):
r = client.post("/api/interface/pin", json={"pin": "abcd"})
assert r.status_code == 400
def test_set_requires_current_when_set(self, client):
client.post("/api/interface/pin", json={"pin": "7777"})
with client.session_transaction() as sess:
sess["interface_ok"] = True
r = client.post("/api/interface/pin",
json={"pin": "8888", "current_pin": "0000"})
assert r.status_code == 403
def test_clear_requires_current(self, client):
client.post("/api/interface/pin", json={"pin": "7777"})
with client.session_transaction() as sess:
sess["interface_ok"] = True
r = client.delete("/api/interface/pin", json={"current_pin": "0000"})
assert r.status_code == 403
def test_clear_with_correct_current(self, client):
client.post("/api/interface/pin", json={"pin": "7777"})
with client.session_transaction() as sess:
sess["interface_ok"] = True
r = client.delete("/api/interface/pin", json={"current_pin": "7777"})
assert r.status_code == 200
assert client.get("/api/interface/pin").get_json()["pin_set"] is False
# ---------------------------------------------------------------------------
# Scan lock released on run_scan() exception
# ---------------------------------------------------------------------------
class TestScanLockReleasedOnError:
def test_lock_released_when_run_scan_raises(self, client, mock_connector,
monkeypatch):
import scan_engine
from routes import state
def _boom(opts):
raise RuntimeError("simulated scan failure")
monkeypatch.setattr(scan_engine, "run_scan", _boom)
r = client.post("/api/scan/start", json={"sources": ["email"]})
assert r.status_code == 200
# Wait for the background thread to finish and release the lock
deadline = time.time() + 2.0
while True:
acquired = state._scan_lock.acquire(blocking=False)
if acquired:
state._scan_lock.release()
break
assert time.time() < deadline, "scan lock was never released after exception"
time.sleep(0.05)
# ---------------------------------------------------------------------------
# GET /api/db/sessions
# ---------------------------------------------------------------------------
class TestDbSessions:
def test_returns_list(self, client, db_patch):
r = client.get("/api/db/sessions")
assert r.status_code == 200
assert isinstance(r.get_json(), list)
def test_completed_scan_appears_in_sessions(self, client, db_patch):
_seed_scan(db_patch, [_item("sess1")])
r = client.get("/api/db/sessions")
sessions = r.get_json()
assert len(sessions) >= 1
s = sessions[0]
assert "ref_scan_id" in s
assert "flagged_count" in s
assert s["flagged_count"] == 1
def test_sessions_ordered_newest_first(self, client, db_patch):
# Create two scans >300 s apart so each forms its own session window.
old_id = _seed_scan(db_patch, [_item("old1")])
db_patch._connect().execute(
"UPDATE scans SET started_at = started_at - 400 WHERE id = ?", (old_id,)
)
db_patch._connect().commit()
_seed_scan(db_patch, [_item("new1")])
sessions = client.get("/api/db/sessions").get_json()
assert len(sessions) == 2
# Newest session (highest ref_scan_id) must be first
assert sessions[0]["ref_scan_id"] > sessions[1]["ref_scan_id"]
# ---------------------------------------------------------------------------
# Profile routes
# ---------------------------------------------------------------------------
class TestProfileRoutes:
"""
Tests for GET /api/profiles, POST /api/profiles/save,
GET /api/profiles/get, and POST /api/profiles/delete.
Each test monkeypatches the profile storage path to a tmp directory so
tests are fully isolated from the real ~/.gdprscanner/settings.json.
"""
@pytest.fixture(autouse=True)
def _isolate(self, tmp_path, monkeypatch):
import app_config
monkeypatch.setattr(app_config, "_SETTINGS_PATH", tmp_path / "settings.json")
def test_list_returns_empty_list_initially(self, client):
r = client.get("/api/profiles")
assert r.status_code == 200
assert r.get_json()["profiles"] == []
def test_save_missing_name_returns_400(self, client):
r = client.post("/api/profiles/save", json={"sources": ["email"]})
assert r.status_code == 400
assert "error" in r.get_json()
def test_save_creates_profile_and_returns_it(self, client):
r = client.post("/api/profiles/save", json={
"id": "", "name": "Alpha", "sources": ["email"], "options": {}
})
assert r.status_code == 200
data = r.get_json()
assert data["status"] == "saved"
assert data["profile"]["name"] == "Alpha"
assert data["profile"]["id"] # server assigned a non-empty id
def test_saved_profile_appears_in_list(self, client):
client.post("/api/profiles/save", json={"name": "Beta", "sources": [], "options": {}})
profiles = client.get("/api/profiles").get_json()["profiles"]
assert any(p["name"] == "Beta" for p in profiles)
def test_rename_updates_name_in_list(self, client):
"""Regression: _pmgmtSaveFullEdit renames the copy — the API must
persist the new name so loadProfiles() returns fresh data for the
left-column re-render."""
r = client.post("/api/profiles/save", json={
"id": "", "name": "LOCAL-TEST (copy)", "sources": [], "options": {}
})
profile_id = r.get_json()["profile"]["id"]
# Simulate the user renaming the copy in the editor and clicking Save
r2 = client.post("/api/profiles/save", json={
"id": profile_id, "name": "LOCAL-TEST-2", "sources": [], "options": {}
})
assert r2.status_code == 200
assert r2.get_json()["profile"]["name"] == "LOCAL-TEST-2"
profiles = client.get("/api/profiles").get_json()["profiles"]
names = [p["name"] for p in profiles]
assert "LOCAL-TEST-2" in names
assert "LOCAL-TEST (copy)" not in names
def test_get_by_id(self, client):
r = client.post("/api/profiles/save", json={
"id": "fixed-id-1", "name": "Gamma", "sources": [], "options": {}
})
profile_id = r.get_json()["profile"]["id"]
r2 = client.get(f"/api/profiles/get?id={profile_id}")
assert r2.status_code == 200
assert r2.get_json()["profile"]["name"] == "Gamma"
def test_get_nonexistent_returns_404(self, client):
r = client.get("/api/profiles/get?id=does-not-exist")
assert r.status_code == 404
def test_delete_removes_profile(self, client):
client.post("/api/profiles/save", json={"name": "ToDelete", "sources": [], "options": {}})
r = client.post("/api/profiles/delete", json={"name": "ToDelete"})
assert r.status_code == 200
assert r.get_json()["status"] == "deleted"
profiles = client.get("/api/profiles").get_json()["profiles"]
assert not any(p["name"] == "ToDelete" for p in profiles)
def test_delete_nonexistent_returns_not_found(self, client):
r = client.post("/api/profiles/delete", json={"name": "Ghost"})
assert r.status_code == 200
assert r.get_json()["status"] == "not_found"
def test_delete_missing_key_returns_400(self, client):
r = client.post("/api/profiles/delete", json={})
assert r.status_code == 400