enh/refac: ldap handle multiple usernames

This commit is contained in:
Timothy Jaeryang Baek 2025-12-11 14:26:35 -05:00
parent 4c4b9d19a1
commit 379f888c9d

View file

@ -288,13 +288,11 @@ async def ldap_auth(request: Request, response: Response, form_data: LdapForm):
f"{LDAP_ATTRIBUTE_FOR_MAIL}", f"{LDAP_ATTRIBUTE_FOR_MAIL}",
"cn", "cn",
] ]
if ENABLE_LDAP_GROUP_MANAGEMENT: if ENABLE_LDAP_GROUP_MANAGEMENT:
search_attributes.append(f"{LDAP_ATTRIBUTE_FOR_GROUPS}") search_attributes.append(f"{LDAP_ATTRIBUTE_FOR_GROUPS}")
log.info( log.info(
f"LDAP Group Management enabled. Adding {LDAP_ATTRIBUTE_FOR_GROUPS} to search attributes" f"LDAP Group Management enabled. Adding {LDAP_ATTRIBUTE_FOR_GROUPS} to search attributes"
) )
log.info(f"LDAP search attributes: {search_attributes}") log.info(f"LDAP search attributes: {search_attributes}")
search_success = connection_app.search( search_success = connection_app.search(
@ -302,15 +300,22 @@ async def ldap_auth(request: Request, response: Response, form_data: LdapForm):
search_filter=f"(&({LDAP_ATTRIBUTE_FOR_USERNAME}={escape_filter_chars(form_data.user.lower())}){LDAP_SEARCH_FILTERS})", search_filter=f"(&({LDAP_ATTRIBUTE_FOR_USERNAME}={escape_filter_chars(form_data.user.lower())}){LDAP_SEARCH_FILTERS})",
attributes=search_attributes, attributes=search_attributes,
) )
if not search_success or not connection_app.entries: if not search_success or not connection_app.entries:
raise HTTPException(400, detail="User not found in the LDAP server") raise HTTPException(400, detail="User not found in the LDAP server")
entry = connection_app.entries[0] entry = connection_app.entries[0]
username = str(entry[f"{LDAP_ATTRIBUTE_FOR_USERNAME}"]).lower() entry_username = entry[f"{LDAP_ATTRIBUTE_FOR_USERNAME}"].value
email = entry[ email = entry[
f"{LDAP_ATTRIBUTE_FOR_MAIL}" f"{LDAP_ATTRIBUTE_FOR_MAIL}"
].value # retrieve the Attribute value ].value # retrieve the Attribute value
username_list = [] # list of usernames from LDAP attribute
if isinstance(entry_username, list):
username_list = [str(name).lower() for name in entry_username]
else:
username_list = [str(entry_username).lower()]
# TODO: support multiple emails if LDAP returns a list
if not email: if not email:
raise HTTPException(400, "User does not have a valid email address.") raise HTTPException(400, "User does not have a valid email address.")
elif isinstance(email, str): elif isinstance(email, str):
@ -320,13 +325,13 @@ async def ldap_auth(request: Request, response: Response, form_data: LdapForm):
else: else:
email = str(email).lower() email = str(email).lower()
cn = str(entry["cn"]) cn = str(entry["cn"]) # common name
user_dn = entry.entry_dn user_dn = entry.entry_dn # user distinguished name
user_groups = [] user_groups = []
if ENABLE_LDAP_GROUP_MANAGEMENT and LDAP_ATTRIBUTE_FOR_GROUPS in entry: if ENABLE_LDAP_GROUP_MANAGEMENT and LDAP_ATTRIBUTE_FOR_GROUPS in entry:
group_dns = entry[LDAP_ATTRIBUTE_FOR_GROUPS] group_dns = entry[LDAP_ATTRIBUTE_FOR_GROUPS]
log.info(f"LDAP raw group DNs for user {username}: {group_dns}") log.info(f"LDAP raw group DNs for user {username_list}: {group_dns}")
if group_dns: if group_dns:
log.info(f"LDAP group_dns original: {group_dns}") log.info(f"LDAP group_dns original: {group_dns}")
@ -377,16 +382,16 @@ async def ldap_auth(request: Request, response: Response, form_data: LdapForm):
) )
log.info( log.info(
f"LDAP groups for user {username}: {user_groups} (total: {len(user_groups)})" f"LDAP groups for user {username_list}: {user_groups} (total: {len(user_groups)})"
) )
else: else:
log.info(f"No groups found for user {username}") log.info(f"No groups found for user {username_list}")
elif ENABLE_LDAP_GROUP_MANAGEMENT: elif ENABLE_LDAP_GROUP_MANAGEMENT:
log.warning( log.warning(
f"LDAP Group Management enabled but {LDAP_ATTRIBUTE_FOR_GROUPS} attribute not found in user entry" f"LDAP Group Management enabled but {LDAP_ATTRIBUTE_FOR_GROUPS} attribute not found in user entry"
) )
if username == form_data.user.lower(): if username_list and form_data.user.lower() in username_list:
connection_user = Connection( connection_user = Connection(
server, server,
user_dn, user_dn,