GDPRScanner/tests/test_routes.py
StyxX65 f84c8516df Reliably restore last session on refresh after a server restart
The page-load restore was one-shot and bailed when a completed scan's
replayed scan_phase left a running flag set; sse_replay_done (the other
retry) only fires for a non-empty replay buffer, which is empty after a
restart — so refreshing post-update showed a blank grid despite the
results being in the DB. The watchdog now retries the restore on each
4s poll while nothing is shown and no scan runs, clearing stale flags
first. /api/scan/status also reports google_running separately so a
refresh during a live Google scan is no longer treated as idle.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-16 11:53:07 +02:00

294 lines
11 KiB
Python

"""
Integration tests for Flask routes — uses the real Flask test client.
Strategy
--------
- ``flask_app`` (module-scope) — imports gdpr_scanner once, enables TESTING mode.
- ``client`` (function-scope) — fresh test_client() per test.
- ``db_patch`` (function-scope) — replaces routes.database._get_db with a ScanDB
backed by a tmp_path so tests never touch ~/.gdprscanner.
Also sets routes.database.DB_OK = True.
- ``mock_connector`` — sets routes.state.connector to a MagicMock so routes
that require authentication pass the ``if not state.connector``
guard.
- ``clean_state`` — autouse, resets routes.state.flagged_items and ensures the
scan lock is released between tests.
"""
import io
import threading
import time
from unittest.mock import MagicMock
import pytest
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(scope="module")
def flask_app():
import gdpr_scanner
gdpr_scanner.app.config["TESTING"] = True
gdpr_scanner.app.config["WTF_CSRF_ENABLED"] = False
return gdpr_scanner.app
@pytest.fixture()
def client(flask_app):
with flask_app.test_client() as c:
yield c
@pytest.fixture()
def db_patch(tmp_path, monkeypatch):
"""Point routes.database and routes.export _get_db at a fresh ScanDB in a temp dir."""
from gdpr_db import ScanDB
import routes.database, routes.export
db = ScanDB(str(tmp_path / "test.db"))
monkeypatch.setattr(routes.database, "_get_db", lambda: db)
monkeypatch.setattr(routes.database, "DB_OK", True)
monkeypatch.setattr(routes.export, "_get_db", lambda: db)
monkeypatch.setattr(routes.export, "DB_OK", True)
return db
@pytest.fixture()
def mock_connector(monkeypatch):
"""Satisfy the connector guard in scan routes.
/api/scan/start is now handled exclusively by the blueprint (routes/scan.py),
which checks ``state.connector``. Patching state.connector is sufficient.
"""
from routes import state
conn = MagicMock()
monkeypatch.setattr(state, "connector", conn)
return conn
@pytest.fixture(autouse=True)
def clean_state():
"""Wipe in-memory scan state and ensure the scan lock is free after each test."""
from routes import state
yield
# Clear in-memory results so export tests don't bleed into each other
state.flagged_items.clear()
# Release the lock if a test left it held (e.g. a failed scan-start test)
if not state._scan_lock.acquire(blocking=False):
pass # still held — leave it; the test that set it is responsible
else:
state._scan_lock.release()
# ---------------------------------------------------------------------------
# /api/scan/status
# ---------------------------------------------------------------------------
class TestScanStatus:
def test_idle_returns_not_running(self, client):
r = client.get("/api/scan/status")
assert r.status_code == 200
data = r.get_json()
assert data["running"] is False
def test_scan_id_is_none_when_idle(self, client):
r = client.get("/api/scan/status")
data = r.get_json()
assert "scan_id" in data
assert data["scan_id"] is None
def test_idle_reports_google_not_running(self, client):
# The refresh/restore path relies on google_running being reported
# separately — running alone misses live Google scans.
data = client.get("/api/scan/status").get_json()
assert data["google_running"] is False
def test_google_lock_held_reports_google_running(self, client):
from routes import state
assert state._google_scan_lock.acquire(blocking=False)
try:
data = client.get("/api/scan/status").get_json()
assert data["google_running"] is True
assert data["running"] is False # M365/file lock still free
finally:
state._google_scan_lock.release()
# ---------------------------------------------------------------------------
# /api/scan/start
# ---------------------------------------------------------------------------
class TestScanStart:
def test_unauthenticated_returns_401(self, client, monkeypatch):
from routes import state
monkeypatch.setattr(state, "connector", None)
r = client.post("/api/scan/start", json={})
assert r.status_code == 401
assert "not authenticated" in r.get_json()["error"]
def test_lock_held_returns_409(self, client, mock_connector):
from routes import state
# Hold the lock as if a scan were already running
acquired = state._scan_lock.acquire(blocking=False)
assert acquired, "Lock should be free at test start"
try:
r = client.post("/api/scan/start", json={})
assert r.status_code == 409
assert "already running" in r.get_json()["error"]
finally:
state._scan_lock.release()
def test_authenticated_returns_started(self, client, mock_connector, monkeypatch):
import scan_engine
from routes import state
# Stub run_scan so the background thread finishes instantly
monkeypatch.setattr(scan_engine, "run_scan", lambda opts: None)
r = client.post("/api/scan/start", json={"sources": ["email"]})
assert r.status_code == 200
assert r.get_json()["status"] == "started"
# Give the background thread time to release the lock
deadline = time.time() + 2.0
while not state._scan_lock.acquire(blocking=False):
assert time.time() < deadline, "scan lock was never released"
time.sleep(0.05)
state._scan_lock.release()
# ---------------------------------------------------------------------------
# /api/scan/stop
# ---------------------------------------------------------------------------
class TestScanStop:
def test_stop_always_returns_200(self, client):
r = client.post("/api/scan/stop")
assert r.status_code == 200
assert r.get_json()["status"] == "stopping"
# ---------------------------------------------------------------------------
# /api/db/stats
# ---------------------------------------------------------------------------
class TestDbStats:
def test_without_db_returns_503(self, client, monkeypatch):
import routes.database
monkeypatch.setattr(routes.database, "DB_OK", False)
r = client.get("/api/db/stats")
assert r.status_code == 503
def test_with_db_returns_200(self, client, db_patch):
# The direct route in gdpr_scanner.py (which takes precedence over the
# blueprint) returns get_stats() directly — an empty dict for a fresh DB.
r = client.get("/api/db/stats")
assert r.status_code == 200
assert isinstance(r.get_json(), dict)
# ---------------------------------------------------------------------------
# /api/db/disposition
# ---------------------------------------------------------------------------
class TestDisposition:
def test_set_disposition_missing_item_id_returns_400(self, client, db_patch):
r = client.post("/api/db/disposition", json={"status": "retain-legal"})
assert r.status_code == 400
assert "item_id" in r.get_json()["error"]
def test_set_disposition_saves_and_get_returns_it(self, client, db_patch):
item_id = "test-item-abc123"
# Set
r = client.post("/api/db/disposition", json={
"item_id": item_id,
"status": "retain-legal",
"legal_basis": "GDPR Art. 6(1)(c)",
"notes": "Required by law",
})
assert r.status_code == 200
assert r.get_json()["status"] == "saved"
# Get
r2 = client.get(f"/api/db/disposition/{item_id}")
assert r2.status_code == 200
data = r2.get_json()
assert data["status"] == "retain-legal"
def test_get_disposition_unknown_id_returns_unreviewed(self, client, db_patch):
r = client.get("/api/db/disposition/no-such-item")
assert r.status_code == 200
assert r.get_json()["status"] == "unreviewed"
def test_without_db_returns_503(self, client, monkeypatch):
import routes.database
monkeypatch.setattr(routes.database, "DB_OK", False)
r = client.post("/api/db/disposition",
json={"item_id": "x", "status": "retain-legal"})
assert r.status_code == 503
# ---------------------------------------------------------------------------
# /api/export_excel
# ---------------------------------------------------------------------------
class TestExportExcel:
XLSX_MIME = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
def test_empty_db_returns_workbook(self, client, db_patch):
r = client.get("/api/export_excel")
assert r.status_code == 200
assert self.XLSX_MIME in r.content_type
# Must be a valid zip/xlsx (PK magic bytes)
assert r.data[:2] == b"PK"
def test_with_items_in_memory_includes_data(self, client, db_patch):
from routes import state
state.flagged_items.append({
"id": "item-001",
"name": "test_file.docx",
"source": "onedrive",
"cpr_count": 2,
"face_count": 0,
"account_name": "Anna Hansen",
"user_role": "staff",
"modified": "2025-01-15T10:00:00",
"size_kb": 42,
"url": "https://example.com/file",
})
r = client.get("/api/export_excel")
assert r.status_code == 200
assert r.data[:2] == b"PK"
# Workbook with data is larger than a skeleton workbook
assert len(r.data) > 4096
# ---------------------------------------------------------------------------
# /api/export_article30
# ---------------------------------------------------------------------------
class TestExportArticle30:
DOCX_MIME = "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
def test_no_items_returns_400(self, client, db_patch):
"""Article 30 export requires at least one flagged item."""
r = client.get("/api/export_article30")
assert r.status_code == 400
assert "scan first" in r.get_json()["error"].lower()
def test_with_items_returns_docx(self, client, db_patch):
from routes import state
state.flagged_items.append({
"id": "item-002",
"name": "payroll.xlsx",
"source": "email",
"cpr_count": 1,
"account_name": "Test User",
"user_role": "staff",
"modified": "2025-03-01T09:00:00",
"size_kb": 10,
})
r = client.get("/api/export_article30")
assert r.status_code == 200
assert self.DOCX_MIME in r.content_type
# DOCX is a zip — check PK magic bytes
assert r.data[:2] == b"PK"