190 lines
7.6 KiB
Python
Executable file
190 lines
7.6 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
import requests
|
|
import json
|
|
import re
|
|
import yaml
|
|
|
|
|
|
from yaml.loader import SafeLoader
|
|
with open ('/etc/authentik-listmonk-bridge/config.yml', 'r') as file:
|
|
config = yaml.safe_load(file)
|
|
|
|
# config parser
|
|
debug=config['debug']
|
|
authentik_api_key=config['authentik']['api_key']
|
|
authentik_api_url=config['authentik']['api_url']
|
|
listmonk_api_key=config['listmonk']['api_key']
|
|
listmonk_api_url=config['listmonk']['api_url']
|
|
listmonk_api_usr=config['listmonk']['api_usr']
|
|
|
|
def authentik_get_users():
|
|
url = authentik_api_url + '/core/users/' + '?type=internal&type=external'
|
|
headers = {'accept': 'application/json', 'Authorization': "Bearer " + authentik_api_key}
|
|
resp = requests.get(url, headers=headers)
|
|
json_object = json.loads(resp._content)
|
|
return(json_object['results'])
|
|
|
|
def authentik_get_user_consent(id):
|
|
url = authentik_api_url + '/core/user_consent/?user=' + str(id)
|
|
headers = {'accept': 'application/json', 'Authorization': "Bearer " + authentik_api_key}
|
|
resp = requests.get(url, headers=headers)
|
|
json_object = json.loads(resp._content)
|
|
return(json_object['results'])
|
|
|
|
def authentik_get_groups():
|
|
url = authentik_api_url + '/core/groups/'
|
|
headers = {'accept': 'application/json', 'Authorization': "Bearer " + authentik_api_key}
|
|
resp = requests.get(url, headers=headers)
|
|
json_object = json.loads(resp._content)
|
|
return(json_object['results'])
|
|
|
|
def authentik_get_apps():
|
|
url = authentik_api_url + '/core/applications/'
|
|
headers = {'accept': 'application/json', 'Authorization': "Bearer " + authentik_api_key}
|
|
resp = requests.get(url, headers=headers)
|
|
json_object = json.loads(resp._content)
|
|
return(json_object['results'])
|
|
|
|
def listmonk_create_subscriber(email, username, status):
|
|
url = listmonk_api_url + '/subscribers'
|
|
payload = {"email": email, "name": username, "status": status, "preconfirm_subscriptions": True}
|
|
headers = {'Content-Type': 'application/json'}
|
|
resp = requests.post(url, data=json.dumps(payload), headers=headers, auth=(listmonk_api_usr, listmonk_api_key))
|
|
json_object = json.loads(resp._content)
|
|
return json_object
|
|
|
|
def listmonk_set_subscriber(subscriber_id, email, username, status):
|
|
url = listmonk_api_url + '/subscribers/' + str(subscriber_id)
|
|
payload = {"email": email, "name": username, "status": status}
|
|
headers = {'Content-Type': 'application/json'}
|
|
resp = requests.put(url, data=json.dumps(payload), headers=headers, auth=(listmonk_api_usr, listmonk_api_key))
|
|
json_object = json.loads(resp._content)
|
|
return json_object
|
|
|
|
def listmonk_get_subscriber(username):
|
|
url = listmonk_api_url + '/subscribers'
|
|
params = {
|
|
'page': '1',
|
|
'per_page': '500',
|
|
'query': "subscribers.name LIKE '" + username + "%'",
|
|
}
|
|
resp = requests.get(url, params=params, auth=(listmonk_api_usr, listmonk_api_key))
|
|
json_object = json.loads(resp._content)
|
|
json_object = json_object['data']['results']
|
|
return json_object[0]
|
|
|
|
def listmonk_set_list(ids, action, target_list_ids, status):
|
|
url = listmonk_api_url + '/subscribers/lists'
|
|
payload = {"ids": ids, "action": action, "target_list_ids": target_list_ids, "status": status}
|
|
headers = {'Content-Type': 'application/json'}
|
|
resp = requests.put(url, data=json.dumps(payload), headers=headers, auth=(listmonk_api_usr, listmonk_api_key))
|
|
json_object = json.loads(resp._content)
|
|
return json_object
|
|
|
|
def listmonk_create_list(group):
|
|
url = listmonk_api_url + '/lists'
|
|
payload = {"name": group, "type": "private", "optin": "single"}
|
|
headers = {'Content-Type': 'application/json'}
|
|
resp = requests.post(url, data=json.dumps(payload), headers=headers, auth=(listmonk_api_usr, listmonk_api_key))
|
|
json_object = json.loads(resp._content)
|
|
return json_object
|
|
|
|
def listmonk_get_list(list):
|
|
url = listmonk_api_url + '/lists?page=1&per_page=100 + "&query=' + list
|
|
headers = {'accept': 'application/json'}
|
|
resp = requests.get(url, headers=headers, auth=(listmonk_api_usr, listmonk_api_key))
|
|
json_object = json.loads(resp._content)
|
|
json_object = json_object['data']['results']
|
|
return json_object[0]
|
|
|
|
# ensure that groups exist as lists
|
|
for group_info in authentik_get_groups():
|
|
try:
|
|
list_info = listmonk_get_list('group-' + group_info['name'])
|
|
|
|
except Exception as e:
|
|
print('> Failed to find group as list, creating group-' + group_info['name'])
|
|
create_resp = listmonk_create_list('group-' + group_info['name'])
|
|
print(create_resp)
|
|
continue
|
|
|
|
# sync that apps exists as lists
|
|
for app_info in authentik_get_apps():
|
|
try:
|
|
list_info = listmonk_get_list('app-' + app_info['slug'])
|
|
|
|
except Exception as e:
|
|
print('> Failed to find app as list, creating app-' + app_info['slug'])
|
|
create_resp = listmonk_create_list('app-' + app_info['slug'])
|
|
print(create_resp)
|
|
continue
|
|
|
|
# ensure that users are synced
|
|
for user_info in authentik_get_users():
|
|
print('>>> Checking user ' + user_info['username'])
|
|
|
|
if user_info['is_active'] == True:
|
|
user_active = "enabled"
|
|
else:
|
|
user_active = "blocklisted"
|
|
|
|
try:
|
|
subscriber_info = listmonk_get_subscriber(user_info['username'])
|
|
if debug:
|
|
print(subscriber_info)
|
|
print(user_info)
|
|
|
|
# if get user info fails, assume that we have to create a new one
|
|
# TOOD: only create if the error is indeed user-existence related
|
|
except Exception as e:
|
|
print('> Failed to find username, creating subscriber')
|
|
create_resp = listmonk_create_subscriber(user_info['email'], user_info['username'], status=user_active)
|
|
print(create_resp)
|
|
|
|
# checks if user email matches subscriber email
|
|
if user_info['email'] != subscriber_info['email']:
|
|
print('> Email check failed, updating')
|
|
set_resp = listmonk_set_subscriber(subscriber_info['id'], user_info['email'], user_info['username'], subscriber_info['status'])
|
|
print(set_resp)
|
|
|
|
# check if user and subscriber info match as it relates to status
|
|
if user_active != subscriber_info['status'] and subscriber_info['status'] != "blocklisted":
|
|
print('> User status check failed, updating')
|
|
set_resp = listmonk_set_subscriber(subscriber_info['id'], user_info['email'], user_info['username'], status=user_active)
|
|
print(set_resp)
|
|
|
|
# creates list of lists_id
|
|
lists_id = []
|
|
for list_info in subscriber_info['lists']:
|
|
lists_id.append(list_info['id'])
|
|
|
|
# matches groups to list_id
|
|
groups_id = []
|
|
for group_info in user_info['groups_obj']:
|
|
list_info = listmonk_get_list('group-' + group_info['name'])
|
|
groups_id.append(list_info['id'])
|
|
|
|
# matches apps to list_id
|
|
for consent_info in authentik_get_user_consent(user_info['pk']):
|
|
print(consent_info['application']['slug'])
|
|
list_info = listmonk_get_list('app-' + consent_info['application']['slug'])
|
|
groups_id.append(list_info['id'])
|
|
|
|
# checks if groups_id is in list of actual lists_id
|
|
# this will fail if user is in a group, but subscriber is not in the respective list
|
|
for id in groups_id:
|
|
if id not in lists_id:
|
|
print('> User lists check failed, updating')
|
|
set_resp = listmonk_set_list([subscriber_info['id']], "add", groups_id, status="confirmed")
|
|
print(set_resp)
|
|
break
|
|
|
|
# checks if lists_id is in list of group id
|
|
# this will fail is user is in a subcriber list, but not in respective group
|
|
for id in lists_id:
|
|
if id not in groups_id:
|
|
print('> User groups check failed, updating')
|
|
set_resp = listmonk_set_list([subscriber_info['id']], "remove", [id], status="")
|
|
|
|
|