mirror of
https://github.com/open-webui/open-webui.git
synced 2025-12-12 04:15:25 +00:00
131 lines
4.6 KiB
Python
131 lines
4.6 KiB
Python
|
|
from pydantic import BaseModel, ConfigDict
|
||
|
|
from sqlalchemy import BigInteger, Boolean, Column, String, Text, JSON
|
||
|
|
from sqlalchemy.dialects.postgresql import JSONB
|
||
|
|
|
||
|
|
|
||
|
|
from sqlalchemy import or_, func, select, and_, text, cast, or_, and_, func
|
||
|
|
|
||
|
|
|
||
|
|
def has_permission(db, DocumentModel, query, filter: dict, permission: str = "read"):
|
||
|
|
group_ids = filter.get("group_ids", [])
|
||
|
|
user_id = filter.get("user_id")
|
||
|
|
dialect_name = db.bind.dialect.name
|
||
|
|
|
||
|
|
conditions = []
|
||
|
|
|
||
|
|
# Handle read_only permission separately
|
||
|
|
if permission == "read_only":
|
||
|
|
# For read_only, we want items where:
|
||
|
|
# 1. User has explicit read permission (via groups or user-level)
|
||
|
|
# 2. BUT does NOT have write permission
|
||
|
|
# 3. Public items are NOT considered read_only
|
||
|
|
|
||
|
|
read_conditions = []
|
||
|
|
|
||
|
|
# Group-level read permission
|
||
|
|
if group_ids:
|
||
|
|
group_read_conditions = []
|
||
|
|
for gid in group_ids:
|
||
|
|
if dialect_name == "sqlite":
|
||
|
|
group_read_conditions.append(
|
||
|
|
DocumentModel.access_control["read"]["group_ids"].contains(
|
||
|
|
[gid]
|
||
|
|
)
|
||
|
|
)
|
||
|
|
elif dialect_name == "postgresql":
|
||
|
|
group_read_conditions.append(
|
||
|
|
cast(
|
||
|
|
DocumentModel.access_control["read"]["group_ids"],
|
||
|
|
JSONB,
|
||
|
|
).contains([gid])
|
||
|
|
)
|
||
|
|
|
||
|
|
if group_read_conditions:
|
||
|
|
read_conditions.append(or_(*group_read_conditions))
|
||
|
|
|
||
|
|
# Combine read conditions
|
||
|
|
if read_conditions:
|
||
|
|
has_read = or_(*read_conditions)
|
||
|
|
else:
|
||
|
|
# If no read conditions, return empty result
|
||
|
|
return query.filter(False)
|
||
|
|
|
||
|
|
# Now exclude items where user has write permission
|
||
|
|
write_exclusions = []
|
||
|
|
|
||
|
|
# Exclude items owned by user (they have implicit write)
|
||
|
|
if user_id:
|
||
|
|
write_exclusions.append(DocumentModel.user_id != user_id)
|
||
|
|
|
||
|
|
# Exclude items where user has explicit write permission via groups
|
||
|
|
if group_ids:
|
||
|
|
group_write_conditions = []
|
||
|
|
for gid in group_ids:
|
||
|
|
if dialect_name == "sqlite":
|
||
|
|
group_write_conditions.append(
|
||
|
|
DocumentModel.access_control["write"]["group_ids"].contains(
|
||
|
|
[gid]
|
||
|
|
)
|
||
|
|
)
|
||
|
|
elif dialect_name == "postgresql":
|
||
|
|
group_write_conditions.append(
|
||
|
|
cast(
|
||
|
|
DocumentModel.access_control["write"]["group_ids"],
|
||
|
|
JSONB,
|
||
|
|
).contains([gid])
|
||
|
|
)
|
||
|
|
|
||
|
|
if group_write_conditions:
|
||
|
|
# User should NOT have write permission
|
||
|
|
write_exclusions.append(~or_(*group_write_conditions))
|
||
|
|
|
||
|
|
# Exclude public items (items without access_control)
|
||
|
|
write_exclusions.append(DocumentModel.access_control.isnot(None))
|
||
|
|
write_exclusions.append(cast(DocumentModel.access_control, String) != "null")
|
||
|
|
|
||
|
|
# Combine: has read AND does not have write AND not public
|
||
|
|
if write_exclusions:
|
||
|
|
query = query.filter(and_(has_read, *write_exclusions))
|
||
|
|
else:
|
||
|
|
query = query.filter(has_read)
|
||
|
|
|
||
|
|
return query
|
||
|
|
|
||
|
|
# Original logic for other permissions (read, write, etc.)
|
||
|
|
# Public access conditions
|
||
|
|
if group_ids or user_id:
|
||
|
|
conditions.extend(
|
||
|
|
[
|
||
|
|
DocumentModel.access_control.is_(None),
|
||
|
|
cast(DocumentModel.access_control, String) == "null",
|
||
|
|
]
|
||
|
|
)
|
||
|
|
|
||
|
|
# User-level permission (owner has all permissions)
|
||
|
|
if user_id:
|
||
|
|
conditions.append(DocumentModel.user_id == user_id)
|
||
|
|
|
||
|
|
# Group-level permission
|
||
|
|
if group_ids:
|
||
|
|
group_conditions = []
|
||
|
|
for gid in group_ids:
|
||
|
|
if dialect_name == "sqlite":
|
||
|
|
group_conditions.append(
|
||
|
|
DocumentModel.access_control[permission]["group_ids"].contains(
|
||
|
|
[gid]
|
||
|
|
)
|
||
|
|
)
|
||
|
|
elif dialect_name == "postgresql":
|
||
|
|
group_conditions.append(
|
||
|
|
cast(
|
||
|
|
DocumentModel.access_control[permission]["group_ids"],
|
||
|
|
JSONB,
|
||
|
|
).contains([gid])
|
||
|
|
)
|
||
|
|
conditions.append(or_(*group_conditions))
|
||
|
|
|
||
|
|
if conditions:
|
||
|
|
query = query.filter(or_(*conditions))
|
||
|
|
|
||
|
|
return query
|