Source code for mdvtools.auth.shibboleth_provider
from flask import session, redirect, request, jsonify
from mdvtools.dbutils.dbservice import UserService
from mdvtools.auth.authutils import update_cache
from mdvtools.auth.auth_provider import AuthProvider
from typing import Optional
from flask.typing import ResponseReturnValue
import logging
[docs]
logger = logging.getLogger(__name__)
[docs]
class ShibbolethProvider(AuthProvider):
def __init__(self, app):
[docs]
self.logout_url = app.config.get("SHIBBOLETH_LOGOUT_URL")
[docs]
self.login_url = app.config.get("SHIBBOLETH_LOGIN_URL")
[docs]
def login(self) -> ResponseReturnValue:
try:
# Extract headers that Shibboleth sets after login
email = request.headers.get("X-Forwarded-User")
persistent_id = request.headers.get("Shibboleth-Persistent-Id")
session["auth_method"] = "shibboleth"
session.modified = True
if not email or not persistent_id:
session.clear()
if self.login_url:
logger.info("Redirecting to Shibboleth login page...")
return redirect(self.login_url)
else:
logger.error("Shibboleth login URL not configured.")
return jsonify({"error": "Shibboleth login URL not configured."}), 500
# Provision user in DB
user = UserService.add_or_update_user(
email=email,
auth_id=persistent_id,
institution="University of Oxford"
)
# Cache user data
user_data = {
"id": user.id,
"auth_id": user.auth_id,
"email": user.email,
"is_admin": user.is_admin
}
update_cache(user_id=user.id, user_data=user_data)
logger.info(f"SSO login successful: {email}")
return redirect("/")
except Exception as e:
session.clear()
logger.exception(f"Error during Shibboleth login: {e}")
return jsonify({"error": "Shibboleth login failed."}), 500
[docs]
def logout(self):
session.clear()
if self.logout_url:
return redirect(self.logout_url)
else:
return jsonify({"error": "Shibboleth logout URL not configured."}), 500
[docs]
def get_user(self, token: Optional[dict] = None) -> Optional[dict]:
eppn = request.headers.get('Shibboleth-Eppn')
if eppn:
return {
"sub": eppn,
"first_name": "Unknown",
"last_name": "Unknown",
"email": eppn,
"association": "University of Oxford",
"avatarUrl": ""
}
return None
[docs]
def get_token(self):
return None # Shibboleth uses headers
[docs]
def handle_callback(self):
return None # Not applicable
[docs]
def validate_user(self):
"""Validate user using Shibboleth."""
from mdvtools.dbutils.dbmodels import User
try:
# Check if the user information is already cached in the session
if 'user' in session:
return session['user'], None # Return the user from session cache
email = request.headers.get("X-Forwarded-User")
persistent_id = request.headers.get("Shibboleth-Persistent-Id")
if not email or not persistent_id:
return None, (jsonify({"error": "Missing SSO authentication headers"}), 401)
# Look up user from database (or in-memory cache if applicable)
user = User.query.filter_by(auth_id=persistent_id).first()
if not user:
return None, (jsonify({"error": "SSO user not found"}), 403)
# Add the user to the in-memory cache
user_data = {"id": user.id, "auth_id": user.auth_id, "email": user.email, "is_admin": user.is_admin}
# Cache the user data in session for future use
session['user'] = user_data
session.modified = True
return user_data, None
except Exception as e:
logger.exception(f"validate_sso_user error: {e}")
return None, (jsonify({"error": "Internal server error - user not validated"}), 500)
[docs]
def sync_users_to_db(self) -> None:
"""
Shibboleth doesn't have a centralized user store to sync from like Auth0.
Users are provisioned individually upon login.
"""
logger.info(
"Shibboleth users are provisioned individually upon login, "
"no batch sync performed."
)