Source code for app.routes.main

import os
import uuid
from functools import wraps

from flask import Blueprint, render_template, request, redirect, url_for, current_app, jsonify, session, flash
from flask_login import login_required, current_user
from werkzeug.utils import secure_filename

from datetime import datetime, timedelta
from collections import defaultdict
import pytz

from app import mongo
import html, re

from pathlib import Path

main = Blueprint('main', __name__)

# Define the project home 
project_home = Path(__file__).parent.parent
UPLOAD_FOLDER = project_home / 'static' / 'upload'
print(f"Upload folder ....: {UPLOAD_FOLDER}")  # Debug: print the upload folder path
# Your file upload and logbook entry handling goes here:
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'bmp', 'pdf', 'xls', 'xlsx'}

[docs] def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
[docs] def require_logbook_access(f): """Decorator to check if user has access to the logbook in session""" @wraps(f) def decorated_function(*args, **kwargs): if 'logbook_name' not in session: flash('No logbook selected!', 'danger') return redirect(url_for('auth.login')) logbook_name = session['logbook_name'] # Check if user has access if not current_user.has_logbook_access(logbook_name): flash(f'You do not have access to the {logbook_name} logbook!', 'danger') return redirect(url_for('auth.logout')) return f(*args, **kwargs) return decorated_function
[docs] @main.route('/add-entry', methods=['GET']) @login_required @require_logbook_access def add_entry_form(): """Show the add entry form. Returns: HTML page -- The add entry form. """ print("add_entry_form") return render_template('add_entry.html')
[docs] @main.route('/dbactivity.html') @login_required @require_logbook_access def db_activity(): """ Renders the 'dbactivity.html' template. Returns: Response: A Flask response object that renders the 'dbactivity.html' template. """ return render_template('dbactivity.html')
[docs] @main.route('/timeline') @login_required def timeline(): """Get the timeline data. Returns: JSON -- The timeline data. """ entries = mongo.db.entries.find() data = defaultdict(int) for entry in entries: timestamp = entry['timestamp'] date = timestamp.date() data[date] += 1 dates = sorted(data.keys()) counts = [data[date] for date in dates] return jsonify(dates=dates, counts=counts)
[docs] @main.route('/save_image') @login_required def save_image(image): """Save an image to the filesystem. Args: image (FileStorage): The image to save. Returns: str: The filename of the saved image. """ # get the name of the logbook from the session logbook_id = ObjectId(session['logbook']) logbook = mongo.db.logbooks.find_one({"_id": logbook_id}) if image and allowed_file(image.filename): # Generate a unique filename using UUID and a timestamp unique_filename = f"{uuid.uuid4()}_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}_{secure_filename(image.filename)}" print('filename =',unique_filename) print('logbook name = ', logbook['name']) full_path = os.path.join(UPLOAD_FOLDER, logbook['name'], unique_filename) find_path = os.path.join(logbook['name'], unique_filename) # full_path = os.path.join(logbook['name'], unique_filename) try: image.save(full_path) return find_path except Exception as e: print(f"Error saving image: {e}") return None else: print("Image is not allowed or no image received") return None
[docs] @main.route('/add-entry', methods=['POST']) @login_required @require_logbook_access def handle_entry(): """Handle the add entry form. Returns: HTML page -- The add entry form. """ # Extract data from form print("handle_entry") text = request.form['text'] keywords = request.form.getlist('keywordSelect[]') # Handle image upload print('static folder = ', current_app.static_folder ) image_filenames = [] if 'image' in request.files: images = request.files.getlist('image') # Get list of uploaded images image_filenames = [save_image(image) for image in images if image] image_filenames = [filename for filename in image_filenames if filename] # Filter out None values # Store the data in MongoDB entry = { "timestamp": datetime.utcnow(), "text": text, "keywords": keywords, "images": image_filenames, # store the path or filename to the uploaded image "user": current_user.username, # this should come from your user management system "logbook": ObjectId(session['logbook']) # retrieve logbook id/name from the session, default to None if not set } # Insert the entry into the database mongo.db.entries.insert_one(entry) # return redirect(url_for('main.add_entry_form')) return redirect(url_for('main.show_entries'))
[docs] @main.route('/add_images', methods=['POST']) @login_required def add_images(): """ Handles the addition of images to an existing entry in the database. This function performs the following steps: 1. Retrieves the entry ID from the form data. 2. Checks if the entry exists in the database. 3. Fetches the associated logbook document using the entry's logbook ID. 4. Handles the image upload process, saving the images and collecting their filenames. 5. Updates the entry in the database with the new image filenames. 6. Redirects to the entries display page, with appropriate error messages if necessary. Returns: A redirect response to the entries display page, with error messages if the entry or logbook is not found. """ # Retrieve entry_id from form entry_id = request.form['entry_id'] # Check if entry exists entry = mongo.db.entries.find_one({"_id": ObjectId(entry_id)}) if not entry: # Handle the case where the entry does not exist. Maybe redirect with an error message. return redirect(url_for('main.show_entries', error="Entry not found")) # Fetch the logbook document using the ObjectId from the entry logbook = mongo.db.logbooks.find_one({"_id": ObjectId(entry['logbook'])}) # If for some reason the logbook is not found, handle it appropriately. if not logbook: return redirect(url_for('main.show_entries', error="Logbook not found")) # Handle image upload image_filenames = [] if 'newImage' in request.files: images = request.files.getlist('newImage') # Get list of uploaded images image_filenames = [save_image(image) for image in images if image] image_filenames = [filename for filename in image_filenames if filename] # Filter out None values # Update the entry in MongoDB with new images if image_filenames: mongo.db.entries.update_one( {"_id": ObjectId(entry_id)}, {"$push": {"images": {"$each": image_filenames}}} ) return redirect(url_for('main.show_entries'))
from math import ceil
[docs] @main.route('/entries') @login_required @require_logbook_access def show_entries(): """Show the logbook entries. Returns: HTML page -- The logbook entries page. """ logbook_id = ObjectId(session['logbook']) # retrieve logbook id/name from the session, default to None if not set # Extract query parameters search_term = request.args.get('search_term', '') keyword_filter = request.args.get('keyword_filter', None) start_date_str = request.args.get('start_date', None) end_date_str = request.args.get('end_date', None) query = {} # always filter by logbook query["logbook"] = logbook_id if search_term: query["text"] = {"$regex": search_term, "$options": "i"} if keyword_filter: query["keywords"] = keyword_filter if start_date_str and end_date_str: start_date = datetime.strptime(start_date_str, '%Y-%m-%d') end_date = datetime.strptime(end_date_str, '%Y-%m-%d') + timedelta(days=1) # Adding 1 day to include the end date query["timestamp"] = {"$gte": start_date, "$lte": end_date} per_page = 10 page_number = int(request.args.get('page', 1)) skip_entries = (page_number - 1) * per_page entries_cursor = mongo.db.entries.find(query).sort("timestamp", -1).skip(skip_entries).limit(per_page) # Convert UTC timestamp to Amsterdam local time amsterdam_tz = pytz.timezone('Europe/Amsterdam') entries = [] for entry in entries_cursor: entry['timestamp'] = entry['timestamp'].replace(tzinfo=pytz.utc).astimezone(amsterdam_tz) entries.append(entry) # Calculate the total number of entries matching the query if query: total_entries = mongo.db.entries.count_documents(query) else: total_entries = mongo.db.entries.count_documents({}) total_pages = ceil(total_entries / per_page) return render_template('show_entries.html', entries=entries, page_number=page_number, total_pages=total_pages, search_term=search_term, keyword_filter=keyword_filter)
[docs] @main.route('/test-mongo') @login_required def test_mongo(): """Test the connection to MongoDB. Returns: str: Success or failure message. """ print(current_app.config['MONGO_URI']) print("MongoDB:", mongo.db) return "Check the logs!"
[docs] @main.route('/add-keyword', methods=['POST']) @login_required def add_keyword(): """Add a new keyword to the allowed list in MongoDB. Returns: str: Success or failure message. """ logbook_id = ObjectId(session['logbook']) # retrieve logbook id/name from the session logbook = mongo.db.logbooks.find_one({"_id": logbook_id}) keyword = request.form.get('keyword') # Get the keyword from the form data if not keyword: return "Keyword not provided", 400 # Add the new keyword to the list of allowed keywords of the specific logbook in the database result = mongo.db.logbooks.update_one({"_id": logbook_id}, {"$addToSet": {"allowed_keywords": keyword}}) if result.modified_count: return f"Keyword '{keyword}' added successfully to {logbook['name']} logbook!", 200 else: return f"Keyword '{keyword}' already exists in {logbook['name']} logbook or there was an error!", 400
[docs] @main.route('/remove-keyword', methods=['POST']) @login_required def remove_keyword(): """Remove a keyword from the allowed list in MongoDB. Returns: str: Success or failure message. """ logbook_id = ObjectId(session['logbook']) # retrieve logbook id/name from the session logbook = mongo.db.logbooks.find_one({"_id": logbook_id}) keyword = request.form.get('keyword') # Get the keyword from the form data if not keyword: return "Keyword not provided", 400 # Remove the keyword from the list of allowed keywords of the specific logbook in the database result = mongo.db.logbooks.update_one({"_id": logbook_id}, {"$pull": {"allowed_keywords": keyword}}) if result.modified_count: return f"Keyword '{keyword}' removed successfully from {logbook['name']} logbook!", 200 else: return f"Keyword '{keyword}' doesn't exist in {logbook['name']} logbook or there was an error!", 400
[docs] @main.route('/keywords') @login_required @require_logbook_access def show_keywords(): """Show the list of allowed keywords. Returns: HTML page -- The keywords page. """ logbook_id = ObjectId(session['logbook']) # retrieve logbook id/name from the session logbook = mongo.db.logbooks.find_one({"_id": logbook_id}) # Fetch keywords from logbook or default to an empty list if not present keywords = logbook.get('allowed_keywords', []) if logbook else [] # Sort the keywords alphabetically keywords = sorted(keywords, key=lambda s: s.lower()) # Render the keywords page return render_template('keywords.html', keywords=keywords)
[docs] @main.route('/get-keywords') @login_required def get_keywords(): """Get the list of allowed keywords. Returns: JSON -- The list of allowed keywords. """ logbook_id = ObjectId(session['logbook']) # retrieve logbook id/name from the session, default to None if not set logbook = mongo.db.logbooks.find_one({"_id": logbook_id}) if 'allowed_keywords' in logbook and logbook['allowed_keywords']: keyword_data = logbook['allowed_keywords'] return jsonify(keywords=sorted(keyword_data)) else: return jsonify(keywords=[])
[docs] @main.route('/calendar') @login_required def calendar_view(): """ Renders the calendar view template with the current date. This function retrieves the current date, formats it as 'YYYY-MM-DD', and passes it to the 'show_calendar.html' template for rendering. Returns: str: The rendered HTML content for the calendar view. """ today = datetime.today().strftime('%Y-%m-%d') return render_template('show_calendar.html', today=today)
[docs] @main.route('/get_calendar_events') @login_required def get_calendar_events(): """Get the calendar events. Returns: JSON -- The calendar events. """ logbook_id = ObjectId(session['logbook']) # retrieve logbook id/name from the session, default to None if not set start_date_str = request.args.get('start', None) end_date_str = request.args.get('end', None) datetime_format = '%Y-%m-%dT%H:%M:%S%z' start_date = datetime.strptime(start_date_str, datetime_format) if start_date_str else None end_date = datetime.strptime(end_date_str, datetime_format) if end_date_str else None # Fetching entries from MongoDB based on start and end dates if provided # shall we also include the logbook id in the query? query = {"logbook": logbook_id} if start_date and end_date: end_date += timedelta(days=1) query["timestamp"] = {"$gte": start_date, "$lt": end_date} entries_cursor = mongo.db.entries.find(query) # entries_cursor = mongo.db.entries.find({"timestamp": {"$gte": start_date, "$lt": end_date}}) else: entries_cursor = mongo.db.entries.find(query) # Counting entries per day count_per_day = defaultdict(int) events_data_per_day = defaultdict(list) for entry in entries_cursor: date_str = entry['timestamp'].strftime('%Y-%m-%d') count_per_day[date_str] += 1 event_data = { 'title': entry.get('text', 'No Title'), 'description': entry.get('description', ''), } events_data_per_day[date_str].append(event_data) # Creating events with title and description events = [] for date, count in count_per_day.items(): title = "+{}".format(count) if count > 3 else "\n".join(event['title'] for event in events_data_per_day[date]) title = html.unescape(title) # Convert HTML entities to their actual characters title = re.sub('<[^<]+?>', '', title) # Remove HTML tags title = title.split('\n')[0] title = title[:20] + '...' if len(title) > 20 else title # Truncate the title to 20 characters event = { 'title': title, 'start': date, # Since the events are all-day, we only need the date part. 'allDay': True } events.append(event) return jsonify(events)
from bson import ObjectId # Importing ObjectId from bson
[docs] @main.route('/update_entry/<string:entry_id>', methods=['POST']) @login_required @require_logbook_access def update_entry(entry_id): """ Update the text of an existing entry in the database. Args: entry_id (str): The ID of the entry to be updated. Returns: Response: A JSON response indicating the success or failure of the update operation. - If the entry ID is invalid, returns a 400 status with an error message. - If the entry is not found, returns a 404 status with an error message. - If the update is successful, returns a 200 status with a success message. - If the update fails, returns a 400 status with an error message. """ data = request.json updated_text = data.get('updated_text') if not ObjectId.is_valid(entry_id): return jsonify(success=False, error="Invalid Entry ID"), 400 entry = mongo.db.entries.find_one({"_id": ObjectId(entry_id)}) if not entry: return jsonify(success=False, error="Entry not found"), 404 print(f"Updating entry with ID: {entry_id}" ) result = mongo.db.entries.update_one({"_id": ObjectId(entry_id)}, {"$set": {"text": updated_text}}) if result.modified_count > 0: return jsonify(success=True), 200 else: return jsonify(success=False, error="Update Failed"), 400
# below is the code for updating keywords
[docs] @main.route('/update-entry-keywords/<string:entry_id>', methods=['POST']) @login_required def update_entry_keywords(entry_id): """ Update the keywords of a specific entry in the database. Args: entry_id (str): The ID of the entry to update. Returns: Response: A JSON response indicating success or failure. - If the entry ID is invalid, returns a JSON response with success=False and an error message, with a 400 status code. - If the update is successful, returns a JSON response with success=True and a 200 status code. - If the update fails, returns a JSON response with success=False and an error message, with a 400 status code. """ if not ObjectId.is_valid(entry_id): return jsonify(success=False, error="Invalid Entry ID"), 400 data = request.json keywords = data.get('keywords', []) result = mongo.db.entries.update_one({"_id": ObjectId(entry_id)}, {"$set": {"keywords": keywords}}) if result.modified_count > 0: return jsonify(success=True), 200 else: return jsonify(success=False, error="Update Failed"), 400
[docs] @main.route('/get-entry-keywords/<string:entry_id>', methods=['GET']) @login_required def get_entry_keywords(entry_id): """ Retrieve the keywords associated with a specific entry. Args: entry_id (str): The ID of the entry to retrieve keywords for. Returns: Response: A JSON response containing the keywords if the entry is found, or an error message if the entry ID is invalid or the entry is not found. The response status code is 200 on success, 400 if the entry ID is invalid, and 404 if the entry is not found. """ if not ObjectId.is_valid(entry_id): return jsonify(success=False, error="Invalid Entry ID"), 400 entry = mongo.db.entries.find_one({"_id": ObjectId(entry_id)}) if not entry: return jsonify(success=False, error="Entry not found"), 404 print(entry.get('keywords', [])) return jsonify(keywords=entry.get('keywords', []))