247 lines
8.5 KiB
Python
247 lines
8.5 KiB
Python
"""
|
|
Google Workspace authentication routes.
|
|
|
|
Endpoints:
|
|
GET /api/google/auth/status — is a service account loaded?
|
|
POST /api/google/auth/connect — save key JSON + optional admin_email
|
|
POST /api/google/auth/disconnect — remove saved key + clear connector
|
|
"""
|
|
from __future__ import annotations
|
|
from flask import Blueprint, jsonify, request
|
|
import json
|
|
import threading
|
|
|
|
from routes import state
|
|
|
|
bp = Blueprint("google_auth", __name__)
|
|
|
|
|
|
def __getattr__(name):
|
|
import gdpr_scanner as _m
|
|
if hasattr(_m, name):
|
|
return getattr(_m, name)
|
|
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
|
|
|
|
|
|
@bp.route("/api/google/auth/status")
|
|
def google_auth_status():
|
|
"""Return current Google connection state."""
|
|
from google_connector import GOOGLE_AUTH_OK, load_saved_key
|
|
if not GOOGLE_AUTH_OK:
|
|
return jsonify({
|
|
"connected": False,
|
|
"error": "google-auth not installed — run: pip install google-auth google-auth-httplib2 google-api-python-client",
|
|
"libs_ok": False,
|
|
})
|
|
|
|
key = load_saved_key()
|
|
if not key:
|
|
return jsonify({"connected": False, "libs_ok": True})
|
|
|
|
sa_email = key.get("client_email", "")
|
|
project_id = key.get("project_id", "")
|
|
admin_email = ""
|
|
|
|
# Read persisted admin_email from config
|
|
cfg = _load_google_config()
|
|
admin_email = cfg.get("admin_email", "")
|
|
|
|
# Rebuild connector in state if not present
|
|
if not state.google_connector:
|
|
try:
|
|
from google_connector import GoogleConnector
|
|
state.google_connector = GoogleConnector(key, admin_email=admin_email)
|
|
except Exception as e:
|
|
return jsonify({"connected": False, "libs_ok": True,
|
|
"error": str(e), "sa_email": sa_email})
|
|
|
|
return jsonify({
|
|
"connected": True,
|
|
"libs_ok": True,
|
|
"sa_email": sa_email,
|
|
"project_id": project_id,
|
|
"admin_email": admin_email,
|
|
})
|
|
|
|
|
|
@bp.route("/api/google/auth/connect", methods=["POST"])
|
|
def google_auth_connect():
|
|
"""
|
|
Accept a service account key JSON + optional admin_email.
|
|
Body: { "key_json": "<raw JSON string or object>", "admin_email": "admin@domain.com" }
|
|
"""
|
|
from google_connector import GOOGLE_AUTH_OK, save_key, GoogleConnector
|
|
if not GOOGLE_AUTH_OK:
|
|
return jsonify({"error": "google-auth not installed"}), 503
|
|
|
|
data = request.get_json() or {}
|
|
raw_key = data.get("key_json", "")
|
|
admin_email = data.get("admin_email", "").strip()
|
|
|
|
# Accept both a JSON string and an already-parsed object
|
|
if isinstance(raw_key, str):
|
|
try:
|
|
key_dict = json.loads(raw_key)
|
|
except json.JSONDecodeError as e:
|
|
return jsonify({"error": f"Invalid JSON: {e}"}), 400
|
|
elif isinstance(raw_key, dict):
|
|
key_dict = raw_key
|
|
else:
|
|
return jsonify({"error": "key_json must be a JSON string or object"}), 400
|
|
|
|
if key_dict.get("type") != "service_account":
|
|
return jsonify({"error": "File must be a service_account JSON key (type != service_account)"}), 400
|
|
|
|
# Validate by building a connector
|
|
try:
|
|
conn = GoogleConnector(key_dict, admin_email=admin_email)
|
|
if not conn.is_authenticated():
|
|
return jsonify({"error": "Credentials did not validate — check the key file"}), 400
|
|
except Exception as e:
|
|
return jsonify({"error": str(e)}), 400
|
|
|
|
save_key(key_dict)
|
|
_save_google_config({"admin_email": admin_email})
|
|
|
|
state.google_connector = conn
|
|
|
|
return jsonify({
|
|
"ok": True,
|
|
"sa_email": key_dict.get("client_email", ""),
|
|
"project_id": key_dict.get("project_id", ""),
|
|
})
|
|
|
|
|
|
@bp.route("/api/google/auth/disconnect", methods=["POST"])
|
|
def google_auth_disconnect():
|
|
"""Remove saved service account key and clear the connector."""
|
|
from google_connector import delete_key
|
|
delete_key()
|
|
_save_google_config({})
|
|
state.google_connector = None
|
|
return jsonify({"ok": True})
|
|
|
|
|
|
# ── Personal Google account (device-code OAuth) ───────────────────────────────
|
|
|
|
@bp.route("/api/google/personal/status")
|
|
def google_personal_status():
|
|
"""Check whether a personal Google OAuth token is present and valid."""
|
|
from google_connector import GOOGLE_AUTH_OK, load_personal_token, PersonalGoogleConnector
|
|
if not GOOGLE_AUTH_OK:
|
|
return jsonify({"connected": False, "libs_ok": False, "auth_mode": "personal"})
|
|
|
|
token_data = load_personal_token()
|
|
if not token_data:
|
|
return jsonify({"connected": False, "libs_ok": True, "auth_mode": "personal"})
|
|
|
|
if not isinstance(state.google_connector, PersonalGoogleConnector):
|
|
try:
|
|
conn = PersonalGoogleConnector(token_data)
|
|
if conn.is_authenticated():
|
|
state.google_connector = conn
|
|
else:
|
|
return jsonify({"connected": False, "libs_ok": True, "auth_mode": "personal"})
|
|
except Exception as e:
|
|
return jsonify({"connected": False, "libs_ok": True, "auth_mode": "personal",
|
|
"error": str(e)})
|
|
|
|
try:
|
|
info = state.google_connector.get_user_info()
|
|
return jsonify({
|
|
"connected": True,
|
|
"libs_ok": True,
|
|
"auth_mode": "personal",
|
|
"email": info.get("email", ""),
|
|
"displayName": info.get("displayName", ""),
|
|
})
|
|
except Exception as e:
|
|
return jsonify({"connected": False, "libs_ok": True, "auth_mode": "personal",
|
|
"error": str(e)})
|
|
|
|
|
|
@bp.route("/api/google/personal/start", methods=["POST"])
|
|
def google_personal_start():
|
|
"""Initiate a Google device-code flow for a personal account."""
|
|
from google_connector import GOOGLE_AUTH_OK, PersonalGoogleConnector
|
|
if not GOOGLE_AUTH_OK:
|
|
return jsonify({"error": "google-auth not installed"}), 503
|
|
|
|
data = request.get_json() or {}
|
|
client_id = data.get("client_id", "").strip()
|
|
client_secret = data.get("client_secret", "").strip()
|
|
if not client_id or not client_secret:
|
|
return jsonify({"error": "client_id and client_secret required"}), 400
|
|
|
|
try:
|
|
flow = PersonalGoogleConnector.get_device_code_flow(client_id, client_secret)
|
|
except Exception as e:
|
|
return jsonify({"error": str(e)}), 400
|
|
|
|
state.google_pending_flow = flow
|
|
state.google_poll_result = None
|
|
|
|
def _do_auth():
|
|
try:
|
|
conn = PersonalGoogleConnector.complete_device_code_flow(flow)
|
|
state.google_connector = conn
|
|
state.google_poll_result = "ok"
|
|
except Exception as e:
|
|
state.google_poll_result = str(e)
|
|
|
|
threading.Thread(target=_do_auth, daemon=True).start()
|
|
|
|
return jsonify({
|
|
"user_code": flow["user_code"],
|
|
"verification_url": flow["verification_url"],
|
|
})
|
|
|
|
|
|
@bp.route("/api/google/personal/poll", methods=["POST"])
|
|
def google_personal_poll():
|
|
"""Check whether the device-code sign-in has completed."""
|
|
result = state.google_poll_result
|
|
if result == "ok":
|
|
state.google_poll_result = None
|
|
state.google_pending_flow = None
|
|
return jsonify({"status": "ok"})
|
|
if result and result != "pending":
|
|
state.google_poll_result = None
|
|
state.google_pending_flow = None
|
|
return jsonify({"status": "error", "error": result})
|
|
return jsonify({"status": "pending"})
|
|
|
|
|
|
@bp.route("/api/google/personal/signout", methods=["POST"])
|
|
def google_personal_signout():
|
|
"""Delete the stored personal OAuth token and clear the connector."""
|
|
from google_connector import delete_personal_token, PersonalGoogleConnector
|
|
delete_personal_token()
|
|
if isinstance(state.google_connector, PersonalGoogleConnector):
|
|
state.google_connector = None
|
|
return jsonify({"ok": True})
|
|
|
|
|
|
# ── Config helpers ────────────────────────────────────────────────────────────
|
|
|
|
from pathlib import Path as _Path
|
|
_DATA_DIR = _Path.home() / ".gdprscanner"
|
|
_DATA_DIR.mkdir(exist_ok=True)
|
|
_GOOGLE_CONFIG = _DATA_DIR / "google.json"
|
|
|
|
|
|
def _load_google_config() -> dict:
|
|
if _GOOGLE_CONFIG.exists():
|
|
try:
|
|
return json.loads(_GOOGLE_CONFIG.read_text())
|
|
except Exception:
|
|
pass
|
|
return {}
|
|
|
|
|
|
def _save_google_config(cfg: dict) -> None:
|
|
try:
|
|
_GOOGLE_CONFIG.write_text(json.dumps(cfg, indent=2))
|
|
except Exception:
|
|
pass
|