Source code for app.routes.auth

import os
from authlib.integrations.flask_client import OAuth

from flask import Blueprint, render_template, redirect, url_for, request, flash, session, current_app
from flask_login import login_user,login_required, logout_user, current_user

from app import mongo, login_manager
from app.models.User import User
from app.models.Registration import RegistrationForm
from app.models.LogBookForm import LogbookForm, AddUsersToLogbookForm, RemoveUsersFromLogbookForm, DeleteUserForm, ManageUserLogbooksForm
from bson.objectid import ObjectId
from app.routes.main import UPLOAD_FOLDER

auth = Blueprint('auth', __name__)

# Initialize OAuth
oauth = OAuth()

[docs] def init_oauth(app): """Initialize OAuth with the Flask app""" oauth.init_app(app) if app.config.get('OIDC_ENABLED'): oauth.register( name='nikhef', client_id=app.config['OIDC_CLIENT_ID'], client_secret=app.config['OIDC_CLIENT_SECRET'], server_metadata_url=app.config['OIDC_DISCOVERY_URL'], client_kwargs={ 'scope': app.config['OIDC_SCOPES'] } )
[docs] @auth.route('/login', methods=['GET', 'POST']) def login(): """For GET requests, display the login form. For POSTS, login the current user by processing the form. """ print("login ..... ") if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') chosen_logbook = request.form.get('logbook') user = User.find_by_username(username) # check if the user is in the list of users for the logbook logbook = mongo.db.logbooks.find_one({"_id": ObjectId(chosen_logbook)}) if user and user.check_password(password) and user._id in logbook['users']: # Log in the user login_user(user) # session variable to store the logbook id session['logbook'] = str(chosen_logbook) session['logbook_name'] = logbook['name'] flash('You are successfully logged in!', 'success') return redirect(url_for('main.show_entries')) elif user and user.check_password(password) and user._id not in logbook['users']: flash('User not in logbook!', 'danger') elif user and not user.check_password(password): flash('Invalid password!', 'danger') else: flash('Invalid username!', 'danger') logbooks = list(mongo.db.logbooks.find()) oidc_enabled = current_app.config.get('OIDC_ENABLED', False) return render_template('login.html', logbooks=logbooks, oidc_enabled=oidc_enabled)
[docs] @auth.route('/login/sso') def login_sso(): """Initiate SSO login""" if not current_app.config.get('OIDC_ENABLED'): flash('SSO is not enabled!', 'danger') return redirect(url_for('auth.login')) # Store the selected logbook in session before redirecting to SSO chosen_logbook = request.args.get('logbook') if chosen_logbook: session['pending_logbook'] = chosen_logbook redirect_uri = url_for('auth.callback', _external=True) return oauth.nikhef.authorize_redirect(redirect_uri)
[docs] @auth.route('/auth/callback') def callback(): """Handle SSO callback""" if not current_app.config.get('OIDC_ENABLED'): flash('SSO is not enabled!', 'danger') return redirect(url_for('auth.login')) try: token = oauth.nikhef.authorize_access_token() # Explicitly fetch userinfo from userinfo endpoint (not from token) userinfo = oauth.nikhef.userinfo(token=token) # Extract user information sso_id = userinfo.get('sub') # Extract email - prefer direct email claim email = userinfo.get('email') if not email: # Try eduperson_principal_name (might be a list) eppn = userinfo.get('eduperson_principal_name') if eppn: email = eppn[0] if isinstance(eppn, list) else eppn else: # Fallback to preferred_username preferred = userinfo.get('preferred_username') if preferred: email = preferred[0] if isinstance(preferred, list) else preferred else: email = sso_id name = userinfo.get('name', userinfo.get('displayName', userinfo.get('cn', userinfo.get('givenName', email)))) # Debug: log what we received print(f"SSO Login - userinfo: {userinfo}") print(f"SSO Login - sso_id: {sso_id}, email: {email}, name: {name}") print(f"SSO Login - Available claims: {list(userinfo.keys())}") # Check group membership group_claim = current_app.config.get('OIDC_GROUP_CLAIM', 'groups') user_groups = userinfo.get(group_claim, []) allowed_groups = current_app.config.get('OIDC_ALLOWED_GROUPS', []) # Debug: log group information print(f"SSO Login - Group claim config: '{group_claim}'") print(f"SSO Login - User groups from '{group_claim}': {user_groups}") print(f"SSO Login - Allowed groups config: {allowed_groups}") # Check if there are any group-related claims in userinfo group_related_claims = {k: v for k, v in userinfo.items() if 'group' in k.lower() or 'entitlement' in k.lower() or 'affiliation' in k.lower()} print(f"SSO Login - Group-related claims found: {group_related_claims}") # If allowed_groups is configured, check membership if allowed_groups: has_access = any(group in allowed_groups for group in user_groups) if not has_access: flash(f'Access denied! You must be a member of one of these groups: {", ".join(allowed_groups)}', 'danger') return redirect(url_for('auth.login')) # Find or create user user = User.find_by_sso_id(sso_id) if not user: # Create new SSO user with default logbooks default_logbooks = current_app.config.get('DEFAULT_LOGBOOKS', ['xams']) user = User( username=email, email=email, auth_method='sso', sso_id=sso_id, sso_name=name, is_admin=False, allowed_logbooks=default_logbooks ) user.save() flash(f'Welcome! Your account has been created with access to: {", ".join(default_logbooks)}', 'success') else: # Update user info user.sso_name = name user.email = email user.update() # Handle logbook selection pending_logbook = session.pop('pending_logbook', None) if pending_logbook: logbook = mongo.db.logbooks.find_one({"_id": ObjectId(pending_logbook)}) if logbook: # Check if user has access to this logbook if user.has_logbook_access(logbook['name']): session['logbook'] = str(pending_logbook) session['logbook_name'] = logbook['name'] login_user(user) flash('You are successfully logged in via SSO!', 'success') return redirect(url_for('main.show_entries')) else: flash(f'You do not have access to the {logbook["name"]} logbook. Please contact an administrator.', 'danger') return redirect(url_for('auth.login')) # If no logbook selected, try to use first allowed logbook if user.allowed_logbooks: first_logbook_name = user.allowed_logbooks[0] first_logbook = mongo.db.logbooks.find_one({"name": first_logbook_name}) if first_logbook: session['logbook'] = str(first_logbook['_id']) session['logbook_name'] = first_logbook['name'] login_user(user) flash('You are successfully logged in via SSO!', 'success') return redirect(url_for('main.show_entries')) # No logbook access flash('You do not have access to any logbooks. Please contact an administrator.', 'danger') return redirect(url_for('auth.login')) except Exception as e: print(f"SSO login error: {str(e)}") flash(f'SSO login failed: {str(e)}', 'danger') return redirect(url_for('auth.login'))
[docs] @auth.route('/logout') @login_required def logout(): """Logout the current user.""" session.clear() logout_user() return redirect(url_for('auth.login'))
[docs] @login_manager.user_loader def load_user(user_id): """Check if user is logged-in on every page load.""" if not user_id or user_id == 'None': return None try: user_data = mongo.db.users.find_one({"_id": ObjectId(user_id)}) if user_data: user_data['password'] = user_data.pop('password', None) # Rename the key return User(**user_data) except Exception as e: print(f"Error loading user {user_id}: {e}") return None return None
[docs] @auth.route('/') def index(): """Redirect to login page.""" return redirect(url_for('main.show_entries'))
[docs] @auth.route('/admin', methods=['GET', 'POST']) @login_required def admin_page(): """Admin page. - Create logbooks - Add users to logbooks - Register new users - Manage SSO user logbook permissions Accessible only to admin users. """ if not current_user.is_admin: flash('Access denied!', 'danger') return redirect(url_for('main.show_entries')) logbook_form = LogbookForm() user_form = AddUsersToLogbookForm() delete_user_form = DeleteUserForm() registration_form = RegistrationForm() manage_logbooks_form = ManageUserLogbooksForm() # Dynamically set choices for the logbook dropdown logbooks = mongo.db.logbooks.find() # New code to populate users users = mongo.db.users.find() # i want to select the users from the database that are currently not in the list of users for the logbook # users = mongo.db.users.find({"_id": {"$nin": logbook['users']}}) # and user_form.logbook_select.choices = [(str(logbook['_id']), logbook['name']) for logbook in logbooks] user_form.user_select.choices = [(str(user['_id']), user['username']) for user in users] # user_form.logbook_select.choices = [(str(logbook['_id']), logbook['name']) for logbook in logbooks] # Set choices for delete user form - exclude admin users all_users = mongo.db.users.find({"is_admin": {"$ne": True}}) delete_user_form.user_select.choices = [('', 'Select User')] + [(str(user['_id']), user['username']) for user in all_users] # Set choices for manage logbooks form all_logbooks = list(mongo.db.logbooks.find()) all_users_for_mgmt = list(mongo.db.users.find()) manage_logbooks_form.user_select.choices = [('', 'Select User')] + [(str(user['_id']), f"{user['username']} ({'SSO' if user.get('auth_method') == 'sso' else 'Local'})") for user in all_users_for_mgmt] manage_logbooks_form.logbook_access.choices = [(logbook['name'], logbook['name']) for logbook in all_logbooks] # Handle logbook creation if 'create_logbook' in request.form: if logbook_form.validate_on_submit(): # Split the allowed keywords by comma, but if there are no elements in the list, set it to an empty list keywords = [keyword.strip() for keyword in logbook_form.allowed_keywords.data.split(',')] if logbook_form.allowed_keywords.data else [] #keywords = [keyword.strip() for keyword in logbook_form.allowed_keywords.data.split(',')] mongo.db.logbooks.insert_one({"name": logbook_form.logbook_name.data, "allowed_keywords": keywords, "users": []}) # create subdirectory in the UPLOAD_FOLDER for the logbook. but firs check if the directpry already exists if not os.path.exists(os.path.join(UPLOAD_FOLDER, logbook_form.logbook_name.data)): os.mkdir(os.path.join(UPLOAD_FOLDER, logbook_form.logbook_name.data)) flash('Logbook created successfully!', 'success') return redirect(url_for('auth.admin_page')) # Handle adding user to logbook if 'add_user_to_logbook' in request.form: if user_form.validate_on_submit(): # user = mongo.db.users.find_one({"username": user_form.username.data}) user = mongo.db.users.find_one({"_id": ObjectId(user_form.user_select.data)}) if user: # Fetch the logbook based on the selected logbook's ID logbook = mongo.db.logbooks.find_one({"_id": ObjectId(user_form.logbook_select.data)}) if logbook: # Check if the user's ID exists within the users array of the logbook if user['_id'] in logbook['users']: flash('User already in logbook!', 'danger') return redirect(url_for('auth.admin_page')) else: mongo.db.logbooks.update_one({"_id": ObjectId(user_form.logbook_select.data)}, {"$push": {"users": user['_id']}}) flash('User added to logbook!', 'success') else: flash('Logbook not found!', 'danger') else: flash('User not found!', 'danger') # Handle user registration if 'register_user' in request.form: if registration_form.validate_on_submit(): hashed_password = User.set_password(registration_form.password.data) new_user = User(username=registration_form.username.data, email=registration_form.email.data, password=hashed_password) new_user.save() flash('New user registered successfully!', 'success') return redirect(url_for('auth.admin_page')) elif request.method == 'POST': # Check if the form was submitted for field, errors in registration_form.errors.items(): for error in errors: flash(f"Error in the {getattr(registration_form, field).label.text} field - {error}", 'danger') # Handle deleting user completely if 'delete_user' in request.form: if delete_user_form.validate_on_submit(): user_id = ObjectId(delete_user_form.user_select.data) user = mongo.db.users.find_one({"_id": user_id}) if user: # Prevent deletion of admin users if user.get('is_admin', False): flash('Cannot delete admin users!', 'danger') return redirect(url_for('auth.admin_page')) # Remove user from all logbooks mongo.db.logbooks.update_many( {"users": user_id}, {"$pull": {"users": user_id}} ) # Delete the user mongo.db.users.delete_one({"_id": user_id}) flash(f"User '{user['username']}' deleted successfully from all logbooks!", 'success') return redirect(url_for('auth.admin_page')) else: flash('User not found!', 'danger') # Handle managing user logbook permissions if 'manage_user_logbooks' in request.form: if manage_logbooks_form.validate_on_submit(): user_id = ObjectId(manage_logbooks_form.user_select.data) user = mongo.db.users.find_one({"_id": user_id}) if user: # Get selected logbooks selected_logbooks = manage_logbooks_form.logbook_access.data # Update user's allowed_logbooks mongo.db.users.update_one( {"_id": user_id}, {"$set": {"allowed_logbooks": selected_logbooks}} ) flash(f"Logbook permissions updated for user '{user['username']}'!", 'success') return redirect(url_for('auth.admin_page')) else: flash('User not found!', 'danger') return render_template('admin.html', logbook_form=logbook_form, user_form=user_form, delete_user_form=delete_user_form, registration_form=registration_form, manage_logbooks_form=manage_logbooks_form)
from flask import jsonify
[docs] @auth.route('/get-available-users/<logbook_id>') def get_available_users(logbook_id): # Get the current users in the logbook logbook = mongo.db.logbooks.find_one({"_id": ObjectId(logbook_id)}) current_users = logbook['users'] # Fetch users not in the logbook available_users = mongo.db.users.find({'_id': {'$nin': current_users}}) # Convert users to list of dicts to send as JSON user_list = [{'id': str(user['_id']), 'name': user['username']} for user in available_users] return jsonify(users=user_list)
[docs] @auth.route('/get-logbook-users/<logbook_id>') def get_logbook_users(logbook_id): # Get the current users in the logbook logbook = mongo.db.logbooks.find_one({"_id": ObjectId(logbook_id)}) if not logbook: return jsonify(users=[]) current_user_ids = logbook.get('users', []) # Fetch users in the logbook logbook_users = mongo.db.users.find({'_id': {'$in': current_user_ids}}) # Convert users to list of dicts to send as JSON user_list = [{'id': str(user['_id']), 'name': user['username']} for user in logbook_users] return jsonify(users=user_list)
[docs] @auth.route('/get-user-logbooks/<user_id>') def get_user_logbooks(user_id): """Get the logbooks a user has access to""" user = mongo.db.users.find_one({"_id": ObjectId(user_id)}) if not user: return jsonify(logbooks=[]) allowed_logbooks = user.get('allowed_logbooks', []) return jsonify(logbooks=allowed_logbooks)