218 lines
8.6 KiB
Python
218 lines
8.6 KiB
Python
"""
|
|
User listing, role overrides, license debug
|
|
"""
|
|
from __future__ import annotations
|
|
import logging
|
|
import traceback
|
|
from flask import Blueprint, jsonify, request
|
|
from routes import state
|
|
from app_config import (
|
|
_load_role_overrides, _save_role_overrides, _resolve_display_name,
|
|
)
|
|
|
|
bp = Blueprint("users", __name__)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@bp.route("/api/users")
|
|
def get_users():
|
|
"""List all tenant users for account selection."""
|
|
if not state.connector:
|
|
return jsonify({"error": "not authenticated"}), 401
|
|
try:
|
|
users = state.connector.list_users()
|
|
out = []
|
|
seen = set()
|
|
|
|
# Build SKU map for role classification.
|
|
# get_subscribed_skus() tries /subscribedSkus → /me/licenseDetails.
|
|
# Then always merge per-user licenseDetails on top — this ensures we
|
|
# have skuPartNumbers for every distinct SKU in the tenant, not just
|
|
# the admin's own license (which is all /me/licenseDetails returns).
|
|
try:
|
|
sku_map = state.connector.get_subscribed_skus()
|
|
except Exception:
|
|
sku_map = {}
|
|
|
|
try:
|
|
per_user = state.connector.build_sku_map_from_users(users)
|
|
if per_user:
|
|
added = len(set(per_user) - set(sku_map))
|
|
sku_map.update(per_user)
|
|
if added:
|
|
logger.info("[skus] merged %d additional SKU(s) from per-user licenseDetails", added)
|
|
except Exception:
|
|
pass
|
|
|
|
# Load any manual role overrides set by the admin
|
|
_role_overrides = _load_role_overrides()
|
|
|
|
def _build_user(u: dict, is_me: bool = False) -> dict:
|
|
_em = u.get("mail") or u.get("userPrincipalName", "")
|
|
_auto = state.connector.classify_user_role(
|
|
u.get("assignedLicenses", []), sku_map
|
|
)
|
|
# Manual override takes precedence over auto-classification
|
|
_role = _role_overrides.get(u["id"], _auto)
|
|
return {
|
|
"id": u["id"],
|
|
"displayName": _resolve_display_name(u.get("displayName", ""), _em),
|
|
"email": _em,
|
|
"isMe": is_me,
|
|
"userRole": _role,
|
|
"roleOverride": u["id"] in _role_overrides,
|
|
}
|
|
|
|
if state.connector.is_app_mode:
|
|
for u in users:
|
|
uid = u.get("id")
|
|
if uid and uid not in seen:
|
|
seen.add(uid)
|
|
out.append(_build_user(u))
|
|
else:
|
|
me = state.connector.get_user_info()
|
|
me_id = me.get("id")
|
|
for u in ([me] + users):
|
|
uid = u.get("id")
|
|
if uid and uid not in seen:
|
|
seen.add(uid)
|
|
out.append(_build_user(u, is_me=(uid == me_id)))
|
|
|
|
# Log a warning when no users were classified — helps diagnose
|
|
# tenants with SKUs not yet in m365_skus.json
|
|
classified = [u for u in out if u["userRole"] in ("student", "staff")]
|
|
if out and not classified:
|
|
unknown_skus: set = set()
|
|
for u in users[:20]: # sample first 20 to keep it brief
|
|
for lic in u.get("assignedLicenses", []):
|
|
sid = lic.get("skuId", "")
|
|
if sid:
|
|
unknown_skus.add(sid)
|
|
logger.warning(
|
|
"[role] 0/%d users classified — no SKUs in m365_skus.json matched. "
|
|
"Unrecognised SKU IDs (sample): %s. "
|
|
"Add them to classification/m365_skus.json or use /api/users/license_debug.",
|
|
len(out), sorted(unknown_skus)[:10],
|
|
)
|
|
|
|
return jsonify({
|
|
"users": out,
|
|
"sku_map_available": bool(sku_map),
|
|
"unclassified": len(out) - len(classified),
|
|
})
|
|
except Exception as e:
|
|
return jsonify({"error": str(e), "detail": traceback.format_exc()}), 500
|
|
|
|
|
|
@bp.route("/api/users/license_debug")
|
|
def license_debug():
|
|
"""Full diagnostic: runtime SKU sets, sku_map, per-user trace, and step-by-step
|
|
classification walk for every user — enough to diagnose any remaining issue."""
|
|
if not state.connector:
|
|
return jsonify({"error": "not authenticated"}), 401
|
|
try:
|
|
users = state.connector.list_users()
|
|
sku_map = state.connector.get_subscribed_skus()
|
|
try:
|
|
sku_map.update(state.connector.build_sku_map_from_users(users))
|
|
except Exception:
|
|
pass
|
|
|
|
# Per-user trace with step-by-step classification walk
|
|
out = []
|
|
for u in users[:100]:
|
|
lics = u.get("assignedLicenses", [])
|
|
role = state.connector.classify_user_role(lics, sku_map)
|
|
|
|
# Walk each licence exactly as classify_user_role does
|
|
lic_trace = []
|
|
for lic in lics:
|
|
raw_id = lic.get("skuId", "")
|
|
low_id = raw_id.lower()
|
|
name = sku_map.get(low_id) or sku_map.get(raw_id) or "?"
|
|
lic_trace.append({
|
|
"skuId": raw_id,
|
|
"skuName": name,
|
|
"in_staff": low_id in state.connector._STAFF_SKU_IDS,
|
|
"in_student": low_id in state.connector._STUDENT_SKU_IDS,
|
|
"frag_staff": next((f for f in state.connector._STAFF_SKU_FRAGMENTS
|
|
if f in name.upper()), None),
|
|
"frag_student": next((f for f in state.connector._STUDENT_SKU_FRAGMENTS
|
|
if f in name.upper()), None),
|
|
})
|
|
|
|
out.append({
|
|
"displayName": u.get("displayName", ""),
|
|
"email": u.get("mail") or u.get("userPrincipalName", ""),
|
|
"role": role,
|
|
"licences": lic_trace,
|
|
})
|
|
|
|
return jsonify({
|
|
# Runtime state — proves whether m365_skus.json loaded correctly
|
|
"runtime": {
|
|
"student_ids_count": len(state.connector._STUDENT_SKU_IDS),
|
|
"staff_ids_count": len(state.connector._STAFF_SKU_IDS),
|
|
"student_fragments": list(state.connector._STUDENT_SKU_FRAGMENTS),
|
|
"staff_fragments": list(state.connector._STAFF_SKU_FRAGMENTS),
|
|
"sku_map_entries": len(sku_map),
|
|
"sku_file_path": str(state.connector._sku_file_path()),
|
|
},
|
|
"student_ids": sorted(state.connector._STUDENT_SKU_IDS),
|
|
"staff_ids": sorted(state.connector._STAFF_SKU_IDS),
|
|
"sku_map": sku_map,
|
|
"users": out,
|
|
})
|
|
except Exception as e:
|
|
return jsonify({"error": str(e), "detail": traceback.format_exc()}), 500
|
|
|
|
|
|
@bp.route("/api/users/lookup")
|
|
def lookup_user():
|
|
"""Look up a single user by UPN or email."""
|
|
if not state.connector:
|
|
return jsonify({"error": "not authenticated"}), 401
|
|
upn = request.args.get("upn", "").strip()
|
|
if not upn:
|
|
return jsonify({"error": "upn required"}), 400
|
|
try:
|
|
data = state.connector._get(f"/users/{upn}", {"$select": "id,displayName,mail,userPrincipalName"})
|
|
_email = data.get("mail") or data.get("userPrincipalName", upn)
|
|
return jsonify({
|
|
"id": data["id"],
|
|
"displayName": _resolve_display_name(data.get("displayName", ""), _email, upn),
|
|
"email": _email,
|
|
"isMe": False,
|
|
})
|
|
except Exception as e:
|
|
return jsonify({"error": str(e)}), 404
|
|
|
|
|
|
@bp.route("/api/users/role_override", methods=["GET"])
|
|
def role_override_get():
|
|
"""Return all manual role overrides as {user_id: role}."""
|
|
return jsonify(_load_role_overrides())
|
|
|
|
|
|
@bp.route("/api/users/role_override", methods=["POST"])
|
|
def role_override_set():
|
|
"""Set or clear a manual role override for one user.
|
|
|
|
Body: {user_id, role} — role is 'student' | 'staff' | 'other' | '' (clear).
|
|
"""
|
|
data = request.get_json() or {}
|
|
uid = data.get("user_id", "").strip()
|
|
role = data.get("role", "").strip().lower()
|
|
if not uid:
|
|
return jsonify({"error": "user_id required"}), 400
|
|
if role and role not in ("student", "staff", "other"):
|
|
return jsonify({"error": "role must be student | staff | other | '' (clear)"}), 400
|
|
overrides = _load_role_overrides()
|
|
if role:
|
|
overrides[uid] = role
|
|
else:
|
|
overrides.pop(uid, None)
|
|
_save_role_overrides(overrides)
|
|
return jsonify({"ok": True, "user_id": uid, "role": role or None,
|
|
"total_overrides": len(overrides)})
|