GDPRScanner/routes/sources.py
StyxX65 744813f4ac Add compliance audit log
Immutable audit_log table in the scanner DB records every significant
admin action (profile save/delete, token create/revoke, PIN changes,
source add/update/delete, scheduler job changes, scan start/stop, SMTP
save, dispositions, item delete/redact). GET /api/audit_log exposes
entries newest-first. New Audit Log tab in the Settings modal renders
the table on demand. Settings modal widened 540→640 px and tab labels
set to white-space:nowrap so the six-tab row fits on one line.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-28 10:51:23 +02:00

209 lines
7.6 KiB
Python

"""
File sources and file scan
"""
from __future__ import annotations
import threading
import uuid as _uuid
from pathlib import Path
from flask import Blueprint, jsonify, request
from routes import state
from app_config import _load_file_sources, _save_file_sources, _SFTP_KEYS_DIR
try:
from gdpr_db import log_audit_event as _audit
except ImportError:
def _audit(*a, **kw): pass # type: ignore[misc]
try:
from file_scanner import store_smb_password, SMB_OK as _SMB_OK
_FILE_SCANNER_OK = True
except ImportError:
_FILE_SCANNER_OK = False
_SMB_OK = False
def store_smb_password(*a, **kw): return False # type: ignore[misc]
try:
from sftp_connector import store_sftp_password, SFTP_OK as _SFTP_OK
except ImportError:
_SFTP_OK = False
def store_sftp_password(*a, **kw): return False # type: ignore[misc]
bp = Blueprint("sources", __name__)
@bp.route("/api/file_sources", methods=["GET"])
def file_sources_list():
"""Return all saved file source definitions."""
sources = _load_file_sources()
return jsonify({
"sources": sources,
"smb_available": _SMB_OK,
"sftp_available": _SFTP_OK,
"scanner_ok": _FILE_SCANNER_OK,
})
@bp.route("/api/file_sources/save", methods=["POST"])
def file_sources_save():
"""Add or update a file source. Assigns a UUID if id is missing."""
data = request.get_json() or {}
source_type = data.get("source_type", "")
# Validate required fields per source type
if source_type == "sftp":
if not data.get("sftp_host", "").strip():
return jsonify({"error": "sftp_host required"}), 400
if not data.get("sftp_user", "").strip():
return jsonify({"error": "sftp_user required"}), 400
if not data.get("path", "").strip():
data["path"] = "/"
else:
if not data.get("path", "").strip():
return jsonify({"error": "path required"}), 400
sources = _load_file_sources()
uid = data.get("id") or ""
for i, s in enumerate(sources):
if s.get("id") == uid:
sources[i] = {**s, **data}
_save_file_sources(sources)
_audit("source_update",
f"name={data.get('name','')!r} type={data.get('source_type','local')!r}",
ip=request.remote_addr or "")
return jsonify({"ok": True, "source": sources[i]})
data["id"] = data.get("id") or str(_uuid.uuid4())
sources.append(data)
_save_file_sources(sources)
_audit("source_add",
f"name={data.get('name','')!r} type={data.get('source_type','local')!r}",
ip=request.remote_addr or "")
return jsonify({"ok": True, "source": data})
@bp.route("/api/file_sources/delete", methods=["POST"])
def file_sources_delete():
"""Remove a file source by id. Also deletes any associated SFTP key file."""
uid = (request.get_json() or {}).get("id", "")
if not uid:
return jsonify({"error": "id required"}), 400
sources = _load_file_sources()
deleted = next((s for s in sources if s.get("id") == uid), None)
sources = [s for s in sources if s.get("id") != uid]
_save_file_sources(sources)
if deleted:
_audit("source_delete",
f"name={deleted.get('name','')!r} type={deleted.get('source_type','local')!r}",
ip=request.remote_addr or "")
# Clean up key file if this was an SFTP key-auth source
if deleted and deleted.get("sftp_key_path"):
key_file = Path(deleted["sftp_key_path"])
if key_file.parent == _SFTP_KEYS_DIR and key_file.exists():
try:
key_file.unlink()
except OSError:
pass
return jsonify({"ok": True})
@bp.route("/api/file_sources/store_creds", methods=["POST"])
def file_sources_store_creds():
"""Store SMB or SFTP password/passphrase in the OS keychain."""
data = request.get_json() or {}
source_type = data.get("source_type", "smb")
password = data.get("password", "")
if source_type == "sftp":
if not _SFTP_OK:
return jsonify({"error": "paramiko not installed — run: pip install paramiko"}), 503
host = data.get("sftp_host", "")
user = data.get("sftp_user", "")
if not user or not password:
return jsonify({"error": "sftp_user and password required"}), 400
key = data.get("keychain_key") or f"sftp:{user}@{host}"
ok = store_sftp_password(host, user, password, key)
if ok:
return jsonify({"ok": True, "keychain_key": key})
return jsonify({"error": "keyring not available — install: pip install keyring"}), 500
else:
if not _FILE_SCANNER_OK:
return jsonify({"error": "file_scanner not available"}), 503
smb_host = data.get("smb_host", "")
smb_user = data.get("smb_user", "")
if not smb_user or not password:
return jsonify({"error": "smb_user and password required"}), 400
key = data.get("keychain_key") or smb_user
ok = store_smb_password(smb_host, smb_user, password, key)
if ok:
return jsonify({"ok": True, "keychain_key": key})
return jsonify({"error": "keyring not available — install: pip install keyring"}), 500
@bp.route("/api/file_sources/upload_key", methods=["POST"])
def file_sources_upload_key():
"""Accept an SSH private key file upload and store it in the SFTP keys directory.
Validates the file is a recognised private key format before saving.
Returns {"key_id": uuid, "key_path": absolute_path}.
"""
if not _SFTP_OK:
return jsonify({"error": "paramiko not installed — run: pip install paramiko"}), 503
if "key_file" not in request.files:
return jsonify({"error": "key_file required"}), 400
file = request.files["key_file"]
raw = file.read(65536) # 64 KB is more than enough for any private key
# Validate before saving — try loading the key material with paramiko
import io
import paramiko
loaded = False
for cls in (paramiko.RSAKey, paramiko.Ed25519Key, paramiko.ECDSAKey, paramiko.DSSKey):
try:
cls.from_private_key(io.BytesIO(raw))
loaded = True
break
except (paramiko.ssh_exception.SSHException, Exception):
continue
if not loaded:
# Might be passphrase-protected — still accept it; validation will happen at connect time
if b"-----BEGIN" not in raw and b"OPENSSH PRIVATE KEY" not in raw:
return jsonify({"error": "File does not appear to be a private key"}), 400
key_id = str(_uuid.uuid4())
key_path = _SFTP_KEYS_DIR / key_id
key_path.write_bytes(raw)
key_path.chmod(0o600)
return jsonify({"ok": True, "key_id": key_id, "key_path": str(key_path)})
@bp.route("/api/file_scan/start", methods=["POST"])
def file_scan_start():
"""Start a file system scan for a single file source (local, SMB, or SFTP)."""
source = request.get_json() or {}
source_type = source.get("source_type", "")
if source_type == "sftp":
if not _SFTP_OK:
return jsonify({"error": "paramiko not installed — run: pip install paramiko"}), 503
elif not _FILE_SCANNER_OK:
return jsonify({"error": "file_scanner not available"}), 503
if not state._scan_lock.acquire(blocking=False):
return jsonify({"error": "scan already running"}), 409
state._scan_abort.clear()
def _run():
from scan_engine import run_file_scan
try:
run_file_scan(source)
finally:
state._scan_lock.release()
threading.Thread(target=_run, daemon=True).start()
return jsonify({"status": "started"})