60 lines
2 KiB
Python
Executable file
60 lines
2 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
import os
|
|
import psycopg
|
|
import re
|
|
from flask import Flask, request, render_template, redirect, url_for
|
|
|
|
app = Flask(__name__)
|
|
|
|
db_con = os.environ.get('DB_CONNECTION', '')
|
|
ldap_groups_start_with = "Allowed "
|
|
|
|
def get_headers(headers):
|
|
return (
|
|
headers.get("HTTP_AUTH_USER", ""),
|
|
[ h[len(ldap_groups_start_with):] for h in headers.get("HTTP_AUTH_GROUPS", "").split("; ") if h[:len(ldap_groups_start_with)] == ldap_groups_start_with ]
|
|
)
|
|
|
|
@app.route("/")
|
|
def show_aliases():
|
|
username, domains = get_headers(request.headers)
|
|
with psycopg.connect(db_con) as conn:
|
|
with conn.cursor() as cur:
|
|
aliases = [ i for (i,) in cur.execute("SELECT alias FROM aliases WHERE username = %s", (username,)).fetchall()]
|
|
|
|
return render_template("index.html", username=username, aliases=aliases, domains=domains)
|
|
|
|
@app.route("/new")
|
|
def new_aliases():
|
|
username, domains = get_headers(request.headers)
|
|
name = request.args.get('name', '').lower()
|
|
domain = request.args.get('domain', '')
|
|
|
|
pattern = re.compile("^[a-z0-9_-][a-z0-9_\.-]+$")
|
|
if name == '' or domain == '' or domain not in domains or not pattern.match(name):
|
|
return redirect(url_for('show_aliases'))
|
|
|
|
with psycopg.connect(db_con) as conn:
|
|
with conn.cursor() as cur:
|
|
existing_alias = cur.execute("SELECT alias FROM aliases WHERE alias = %s", (name + "@" + domain,)).fetchone()
|
|
|
|
if existing_alias == None:
|
|
cur.execute(f'INSERT INTO aliases VALUES (%s, %s)', (name + "@" + domain, username))
|
|
conn.commit()
|
|
|
|
|
|
return redirect(url_for('show_aliases'))
|
|
|
|
@app.route("/delete/<alias>")
|
|
def del_aliases(alias):
|
|
username, domains = get_headers(request.headers)
|
|
with psycopg.connect(db_con) as conn:
|
|
with conn.cursor() as cur:
|
|
owner = cur.execute("SELECT username FROM aliases WHERE alias = %s", (alias,)).fetchone()
|
|
|
|
if owner != None and owner[0] == username:
|
|
cur.execute(f'DELETE FROM aliases WHERE alias = %s', (alias,))
|
|
conn.commit()
|
|
|
|
return redirect(url_for('show_aliases'))
|