import logging from typing import Optional import base64 import io from fastapi import APIRouter, Depends, HTTPException, Request, status from fastapi.responses import Response, StreamingResponse, FileResponse from pydantic import BaseModel, ConfigDict from open_webui.models.auths import Auths from open_webui.models.oauth_sessions import OAuthSessions from open_webui.models.groups import Groups from open_webui.models.chats import Chats from open_webui.models.users import ( UserModel, UserGroupIdsModel, UserGroupIdsListResponse, UserInfoListResponse, UserInfoListResponse, UserRoleUpdateForm, Users, UserSettings, UserUpdateForm, ) from open_webui.constants import ERROR_MESSAGES from open_webui.env import SRC_LOG_LEVELS, STATIC_DIR from open_webui.utils.auth import ( get_admin_user, get_password_hash, get_verified_user, validate_password, ) from open_webui.utils.access_control import get_permissions, has_permission log = logging.getLogger(__name__) log.setLevel(SRC_LOG_LEVELS["MODELS"]) router = APIRouter() ############################ # GetUsers ############################ PAGE_ITEM_COUNT = 30 @router.get("/", response_model=UserGroupIdsListResponse) async def get_users( query: Optional[str] = None, order_by: Optional[str] = None, direction: Optional[str] = None, page: Optional[int] = 1, user=Depends(get_admin_user), ): limit = PAGE_ITEM_COUNT page = max(1, page) skip = (page - 1) * limit filter = {} if query: filter["query"] = query if order_by: filter["order_by"] = order_by if direction: filter["direction"] = direction result = Users.get_users(filter=filter, skip=skip, limit=limit) users = result["users"] total = result["total"] return { "users": [ UserGroupIdsModel( **{ **user.model_dump(), "group_ids": [ group.id for group in Groups.get_groups_by_member_id(user.id) ], } ) for user in users ], "total": total, } @router.get("/all", response_model=UserInfoListResponse) async def get_all_users( user=Depends(get_admin_user), ): return Users.get_users() @router.get("/search", response_model=UserInfoListResponse) async def search_users( query: Optional[str] = None, order_by: Optional[str] = None, direction: Optional[str] = None, page: Optional[int] = 1, user=Depends(get_verified_user), ): limit = PAGE_ITEM_COUNT page = max(1, page) skip = (page - 1) * limit filter = {} if query: filter["query"] = query filter = {} if query: filter["query"] = query if order_by: filter["order_by"] = order_by if direction: filter["direction"] = direction return Users.get_users(filter=filter, skip=skip, limit=limit) ############################ # User Groups ############################ @router.get("/groups") async def get_user_groups(user=Depends(get_verified_user)): return Groups.get_groups_by_member_id(user.id) ############################ # User Permissions ############################ @router.get("/permissions") async def get_user_permissisions(request: Request, user=Depends(get_verified_user)): user_permissions = get_permissions( user.id, request.app.state.config.USER_PERMISSIONS ) return user_permissions ############################ # User Default Permissions ############################ class WorkspacePermissions(BaseModel): models: bool = False knowledge: bool = False prompts: bool = False tools: bool = False models_import: bool = False models_export: bool = False prompts_import: bool = False prompts_export: bool = False tools_import: bool = False tools_export: bool = False class SharingPermissions(BaseModel): models: bool = False public_models: bool = False knowledge: bool = False public_knowledge: bool = False prompts: bool = False public_prompts: bool = False tools: bool = False public_tools: bool = True notes: bool = False public_notes: bool = True class ChatPermissions(BaseModel): controls: bool = True valves: bool = True system_prompt: bool = True params: bool = True file_upload: bool = True delete: bool = True delete_message: bool = True continue_response: bool = True regenerate_response: bool = True rate_response: bool = True edit: bool = True share: bool = True export: bool = True stt: bool = True tts: bool = True call: bool = True multiple_models: bool = True temporary: bool = True temporary_enforced: bool = False class FeaturesPermissions(BaseModel): api_keys: bool = False notes: bool = True channels: bool = True folders: bool = True direct_tool_servers: bool = False web_search: bool = True image_generation: bool = True code_interpreter: bool = True class UserPermissions(BaseModel): workspace: WorkspacePermissions sharing: SharingPermissions chat: ChatPermissions features: FeaturesPermissions @router.get("/default/permissions", response_model=UserPermissions) async def get_default_user_permissions(request: Request, user=Depends(get_admin_user)): return { "workspace": WorkspacePermissions( **request.app.state.config.USER_PERMISSIONS.get("workspace", {}) ), "sharing": SharingPermissions( **request.app.state.config.USER_PERMISSIONS.get("sharing", {}) ), "chat": ChatPermissions( **request.app.state.config.USER_PERMISSIONS.get("chat", {}) ), "features": FeaturesPermissions( **request.app.state.config.USER_PERMISSIONS.get("features", {}) ), } @router.post("/default/permissions") async def update_default_user_permissions( request: Request, form_data: UserPermissions, user=Depends(get_admin_user) ): request.app.state.config.USER_PERMISSIONS = form_data.model_dump() return request.app.state.config.USER_PERMISSIONS ############################ # GetUserSettingsBySessionUser ############################ @router.get("/user/settings", response_model=Optional[UserSettings]) async def get_user_settings_by_session_user(user=Depends(get_verified_user)): user = Users.get_user_by_id(user.id) if user: return user.settings else: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.USER_NOT_FOUND, ) ############################ # UpdateUserSettingsBySessionUser ############################ @router.post("/user/settings/update", response_model=UserSettings) async def update_user_settings_by_session_user( request: Request, form_data: UserSettings, user=Depends(get_verified_user) ): updated_user_settings = form_data.model_dump() if ( user.role != "admin" and "toolServers" in updated_user_settings.get("ui").keys() and not has_permission( user.id, "features.direct_tool_servers", request.app.state.config.USER_PERMISSIONS, ) ): # If the user is not an admin and does not have permission to use tool servers, remove the key updated_user_settings["ui"].pop("toolServers", None) user = Users.update_user_settings_by_id(user.id, updated_user_settings) if user: return user.settings else: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.USER_NOT_FOUND, ) ############################ # GetUserInfoBySessionUser ############################ @router.get("/user/info", response_model=Optional[dict]) async def get_user_info_by_session_user(user=Depends(get_verified_user)): user = Users.get_user_by_id(user.id) if user: return user.info else: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.USER_NOT_FOUND, ) ############################ # UpdateUserInfoBySessionUser ############################ @router.post("/user/info/update", response_model=Optional[dict]) async def update_user_info_by_session_user( form_data: dict, user=Depends(get_verified_user) ): user = Users.get_user_by_id(user.id) if user: if user.info is None: user.info = {} user = Users.update_user_by_id(user.id, {"info": {**user.info, **form_data}}) if user: return user.info else: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.USER_NOT_FOUND, ) else: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.USER_NOT_FOUND, ) ############################ # GetUserById ############################ class UserActiveResponse(BaseModel): name: str profile_image_url: Optional[str] = None is_active: bool model_config = ConfigDict(extra="allow") @router.get("/{user_id}", response_model=UserActiveResponse) async def get_user_by_id(user_id: str, user=Depends(get_verified_user)): # Check if user_id is a shared chat # If it is, get the user_id from the chat if user_id.startswith("shared-"): chat_id = user_id.replace("shared-", "") chat = Chats.get_chat_by_id(chat_id) if chat: user_id = chat.user_id else: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.USER_NOT_FOUND, ) user = Users.get_user_by_id(user_id) if user: return UserActiveResponse( **{ "id": user.id, "name": user.name, "is_active": Users.is_user_active(user_id), } ) else: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.USER_NOT_FOUND, ) @router.get("/{user_id}/oauth/sessions") async def get_user_oauth_sessions_by_id(user_id: str, user=Depends(get_admin_user)): sessions = OAuthSessions.get_sessions_by_user_id(user_id) if sessions and len(sessions) > 0: return sessions else: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.USER_NOT_FOUND, ) ############################ # GetUserProfileImageById ############################ @router.get("/{user_id}/profile/image") async def get_user_profile_image_by_id(user_id: str, user=Depends(get_verified_user)): user = Users.get_user_by_id(user_id) if user: if user.profile_image_url: # check if it's url or base64 if user.profile_image_url.startswith("http"): return Response( status_code=status.HTTP_302_FOUND, headers={"Location": user.profile_image_url}, ) elif user.profile_image_url.startswith("data:image"): try: header, base64_data = user.profile_image_url.split(",", 1) image_data = base64.b64decode(base64_data) image_buffer = io.BytesIO(image_data) return StreamingResponse( image_buffer, media_type="image/png", headers={"Content-Disposition": "inline; filename=image.png"}, ) except Exception as e: pass return FileResponse(f"{STATIC_DIR}/user.png") else: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.USER_NOT_FOUND, ) ############################ # GetUserActiveStatusById ############################ @router.get("/{user_id}/active", response_model=dict) async def get_user_active_status_by_id(user_id: str, user=Depends(get_verified_user)): return { "active": Users.is_user_active(user_id), } ############################ # UpdateUserById ############################ @router.post("/{user_id}/update", response_model=Optional[UserModel]) async def update_user_by_id( user_id: str, form_data: UserUpdateForm, session_user=Depends(get_admin_user), ): # Prevent modification of the primary admin user by other admins try: first_user = Users.get_first_user() if first_user: if user_id == first_user.id: if session_user.id != user_id: # If the user trying to update is the primary admin, and they are not the primary admin themselves raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.ACTION_PROHIBITED, ) if form_data.role != "admin": # If the primary admin is trying to change their own role, prevent it raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.ACTION_PROHIBITED, ) except Exception as e: log.error(f"Error checking primary admin status: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Could not verify primary admin status.", ) user = Users.get_user_by_id(user_id) if user: if form_data.email.lower() != user.email: email_user = Users.get_user_by_email(form_data.email.lower()) if email_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.EMAIL_TAKEN, ) if form_data.password: try: validate_password(form_data.password) except Exception as e: raise HTTPException(400, detail=str(e)) hashed = get_password_hash(form_data.password) Auths.update_user_password_by_id(user_id, hashed) Auths.update_email_by_id(user_id, form_data.email.lower()) updated_user = Users.update_user_by_id( user_id, { "role": form_data.role, "name": form_data.name, "email": form_data.email.lower(), "profile_image_url": form_data.profile_image_url, }, ) if updated_user: return updated_user raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT(), ) raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.USER_NOT_FOUND, ) ############################ # DeleteUserById ############################ @router.delete("/{user_id}", response_model=bool) async def delete_user_by_id(user_id: str, user=Depends(get_admin_user)): # Prevent deletion of the primary admin user try: first_user = Users.get_first_user() if first_user and user_id == first_user.id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.ACTION_PROHIBITED, ) except Exception as e: log.error(f"Error checking primary admin status: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Could not verify primary admin status.", ) if user.id != user_id: result = Auths.delete_auth_by_id(user_id) if result: return True raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=ERROR_MESSAGES.DELETE_USER_ERROR, ) # Prevent self-deletion raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.ACTION_PROHIBITED, ) ############################ # GetUserGroupsById ############################ @router.get("/{user_id}/groups") async def get_user_groups_by_id(user_id: str, user=Depends(get_admin_user)): return Groups.get_groups_by_member_id(user_id)