diff --git a/backend/open_webui/models/models.py b/backend/open_webui/models/models.py index e902a978d1..329b87a91f 100755 --- a/backend/open_webui/models/models.py +++ b/backend/open_webui/models/models.py @@ -220,6 +220,34 @@ class ModelsTable: or has_access(user_id, permission, model.access_control, user_group_ids) ] + def _has_write_permission(self, query, filter: dict): + if filter.get("group_ids") or filter.get("user_id"): + conditions = [] + + # --- ANY group_ids match ("write".group_ids) --- + if filter.get("group_ids"): + group_ids = filter["group_ids"] + like_clauses = [] + + for gid in group_ids: + like_clauses.append( + cast(Model.access_control, String).like( + f'%"write"%"group_ids"%"{gid}"%' + ) + ) + + # ANY → OR + conditions.append(or_(*like_clauses)) + + # --- user_id match (owner) --- + if filter.get("user_id"): + conditions.append(Model.user_id == filter["user_id"]) + + # Apply OR across the two groups of conditions + query = query.filter(or_(*conditions)) + + return query + def search_models( self, user_id: str, filter: dict = {}, skip: int = 0, limit: int = 30 ) -> ModelListResponse: @@ -238,11 +266,10 @@ class ModelsTable: ) ) - if filter.get("user_id"): - query = query.filter(Model.user_id == filter.get("user_id")) + # Apply access control filtering + query = self._has_write_permission(query, filter) view_option = filter.get("view_option") - if view_option == "created": query = query.filter(Model.user_id == user_id) elif view_option == "shared": diff --git a/backend/open_webui/routers/models.py b/backend/open_webui/routers/models.py index 93d8cb8bf7..df5a7377dc 100644 --- a/backend/open_webui/routers/models.py +++ b/backend/open_webui/routers/models.py @@ -5,6 +5,7 @@ import json import asyncio import logging +from open_webui.models.groups import Groups from open_webui.models.models import ( ModelForm, ModelModel, @@ -78,6 +79,10 @@ async def get_models( filter["direction"] = direction if not user.role == "admin" or not BYPASS_ADMIN_ACCESS_CONTROL: + groups = Groups.get_groups_by_member_id(user.id) + if groups: + filter["group_ids"] = [group.id for group in groups] + filter["user_id"] = user.id return Models.search_models(user.id, filter=filter, skip=skip, limit=limit)