Files
dev-manufaktur/Dashboard_V2/app.py
2025-02-19 14:23:44 +01:00

349 lines
13 KiB
Python

import sqlite3
from flask import Flask, render_template, request, redirect, url_for, session, g, flash
from datetime import datetime
from werkzeug.security import generate_password_hash, check_password_hash
app = Flask(__name__)
app.secret_key = "SUPER_SECRET_KEY" # Bitte anpassen
DATABASE = 'clickcandit.db'
def get_db():
db = getattr(g, '_database', None)
if db is None:
db = g._database = sqlite3.connect(DATABASE)
db.row_factory = sqlite3.Row
return db
@app.teardown_appcontext
def close_connection(exception):
db = getattr(g, '_database', None)
if db is not None:
db.close()
def init_db():
with app.app_context():
db = get_db()
# Users
db.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password TEXT NOT NULL,
email TEXT,
is_admin INTEGER DEFAULT 0
);
""")
# Bookmarks
db.execute("""
CREATE TABLE IF NOT EXISTS bookmarks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
title TEXT NOT NULL,
url TEXT NOT NULL,
icon_class TEXT NOT NULL DEFAULT 'fas fa-bookmark',
FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
);
""")
# Notifications
db.execute("""
CREATE TABLE IF NOT EXISTS notifications (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER, -- NULL = für alle
message TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
is_read INTEGER DEFAULT 0
);
""")
# Time Tracking
db.execute("""
CREATE TABLE IF NOT EXISTS time_entries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
activity TEXT NOT NULL,
start_time DATETIME NOT NULL,
end_time DATETIME,
FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
);
""")
db.commit()
def get_user_by_username_or_email(user_input):
db = get_db()
user = db.execute("""
SELECT * FROM users
WHERE username = :val OR email = :val
""", {"val": user_input}).fetchone()
return user
def get_user_by_username(username):
db = get_db()
return db.execute("SELECT * FROM users WHERE username = ?", (username,)).fetchone()
def get_user_by_id(user_id):
db = get_db()
return db.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone()
def is_admin():
return session.get('is_admin', 0) == 1
@app.route('/')
def index():
if 'user_id' in session:
return redirect(url_for('dashboard'))
return redirect(url_for('login'))
# --------------------- REGISTRIEREN -------------------------
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
if not username or not email or not password:
flash('Bitte alle Felder ausfüllen!', 'red')
return redirect(url_for('register'))
db = get_db()
# Prüfen, ob Benutzer/E-Mail existieren
existing = db.execute("SELECT * FROM users WHERE username=? OR email=?", (username, email)).fetchone()
if existing:
flash('Benutzername oder E-Mail ist bereits vergeben!', 'red')
return redirect(url_for('register'))
# Erster registrierter User wird Admin
count = db.execute("SELECT COUNT(*) as cnt FROM users").fetchone()['cnt']
is_admin_val = 1 if count == 0 else 0 # Falls noch niemand existiert, ist das der Admin
hashed_pw = generate_password_hash(password)
db.execute("""
INSERT INTO users (username, password, email, is_admin)
VALUES (?, ?, ?, ?)
""", (username, hashed_pw, email, is_admin_val))
db.commit()
flash('Registrierung erfolgreich! Bitte melde dich an.', 'green')
return redirect(url_for('login'))
return render_template('register.html')
# --------------------- LOGIN/LOGOUT -------------------------
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
user = get_user_by_username(username)
if user and check_password_hash(user['password'], password):
session['user_id'] = user['id']
session['username'] = user['username']
session['is_admin'] = user['is_admin']
flash("Login erfolgreich!", "green")
return redirect(url_for('dashboard'))
else:
flash("Benutzername oder Passwort falsch!", "red")
return render_template('login.html')
@app.route('/logout', methods=['POST'])
def logout():
session.clear()
flash("Erfolgreich abgemeldet!", "green")
return redirect(url_for('login'))
# --------------------- PASSWORT VERGESSEN -------------------
@app.route('/forgot_password', methods=['GET', 'POST'])
def forgot_password():
if request.method == 'POST':
user_input = request.form.get('username_or_email')
if not user_input:
flash("Bitte einen Benutzernamen oder eine E-Mail angeben!", "red")
return redirect(url_for('forgot_password'))
user = get_user_by_username_or_email(user_input)
# Hier könnte man z.B. eine E-Mail mit Link senden oder im Notfall ein neues PW setzen
if user:
# Demo: Wir setzen einfach das Passwort auf "reset123" (nicht sicher!)
# Besser: generiere Token, sende E-Mail etc.
new_pw_hashed = generate_password_hash("reset123")
db = get_db()
db.execute("UPDATE users SET password=? WHERE id=?", (new_pw_hashed, user['id']))
db.commit()
flash("Dein Passwort wurde zurückgesetzt auf 'reset123'. Bitte ändere es nach dem Login!", "green")
return redirect(url_for('login'))
else:
flash("Kein passender Benutzer gefunden. Versuche es erneut!", "red")
return redirect(url_for('forgot_password'))
return render_template('forgot_password.html')
# --------------------- ADMIN-BEREICH ------------------------
@app.route('/admin', methods=['GET', 'POST'])
def admin_panel():
if not is_admin():
flash("Zugriff verweigert!", "red")
return redirect(url_for('dashboard'))
db = get_db()
if request.method == 'POST':
new_username = request.form.get('new_username')
new_password = request.form.get('new_password')
new_is_admin = 1 if request.form.get('new_is_admin') == 'on' else 0
if new_username and new_password:
hashed_pw = generate_password_hash(new_password)
try:
db.execute("INSERT INTO users (username, password, is_admin) VALUES (?, ?, ?)",
(new_username, hashed_pw, new_is_admin))
db.commit()
flash("Neuer Benutzer erstellt!", "green")
except sqlite3.IntegrityError:
flash("Benutzername bereits vorhanden!", "red")
users = db.execute("SELECT * FROM users").fetchall()
return render_template('admin.html', users=users)
@app.route('/admin/delete_user/<int:user_id>', methods=['POST'])
def delete_user(user_id):
if not is_admin():
flash("Zugriff verweigert!", "red")
return redirect(url_for('dashboard'))
if user_id == session.get('user_id'):
flash("Du kannst dich nicht selbst löschen!", "red")
return redirect(url_for('admin_panel'))
db = get_db()
db.execute("DELETE FROM users WHERE id=?", (user_id,))
db.commit()
flash("Benutzer gelöscht!", "green")
return redirect(url_for('admin_panel'))
# --------------------- NOTIFICATIONS ------------------------
@app.route('/admin/notifications', methods=['POST'])
def add_notification():
if not is_admin():
flash("Zugriff verweigert!", "red")
return redirect(url_for('dashboard'))
message = request.form.get('message')
user_id = request.form.get('user_id')
if message:
db = get_db()
if user_id == "all":
db.execute("INSERT INTO notifications (user_id, message) VALUES (NULL, ?)", (message,))
else:
db.execute("INSERT INTO notifications (user_id, message) VALUES (?, ?)", (user_id, message))
db.commit()
flash("Benachrichtigung erstellt!", "green")
return redirect(url_for('admin_panel'))
# --------------------- BOOKMARKS (nur Admin pflegbar) -------
@app.route('/admin/bookmarks/<int:user_id>', methods=['GET', 'POST'])
def manage_bookmarks(user_id):
if not is_admin():
flash("Zugriff verweigert!", "red")
return redirect(url_for('dashboard'))
db = get_db()
user = get_user_by_id(user_id)
if not user:
flash("Benutzer nicht gefunden!", "red")
return redirect(url_for('admin_panel'))
if request.method == 'POST':
title = request.form.get('title')
url = request.form.get('url')
icon = request.form.get('icon_class', 'fas fa-bookmark')
if title and url:
db.execute("INSERT INTO bookmarks (user_id, title, url, icon_class) VALUES (?, ?, ?, ?)",
(user_id, title, url, icon))
db.commit()
flash("Neues Lesezeichen hinzugefügt!", "green")
bookmarks = db.execute("SELECT * FROM bookmarks WHERE user_id=?", (user_id,)).fetchall()
return render_template('admin.html', single_user=user, bookmarks=bookmarks)
@app.route('/admin/delete_bookmark/<int:bookmark_id>/<int:user_id>', methods=['POST'])
def delete_bookmark(bookmark_id, user_id):
if not is_admin():
flash("Zugriff verweigert!", "red")
return redirect(url_for('dashboard'))
db = get_db()
db.execute("DELETE FROM bookmarks WHERE id=?", (bookmark_id,))
db.commit()
flash("Lesezeichen gelöscht!", "green")
return redirect(url_for('manage_bookmarks', user_id=user_id))
# --------------------- ZEITERFASSUNG ------------------------
@app.route('/time_tracking', methods=['POST'])
def time_tracking():
if 'user_id' not in session:
flash("Bitte erst einloggen!", "red")
return redirect(url_for('login'))
action = request.form.get('action')
activity = request.form.get('activity')
db = get_db()
if action == 'start':
if activity:
db.execute("""
INSERT INTO time_entries (user_id, activity, start_time)
VALUES (?, ?, ?)
""", (session['user_id'], activity, datetime.now()))
db.commit()
flash("Zeiterfassung gestartet!", "green")
else:
flash("Bitte einen Aktivitätsnamen angeben!", "red")
elif action == 'stop':
open_entry = db.execute("""
SELECT * FROM time_entries
WHERE user_id = ? AND end_time IS NULL
ORDER BY start_time DESC
LIMIT 1
""", (session['user_id'],)).fetchone()
if open_entry:
db.execute("UPDATE time_entries SET end_time=? WHERE id=?", (datetime.now(), open_entry['id']))
db.commit()
flash("Zeiterfassung gestoppt!", "green")
else:
flash("Keine laufende Zeiterfassung gefunden!", "red")
return redirect(url_for('dashboard'))
# --------------------- DASHBOARD ----------------------------
@app.route('/dashboard')
def dashboard():
if 'user_id' not in session:
flash("Bitte erst einloggen!", "red")
return redirect(url_for('login'))
db = get_db()
user_id = session['user_id']
user = get_user_by_id(user_id)
# Notifications (global oder speziell für diesen User)
notifications = db.execute("""
SELECT * FROM notifications
WHERE user_id = ? OR user_id IS NULL
ORDER BY created_at DESC
""", (user_id,)).fetchall()
# Bookmarks für diesen Benutzer
user_bookmarks = db.execute("SELECT * FROM bookmarks WHERE user_id = ?", (user_id,)).fetchall()
# Time-Entries
time_entries = db.execute("""
SELECT * FROM time_entries
WHERE user_id = ?
ORDER BY start_time DESC
""", (user_id,)).fetchall()
return render_template('dashboard.html',
user=user['username'],
notifications=notifications,
user_bookmarks=user_bookmarks,
time_entries=time_entries,
domain="meinedomain.de",
logo_path="static/logo.png")
# --------------------- STARTUP ------------------------------
if __name__ == '__main__':
init_db()
app.run(debug=True)