main.py: create app lists without user syncing

This commit is contained in:
Antoine Martin 2024-03-05 11:16:08 -05:00
parent cc9d07344c
commit 3bd5f39d9c
Signed by: forge
GPG key ID: D62A472A4AA7D541

38
main.py
View file

@ -25,6 +25,13 @@ def authentik_get_users():
json_object = json.loads(resp._content) json_object = json.loads(resp._content)
return(json_object['results']) return(json_object['results'])
def authentik_get_app_used_by(slug):
url = authentik_api_url + '/core/applications/' + slug + '/used_by/'
headers = {'accept': 'application/json', 'Authorization': "Bearer " + authentik_api_key}
resp = requests.get(url, headers=headers)
json_object = json.loads(resp._content)
return(json_object)
def authentik_get_groups(): def authentik_get_groups():
url = authentik_api_url + '/core/groups/' url = authentik_api_url + '/core/groups/'
headers = {'accept': 'application/json', 'Authorization': "Bearer " + authentik_api_key} headers = {'accept': 'application/json', 'Authorization': "Bearer " + authentik_api_key}
@ -32,6 +39,13 @@ def authentik_get_groups():
json_object = json.loads(resp._content) json_object = json.loads(resp._content)
return(json_object['results']) return(json_object['results'])
def authentik_get_apps():
url = authentik_api_url + '/core/applications/'
headers = {'accept': 'application/json', 'Authorization': "Bearer " + authentik_api_key}
resp = requests.get(url, headers=headers)
json_object = json.loads(resp._content)
return(json_object['results'])
def listmonk_create_subscriber(email, username, status): def listmonk_create_subscriber(email, username, status):
url = listmonk_api_url + '/subscribers' url = listmonk_api_url + '/subscribers'
payload = {"email": email, "name": username, "status": status, "preconfirm_subscriptions": True} payload = {"email": email, "name": username, "status": status, "preconfirm_subscriptions": True}
@ -87,15 +101,14 @@ def listmonk_get_list(list):
# ensure that groups exist as lists # ensure that groups exist as lists
for group_info in authentik_get_groups(): for group_info in authentik_get_groups():
try: try:
list_info = listmonk_get_list(group_info['name']) list_info = listmonk_get_list('group-' + group_info['name'])
except Exception as e: except Exception as e:
print('> Failed to find group as list, creating ' + group_info['name']) print('> Failed to find group as list, creating group-' + group_info['name'])
create_resp = listmonk_create_list(group_info['name']) create_resp = listmonk_create_list('group-' + group_info['name'])
print(create_resp) print(create_resp)
continue continue
# ensure that users are synced # ensure that users are synced
for user_info in authentik_get_users(): for user_info in authentik_get_users():
print('>>> Checking user ' + user_info['username']) print('>>> Checking user ' + user_info['username'])
@ -109,6 +122,7 @@ for user_info in authentik_get_users():
subscriber_info = listmonk_get_subscriber(user_info['username']) subscriber_info = listmonk_get_subscriber(user_info['username'])
if debug: if debug:
print(subscriber_info) print(subscriber_info)
print(user_info)
# if get user info fails, assume that we have to create a new one # if get user info fails, assume that we have to create a new one
# TOOD: only create if the error is indeed user-existence related # TOOD: only create if the error is indeed user-existence related
@ -137,7 +151,7 @@ for user_info in authentik_get_users():
# matches groups to list_id # matches groups to list_id
groups_id = [] groups_id = []
for group_info in user_info['groups_obj']: for group_info in user_info['groups_obj']:
list_info = listmonk_get_list(group_info['name']) list_info = listmonk_get_list('group-' + group_info['name'])
groups_id.append(list_info['id']) groups_id.append(list_info['id'])
# checks if groups_id is in list of actual lists_id # checks if groups_id is in list of actual lists_id
@ -155,3 +169,17 @@ for user_info in authentik_get_users():
if id not in groups_id: if id not in groups_id:
print('> User groups check failed, updating') print('> User groups check failed, updating')
set_resp = listmonk_set_list([subscriber_info['id']], "remove", [id], status="") set_resp = listmonk_set_list([subscriber_info['id']], "remove", [id], status="")
# sync used_by with users
for app_info in authentik_get_apps():
try:
list_info = listmonk_get_list('app-' + app_info['slug'])
except Exception as e:
print('> Failed to find app as list, creating app-' + app_info['slug'])
create_resp = listmonk_create_list('app-' + app_info['slug'])
print(create_resp)
continue
for used_by in authentik_get_app_used_by(app_info['slug']):
print(used_by)