Files
dev-manufaktur/Dashboard/app.py
2025-02-16 21:47:35 +01:00

630 lines
21 KiB
Python

from flask import Flask, render_template, request, redirect, url_for, session, flash, jsonify, send_file
import sqlite3
import bcrypt
import secrets
import smtplib
import json
from datetime import datetime, timedelta
import os
import requests
from email.mime.text import MIMEText
app = Flask(__name__, static_folder="static")
app.secret_key = 'supersecretkey'
DATABASE = "clickcandit.db"
SMTP_SERVER = "smtp.gmail.com"
SMTP_PORT = 465
EMAIL_SENDER = "clickcandit@gmail.com"
EMAIL_PASSWORD = 'iuxexntistlwilhl'
WEATHER_API_KEY = '640935f82530489f8c7105323241409'
WEATHER_API_URL = 'http://api.openweathermap.org/data/2.5/forecast'
EXPORT_FOLDER = "exports"
os.makedirs(EXPORT_FOLDER, exist_ok=True)
def get_db_connection():
conn = sqlite3.connect(DATABASE)
conn.row_factory = sqlite3.Row
return conn
def init_db():
conn = get_db_connection()
c = conn.cursor()
# Benutzer (User & Admin)
c.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
password TEXT NOT NULL,
role TEXT DEFAULT 'user',
reset_token TEXT DEFAULT NULL,
reset_token_expiry TEXT DEFAULT NULL
)
""")
# Einstellungen
c.execute("""
CREATE TABLE IF NOT EXISTS settings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL UNIQUE,
wallpaper TEXT DEFAULT '7.png',
city TEXT DEFAULT 'Berlin',
bookmarks TEXT DEFAULT '[]',
show_forecast BOOLEAN DEFAULT 1,
FOREIGN KEY(user_id) REFERENCES users(id)
)
""")
# Download Links (Nutzerdatenexport)
c.execute("""
CREATE TABLE IF NOT EXISTS download_links (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
token TEXT NOT NULL,
expiration_time TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES users(id)
)
""")
conn.commit()
conn.close()
init_db()
######################################
# Hilfsfunktionen
######################################
def admin_exists():
conn = get_db_connection()
result = conn.execute("SELECT 1 FROM users WHERE role = 'admin' LIMIT 1").fetchone()
conn.close()
return result is not None
def get_weather(city):
params = {
'q': city,
'appid': WEATHER_API_KEY,
'units': 'metric',
'lang': 'de'
}
response = requests.get(WEATHER_API_URL, params=params)
if response.status_code == 200:
data = response.json()
current_temp = data['list'][0]['main']['temp']
weather_icon = map_weather_icon(data['list'][0]['weather'][0]['icon'])
# Einfaches Forecast-Beispiel
forecast = []
for i in range(0, len(data['list']), 8):
day = data['list'][i]
forecast.append({
'date': day['dt_txt'].split()[0],
'day': {
'avgtemp_c': day['main']['temp']
},
'weather_icon': map_weather_icon(day['weather'][0]['icon'])
})
return current_temp, weather_icon, forecast
else:
return None, None, []
def map_weather_icon(icon_code):
mapping = {
'01d': 'fa-sun',
'01n': 'fa-moon',
'02d': 'fa-cloud-sun',
'02n': 'fa-cloud-moon',
'03d': 'fa-cloud',
'03n': 'fa-cloud',
'04d': 'fa-cloud-meatball',
'04n': 'fa-cloud-meatball',
'09d': 'fa-cloud-showers-heavy',
'09n': 'fa-cloud-showers-heavy',
'10d': 'fa-cloud-sun-rain',
'10n': 'fa-cloud-moon-rain',
'11d': 'fa-poo-storm',
'11n': 'fa-poo-storm',
'13d': 'fa-snowflake',
'13n': 'fa-snowflake',
'50d': 'fa-smog',
'50n': 'fa-smog'
}
return mapping.get(icon_code, 'fa-cloud')
def send_reset_email(to_email, reset_url):
subject = "Passwort zurücksetzen"
body = f"""
Hallo,
Sie haben eine Anfrage zum Zurücksetzen Ihres Passworts erhalten. Klicken Sie auf den folgenden Link, um Ihr Passwort zurückzusetzen:
{reset_url}
Der Link ist 48 Stunden gültig. Falls Sie diese Anfrage nicht gestellt haben, ignorieren Sie bitte diese E-Mail.
Mit freundlichen Grüßen,
Ihr ClickCandit Team
"""
msg = MIMEText(body, "plain")
msg["Subject"] = subject
msg["From"] = EMAIL_SENDER
msg["To"] = to_email
try:
with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT) as server:
server.login(EMAIL_SENDER, EMAIL_PASSWORD)
server.sendmail(EMAIL_SENDER, to_email, msg.as_string())
print(f"Passwort-Reset-E-Mail erfolgreich an {to_email} gesendet.")
except Exception as e:
print(f"Fehler beim Senden der E-Mail: {e}")
def send_export_email(to_email, download_link):
subject = "Ihre Nutzerdaten zum Download"
body = f"""
Hallo {to_email},
Sie haben den Export Ihrer Nutzerdaten angefordert. Klicken Sie auf den folgenden Link, um Ihre Daten herunterzuladen:
{download_link}
Dieser Link ist 48 Stunden gültig.
Mit freundlichen Grüßen,
Ihr ClickCandit Team
"""
msg = MIMEText(body, "plain")
msg["Subject"] = subject
msg["From"] = EMAIL_SENDER
msg["To"] = to_email
try:
with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT) as server:
server.login(EMAIL_SENDER, EMAIL_PASSWORD)
server.sendmail(EMAIL_SENDER, to_email, msg.as_string())
print(f"Download-Link an {to_email} gesendet.")
except Exception as e:
print(f"Fehler beim Senden der E-Mail: {e}")
######################################
# Routen
######################################
@app.route('/')
def index():
# Wenn kein Admin existiert, zur Register-Seite (erster Admin)
if not admin_exists():
return redirect(url_for('register'))
# Wenn schon Admin da, aber keiner eingeloggt -> Login
if 'user_id' not in session:
return redirect(url_for('login'))
# Ansonsten: Dashboard
return redirect(url_for('dashboard'))
@app.route('/register', methods=['GET', 'POST'])
def register():
already_admin = admin_exists()
if request.method == 'GET':
# Wenn Admin existiert, nur eingeloggter Admin darf registrieren
if already_admin:
if 'role' not in session or session['role'] != 'admin':
flash("Nur Admins können weitere Benutzer hinzufügen!", "danger")
return redirect(url_for('login'))
return render_template('register.html')
# POST: Registrieren
name = request.form['name'].strip()
email = request.form['email'].strip()
password = request.form['password'].strip()
if not name or not email or not password:
flash("Bitte alle Felder ausfüllen.", "danger")
return redirect(url_for('register'))
conn = get_db_connection()
try:
# Wenn noch kein Admin da -> erster wird Admin
role = 'admin' if not already_admin else request.form.get('role', 'user')
hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())
conn.execute("INSERT INTO users (name, email, password, role) VALUES (?, ?, ?, ?)",
(name, email, hashed_password, role))
conn.commit()
flash("Benutzer erfolgreich registriert!", "success")
except sqlite3.IntegrityError:
flash("Diese E-Mail existiert bereits!", "danger")
finally:
conn.close()
if not already_admin:
# Erster Admin angelegt -> nun einloggen
return redirect(url_for('login'))
else:
return redirect(url_for('admin'))
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
email = request.form['email'].strip()
password = request.form['password'].strip()
if not email or not password:
flash("Bitte E-Mail und Passwort angeben!", "danger")
return redirect(url_for('login'))
conn = get_db_connection()
user = conn.execute("SELECT * FROM users WHERE email = ?", (email,)).fetchone()
conn.close()
if user and bcrypt.checkpw(password.encode('utf-8'), user['password']):
session['user_id'] = user['id']
session['user_name'] = user['name']
session['role'] = user['role']
flash("Login erfolgreich!", "success")
return redirect(url_for('dashboard'))
else:
flash("Falsche E-Mail oder Passwort!", "danger")
return redirect(url_for('login'))
else:
return render_template('login.html')
@app.route('/logout')
def logout():
session.clear()
flash("Du wurdest ausgeloggt.", "info")
return redirect(url_for('login'))
@app.route('/dashboard')
def dashboard():
if 'user_id' not in session:
flash("Bitte melde dich an!", "danger")
return redirect(url_for('login'))
user_id = session['user_id']
conn = get_db_connection()
user = conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone()
settings = conn.execute("SELECT * FROM settings WHERE user_id = ?", (user_id,)).fetchone()
conn.close()
if not user:
flash("Benutzer nicht gefunden.", "danger")
return redirect(url_for('logout'))
# Default-Werte
if settings is None:
wallpaper = '19.png'
city = 'Berlin'
show_forecast = True
bookmarks = []
else:
wallpaper = settings['wallpaper']
city = settings['city']
show_forecast = bool(settings['show_forecast'])
bookmarks = json.loads(settings['bookmarks'])
current_temp, weather_icon, forecast = get_weather(city)
if current_temp is None:
current_temp = "N/A"
weather_icon = "fa-question"
forecast = []
domain = "example.com"
logo_path = url_for('static', filename='clickcandit.png')
# Beispiel Apps
user_app_chunks = [
[
{
'name': 'Mail',
'subdomain': 'mail',
'appkey': 'mailapp',
'icon_class': 'fa fa-envelope',
'bg_color': 'bg-blue-500'
},
{
'name': 'Calendar',
'subdomain': 'calendar',
'appkey': 'calendarapp',
'icon_class': 'fa fa-calendar',
'bg_color': 'bg-green-500'
},
]
]
return render_template(
'dashboard.html',
user=user['name'],
role=user['role'], # Für Admin-Abfrage im Template
wallpaper=wallpaper,
city=city,
show_forecast=show_forecast,
bookmarks=bookmarks,
current_temp=current_temp,
weather_icon=weather_icon,
forecast=forecast,
domain=domain,
logo_path=logo_path,
user_app_chunks=user_app_chunks
)
######################################
# Admin-Bereich
######################################
@app.route('/admin')
def admin():
if 'role' not in session or session['role'] != 'admin':
flash("Kein Zugriff!", "danger")
return redirect(url_for('login'))
conn = get_db_connection()
users = conn.execute("SELECT id, name, email, role FROM users").fetchall()
conn.close()
return render_template('admin.html', users=users)
@app.route('/admin/delete/<int:user_id>', methods=['POST'])
def delete_user(user_id):
if 'role' not in session or session['role'] != 'admin':
flash("Kein Zugriff!", "danger")
return redirect(url_for('login'))
conn = get_db_connection()
conn.execute("DELETE FROM users WHERE id = ?", (user_id,))
conn.execute("DELETE FROM settings WHERE user_id = ?", (user_id,))
conn.commit()
conn.close()
flash("Benutzer gelöscht!", "success")
return redirect(url_for('admin'))
@app.route('/admin/edit_bookmarks/<int:user_id>', methods=['GET', 'POST'])
def edit_bookmarks(user_id):
if 'role' not in session or session['role'] != 'admin':
flash("Kein Zugriff", "danger")
return redirect(url_for('login'))
conn = get_db_connection()
settings = conn.execute("SELECT * FROM settings WHERE user_id = ?", (user_id,)).fetchone()
if request.method == 'POST':
bookmarks_raw = request.form.get('bookmarks', '')
bookmarks_list = [b.strip() for b in bookmarks_raw.split(',') if b.strip()]
bookmarks_json = json.dumps(bookmarks_list)
if settings:
conn.execute("UPDATE settings SET bookmarks = ? WHERE user_id = ?", (bookmarks_json, user_id))
else:
conn.execute("INSERT INTO settings (user_id, bookmarks) VALUES (?, ?)", (user_id, bookmarks_json))
conn.commit()
conn.close()
flash("Lesezeichen aktualisiert", "success")
return redirect(url_for('admin'))
else:
current_bookmarks = json.loads(settings['bookmarks']) if settings else []
conn.close()
return render_template('edit_bookmarks.html', user_id=user_id, bookmarks=current_bookmarks)
######################################
# Settings / API
######################################
@app.route('/save_settings', methods=['POST'])
def save_settings():
if 'user_id' not in session:
return jsonify({'success': False, 'message': 'Nicht autorisiert'}), 401
data = request.get_json()
if not data:
return jsonify({'success': False, 'message': 'Keine Daten übergeben'}), 400
wallpaper = data.get('wallpaper', '7.png')
city = data.get('city', 'Berlin')
bookmarks = json.dumps(data.get('bookmarks', []))
show_forecast = data.get('show_forecast', True)
conn = get_db_connection()
conn.execute("""
INSERT INTO settings (user_id, wallpaper, city, bookmarks, show_forecast)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(user_id) DO UPDATE SET
wallpaper=excluded.wallpaper,
city=excluded.city,
bookmarks=excluded.bookmarks,
show_forecast=excluded.show_forecast
""", (session['user_id'], wallpaper, city, bookmarks, show_forecast))
conn.commit()
conn.close()
return jsonify({'success': True})
@app.route('/get_settings', methods=['GET'])
def get_settings():
if 'user_id' not in session:
return jsonify({'success': False, 'message': 'Nicht autorisiert'}), 401
conn = get_db_connection()
settings = conn.execute("SELECT * FROM settings WHERE user_id = ?", (session['user_id'],)).fetchone()
conn.close()
if settings:
return jsonify({
'wallpaper_url': url_for('static', filename=settings['wallpaper']),
'city': settings['city'],
'bookmarks': json.loads(settings['bookmarks']),
'show_forecast': bool(settings['show_forecast'])
})
else:
# Falls noch keine Settings da
return jsonify({
'wallpaper_url': url_for('static', filename='7.png'),
'city': 'Berlin',
'bookmarks': [],
'show_forecast': True
})
######################################
# Passwort / Account
######################################
@app.route('/forgot-password', methods=['GET', 'POST'])
def forgot_password():
if request.method == 'POST':
email = request.form['email'].strip()
conn = get_db_connection()
user = conn.execute("SELECT * FROM users WHERE email = ?", (email,)).fetchone()
if not user:
flash("Diese E-Mail ist nicht registriert.", "danger")
conn.close()
return redirect(url_for('forgot_password'))
reset_token = secrets.token_hex(16)
expiry_time = (datetime.utcnow() + timedelta(hours=48)).isoformat()
conn.execute("UPDATE users SET reset_token=?, reset_token_expiry=? WHERE email=?",
(reset_token, expiry_time, email))
conn.commit()
conn.close()
reset_url = url_for('reset_password', token=reset_token, _external=True)
send_reset_email(email, reset_url)
flash("Eine E-Mail zum Zurücksetzen des Passworts wurde gesendet.", "success")
return redirect(url_for('login'))
return render_template('forgot_password.html')
@app.route('/reset-password/<token>', methods=['GET', 'POST'])
def reset_password(token):
conn = get_db_connection()
user = conn.execute("SELECT * FROM users WHERE reset_token = ?", (token,)).fetchone()
if not user:
conn.close()
flash("Ungültiger Reset-Token", "danger")
return redirect(url_for('forgot_password'))
expiry = datetime.fromisoformat(user['reset_token_expiry']) if user['reset_token_expiry'] else None
if not expiry or expiry < datetime.utcnow():
conn.close()
flash("Reset-Link ist abgelaufen.", "danger")
return redirect(url_for('forgot_password'))
if request.method == 'POST':
pw = request.form['password'].strip()
cpw = request.form['confirm_password'].strip()
if pw != cpw:
flash("Passwörter stimmen nicht überein.", "danger")
return render_template('reset_password.html', token=token)
hashed_pw = bcrypt.hashpw(pw.encode('utf-8'), bcrypt.gensalt())
conn.execute("UPDATE users SET password=?, reset_token=NULL, reset_token_expiry=NULL WHERE id=?",
(hashed_pw, user['id']))
conn.commit()
conn.close()
flash("Ihr Passwort wurde zurückgesetzt.", "success")
return redirect(url_for('login'))
conn.close()
return render_template('reset_password.html', token=token)
@app.route('/delete_account', methods=['POST'])
def delete_account():
if 'user_id' not in session:
flash("Bitte melde dich an!", "danger")
return redirect(url_for('login'))
user_id = session['user_id']
try:
conn = get_db_connection()
user = conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone()
if not user:
flash("Benutzerkonto nicht gefunden.", "danger")
conn.close()
return redirect(url_for('dashboard'))
email = user["email"]
conn.execute("DELETE FROM settings WHERE user_id=?", (user_id,))
conn.execute("DELETE FROM download_links WHERE user_id=?", (user_id,))
conn.execute("DELETE FROM users WHERE id=?", (user_id,))
conn.commit()
conn.close()
export_file = os.path.join(EXPORT_FOLDER, f"{email}_data.json")
if os.path.exists(export_file):
os.remove(export_file)
session.clear()
flash("Dein Konto wurde erfolgreich gelöscht.", "success")
return redirect(url_for('login'))
except Exception as e:
print("Fehler beim Löschen des Kontos:", e)
flash("Fehler beim Löschen des Kontos.", "danger")
return redirect(url_for('dashboard'))
######################################
# Nutzerdaten-Export
######################################
@app.route('/request_data_export', methods=['GET'])
def request_data_export():
if 'user_id' not in session:
flash("Bitte melde dich an!", "danger")
return redirect(url_for('login'))
user_id = session['user_id']
conn = get_db_connection()
user = conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone()
if not user:
flash("Benutzerkonto nicht gefunden.", "danger")
conn.close()
return redirect(url_for('dashboard'))
download_token = secrets.token_hex(16)
expiry = (datetime.utcnow() + timedelta(hours=48)).isoformat()
conn.execute("INSERT INTO download_links (user_id, token, expiration_time) VALUES (?, ?, ?)",
(user_id, download_token, expiry))
conn.commit()
conn.close()
export_file = os.path.join(EXPORT_FOLDER, f"{user['email']}_data.json")
user_data = {
"name": user["name"],
"email": user["email"],
"role": user["role"]
}
with open(export_file, "w", encoding="utf-8") as f:
json.dump(user_data, f, indent=4)
download_url = url_for('download_user_data', token=download_token, _external=True)
send_export_email(user["email"], download_url)
flash("Eine E-Mail mit dem Download-Link wurde gesendet.", "success")
return redirect(url_for('dashboard'))
@app.route('/download_user_data/<token>', methods=['GET'])
def download_user_data(token):
conn = get_db_connection()
link_data = conn.execute("SELECT * FROM download_links WHERE token=?", (token,)).fetchone()
if not link_data:
conn.close()
flash("Ungültiger oder abgelaufener Download-Link.", "danger")
return redirect(url_for('dashboard'))
expiry_time = datetime.fromisoformat(link_data["expiration_time"])
if expiry_time < datetime.utcnow():
conn.close()
flash("Der Download-Link ist abgelaufen.", "danger")
return redirect(url_for('dashboard'))
user = conn.execute("SELECT * FROM users WHERE id=?", (link_data["user_id"],)).fetchone()
conn.close()
if not user:
flash("Benutzerdaten nicht gefunden.", "danger")
return redirect(url_for('dashboard'))
export_file = os.path.join(EXPORT_FOLDER, f"{user['email']}_data.json")
if not os.path.exists(export_file):
flash("Exportdatei nicht gefunden.", "danger")
return redirect(url_for('dashboard'))
return send_file(export_file, as_attachment=True)
if __name__ == '__main__':
app.run(debug=True)