#!/usr/bin/env python3 import requests import json import re import yaml from yaml.loader import SafeLoader with open ('/etc/authentik-listmonk-bridge/config.yml', 'r') as file: config = yaml.safe_load(file) # config parser debug=config['debug'] authentik_api_key=config['authentik']['api_key'] authentik_api_url=config['authentik']['api_url'] listmonk_api_key=config['listmonk']['api_key'] listmonk_api_url=config['listmonk']['api_url'] listmonk_api_usr=config['listmonk']['api_usr'] def authentik_get_users(): url = authentik_api_url + '/core/users/' + '?type=internal&type=external' headers = {'accept': 'application/json', 'Authorization': "Bearer " + authentik_api_key} resp = requests.get(url, headers=headers) json_object = json.loads(resp._content) return(json_object['results']) def authentik_get_user_consent(id): url = authentik_api_url + '/core/user_consent/?user=' + str(id) headers = {'accept': 'application/json', 'Authorization': "Bearer " + authentik_api_key} resp = requests.get(url, headers=headers) json_object = json.loads(resp._content) return(json_object['results']) def authentik_get_groups(): url = authentik_api_url + '/core/groups/' headers = {'accept': 'application/json', 'Authorization': "Bearer " + authentik_api_key} resp = requests.get(url, headers=headers) json_object = json.loads(resp._content) return(json_object['results']) def authentik_get_apps(): url = authentik_api_url + '/core/applications/' headers = {'accept': 'application/json', 'Authorization': "Bearer " + authentik_api_key} resp = requests.get(url, headers=headers) json_object = json.loads(resp._content) return(json_object['results']) def listmonk_create_subscriber(email, username, status): url = listmonk_api_url + '/subscribers' payload = {"email": email, "name": username, "status": status, "preconfirm_subscriptions": True} headers = {'Content-Type': 'application/json'} resp = requests.post(url, data=json.dumps(payload), headers=headers, auth=(listmonk_api_usr, listmonk_api_key)) json_object = json.loads(resp._content) return json_object def listmonk_set_subscriber(subscriber_id, email, username, status): url = listmonk_api_url + '/subscribers/' + str(subscriber_id) payload = {"email": email, "name": username, "status": status} headers = {'Content-Type': 'application/json'} resp = requests.put(url, data=json.dumps(payload), headers=headers, auth=(listmonk_api_usr, listmonk_api_key)) json_object = json.loads(resp._content) return json_object def listmonk_get_subscriber(username): url = listmonk_api_url + '/subscribers' params = { 'page': '1', 'per_page': '500', 'query': "subscribers.name LIKE '" + username + "%'", } resp = requests.get(url, params=params, auth=(listmonk_api_usr, listmonk_api_key)) json_object = json.loads(resp._content) json_object = json_object['data']['results'] return json_object[0] def listmonk_set_list(ids, action, target_list_ids, status): url = listmonk_api_url + '/subscribers/lists' payload = {"ids": ids, "action": action, "target_list_ids": target_list_ids, "status": status} headers = {'Content-Type': 'application/json'} resp = requests.put(url, data=json.dumps(payload), headers=headers, auth=(listmonk_api_usr, listmonk_api_key)) json_object = json.loads(resp._content) return json_object def listmonk_create_list(group): url = listmonk_api_url + '/lists' payload = {"name": group, "type": "private", "optin": "single"} headers = {'Content-Type': 'application/json'} resp = requests.post(url, data=json.dumps(payload), headers=headers, auth=(listmonk_api_usr, listmonk_api_key)) json_object = json.loads(resp._content) return json_object def listmonk_get_list(list): url = listmonk_api_url + '/lists?page=1&per_page=100 + "&query=' + list headers = {'accept': 'application/json'} resp = requests.get(url, headers=headers, auth=(listmonk_api_usr, listmonk_api_key)) json_object = json.loads(resp._content) json_object = json_object['data']['results'] return json_object[0] # ensure that groups exist as lists for group_info in authentik_get_groups(): try: list_info = listmonk_get_list('group-' + group_info['name']) except Exception as e: print('> Failed to find group as list, creating group-' + group_info['name']) create_resp = listmonk_create_list('group-' + group_info['name']) print(create_resp) continue # sync that apps exists as lists for app_info in authentik_get_apps(): try: list_info = listmonk_get_list('app-' + app_info['slug']) except Exception as e: print('> Failed to find app as list, creating app-' + app_info['slug']) create_resp = listmonk_create_list('app-' + app_info['slug']) print(create_resp) continue # ensure that users are synced for user_info in authentik_get_users(): print('>>> Checking user ' + user_info['username']) if user_info['is_active'] == True: user_active = "enabled" else: user_active = "blocklisted" try: subscriber_info = listmonk_get_subscriber(user_info['username']) if debug: print(subscriber_info) print(user_info) # if get user info fails, assume that we have to create a new one # TOOD: only create if the error is indeed user-existence related except Exception as e: print('> Failed to find username, creating subscriber') create_resp = listmonk_create_subscriber(user_info['email'], user_info['username'], status=user_active) subscriber_info = listmonk_get_subscriber(user_info['username']) print(create_resp) # checks if user email matches subscriber email if user_info['email'] != subscriber_info['email']: print('> Email check failed, updating') set_resp = listmonk_set_subscriber(subscriber_info['id'], user_info['email'], user_info['username'], subscriber_info['status']) print(set_resp) # check if user and subscriber info match as it relates to status if user_active != subscriber_info['status'] and subscriber_info['status'] != "blocklisted": print('> User status check failed, updating') set_resp = listmonk_set_subscriber(subscriber_info['id'], user_info['email'], user_info['username'], status=user_active) print(set_resp) # creates list of lists_id lists_id = [] for list_info in subscriber_info['lists']: lists_id.append(list_info['id']) # matches groups to list_id groups_id = [] for group_info in user_info['groups_obj']: list_info = listmonk_get_list('group-' + group_info['name']) groups_id.append(list_info['id']) # matches apps to list_id for consent_info in authentik_get_user_consent(user_info['pk']): list_info = listmonk_get_list('app-' + consent_info['application']['slug']) groups_id.append(list_info['id']) # checks if groups_id is in list of actual lists_id # this will fail if user is in a group, but subscriber is not in the respective list for id in groups_id: if id not in lists_id: print('> User lists check failed, updating') set_resp = listmonk_set_list([subscriber_info['id']], "add", groups_id, status="confirmed") print(set_resp) break # checks if lists_id is in list of group id # this will fail is user is in a subcriber list, but not in respective group for id in lists_id: if id not in groups_id: print('> User groups check failed, updating') set_resp = listmonk_set_list([subscriber_info['id']], "remove", [id], status="")