GDPRScanner/tests/test_db.py
2026-04-11 04:38:11 +02:00

268 lines
13 KiB
Python

"""
test_db.py — Tests for gdpr_db.py (ScanDB).
Covers:
- begin_scan / finish_scan round-trip
- save_item and retrieval
- CPR index stores hash, never plaintext
- lookup_data_subject returns matching items
- set_disposition / get_disposition
- Deletion log
- Export / import cycle (merge and replace modes)
"""
import sys
import hashlib
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent))
from gdpr_db import ScanDB
# ─────────────────────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────────────────────
def _make_card(item_id="abc123", cpr_count=1, source_type="email", role="staff"):
return {
"id": item_id,
"name": f"{item_id}.docx",
"source": "email",
"source_type": source_type,
"cpr_count": cpr_count,
"url": "https://example.com/item",
"size_kb": 12.5,
"modified": "2024-03-01",
"thumb_b64": "",
"thumb_mime": "image/svg+xml",
"risk": None,
"account_id": "user-1",
"account_name": "Test User",
"user_role": role,
"drive_id": "",
"attachments": [],
"folder": "",
"transfer_risk": "",
"special_category": [],
"face_count": 0,
"exif": {},
}
# ─────────────────────────────────────────────────────────────────────────────
# 1. Scan lifecycle
# ─────────────────────────────────────────────────────────────────────────────
class TestScanLifecycle:
def test_begin_scan_returns_int(self, tmp_db):
scan_id = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
assert isinstance(scan_id, int)
assert scan_id > 0
def test_begin_scan_increments(self, tmp_db):
id1 = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
id2 = tmp_db.begin_scan({"sources": ["onedrive"], "user_ids": []})
assert id2 > id1
def test_finish_scan_does_not_raise(self, tmp_db):
scan_id = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
tmp_db.finish_scan(scan_id, 42) # must not raise
def test_multiple_scans_independent(self, tmp_db):
id1 = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
tmp_db.save_item(id1, _make_card("item-a"), ["290472-1234"])
id2 = tmp_db.begin_scan({"sources": ["onedrive"], "user_ids": []})
tmp_db.save_item(id2, _make_card("item-b"), ["010185-4321"])
tmp_db.finish_scan(id1, 1)
tmp_db.finish_scan(id2, 1)
# ─────────────────────────────────────────────────────────────────────────────
# 2. save_item
# ─────────────────────────────────────────────────────────────────────────────
class TestSaveItem:
def test_save_item_does_not_raise(self, tmp_db):
scan_id = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
tmp_db.save_item(scan_id, _make_card(), ["290472-1234"])
def test_save_item_without_cprs(self, tmp_db):
scan_id = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
tmp_db.save_item(scan_id, _make_card(cpr_count=0), [])
def test_save_multiple_items(self, tmp_db):
scan_id = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
for i in range(5):
tmp_db.save_item(scan_id, _make_card(f"item-{i}"), ["290472-1234"])
def test_save_item_with_pii_counts(self, tmp_db):
scan_id = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
pii = {"cpr": 1, "name": 2, "email": 0}
tmp_db.save_item(scan_id, _make_card(), ["290472-1234"], pii_counts=pii)
# ─────────────────────────────────────────────────────────────────────────────
# 3. CPR index — hash only, never plaintext
# ─────────────────────────────────────────────────────────────────────────────
class TestCprIndex:
def test_cpr_not_stored_in_plaintext(self, tmp_db):
scan_id = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
tmp_db.save_item(scan_id, _make_card(), ["290472-1234"])
# Read the raw DB and confirm plaintext CPR is absent
import sqlite3
with sqlite3.connect(tmp_db._path) as con:
rows = con.execute("SELECT cpr_hash FROM cpr_index").fetchall()
assert len(rows) == 1
stored = rows[0][0]
assert stored != "290472-1234"
assert "290472" not in stored
def test_cpr_hash_is_sha256(self, tmp_db):
scan_id = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
tmp_db.save_item(scan_id, _make_card(), ["290472-1234"])
import sqlite3
with sqlite3.connect(tmp_db._path) as con:
rows = con.execute("SELECT cpr_hash FROM cpr_index").fetchall()
stored = rows[0][0]
expected = hashlib.sha256("290472-1234".encode()).hexdigest()
assert stored == expected
def test_lookup_finds_item(self, tmp_db):
scan_id = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
tmp_db.save_item(scan_id, _make_card("item-x"), ["290472-1234"])
results = tmp_db.lookup_data_subject("290472-1234")
assert len(results) >= 1
def test_lookup_returns_correct_item(self, tmp_db):
scan_id = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
tmp_db.save_item(scan_id, _make_card("target-item"), ["290472-1234"])
results = tmp_db.lookup_data_subject("290472-1234")
ids = [r.get("id") or r.get("item_id") for r in results]
assert "target-item" in ids
def test_lookup_different_cpr_returns_empty(self, tmp_db):
scan_id = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
tmp_db.save_item(scan_id, _make_card(), ["290472-1234"])
results = tmp_db.lookup_data_subject("010185-4321")
assert results == []
def test_lookup_multiple_items_for_same_cpr(self, tmp_db):
scan_id = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
tmp_db.save_item(scan_id, _make_card("item-a"), ["290472-1234"])
tmp_db.save_item(scan_id, _make_card("item-b"), ["290472-1234"])
results = tmp_db.lookup_data_subject("290472-1234")
assert len(results) >= 2
# ─────────────────────────────────────────────────────────────────────────────
# 4. Dispositions
# ─────────────────────────────────────────────────────────────────────────────
class TestDispositions:
def test_get_disposition_returns_none_for_unknown(self, tmp_db):
assert tmp_db.get_disposition("nonexistent") is None
def test_set_and_get_disposition(self, tmp_db):
scan_id = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
tmp_db.save_item(scan_id, _make_card("disp-item"), ["290472-1234"])
tmp_db.set_disposition("disp-item", "retain-legal", "Bogfoeringsloven", "", "admin")
disp = tmp_db.get_disposition("disp-item")
assert disp is not None
assert disp["status"] == "retain-legal"
def test_disposition_legal_basis_stored(self, tmp_db):
scan_id = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
tmp_db.save_item(scan_id, _make_card("disp-2"), [])
tmp_db.set_disposition("disp-2", "delete-scheduled", "Data minimisation", "", "reviewer")
disp = tmp_db.get_disposition("disp-2")
assert disp["legal_basis"] == "Data minimisation"
def test_disposition_overwrite(self, tmp_db):
scan_id = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
tmp_db.save_item(scan_id, _make_card("disp-3"), [])
tmp_db.set_disposition("disp-3", "unreviewed", "", "", "")
tmp_db.set_disposition("disp-3", "deleted", "", "", "admin")
disp = tmp_db.get_disposition("disp-3")
assert disp["status"] == "deleted"
def test_all_disposition_values_accepted(self, tmp_db):
statuses = ["unreviewed", "retain-legal", "retain-legitimate",
"retain-contract", "delete-scheduled", "deleted"]
scan_id = tmp_db.begin_scan({"sources": ["email"], "user_ids": []})
for i, status in enumerate(statuses):
item_id = f"disp-status-{i}"
tmp_db.save_item(scan_id, _make_card(item_id), [])
tmp_db.set_disposition(item_id, status, "", "", "test")
disp = tmp_db.get_disposition(item_id)
assert disp["status"] == status
# ─────────────────────────────────────────────────────────────────────────────
# 5. Export / import
# ─────────────────────────────────────────────────────────────────────────────
class TestExportImport:
def _populate(self, db):
scan_id = db.begin_scan({"sources": ["email"], "user_ids": []})
db.save_item(scan_id, _make_card("exp-1"), ["290472-1234"])
db.save_item(scan_id, _make_card("exp-2"), ["010185-4321"])
db.set_disposition("exp-1", "retain-legal", "Bogfoeringsloven", "", "admin")
db.finish_scan(scan_id, 2)
def test_export_creates_zip(self, tmp_db, tmp_path):
if not hasattr(tmp_db, "export_db"):
pytest.skip("export_db not implemented")
self._populate(tmp_db)
export_path = tmp_path / "export.zip"
tmp_db.export_db(str(export_path))
assert export_path.exists()
assert export_path.stat().st_size > 0
def test_export_zip_contains_expected_files(self, tmp_db, tmp_path):
if not hasattr(tmp_db, "export_db"):
pytest.skip("export_db not implemented")
self._populate(tmp_db)
export_path = tmp_path / "export.zip"
tmp_db.export_db(str(export_path))
import zipfile
with zipfile.ZipFile(export_path) as zf:
names = zf.namelist()
for expected in ["export_meta.json", "flagged_items.json", "dispositions.json"]:
assert expected in names
def test_import_merge_adds_dispositions(self, tmp_path):
if not hasattr(ScanDB, "export_db"):
pytest.skip("export_db not implemented")
# Source DB
src = ScanDB(str(tmp_path / "src.db"))
self._populate(src)
export_path = tmp_path / "export.zip"
src.export_db(str(export_path))
# Target DB (fresh)
tgt = ScanDB(str(tmp_path / "tgt.db"))
tgt.import_db(str(export_path), mode="merge")
# Disposition for exp-1 should now exist in tgt
disp = tgt.get_disposition("exp-1")
assert disp is not None
def test_import_replace_restores_items(self, tmp_path):
if not hasattr(ScanDB, "export_db"):
pytest.skip("export_db not implemented")
src = ScanDB(str(tmp_path / "src2.db"))
self._populate(src)
export_path = tmp_path / "export2.zip"
src.export_db(str(export_path))
tgt = ScanDB(str(tmp_path / "tgt2.db"))
tgt.import_db(str(export_path), mode="replace")
results = tgt.lookup_data_subject("290472-1234")
assert len(results) >= 1