GDPRScanner/tests/test_db.py
StyxX65 b661a94f98 Restore user/group badges on DB-loaded result cards
The card badge only rendered when f.account_name was set, and the
group (role) badge was nested inside that same check. But save_item
never persisted account_name — only account_id (a GUID) and user_role.
Live SSE cards carried account_name so badges showed during a scan;
now that the grid loads finalized scans from the DB, the gap is exposed
and both badges vanish for earlier scans.

- Persist account_name (migration 11 + save_item) so future scans show
  the user badge. Both M365 and Google cards already carry it.
- _accountPill() in results.js drives the group badge off user_role
  alone (shows for legacy rows) and resolves a best-effort user label:
  account_name → S._allUsers (id/email) → email-style account_id → omit.
  Both card layouts share the one helper.

Legacy rows still lack account_name (never captured), but now show the
group badge and a resolved/email user label where possible.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-22 10:15:19 +02:00

336 lines
16 KiB
Python

"""
test_db.py — Tests for gdpr_db.py (ScanDB).
Covers:
- begin_scan / finish_scan round-trip
- save_item and retrieval
- CPR index stores hash, never plaintext
- lookup_data_subject returns matching items
- set_disposition / get_disposition
- Deletion log
- Export / import cycle (merge and replace modes)
"""
import sys
import hashlib
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent))
from gdpr_db import ScanDB
# ─────────────────────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────────────────────
def _make_card(item_id="abc123", cpr_count=1, source_type="email", role="staff"):
return {
"id": item_id,
"name": f"{item_id}.docx",
"source": "email",
"source_type": source_type,
"cpr_count": cpr_count,
"url": "https://example.com/item",
"size_kb": 12.5,
"modified": "2024-03-01",
"thumb_b64": "",
"thumb_mime": "image/svg+xml",
"risk": None,
"account_id": "user-1",
"account_name": "Test User",
"user_role": role,
"drive_id": "",
"attachments": [],
"folder": "",
"transfer_risk": "",
"special_category": [],
"face_count": 0,
"exif": {},
}
# ─────────────────────────────────────────────────────────────────────────────
# 1. Scan lifecycle
# ─────────────────────────────────────────────────────────────────────────────
class TestScanLifecycle:
def test_begin_scan_returns_int(self, tmp_db):
scan_id = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
assert isinstance(scan_id, int)
assert scan_id > 0
def test_begin_scan_increments(self, tmp_db):
id1 = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
id2 = tmp_db.begin_scan({"sources": ["onedrive"], "user_ids": []})
assert id2 > id1
def test_finish_scan_does_not_raise(self, tmp_db):
scan_id = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
tmp_db.finish_scan(scan_id, 42) # must not raise
def test_multiple_scans_independent(self, tmp_db):
id1 = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
tmp_db.save_item(id1, _make_card("item-a"), ["290472-1234"])
id2 = tmp_db.begin_scan({"sources": ["onedrive"], "user_ids": []})
tmp_db.save_item(id2, _make_card("item-b"), ["010185-4321"])
tmp_db.finish_scan(id1, 1)
tmp_db.finish_scan(id2, 1)
# ─────────────────────────────────────────────────────────────────────────────
# 2. save_item
# ─────────────────────────────────────────────────────────────────────────────
class TestSaveItem:
def test_save_item_does_not_raise(self, tmp_db):
scan_id = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
tmp_db.save_item(scan_id, _make_card(), ["290472-1234"])
def test_save_item_without_cprs(self, tmp_db):
scan_id = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
tmp_db.save_item(scan_id, _make_card(cpr_count=0), [])
def test_save_multiple_items(self, tmp_db):
scan_id = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
for i in range(5):
tmp_db.save_item(scan_id, _make_card(f"item-{i}"), ["290472-1234"])
def test_save_item_with_pii_counts(self, tmp_db):
scan_id = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
pii = {"cpr": 1, "name": 2, "email": 0}
tmp_db.save_item(scan_id, _make_card(), ["290472-1234"], pii_counts=pii)
# ─────────────────────────────────────────────────────────────────────────────
# 3. CPR index — hash only, never plaintext
# ─────────────────────────────────────────────────────────────────────────────
class TestCprIndex:
def test_cpr_not_stored_in_plaintext(self, tmp_db):
scan_id = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
tmp_db.save_item(scan_id, _make_card(), ["290472-1234"])
# Read the raw DB and confirm plaintext CPR is absent
import sqlite3
with sqlite3.connect(tmp_db._path) as con:
rows = con.execute("SELECT cpr_hash FROM cpr_index").fetchall()
assert len(rows) == 1
stored = rows[0][0]
assert stored != "290472-1234"
assert "290472" not in stored
def test_cpr_hash_is_sha256(self, tmp_db):
scan_id = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
tmp_db.save_item(scan_id, _make_card(), ["290472-1234"])
import sqlite3
with sqlite3.connect(tmp_db._path) as con:
rows = con.execute("SELECT cpr_hash FROM cpr_index").fetchall()
stored = rows[0][0]
expected = hashlib.sha256("290472-1234".encode()).hexdigest()
assert stored == expected
def test_lookup_finds_item(self, tmp_db):
scan_id = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
tmp_db.save_item(scan_id, _make_card("item-x"), ["290472-1234"])
results = tmp_db.lookup_data_subject("290472-1234")
assert len(results) >= 1
def test_lookup_returns_correct_item(self, tmp_db):
scan_id = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
tmp_db.save_item(scan_id, _make_card("target-item"), ["290472-1234"])
results = tmp_db.lookup_data_subject("290472-1234")
ids = [r.get("id") or r.get("item_id") for r in results]
assert "target-item" in ids
def test_lookup_different_cpr_returns_empty(self, tmp_db):
scan_id = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
tmp_db.save_item(scan_id, _make_card(), ["290472-1234"])
results = tmp_db.lookup_data_subject("010185-4321")
assert results == []
def test_lookup_multiple_items_for_same_cpr(self, tmp_db):
scan_id = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
tmp_db.save_item(scan_id, _make_card("item-a"), ["290472-1234"])
tmp_db.save_item(scan_id, _make_card("item-b"), ["290472-1234"])
results = tmp_db.lookup_data_subject("290472-1234")
assert len(results) >= 2
# ─────────────────────────────────────────────────────────────────────────────
# 4. Dispositions
# ─────────────────────────────────────────────────────────────────────────────
class TestDispositions:
def test_get_disposition_returns_none_for_unknown(self, tmp_db):
assert tmp_db.get_disposition("nonexistent") is None
def test_set_and_get_disposition(self, tmp_db):
scan_id = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
tmp_db.save_item(scan_id, _make_card("disp-item"), ["290472-1234"])
tmp_db.set_disposition("disp-item", "retain-legal", "Bogfoeringsloven", "", "admin")
disp = tmp_db.get_disposition("disp-item")
assert disp is not None
assert disp["status"] == "retain-legal"
def test_disposition_legal_basis_stored(self, tmp_db):
scan_id = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
tmp_db.save_item(scan_id, _make_card("disp-2"), [])
tmp_db.set_disposition("disp-2", "delete-scheduled", "Data minimisation", "", "reviewer")
disp = tmp_db.get_disposition("disp-2")
assert disp["legal_basis"] == "Data minimisation"
def test_disposition_overwrite(self, tmp_db):
scan_id = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
tmp_db.save_item(scan_id, _make_card("disp-3"), [])
tmp_db.set_disposition("disp-3", "unreviewed", "", "", "")
tmp_db.set_disposition("disp-3", "deleted", "", "", "admin")
disp = tmp_db.get_disposition("disp-3")
assert disp["status"] == "deleted"
def test_all_disposition_values_accepted(self, tmp_db):
statuses = ["unreviewed", "retain-legal", "retain-legitimate",
"retain-contract", "delete-scheduled", "deleted"]
scan_id = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
for i, status in enumerate(statuses):
item_id = f"disp-status-{i}"
tmp_db.save_item(scan_id, _make_card(item_id), [])
tmp_db.set_disposition(item_id, status, "", "", "test")
disp = tmp_db.get_disposition(item_id)
assert disp["status"] == status
# ─────────────────────────────────────────────────────────────────────────────
# 5. Export / import
# ─────────────────────────────────────────────────────────────────────────────
class TestExportImport:
def _populate(self, db):
scan_id = db.begin_scan({"sources": ["email"], "user_ids": []})
db.save_item(scan_id, _make_card("exp-1"), ["290472-1234"])
db.save_item(scan_id, _make_card("exp-2"), ["010185-4321"])
db.set_disposition("exp-1", "retain-legal", "Bogfoeringsloven", "", "admin")
db.finish_scan(scan_id, 2)
def test_export_creates_zip(self, tmp_db, tmp_path):
if not hasattr(tmp_db, "export_db"):
pytest.skip("export_db not implemented")
self._populate(tmp_db)
export_path = tmp_path / "export.zip"
tmp_db.export_db(str(export_path))
assert export_path.exists()
assert export_path.stat().st_size > 0
def test_export_zip_contains_expected_files(self, tmp_db, tmp_path):
if not hasattr(tmp_db, "export_db"):
pytest.skip("export_db not implemented")
self._populate(tmp_db)
export_path = tmp_path / "export.zip"
tmp_db.export_db(str(export_path))
import zipfile
with zipfile.ZipFile(export_path) as zf:
names = zf.namelist()
for expected in ["export_meta.json", "flagged_items.json", "dispositions.json"]:
assert expected in names
def test_import_merge_adds_dispositions(self, tmp_path):
if not hasattr(ScanDB, "export_db"):
pytest.skip("export_db not implemented")
# Source DB
src = ScanDB(str(tmp_path / "src.db"))
self._populate(src)
export_path = tmp_path / "export.zip"
src.export_db(str(export_path))
# Target DB (fresh)
tgt = ScanDB(str(tmp_path / "tgt.db"))
tgt.import_db(str(export_path), mode="merge")
# Disposition for exp-1 should now exist in tgt
disp = tgt.get_disposition("exp-1")
assert disp is not None
def test_import_replace_restores_items(self, tmp_path):
if not hasattr(ScanDB, "export_db"):
pytest.skip("export_db not implemented")
src = ScanDB(str(tmp_path / "src2.db"))
self._populate(src)
export_path = tmp_path / "export2.zip"
src.export_db(str(export_path))
tgt = ScanDB(str(tmp_path / "tgt2.db"))
tgt.import_db(str(export_path), mode="replace")
results = tgt.lookup_data_subject("290472-1234")
assert len(results) >= 1
# ─────────────────────────────────────────────────────────────────────────────
# Orphan-scan recovery (crash / kill / mid-scan restart)
# ─────────────────────────────────────────────────────────────────────────────
class TestOrphanScanRecovery:
def _start_unfinished_scan(self, db, item_id):
"""Begin a scan and save an item but never call finish_scan."""
sid = db.begin_scan({"sources": ["email"], "user_ids": []})
db.save_item(sid, _make_card(item_id=item_id))
return sid
def test_unfinished_scan_items_hidden_until_recovery(self, tmp_db):
self._start_unfinished_scan(tmp_db, "orphan-1")
# Not finalised → invisible to the open-items view
assert tmp_db.get_open_items() == []
def test_recovery_finalises_and_reveals_items(self, tmp_db):
self._start_unfinished_scan(tmp_db, "orphan-1")
self._start_unfinished_scan(tmp_db, "orphan-2")
recovered = tmp_db.finalize_orphan_scans()
assert recovered == 2
ids = {row["id"] for row in tmp_db.get_open_items()}
assert ids == {"orphan-1", "orphan-2"}
def test_recovery_leaves_finished_scans_untouched(self, tmp_db):
sid = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
tmp_db.save_item(sid, _make_card(item_id="done-1"))
tmp_db.finish_scan(sid, total_scanned=1)
before = tmp_db._connect().execute(
"SELECT finished_at FROM scans WHERE id=?", (sid,)
).fetchone()[0]
assert tmp_db.finalize_orphan_scans() == 0 # nothing to recover
after = tmp_db._connect().execute(
"SELECT finished_at FROM scans WHERE id=?", (sid,)
).fetchone()[0]
assert after == before # finished_at not rewritten
def test_recovery_is_idempotent(self, tmp_db):
self._start_unfinished_scan(tmp_db, "orphan-1")
assert tmp_db.finalize_orphan_scans() == 1
assert tmp_db.finalize_orphan_scans() == 0
# ─────────────────────────────────────────────────────────────────────────────
# account_name persistence (user/group badge data)
# ─────────────────────────────────────────────────────────────────────────────
class TestAccountNamePersistence:
def test_account_name_round_trips(self, tmp_db):
sid = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
tmp_db.save_item(sid, _make_card(item_id="an-1")) # account_name="Test User"
tmp_db.finish_scan(sid, total_scanned=1)
row = [r for r in tmp_db.get_open_items() if r["id"] == "an-1"][0]
assert row.get("account_name") == "Test User"
def test_account_name_column_exists(self, tmp_db):
cols = [r[1] for r in tmp_db._connect().execute(
"PRAGMA table_info(flagged_items)").fetchall()]
assert "account_name" in cols