Source code for mdvtools.auth.authutils

from flask import session, request,redirect, current_app, has_request_context
from mdvtools.logging_config import get_logger

# Setup logging
[docs] logger = get_logger(__name__)
# in_memory_cache.py
[docs] user_cache = {} # key: auth_id -> user details
[docs] user_project_cache = {} # key: user_id -> project permissions
[docs] all_users_cache = [] # list of all user summaries
[docs] active_projects_cache = []
# Cache metadata
[docs] cache_last_updated = None
[docs] CACHE_REFRESH_INTERVAL = 300 # 5 minutes
[docs] def create_auth_provider(auth_method: str, app): """Factory function to create an authentication provider.""" auth_method = auth_method.lower() if auth_method == "auth0": from mdvtools.auth.auth0_provider import Auth0Provider from mdvtools.dbutils.mdv_server_app import oauth return Auth0Provider( app, oauth=oauth, client_id=app.config['AUTH0_CLIENT_ID'], client_secret=app.config['AUTH0_CLIENT_SECRET'], domain=app.config['AUTH0_DOMAIN'] ) elif auth_method == "dummy": from mdvtools.auth.dummy_provider import DummyAuthProvider return DummyAuthProvider(app) elif auth_method == "shibboleth": from mdvtools.auth.shibboleth_provider import ShibbolethProvider return ShibbolethProvider(app) else: raise ValueError(f"Unsupported auth method: {auth_method}")
[docs] def get_auth_provider(): """ Determines the correct authentication method and returns an instance of the corresponding auth provider. The resolution order is: 1. If DEFAULT_AUTH_METHOD in the app config is explicitly set to 'dummy', the DummyAuthProvider is ALWAYS used. This is a developer override for safe local testing. 2. If 'auth_method' is present in the session, that method is used. 3. Otherwise, the value from the required DEFAULT_AUTH_METHOD app configuration is used. Raises: ValueError: If DEFAULT_AUTH_METHOD is not configured in the application. """ # Fail loudly if the default auth method is not explicitly configured. # This prevents silently falling back to an insecure 'dummy' mode. try: default_method = current_app.config["DEFAULT_AUTH_METHOD"] except KeyError: raise ValueError("Security risk: DEFAULT_AUTH_METHOD must be explicitly configured.") # 1. Check for the developer override. if default_method.lower() == 'dummy': return create_auth_provider('dummy', current_app) # 2. Check for a method specified in the session. auth_method = None if has_request_context(): auth_method = session.get("auth_method") # 3. Fall back to the configured default method. if not auth_method: auth_method = default_method return create_auth_provider(auth_method, current_app)
[docs] def is_authenticated(): """Validate the current user via Auth0 or Shibboleth.""" ENABLE_AUTH = current_app.config.get("ENABLE_AUTH", False) if not ENABLE_AUTH: return True # Quick check: if user is already in session, they're authenticated if 'user' in session: return True try: provider = get_auth_provider() except Exception as e: current_app.logger.error(f"Failed to get auth provider: {e}") return False user, error_response = provider.validate_user() return user is not None and error_response is None
[docs] def register_before_request_auth(app): """Attach the before_request auth logic to the Flask app.""" whitelist_routes = [ '/login_dev', '/login_sso', '/login', '/callback', '/favicon.ico', '/flask/js/', '/static', '/flask/assets', '/flask/img' ] @app.before_request def enforce_authentication(): ENABLE_AUTH = app.config.get("ENABLE_AUTH", False) if not ENABLE_AUTH: return None requested_path = request.path if any(requested_path.startswith(route) for route in whitelist_routes): return None if not is_authenticated(): redirect_uri = app.config.get("LOGIN_REDIRECT_URL", "/login_dev") logger.info(f"Unauthorized access to {requested_path}. Redirecting.") return redirect(redirect_uri) return None
[docs] def needs_cache_refresh(): """Check if cache needs to be refreshed based on time interval.""" import time global cache_last_updated if cache_last_updated is None: return True return time.time() - cache_last_updated > CACHE_REFRESH_INTERVAL
[docs] def cache_user_projects(): """ Caches user details and their associated project permissions in memory. """ global cache_last_updated import time from mdvtools.dbutils.dbmodels import User, UserProject from mdvtools.dbutils.dbservice import ProjectService try: logger.info("Caching user details and project permissions...") users = User.query.all() all_users_cache.clear() user_cache.clear() user_project_cache.clear() active_projects_cache.clear() for user in users: user_data = { "id": user.id, "auth_id": user.auth_id, "email": user.email, "is_admin": user.is_admin } user_cache[user.auth_id] = user_data all_users_cache.append(user_data) user_projects = UserProject.query.filter_by(user_id=user.id).all() project_permissions = { up.project_id: { "can_read": up.can_read, "can_write": up.can_write, "is_owner": up.is_owner } for up in user_projects } user_project_cache[user.id] = project_permissions active_projects = ProjectService.get_active_projects() active_projects_cache[:] = active_projects # Update cache timestamp cache_last_updated = time.time() logger.info("Cached users and their project permissions in memory.") return True except Exception as e: logger.exception(f"Error caching user projects: {e}") return False
[docs] def update_cache(user_id=None, project_id=None, user_data=None, project_data=None, permissions=None): """ Updates the in-memory caches (user_cache, user_project_cache, all_users_cache, active_projects_cache) for the provided user and project details, including any changes in permissions. :param user_id: The ID of the user whose cache needs to be updated. :param project_id: The ID of the project to be added or updated in the cache. :param user_data: A dictionary containing the user's details (if updating the user cache). :param project_data: A dictionary containing the project's details (if updating the project cache). :param permissions: A dictionary containing the user's permissions for a project (optional). """ try: # Step 1: Update user_cache (user details) if user_data and user_id: user_cache[user_id] = user_data # Update or add the user in the cache # Ensure the user is in all_users_cache if they are not already if user_data not in all_users_cache: all_users_cache.append(user_data) # Step 2: Update user_project_cache (user-project relationship) if user_id and project_id and permissions is not None: # Create or update user project permissions in cache if user_id not in user_project_cache: user_project_cache[user_id] = {} user_project_cache[user_id][project_id] = permissions # Update or add project permissions # Step 3: Update active_projects_cache (active project details) if project_data and project_id: # Search for the project in the cache existing_project = next((p for p in active_projects_cache if p["id"] == project_id), None) if existing_project: # Project exists in cache, update only the changed fields existing_project["name"] = project_data.get("name", existing_project["name"]) existing_project["lastModified"] = project_data.get("lastModified", existing_project["lastModified"]) existing_project["thumbnail"] = project_data.get("thumbnail", existing_project["thumbnail"]) logger.info(f"Updated project {project_id} in active projects cache.") else: # Project does not exist in cache, append a new entry project_entry = { "id": project_data["id"], "name": project_data["name"], "lastModified": project_data["lastModified"], "thumbnail": project_data["thumbnail"] } active_projects_cache.append(project_entry) logger.info(f"Added new project {project_id} to active projects cache.") logger.info("Cache successfully updated.") except Exception as e: logger.exception(f"Error updating cache: {e}")