2026-04-11 04:38:11 +02:00

218 lines
8.6 KiB
Python

"""
User listing, role overrides, license debug
"""
from __future__ import annotations
import logging
import traceback
from flask import Blueprint, jsonify, request
from routes import state
from app_config import (
_load_role_overrides, _save_role_overrides, _resolve_display_name,
)
bp = Blueprint("users", __name__)
logger = logging.getLogger(__name__)
@bp.route("/api/users")
def get_users():
"""List all tenant users for account selection."""
if not state.connector:
return jsonify({"error": "not authenticated"}), 401
try:
users = state.connector.list_users()
out = []
seen = set()
# Build SKU map for role classification.
# get_subscribed_skus() tries /subscribedSkus → /me/licenseDetails.
# Then always merge per-user licenseDetails on top — this ensures we
# have skuPartNumbers for every distinct SKU in the tenant, not just
# the admin's own license (which is all /me/licenseDetails returns).
try:
sku_map = state.connector.get_subscribed_skus()
except Exception:
sku_map = {}
try:
per_user = state.connector.build_sku_map_from_users(users)
if per_user:
added = len(set(per_user) - set(sku_map))
sku_map.update(per_user)
if added:
logger.info("[skus] merged %d additional SKU(s) from per-user licenseDetails", added)
except Exception:
pass
# Load any manual role overrides set by the admin
_role_overrides = _load_role_overrides()
def _build_user(u: dict, is_me: bool = False) -> dict:
_em = u.get("mail") or u.get("userPrincipalName", "")
_auto = state.connector.classify_user_role(
u.get("assignedLicenses", []), sku_map
)
# Manual override takes precedence over auto-classification
_role = _role_overrides.get(u["id"], _auto)
return {
"id": u["id"],
"displayName": _resolve_display_name(u.get("displayName", ""), _em),
"email": _em,
"isMe": is_me,
"userRole": _role,
"roleOverride": u["id"] in _role_overrides,
}
if state.connector.is_app_mode:
for u in users:
uid = u.get("id")
if uid and uid not in seen:
seen.add(uid)
out.append(_build_user(u))
else:
me = state.connector.get_user_info()
me_id = me.get("id")
for u in ([me] + users):
uid = u.get("id")
if uid and uid not in seen:
seen.add(uid)
out.append(_build_user(u, is_me=(uid == me_id)))
# Log a warning when no users were classified — helps diagnose
# tenants with SKUs not yet in m365_skus.json
classified = [u for u in out if u["userRole"] in ("student", "staff")]
if out and not classified:
unknown_skus: set = set()
for u in users[:20]: # sample first 20 to keep it brief
for lic in u.get("assignedLicenses", []):
sid = lic.get("skuId", "")
if sid:
unknown_skus.add(sid)
logger.warning(
"[role] 0/%d users classified — no SKUs in m365_skus.json matched. "
"Unrecognised SKU IDs (sample): %s. "
"Add them to classification/m365_skus.json or use /api/users/license_debug.",
len(out), sorted(unknown_skus)[:10],
)
return jsonify({
"users": out,
"sku_map_available": bool(sku_map),
"unclassified": len(out) - len(classified),
})
except Exception as e:
return jsonify({"error": str(e), "detail": traceback.format_exc()}), 500
@bp.route("/api/users/license_debug")
def license_debug():
"""Full diagnostic: runtime SKU sets, sku_map, per-user trace, and step-by-step
classification walk for every user — enough to diagnose any remaining issue."""
if not state.connector:
return jsonify({"error": "not authenticated"}), 401
try:
users = state.connector.list_users()
sku_map = state.connector.get_subscribed_skus()
try:
sku_map.update(state.connector.build_sku_map_from_users(users))
except Exception:
pass
# Per-user trace with step-by-step classification walk
out = []
for u in users[:100]:
lics = u.get("assignedLicenses", [])
role = state.connector.classify_user_role(lics, sku_map)
# Walk each licence exactly as classify_user_role does
lic_trace = []
for lic in lics:
raw_id = lic.get("skuId", "")
low_id = raw_id.lower()
name = sku_map.get(low_id) or sku_map.get(raw_id) or "?"
lic_trace.append({
"skuId": raw_id,
"skuName": name,
"in_staff": low_id in state.connector._STAFF_SKU_IDS,
"in_student": low_id in state.connector._STUDENT_SKU_IDS,
"frag_staff": next((f for f in state.connector._STAFF_SKU_FRAGMENTS
if f in name.upper()), None),
"frag_student": next((f for f in state.connector._STUDENT_SKU_FRAGMENTS
if f in name.upper()), None),
})
out.append({
"displayName": u.get("displayName", ""),
"email": u.get("mail") or u.get("userPrincipalName", ""),
"role": role,
"licences": lic_trace,
})
return jsonify({
# Runtime state — proves whether m365_skus.json loaded correctly
"runtime": {
"student_ids_count": len(state.connector._STUDENT_SKU_IDS),
"staff_ids_count": len(state.connector._STAFF_SKU_IDS),
"student_fragments": list(state.connector._STUDENT_SKU_FRAGMENTS),
"staff_fragments": list(state.connector._STAFF_SKU_FRAGMENTS),
"sku_map_entries": len(sku_map),
"sku_file_path": str(state.connector._sku_file_path()),
},
"student_ids": sorted(state.connector._STUDENT_SKU_IDS),
"staff_ids": sorted(state.connector._STAFF_SKU_IDS),
"sku_map": sku_map,
"users": out,
})
except Exception as e:
return jsonify({"error": str(e), "detail": traceback.format_exc()}), 500
@bp.route("/api/users/lookup")
def lookup_user():
"""Look up a single user by UPN or email."""
if not state.connector:
return jsonify({"error": "not authenticated"}), 401
upn = request.args.get("upn", "").strip()
if not upn:
return jsonify({"error": "upn required"}), 400
try:
data = state.connector._get(f"/users/{upn}", {"$select": "id,displayName,mail,userPrincipalName"})
_email = data.get("mail") or data.get("userPrincipalName", upn)
return jsonify({
"id": data["id"],
"displayName": _resolve_display_name(data.get("displayName", ""), _email, upn),
"email": _email,
"isMe": False,
})
except Exception as e:
return jsonify({"error": str(e)}), 404
@bp.route("/api/users/role_override", methods=["GET"])
def role_override_get():
"""Return all manual role overrides as {user_id: role}."""
return jsonify(_load_role_overrides())
@bp.route("/api/users/role_override", methods=["POST"])
def role_override_set():
"""Set or clear a manual role override for one user.
Body: {user_id, role} — role is 'student' | 'staff' | 'other' | '' (clear).
"""
data = request.get_json() or {}
uid = data.get("user_id", "").strip()
role = data.get("role", "").strip().lower()
if not uid:
return jsonify({"error": "user_id required"}), 400
if role and role not in ("student", "staff", "other"):
return jsonify({"error": "role must be student | staff | other | '' (clear)"}), 400
overrides = _load_role_overrides()
if role:
overrides[uid] = role
else:
overrides.pop(uid, None)
_save_role_overrides(overrides)
return jsonify({"ok": True, "user_id": uid, "role": role or None,
"total_overrides": len(overrides)})