Added tests for Video & Audio
feat: video/audio metadata scanning, profile rename fix, route tests
- Scan .mp4/.mov/.avi/.mkv and .mp3/.flac/.ogg/.m4a/.wma (+ 7 more)
for GPS coordinates, artist/author, title, comment — metadata only,
no frame or audio analysis. Uses mutagen (added to requirements.txt).
GPS-tagged phone recordings now flag with gps_location like photos.
- Fix _extract_audio_metadata silently returning empty results:
mutagen.File() first positional arg is `filename`, not `fileobj` —
was passing BytesIO as the filename. Fixed to keyword args.
- Fix profile copy rename not reflected in left column until modal
reopen: _pmgmtSaveFullEdit called loadProfiles() but never
_renderProfileMgmt(). Added re-render and active-row highlight.
- Add TestProfileRoutes (10 tests) covering all profile API endpoints
including a rename regression test. Total: 182 tests.
- generate_fixtures.py now produces 6 audio/video fixtures (14–19):
2 MP3, 2 FLAC, 2 MP4 — 4 flagged, 2 negative cases.
This commit is contained in:
parent
2a2d79de90
commit
d42518dc81
@ -9,6 +9,14 @@ Version numbers follow [Semantic Versioning](https://semver.org/spec/v2.0.0.html
|
||||
|
||||
## [1.6.23] — 2026-04-21
|
||||
|
||||
### Added
|
||||
|
||||
- **Video file metadata scanning** — `.mp4`, `.mov`, `.m4v`, `.avi`, `.mkv`, `.wmv`, `.flv`, `.webm` files are now included in all scan sources (M365 OneDrive/SharePoint/Teams, Google Drive, local/SMB). No frame or audio analysis is performed; only container metadata is extracted: GPS coordinates (iPhone/Android QuickTime `©xyz` atom, ISO 6709 format), author/artist, title, comment/description, and recording date. A smartphone recording with an embedded GPS location is flagged with the `gps_location` special category, exactly like a geotagged photo. AVI metadata (RIFF INFO `INAM`/`IART`/`ICMT`) is parsed without any external library. Requires `mutagen>=1.47` (added to `requirements.txt`).
|
||||
|
||||
- **Audio file metadata scanning** — `.mp3`, `.flac`, `.ogg`, `.m4a`, `.aac`, `.wma`, `.wav`, `.opus`, `.aiff` files are now scanned for PII-bearing tags across all sources. Extracted fields: title, artist, album artist, composer, lyricist, conductor, author, copyright, comment, description. No audio content is transcribed. Uses `mutagen.File(easy=True)` which normalises tag formats across ID3 (MP3), MPEG-4 (M4A/AAC), Vorbis (FLAC/OGG), and ASF (WMA) into a unified lowercase-key interface. A voice recording saved with a student's name in the artist tag will be flagged with `exif_pii`. Fixed a silent bug in `_extract_audio_metadata` where `mutagen.File(io.BytesIO(content), filename)` was passing the BytesIO as the `filename` positional argument; corrected to `mutagen.File(fileobj=..., filename=...)`.
|
||||
|
||||
- **Audio and video test fixtures** — `tests/fixtures/local_files/generate_fixtures.py` now generates 6 new fixtures: `14_audio_artist_pii.mp3`, `15_audio_artist_pii.flac` (artist name → flag), `16_audio_no_pii.mp3`, `17_audio_no_pii.flac` (no tags → no flag), `18_video_gps.mp4` (GPS + artist → flag), `19_video_no_pii.mp4` (no tags → no flag). Total fixtures: 19 (14 flagged, 5 negative).
|
||||
|
||||
### Fixed
|
||||
|
||||
- **Profile copy rename not reflected in left column until modal reopen** — saving a renamed profile via the full editor (`_pmgmtSaveFullEdit`) called `loadProfiles()` to refresh `S._profiles` but never called `_renderProfileMgmt()`, so the left-column list was not repainted. The new name only appeared after closing and reopening the modal. Fixed by calling `_renderProfileMgmt()` immediately after `loadProfiles()` and re-applying the `.active` highlight to the correct row. 10 new route integration tests added for all profile API endpoints; total test count: 182.
|
||||
|
||||
@ -46,7 +46,7 @@ python -m pytest tests/ -q
|
||||
|
||||
**`tests/test_route_integration.py`** — 54 Flask test-client tests covering security-sensitive paths: viewer token CRUD and scope validation, `GET /api/db/flagged` role/user scope enforcement, bulk disposition isolation, viewer PIN (set/verify/rate-limit/change/clear), interface PIN gate (multi-step flows require `session["interface_ok"] = True` after PIN set — the `before_request` hook blocks the same endpoint once a PIN exists), scan lock release on `run_scan()` exception, `GET /api/db/sessions` shape and ordering, profile routes CRUD and rename (including the rename-after-copy regression). Uses a tmp-path `ScanDB` monkeypatched into `routes.database._get_db` — tests never touch the real database. Interface PIN tests manipulate the real `config.json` via `setup_method`/`teardown_method` calling `clear_interface_pin()`.
|
||||
|
||||
**Local-file scan fixtures** — `tests/fixtures/local_files/` holds 13 documents for manual/UI-level testing of the file scanner. 10 should be flagged; 3 are true negatives. All CPR numbers verified against `is_valid_cpr`. `generate_fixtures.py` (requires `python-docx` + `openpyxl`, already in venv) regenerates the binary `.docx`/`.xlsx` files.
|
||||
**Local-file scan fixtures** — `tests/fixtures/local_files/` holds 19 files for manual/UI-level testing of the file scanner. 14 should be flagged; 5 are true negatives. All CPR numbers verified against `is_valid_cpr`. `generate_fixtures.py` (requires `python-docx`, `openpyxl`, `mutagen` — all in venv) regenerates the binary `.docx`/`.xlsx`/`.mp3`/`.flac`/`.mp4` files. Audio fixtures need 2 silent MPEG frames so mutagen can sync; FLAC uses a hand-packed STREAMINFO + Vorbis comment block; MP4 uses a minimal `ftyp`+`moov`/`mvhd` base that mutagen can tag.
|
||||
|
||||
**`_CPR_PREFIX_NOISE` in `.docx` fixtures** — `scan_docx` builds a single string by concatenating all run texts with no separators between paragraphs. If a CPR value run is immediately followed by text from the next paragraph without a word boundary, `\b` in `CPR_PATTERN` fails and the number is silently missed. The fixture generator appends a trailing `" "` to every value run so CPRs are always surrounded by word boundaries after concatenation. Do not remove this trailing space — the detection will silently regress.
|
||||
|
||||
|
||||
10
README.md
10
README.md
@ -617,7 +617,7 @@ The test suite should be run before every release and after any change to `docum
|
||||
|
||||
#### Local-file scan fixtures
|
||||
|
||||
`tests/fixtures/local_files/` provides 13 hand-crafted documents for end-to-end testing of the file scanner via the UI or `file_scanner.py`. Drop the folder as a local source and run a scan — all 10 PII-bearing files should be flagged and all 3 negative-case files should produce zero hits.
|
||||
`tests/fixtures/local_files/` provides 19 files for end-to-end testing of the file scanner via the UI or `file_scanner.py`. Drop the folder as a local source and run a scan — all 14 PII-bearing files should be flagged and all 5 negative-case files should produce zero hits.
|
||||
|
||||
| File | Format | Expected | Scenario |
|
||||
|---|---|---|---|
|
||||
@ -634,8 +634,14 @@ The test suite should be run before every release and after any change to `docum
|
||||
| `11_false_positive_invoice.txt` | TXT | **No flag** | Invoice: CPR-shaped numbers suppressed by `faktura`/`varenr` context |
|
||||
| `12_post2007_no_context.txt` | TXT | **No flag** | Equipment serial that looks like a post-2007 CPR but has no context keyword |
|
||||
| `13_cpr_in_xlsx.xlsx` | XLSX | Flag | Excel workbook with two sheets: students + employees |
|
||||
| `14_audio_artist_pii.mp3` | MP3 | Flag | ID3 artist/title tags with a personal name → `exif_pii` |
|
||||
| `15_audio_artist_pii.flac` | FLAC | Flag | Vorbis comment artist/title tags with a personal name → `exif_pii` |
|
||||
| `16_audio_no_pii.mp3` | MP3 | **No flag** | Empty ID3 header — no metadata tags |
|
||||
| `17_audio_no_pii.flac` | FLAC | **No flag** | FLAC with no Vorbis comment block |
|
||||
| `18_video_gps.mp4` | MP4 | Flag | QuickTime GPS coordinates (Copenhagen) + artist tag → `gps_location` + `exif_pii` |
|
||||
| `19_video_no_pii.mp4` | MP4 | **No flag** | Minimal MP4 container with no metadata |
|
||||
|
||||
All CPR numbers are mathematically valid (verified against `is_valid_cpr`). Run `generate_fixtures.py` inside the venv to regenerate the `.docx` and `.xlsx` binary files after any changes.
|
||||
All CPR numbers are mathematically valid (verified against `is_valid_cpr`). Run `generate_fixtures.py` inside the venv to regenerate all binary files after any changes. Requires `python-docx`, `openpyxl`, and `mutagen` (all included in `requirements.txt`).
|
||||
|
||||
### Roadmap
|
||||
|
||||
|
||||
232
cpr_detector.py
232
cpr_detector.py
@ -5,12 +5,14 @@ Provides:
|
||||
_scan_bytes(content, filename) — dispatch to correct scanner by file type
|
||||
_scan_text_direct(text) — scan a plain text string
|
||||
_extract_exif(content, filename) — extract PII-bearing EXIF tags from images
|
||||
_extract_video_metadata(content, fn) — extract PII-bearing metadata from video files
|
||||
_extract_audio_metadata(content, fn) — extract PII-bearing tags from audio files
|
||||
_detect_photo_faces(content, fn) — count faces in an image (OpenCV)
|
||||
_get_pii_counts(text) — NER-based PII type counts
|
||||
_make_thumb(content, filename) — JPEG thumbnail as base64 string
|
||||
_placeholder_svg(ext, name) — SVG file-type icon
|
||||
|
||||
Globals SCANNER_OK, PIL_OK, PHOTO_EXTS, SUPPORTED_EXTS, ds, PILImage, LANG,
|
||||
Globals SCANNER_OK, PIL_OK, PHOTO_EXTS, VIDEO_EXTS, AUDIO_EXTS, SUPPORTED_EXTS, ds, PILImage, LANG,
|
||||
and _check_special_category are injected at startup by gdpr_scanner.py via
|
||||
`from cpr_detector import *` AFTER those names are defined. This keeps the
|
||||
module cleanly importable in isolation for unit tests (#26) while preserving
|
||||
@ -47,11 +49,17 @@ except ImportError:
|
||||
PILImage = None # type: ignore[assignment]
|
||||
PIL_OK = False
|
||||
|
||||
VIDEO_EXTS = {
|
||||
".mp4", ".mov", ".m4v", ".avi", ".mkv", ".wmv", ".flv", ".webm",
|
||||
}
|
||||
AUDIO_EXTS = {
|
||||
".mp3", ".flac", ".ogg", ".m4a", ".aac", ".wma", ".wav", ".opus", ".aiff", ".aif",
|
||||
}
|
||||
SUPPORTED_EXTS = {
|
||||
".pdf", ".docx", ".doc", ".xlsx", ".xlsm", ".csv",
|
||||
".txt", ".eml", ".msg",
|
||||
".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".tif", ".webp",
|
||||
}
|
||||
} | VIDEO_EXTS | AUDIO_EXTS
|
||||
PHOTO_EXTS = {
|
||||
".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".tif", ".webp", ".heic", ".heif",
|
||||
}
|
||||
@ -190,6 +198,226 @@ def _extract_exif(content: bytes, filename: str) -> dict:
|
||||
return result
|
||||
|
||||
|
||||
def _extract_video_metadata(content: bytes, filename: str) -> dict:
|
||||
"""Extract PII-bearing metadata from a video file.
|
||||
|
||||
Returns the same structure as _extract_exif so callers can treat both
|
||||
identically:
|
||||
gps — {lat, lon, lat_ref, lon_ref, maps_url} or None
|
||||
pii_fields — {label: value} for title/artist/comment/description
|
||||
author — str or None
|
||||
datetime — str or None
|
||||
device — str or None
|
||||
has_pii — bool
|
||||
|
||||
MP4/MOV/M4V: reads QuickTime/MPEG-4 tags via mutagen (no system deps).
|
||||
GPS is extracted from the ©xyz QuickTime atom (ISO 6709 string written by
|
||||
iPhones and Android devices: "+55.6763+012.5681+005.000/").
|
||||
AVI: parses the RIFF INFO list chunk without any external library.
|
||||
All other extensions: returns empty result immediately.
|
||||
"""
|
||||
result: dict = {"gps": None, "pii_fields": {}, "author": None,
|
||||
"datetime": None, "device": None, "has_pii": False}
|
||||
ext = Path(filename).suffix.lower()
|
||||
|
||||
if ext in {".mp4", ".mov", ".m4v"}:
|
||||
_extract_mp4_tags(content, result)
|
||||
elif ext == ".avi":
|
||||
_extract_avi_info(content, result)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def _extract_mp4_tags(content: bytes, result: dict) -> None:
|
||||
"""Populate result dict from MPEG-4/QuickTime container tags via mutagen."""
|
||||
try:
|
||||
import mutagen.mp4
|
||||
tags = mutagen.mp4.MP4(io.BytesIO(content)).tags
|
||||
if not tags:
|
||||
return
|
||||
|
||||
# Text fields that may contain personal data
|
||||
_tag_label = {
|
||||
"©nam": "Title",
|
||||
"©cmt": "Comment",
|
||||
"©des": "Description",
|
||||
"desc": "Description",
|
||||
"©lyr": "Lyrics",
|
||||
}
|
||||
for tag, label in _tag_label.items():
|
||||
val = tags.get(tag)
|
||||
if val:
|
||||
text = str(val[0]).strip() if isinstance(val, list) else str(val).strip()
|
||||
if len(text) >= _EXIF_PII_MIN_LEN:
|
||||
result["pii_fields"][label] = text
|
||||
result["has_pii"] = True
|
||||
|
||||
# Author — prefer ©ART (artist), fall back to album artist
|
||||
for tag in ("©ART", "aART"):
|
||||
val = tags.get(tag)
|
||||
if val:
|
||||
author = str(val[0]).strip() if isinstance(val, list) else str(val).strip()
|
||||
if len(author) >= _EXIF_PII_MIN_LEN:
|
||||
result["author"] = author
|
||||
result["pii_fields"]["Artist"] = author
|
||||
result["has_pii"] = True
|
||||
break
|
||||
|
||||
# Recording date
|
||||
val = tags.get("©day")
|
||||
if val:
|
||||
result["datetime"] = str(val[0]).strip() if isinstance(val, list) else str(val).strip()
|
||||
|
||||
# Device (QuickTime-specific tags written by iPhones)
|
||||
make = tags.get("©mak")
|
||||
model = tags.get("©mod")
|
||||
if make or model:
|
||||
result["device"] = " ".join(
|
||||
str(v[0] if isinstance(v, list) else v).strip()
|
||||
for v in (make, model) if v
|
||||
)
|
||||
|
||||
# GPS — QuickTime ©xyz atom: "+55.6763+012.5681+005.000/" (ISO 6709)
|
||||
import re as _re
|
||||
for gps_tag in ("©xyz", "com.apple.quicktime.location.ISO6709"):
|
||||
val = tags.get(gps_tag)
|
||||
if val:
|
||||
gps_str = str(val[0] if isinstance(val, list) else val).strip()
|
||||
m = _re.match(r'([+-]\d+\.?\d*)([+-]\d+\.?\d*)', gps_str)
|
||||
if m:
|
||||
lat = round(float(m.group(1)), 7)
|
||||
lon = round(float(m.group(2)), 7)
|
||||
result["gps"] = {
|
||||
"lat": lat,
|
||||
"lon": lon,
|
||||
"lat_ref": "N" if lat >= 0 else "S",
|
||||
"lon_ref": "E" if lon >= 0 else "W",
|
||||
"maps_url": f"https://www.google.com/maps?q={lat},{lon}",
|
||||
}
|
||||
result["has_pii"] = True
|
||||
break
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def _extract_avi_info(content: bytes, result: dict) -> None:
|
||||
"""Populate result dict from RIFF INFO list chunk in an AVI file."""
|
||||
try:
|
||||
import struct
|
||||
if len(content) < 12 or content[:4] != b"RIFF":
|
||||
return
|
||||
# Walk top-level RIFF chunks looking for the INFO LIST
|
||||
i = 12
|
||||
while i + 8 <= len(content):
|
||||
chunk_id = content[i:i+4]
|
||||
chunk_size = struct.unpack_from("<I", content, i + 4)[0]
|
||||
if chunk_id == b"LIST" and content[i+8:i+12] == b"INFO":
|
||||
_parse_riff_info(content, i + 12, i + 8 + chunk_size, result)
|
||||
break
|
||||
i += 8 + chunk_size + (chunk_size & 1) # RIFF chunks are word-aligned
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def _parse_riff_info(content: bytes, start: int, end: int, result: dict) -> None:
|
||||
import struct
|
||||
_info_labels = {
|
||||
b"INAM": "Title",
|
||||
b"IART": "Artist",
|
||||
b"ICMT": "Comment",
|
||||
b"ISBJ": "Subject",
|
||||
b"ICRD": "Date",
|
||||
}
|
||||
i = start
|
||||
while i + 8 <= end and i + 8 <= len(content):
|
||||
sub_id = content[i:i+4]
|
||||
sub_size = struct.unpack_from("<I", content, i + 4)[0]
|
||||
label = _info_labels.get(sub_id)
|
||||
if label:
|
||||
raw = content[i+8 : i+8+sub_size]
|
||||
val = raw.decode("utf-8", errors="replace").strip("\x00 ")
|
||||
if val and len(val) >= _EXIF_PII_MIN_LEN:
|
||||
result["pii_fields"][label] = val
|
||||
result["has_pii"] = True
|
||||
if label == "Artist" and not result["author"]:
|
||||
result["author"] = val
|
||||
if label == "Date" and not result["datetime"]:
|
||||
result["datetime"] = val
|
||||
i += 8 + sub_size + (sub_size & 1)
|
||||
|
||||
|
||||
def _extract_audio_metadata(content: bytes, filename: str) -> dict:
|
||||
"""Extract PII-bearing tags from an audio file.
|
||||
|
||||
Returns the same structure as _extract_exif / _extract_video_metadata.
|
||||
No GPS extraction — GPS is not embedded in audio containers in practice.
|
||||
|
||||
Uses mutagen.File(easy=True) which normalises tags to lowercase keys for
|
||||
MP3 (ID3), M4A/AAC (MPEG-4), FLAC, OGG Vorbis, and AIFF. WMA/ASF tags
|
||||
use mixed-case keys (e.g. "Title", "Author") — these are lowercased during
|
||||
normalisation so the same extraction logic covers all formats.
|
||||
"""
|
||||
result: dict = {"gps": None, "pii_fields": {}, "author": None,
|
||||
"datetime": None, "device": None, "has_pii": False}
|
||||
try:
|
||||
import mutagen
|
||||
f = mutagen.File(fileobj=io.BytesIO(content), filename=filename, easy=True)
|
||||
if not f or not f.tags:
|
||||
return result
|
||||
|
||||
# Normalise all tags to {lowercase_key: str_value} regardless of format
|
||||
def _strval(v):
|
||||
return str(v[0] if isinstance(v, list) and v else v).strip()
|
||||
|
||||
tags: dict[str, str] = {
|
||||
k.lower(): _strval(v) for k, v in f.tags.items()
|
||||
}
|
||||
|
||||
# Fields that may contain personal names or descriptions
|
||||
_pii_keys = {
|
||||
"title": "Title",
|
||||
"artist": "Artist",
|
||||
"albumartist": "Album Artist",
|
||||
"composer": "Composer",
|
||||
"lyricist": "Lyricist",
|
||||
"conductor": "Conductor",
|
||||
"author": "Author",
|
||||
"copyright": "Copyright",
|
||||
"comment": "Comment",
|
||||
"description": "Description",
|
||||
# WMA/ASF mixed-case keys survive as lowercase after normalisation
|
||||
"wm/albumartist": "Album Artist",
|
||||
"wm/composer": "Composer",
|
||||
"wm/conductor": "Conductor",
|
||||
"wm/lyrics": "Lyrics",
|
||||
}
|
||||
seen: set[str] = set() # avoid duplicate label entries
|
||||
for key, label in _pii_keys.items():
|
||||
val = tags.get(key, "")
|
||||
if val and len(val) >= _EXIF_PII_MIN_LEN and label not in seen:
|
||||
result["pii_fields"][label] = val
|
||||
result["has_pii"] = True
|
||||
seen.add(label)
|
||||
|
||||
# Author — most specific personal name field wins
|
||||
for key in ("artist", "author", "albumartist", "wm/albumartist", "composer"):
|
||||
val = tags.get(key, "")
|
||||
if val and len(val) >= _EXIF_PII_MIN_LEN:
|
||||
result["author"] = val
|
||||
break
|
||||
|
||||
# Recording / release date
|
||||
for key in ("date", "year", "wm/year"):
|
||||
val = tags.get(key, "")
|
||||
if val:
|
||||
result["datetime"] = val
|
||||
break
|
||||
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return result
|
||||
|
||||
|
||||
"""Detect faces in an image file using OpenCV Haar cascades.
|
||||
|
||||
|
||||
@ -260,8 +260,8 @@ import sse as _sse_mod # for _current_scan_id access at call time
|
||||
from cpr_detector import (
|
||||
_scan_bytes, _scan_bytes_timeout, _scan_text_direct, _html_esc, _get_pii_counts,
|
||||
_make_thumb, _placeholder_svg,
|
||||
_extract_exif, _detect_photo_faces,
|
||||
SUPPORTED_EXTS, PHOTO_EXTS,
|
||||
_extract_exif, _extract_video_metadata, _extract_audio_metadata, _detect_photo_faces,
|
||||
SUPPORTED_EXTS, PHOTO_EXTS, VIDEO_EXTS, AUDIO_EXTS,
|
||||
_EXIF_PII_TAGS,
|
||||
)
|
||||
# Inject runtime deps into cpr_detector
|
||||
@ -285,12 +285,16 @@ _se.FILE_SCANNER_OK = FILE_SCANNER_OK
|
||||
_se.CONNECTOR_OK = CONNECTOR_OK
|
||||
_se.DB_OK = DB_OK
|
||||
_se.PHOTO_EXTS = PHOTO_EXTS
|
||||
_se.VIDEO_EXTS = VIDEO_EXTS
|
||||
_se.AUDIO_EXTS = AUDIO_EXTS
|
||||
_se.SUPPORTED_EXTS = SUPPORTED_EXTS
|
||||
# cpr helpers
|
||||
_se._scan_bytes = _scan_bytes
|
||||
_se._scan_bytes_timeout = _scan_bytes_timeout
|
||||
_se._detect_photo_faces = _detect_photo_faces
|
||||
_se._extract_exif = _extract_exif
|
||||
_se._extract_video_metadata = _extract_video_metadata
|
||||
_se._extract_audio_metadata = _extract_audio_metadata
|
||||
_se._make_thumb = _make_thumb
|
||||
_se._placeholder_svg = _placeholder_svg
|
||||
_se._check_special_category = _check_special_category
|
||||
|
||||
@ -13,10 +13,11 @@ pdfplumber>=0.11 # PDF text extraction
|
||||
python-docx>=1.1 # Word document scanning
|
||||
openpyxl>=3.1 # Excel scanning + export
|
||||
|
||||
# ── Image processing ──────────────────────────────────────────────────────────
|
||||
# ── Image / video processing ─────────────────────────────────────────────────
|
||||
Pillow>=10.0 # Image thumbnails + EXIF extraction (always-on)
|
||||
opencv-python>=4.9 # Face detection (opt-in — Scan photos for faces)
|
||||
numpy>=1.26 # Required by opencv-python
|
||||
mutagen>=1.47 # Video metadata extraction (MP4/MOV/AVI — GPS, author, title)
|
||||
|
||||
# ── NER / PII detection ───────────────────────────────────────────────────────
|
||||
# spaCy 3.7 supports Python 3.8–3.12. Do NOT upgrade past Python 3.12.
|
||||
|
||||
@ -99,6 +99,8 @@ except ImportError:
|
||||
# Stubs for standalone import — overwritten by gdpr_scanner.py injections
|
||||
LANG: dict = {}
|
||||
PHOTO_EXTS: set = set()
|
||||
VIDEO_EXTS: set = set()
|
||||
AUDIO_EXTS: set = set()
|
||||
SUPPORTED_EXTS: set = set()
|
||||
|
||||
# cpr_detector helpers — injected by gdpr_scanner.py
|
||||
@ -106,6 +108,8 @@ def _scan_bytes(content, filename, poppler_path=None): return {"cprs": [], "date
|
||||
def _scan_bytes_timeout(content, filename, timeout=60): return {"cprs": [], "dates": []} # type: ignore[misc]
|
||||
def _detect_photo_faces(content, filename): return 0 # type: ignore[misc]
|
||||
def _extract_exif(content, filename): return {} # type: ignore[misc]
|
||||
def _extract_video_metadata(content, filename): return {} # type: ignore[misc]
|
||||
def _extract_audio_metadata(content, filename): return {} # type: ignore[misc]
|
||||
def _make_thumb(content, filename): return "" # type: ignore[misc]
|
||||
def _placeholder_svg(ext, name): return "" # type: ignore[misc]
|
||||
def _check_special_category(text, cprs): return [] # type: ignore[misc]
|
||||
@ -227,9 +231,9 @@ def run_file_scan(source: dict):
|
||||
|
||||
ext = Path(rel_path).suffix.lower()
|
||||
|
||||
# CPR scan — skip for images (no text layer; EXIF/face detection handles them)
|
||||
# CPR scan — skip for images, video and audio (no text layer)
|
||||
result: dict = {"cprs": [], "dates": []}
|
||||
if ext not in PHOTO_EXTS:
|
||||
if ext not in PHOTO_EXTS and ext not in VIDEO_EXTS and ext not in AUDIO_EXTS:
|
||||
try:
|
||||
result = _scan_bytes_timeout(content, rel_path)
|
||||
except Exception as e:
|
||||
@ -238,13 +242,17 @@ def run_file_scan(source: dict):
|
||||
|
||||
cprs = result.get("cprs", [])
|
||||
|
||||
# Photo / biometric scan + EXIF extraction
|
||||
# Photo / biometric scan + EXIF/video/audio metadata extraction
|
||||
_face_count = 0
|
||||
_exif = {}
|
||||
if ext in PHOTO_EXTS:
|
||||
if scan_photos:
|
||||
_face_count = _detect_photo_faces(content, rel_path)
|
||||
_exif = _extract_exif(content, rel_path)
|
||||
elif ext in VIDEO_EXTS:
|
||||
_exif = _extract_video_metadata(content, rel_path)
|
||||
elif ext in AUDIO_EXTS:
|
||||
_exif = _extract_audio_metadata(content, rel_path)
|
||||
|
||||
# Apply filters: distinct CPR threshold and GPS suppression
|
||||
_distinct_cprs = list(dict.fromkeys(c["formatted"] for c in cprs))
|
||||
@ -1084,16 +1092,23 @@ def run_scan(options: dict):
|
||||
content = conn.download_drive_item_for(uid, item_id)
|
||||
else:
|
||||
content = conn.download_item(meta)
|
||||
result = _scan_bytes(content, name)
|
||||
|
||||
# CPR scan — skip for video and audio (metadata-only; no text layer)
|
||||
_media_only = ext in VIDEO_EXTS or ext in AUDIO_EXTS
|
||||
result = {"cprs": [], "dates": []} if _media_only else _scan_bytes(content, name)
|
||||
cprs = result.get("cprs", [])
|
||||
|
||||
# ── Biometric photo scan (#9) + EXIF (#18) ───────────────
|
||||
# ── Biometric photo scan (#9) + EXIF/video/audio metadata (#18) ─
|
||||
_face_count = 0
|
||||
_exif = {}
|
||||
if ext in PHOTO_EXTS:
|
||||
if scan_photos:
|
||||
_face_count = _detect_photo_faces(content, name)
|
||||
_exif = _extract_exif(content, name)
|
||||
elif ext in VIDEO_EXTS:
|
||||
_exif = _extract_video_metadata(content, name)
|
||||
elif ext in AUDIO_EXTS:
|
||||
_exif = _extract_audio_metadata(content, name)
|
||||
|
||||
# Apply filters: distinct CPR threshold and GPS suppression
|
||||
_distinct_cprs = list(dict.fromkeys(c["formatted"] for c in cprs))
|
||||
|
||||
BIN
tests/fixtures/local_files/09_cpr_in_docx.docx
vendored
BIN
tests/fixtures/local_files/09_cpr_in_docx.docx
vendored
Binary file not shown.
BIN
tests/fixtures/local_files/13_cpr_in_xlsx.xlsx
vendored
BIN
tests/fixtures/local_files/13_cpr_in_xlsx.xlsx
vendored
Binary file not shown.
BIN
tests/fixtures/local_files/14_audio_artist_pii.mp3
vendored
Normal file
BIN
tests/fixtures/local_files/14_audio_artist_pii.mp3
vendored
Normal file
Binary file not shown.
BIN
tests/fixtures/local_files/15_audio_artist_pii.flac
vendored
Normal file
BIN
tests/fixtures/local_files/15_audio_artist_pii.flac
vendored
Normal file
Binary file not shown.
BIN
tests/fixtures/local_files/16_audio_no_pii.mp3
vendored
Normal file
BIN
tests/fixtures/local_files/16_audio_no_pii.mp3
vendored
Normal file
Binary file not shown.
BIN
tests/fixtures/local_files/17_audio_no_pii.flac
vendored
Normal file
BIN
tests/fixtures/local_files/17_audio_no_pii.flac
vendored
Normal file
Binary file not shown.
BIN
tests/fixtures/local_files/18_video_gps.mp4
vendored
Normal file
BIN
tests/fixtures/local_files/18_video_gps.mp4
vendored
Normal file
Binary file not shown.
BIN
tests/fixtures/local_files/19_video_no_pii.mp4
vendored
Normal file
BIN
tests/fixtures/local_files/19_video_no_pii.mp4
vendored
Normal file
Binary file not shown.
193
tests/fixtures/local_files/generate_fixtures.py
vendored
193
tests/fixtures/local_files/generate_fixtures.py
vendored
@ -4,7 +4,26 @@ Generate binary fixture files for the local-file GDPR scan test suite.
|
||||
Run from repo root:
|
||||
source venv/bin/activate
|
||||
python tests/fixtures/local_files/generate_fixtures.py
|
||||
|
||||
Fixtures produced
|
||||
─────────────────
|
||||
Document fixtures (require python-docx + openpyxl):
|
||||
09_cpr_in_docx.docx — Word document with 2 CPR numbers → Flag
|
||||
13_cpr_in_xlsx.xlsx — Excel workbook with CPR numbers → Flag
|
||||
|
||||
Audio fixtures (require mutagen):
|
||||
14_audio_artist_pii.mp3 — MP3 with artist/title tags (personal name) → Flag
|
||||
15_audio_artist_pii.flac — FLAC with artist/title Vorbis comments → Flag
|
||||
16_audio_no_pii.mp3 — MP3 with no metadata tags → No flag
|
||||
17_audio_no_pii.flac — FLAC with no metadata → No flag
|
||||
|
||||
Video fixtures (require mutagen):
|
||||
18_video_gps.mp4 — MP4 with GPS coordinates + artist tag → Flag
|
||||
19_video_no_pii.mp4 — MP4 with no metadata tags → No flag
|
||||
"""
|
||||
import struct
|
||||
import tempfile
|
||||
import os
|
||||
from pathlib import Path
|
||||
import sys
|
||||
|
||||
@ -19,6 +38,7 @@ def _require(pkg):
|
||||
|
||||
openpyxl = _require("openpyxl")
|
||||
docx = _require("docx")
|
||||
_require("mutagen")
|
||||
|
||||
from openpyxl import Workbook
|
||||
from openpyxl.styles import Font, PatternFill, Alignment
|
||||
@ -148,7 +168,180 @@ def make_xlsx():
|
||||
print(f"Written: {out.name}")
|
||||
|
||||
|
||||
# ── Audio / video helpers ─────────────────────────────────────────────────────
|
||||
|
||||
# Two silent MPEG1 Layer3 frames (128 kbps / 44100 Hz / mono).
|
||||
# mutagen needs at least 2 consecutive frame headers to confirm sync.
|
||||
# 4-byte header + 413 bytes frame body = 417 bytes × 2 = 834 bytes total.
|
||||
_MPEG_FRAMES = (b'\xff\xfb\x90\x00' + b'\x00' * 413) * 2
|
||||
|
||||
|
||||
def _flac_block_header(block_type: int, data_len: int, last: bool = False) -> bytes:
|
||||
first = (0x80 if last else 0x00) | block_type
|
||||
return bytes([first, (data_len >> 16) & 0xFF, (data_len >> 8) & 0xFF, data_len & 0xFF])
|
||||
|
||||
|
||||
def _vorbis_comment_block(comments: dict) -> bytes:
|
||||
vendor = b'GDPRScanner fixture'
|
||||
data = struct.pack('<I', len(vendor)) + vendor
|
||||
data += struct.pack('<I', len(comments))
|
||||
for key, value in comments.items():
|
||||
entry = f'{key}={value}'.encode('utf-8')
|
||||
data += struct.pack('<I', len(entry)) + entry
|
||||
return data
|
||||
|
||||
|
||||
def _minimal_flac(comments: dict) -> bytes:
|
||||
"""Return bytes for a valid minimal FLAC file with Vorbis comments."""
|
||||
# STREAMINFO (34 bytes): 44100 Hz, mono, 16-bit, 0 samples, zero MD5.
|
||||
si = bytearray(34)
|
||||
si[0:2] = struct.pack('>H', 4096) # min block size
|
||||
si[2:4] = struct.pack('>H', 4096) # max block size
|
||||
# bytes 4-9: min/max frame sizes = 0 (unknown)
|
||||
# Bits 80-99: sample_rate=44100 (0xAC44 in 20-bit field)
|
||||
# Bits 100-102: channels-1 = 0 (mono)
|
||||
# Bits 103-107: bits_per_sample-1 = 15 (16-bit)
|
||||
# Bits 108-143: total_samples = 0; bytes 14-17 remain zero
|
||||
si[10] = 0x0A # 0000_1010 — top 8 of 44100 in 20-bit field
|
||||
si[11] = 0xC4 # 1100_0100
|
||||
si[12] = 0x40 # bottom 4 of sample_rate | channels(000) | bps_msb(0)
|
||||
si[13] = 0xF0 # bps remaining 4 bits (1111) | top 4 of total_samples (0)
|
||||
|
||||
vc = _vorbis_comment_block(comments)
|
||||
return (
|
||||
b'fLaC'
|
||||
+ _flac_block_header(0, 34, last=not comments) # STREAMINFO
|
||||
+ bytes(si)
|
||||
+ (_flac_block_header(4, len(vc), last=True) + vc if comments else b'')
|
||||
)
|
||||
|
||||
|
||||
def _mp4_atom(name: bytes, data: bytes) -> bytes:
|
||||
return struct.pack('>I', 8 + len(data)) + name + data
|
||||
|
||||
|
||||
def _minimal_mp4_base() -> bytes:
|
||||
"""Return bytes for the smallest valid MPEG-4 container mutagen can tag."""
|
||||
# ftyp — identifies the file as M4A
|
||||
ftyp = _mp4_atom(
|
||||
b'ftyp',
|
||||
b'M4A ' + struct.pack('>I', 0) + b'M4A ' + b'mp42' + b'isom',
|
||||
)
|
||||
# mvhd version 0 — 100 bytes of content (ISO 14496-12 §8.2.2)
|
||||
mvhd = bytearray(100)
|
||||
mvhd[0:4] = b'\x00\x00\x00\x00' # version + flags
|
||||
struct.pack_into('>IIII', mvhd, 4, 0, 0, 1000, 0) # creation, modification, timescale, duration
|
||||
struct.pack_into('>I', mvhd, 16, 0x00010000) # rate = 1.0
|
||||
struct.pack_into('>H', mvhd, 20, 0x0100) # volume = 1.0
|
||||
# bytes 22-31: reserved (10 bytes, already zero)
|
||||
struct.pack_into('>9i', mvhd, 32, # unity matrix
|
||||
0x00010000, 0, 0, 0, 0x00010000, 0, 0, 0, 0x40000000)
|
||||
# bytes 68-91: pre-defined (24 bytes, already zero)
|
||||
struct.pack_into('>I', mvhd, 96, 0xFFFFFFFF) # next_track_ID
|
||||
|
||||
return ftyp + _mp4_atom(b'moov', _mp4_atom(b'mvhd', bytes(mvhd)))
|
||||
|
||||
|
||||
def _mp4_with_tags(tags: dict) -> bytes:
|
||||
"""Return bytes for a minimal MP4 with the given mutagen tag dict."""
|
||||
import mutagen.mp4
|
||||
tmp = tempfile.mktemp(suffix='.mp4')
|
||||
try:
|
||||
with open(tmp, 'wb') as fh:
|
||||
fh.write(_minimal_mp4_base())
|
||||
f = mutagen.mp4.MP4(tmp)
|
||||
f.add_tags()
|
||||
for key, value in tags.items():
|
||||
f.tags[key] = [value]
|
||||
f.save()
|
||||
with open(tmp, 'rb') as fh:
|
||||
return fh.read()
|
||||
finally:
|
||||
if os.path.exists(tmp):
|
||||
os.unlink(tmp)
|
||||
|
||||
|
||||
# ── 14_audio_artist_pii.mp3 ───────────────────────────────────────────────────
|
||||
def make_mp3_pii():
|
||||
from mutagen.easyid3 import EasyID3
|
||||
tmp = tempfile.mktemp(suffix='.mp3')
|
||||
try:
|
||||
t = EasyID3()
|
||||
t['artist'] = ['Emma Slot Henriksen']
|
||||
t['title'] = ['Fortrolig optagelse — personalemøde']
|
||||
t['date'] = ['2026-04-21']
|
||||
t.save(tmp)
|
||||
with open(tmp, 'rb') as fh:
|
||||
id3_bytes = fh.read()
|
||||
finally:
|
||||
if os.path.exists(tmp):
|
||||
os.unlink(tmp)
|
||||
|
||||
out = HERE / '14_audio_artist_pii.mp3'
|
||||
out.write_bytes(id3_bytes + _MPEG_FRAMES)
|
||||
print(f"Written: {out.name}")
|
||||
|
||||
|
||||
# ── 15_audio_artist_pii.flac ──────────────────────────────────────────────────
|
||||
def make_flac_pii():
|
||||
out = HERE / '15_audio_artist_pii.flac'
|
||||
out.write_bytes(_minimal_flac({
|
||||
'ARTIST': 'Emma Slot Henriksen',
|
||||
'TITLE': 'Fortrolig optagelse — personalemøde',
|
||||
'DATE': '2026-04-21',
|
||||
}))
|
||||
print(f"Written: {out.name}")
|
||||
|
||||
|
||||
# ── 16_audio_no_pii.mp3 ───────────────────────────────────────────────────────
|
||||
def make_mp3_no_pii():
|
||||
from mutagen.easyid3 import EasyID3
|
||||
tmp = tempfile.mktemp(suffix='.mp3')
|
||||
try:
|
||||
EasyID3().save(tmp) # empty ID3 header, no tags
|
||||
with open(tmp, 'rb') as fh:
|
||||
id3_bytes = fh.read()
|
||||
finally:
|
||||
if os.path.exists(tmp):
|
||||
os.unlink(tmp)
|
||||
|
||||
out = HERE / '16_audio_no_pii.mp3'
|
||||
out.write_bytes(id3_bytes + _MPEG_FRAMES)
|
||||
print(f"Written: {out.name}")
|
||||
|
||||
|
||||
# ── 17_audio_no_pii.flac ──────────────────────────────────────────────────────
|
||||
def make_flac_no_pii():
|
||||
out = HERE / '17_audio_no_pii.flac'
|
||||
out.write_bytes(_minimal_flac({})) # no Vorbis comment block
|
||||
print(f"Written: {out.name}")
|
||||
|
||||
|
||||
# ── 18_video_gps.mp4 ─────────────────────────────────────────────────────────
|
||||
def make_mp4_gps():
|
||||
out = HERE / '18_video_gps.mp4'
|
||||
out.write_bytes(_mp4_with_tags({
|
||||
'©xyz': '+55.6761+012.5683+000.000/', # Copenhagen
|
||||
'©ART': 'Emma Slot Henriksen',
|
||||
'©nam': 'Optagelse fra skolegården',
|
||||
}))
|
||||
print(f"Written: {out.name}")
|
||||
|
||||
|
||||
# ── 19_video_no_pii.mp4 ──────────────────────────────────────────────────────
|
||||
def make_mp4_no_pii():
|
||||
out = HERE / '19_video_no_pii.mp4'
|
||||
out.write_bytes(_minimal_mp4_base()) # no moov/udta/meta/ilst — no tags
|
||||
print(f"Written: {out.name}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
make_docx()
|
||||
make_xlsx()
|
||||
make_mp3_pii()
|
||||
make_flac_pii()
|
||||
make_mp3_no_pii()
|
||||
make_flac_no_pii()
|
||||
make_mp4_gps()
|
||||
make_mp4_no_pii()
|
||||
print("Done.")
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user