authentik-listmonk-bridge/main.py

236 lines
9.4 KiB
Python
Executable file

#!/usr/bin/env python3
import requests
import json
import re
import yaml
from yaml.loader import SafeLoader
with open ('/etc/authentik-listmonk-bridge/config.yml', 'r') as file:
config = yaml.safe_load(file)
# config parser
debug=config['debug']
authentik_api_key=config['authentik']['api_key']
authentik_api_url=config['authentik']['api_url']
listmonk_api_key=config['listmonk']['api_key']
listmonk_api_url=config['listmonk']['api_url']
listmonk_api_usr=config['listmonk']['api_usr']
def authentik_get_users():
url = authentik_api_url + '/core/users/' + '?type=internal&type=external'
headers = {'accept': 'application/json', 'Authorization': "Bearer " + authentik_api_key}
resp = requests.get(url, headers=headers)
json_object = json.loads(resp._content)
return(json_object['results'])
def authentik_get_user_consent(id):
url = authentik_api_url + '/core/user_consent/?user=' + str(id)
headers = {'accept': 'application/json', 'Authorization': "Bearer " + authentik_api_key}
resp = requests.get(url, headers=headers)
json_object = json.loads(resp._content)
return(json_object['results'])
def authentik_get_groups():
url = authentik_api_url + '/core/groups/'
headers = {'accept': 'application/json', 'Authorization': "Bearer " + authentik_api_key}
resp = requests.get(url, headers=headers)
json_object = json.loads(resp._content)
return(json_object['results'])
def authentik_get_apps():
url = authentik_api_url + '/core/applications/'
headers = {'accept': 'application/json', 'Authorization': "Bearer " + authentik_api_key}
resp = requests.get(url, headers=headers)
json_object = json.loads(resp._content)
return(json_object['results'])
def listmonk_create_subscriber(email, username, status):
url = listmonk_api_url + '/subscribers'
payload = {"email": email, "name": username, "status": status, "preconfirm_subscriptions": True}
headers = {'Content-Type': 'application/json'}
resp = requests.post(url, data=json.dumps(payload), headers=headers, auth=(listmonk_api_usr, listmonk_api_key))
json_object = json.loads(resp._content)
return json_object
def listmonk_set_subscriber(subscriber_id, email, username, status):
url = listmonk_api_url + '/subscribers/' + str(subscriber_id)
payload = {"email": email, "name": username, "status": status}
headers = {'Content-Type': 'application/json'}
resp = requests.put(url, data=json.dumps(payload), headers=headers, auth=(listmonk_api_usr, listmonk_api_key))
json_object = json.loads(resp._content)
return json_object
def listmonk_get_subscriber(username):
url = listmonk_api_url + '/subscribers'
params = {
'page': '1',
'per_page': '500',
'query': "subscribers.name LIKE '" + username + "%'",
}
resp = requests.get(url, params=params, auth=(listmonk_api_usr, listmonk_api_key))
json_object = json.loads(resp._content)
json_object = json_object['data']['results']
return json_object[0]
def listmonk_set_list(ids, action, target_list_ids, status):
url = listmonk_api_url + '/subscribers/lists'
payload = {"ids": ids, "action": action, "target_list_ids": target_list_ids, "status": status}
headers = {'Content-Type': 'application/json'}
resp = requests.put(url, data=json.dumps(payload), headers=headers, auth=(listmonk_api_usr, listmonk_api_key))
json_object = json.loads(resp._content)
return json_object
def listmonk_create_list(group):
url = listmonk_api_url + '/lists'
payload = {"name": group, "type": "private", "optin": "single"}
headers = {'Content-Type': 'application/json'}
resp = requests.post(url, data=json.dumps(payload), headers=headers, auth=(listmonk_api_usr, listmonk_api_key))
json_object = json.loads(resp._content)
return json_object
def listmonk_get_list(list):
url = listmonk_api_url + '/lists?page=1&per_page=100 + "&query=' + list
headers = {'accept': 'application/json'}
resp = requests.get(url, headers=headers, auth=(listmonk_api_usr, listmonk_api_key))
json_object = json.loads(resp._content)
json_object = json_object['data']['results']
return json_object[0]
# ensure that groups exist as lists
for group_info in authentik_get_groups():
try:
list_info = listmonk_get_list('group-' + group_info['name'])
except:
print('> Failed to find group as list, creating group-' + group_info['name'])
try:
create_resp = listmonk_create_list('group-' + group_info['name'])
except:
print('> Failed to create group-' + group_info['name'])
print(create_resp)
finally:
if debug:
print(create_resp)
continue
# sync that apps exists as lists
for app_info in authentik_get_apps():
try:
list_info = listmonk_get_list('app-' + app_info['slug'])
except:
print('> Failed to find app as list, creating app-' + app_info['slug'])
try:
create_resp = listmonk_create_list('app-' + app_info['slug'])
except:
print('> Failed to create app-' + app_info['slug'])
print(create_resp)
finally:
if debug:
print(create_resp)
continue
# ensure that users are synced
for user_info in authentik_get_users():
print('>>> Checking user ' + user_info['username'])
if user_info['is_active'] == True:
user_active = "enabled"
else:
user_active = "blocklisted"
try:
subscriber_info = listmonk_get_subscriber(user_info['username'])
# if get user info fails, assume that we have to create a new one
# TOOD: only create if the error is indeed user-existence related
except:
print('> Failed to find username, creating subscriber')
try:
create_resp = listmonk_create_subscriber(user_info['email'], user_info['username'], status=user_active)
except:
print('> Subscriber ' + user_info['username'] + ' failed to be created')
print(create_resp)
finally:
print('> Subscriber ' + user_info['username'] + ' successfully created')
subscriber_info = listmonk_get_subscriber(user_info['username'])
if debug:
print(create_resp)
if debug:
print(subscriber_info)
print(user_info)
# checks if user email matches subscriber email
if user_info['email'] != subscriber_info['email']:
print('> Email check failed, updating')
try:
set_resp = listmonk_set_subscriber(subscriber_info['id'], user_info['email'], user_info['username'], subscriber_info['status'])
except:
print('> Failed to set email address of ' + user_info['username'])
print(set_resp)
finally:
print('> Email of ' + user_info['username'] + ' succesfully set as ' + user_info['email'])
if debug:
print(set_resp)
# check if user and subscriber info match as it relates to status
if user_active != subscriber_info['status'] and subscriber_info['status'] != "blocklisted":
print('> User status check failed, updating')
try:
set_resp = listmonk_set_subscriber(subscriber_info['id'], user_info['email'], user_info['username'], status=user_active)
except:
print('> Failed to set status of ' + user_info['username'] + ' as ' + user_active)
print(set_resp)
finally:
print('> Status of ' + user_info['username'] + ' succesfully set as ' + user_active)
if debug:
print(set_resp)
# creates list of lists_id
lists_id = []
for list_info in subscriber_info['lists']:
lists_id.append(list_info['id'])
# matches groups to list_id
groups_id = []
for group_info in user_info['groups_obj']:
list_info = listmonk_get_list('group-' + group_info['name'])
groups_id.append(list_info['id'])
# matches apps to list_id
for consent_info in authentik_get_user_consent(user_info['pk']):
list_info = listmonk_get_list('app-' + consent_info['application']['slug'])
groups_id.append(list_info['id'])
# checks if groups_id is in list of actual lists_id
# this will fail if user is in a group, but subscriber is not in the respective list
for id in groups_id:
if id not in lists_id:
print('> User lists check failed, updating')
try:
set_resp = listmonk_set_list([subscriber_info['id']], "add", groups_id, status="confirmed")
except:
print('> Adding to lists of ' + user_info['username'] + ' failed')
print(set_resp)
finally:
print('> List additions of ' + user_info['username'] + ' successfully set')
if debug:
print(set_resp)
break
# checks if lists_id is in list of group id
# this will fail is user is in a subcriber list, but not in respective group
for id in lists_id:
if id not in groups_id:
print('> User groups check failed, updating')
try:
set_resp = listmonk_set_list([subscriber_info['id']], "remove", [id], status="")
except:
print('> Removing from lists of ' + user_info['username'] + ' failed')
print(set_resp)
finally:
print('> List removals of ' + user_info['username'] + ' successfully set')
if debug:
print(set_resp)