From 423a35782ba09267e2e8dfe2edfca808d0504e99 Mon Sep 17 00:00:00 2001 From: Timothy Jaeryang Baek Date: Mon, 16 Jun 2025 10:42:34 +0400 Subject: [PATCH] refac: usage event handling --- backend/open_webui/config.py | 1 - backend/open_webui/main.py | 15 ++++ backend/open_webui/routers/users.py | 36 +++++++- backend/open_webui/socket/main.py | 88 ++++++++----------- src/lib/apis/index.ts | 27 ++++++ src/lib/apis/users/index.ts | 27 ++++++ .../Messages/Message/ProfilePreview.svelte | 27 ++++-- .../components/layout/Sidebar/UserMenu.svelte | 83 +++++++++++------ .../components/notes/Notes/NoteMenu.svelte | 2 +- src/lib/components/notes/RecordMenu.svelte | 2 +- src/lib/utils/websocket.ts | 51 ----------- src/routes/+layout.svelte | 12 --- 12 files changed, 219 insertions(+), 152 deletions(-) delete mode 100644 src/lib/utils/websocket.ts diff --git a/backend/open_webui/config.py b/backend/open_webui/config.py index 6a9030432b..fd2f34070a 100644 --- a/backend/open_webui/config.py +++ b/backend/open_webui/config.py @@ -3094,4 +3094,3 @@ LDAP_ATTRIBUTE_FOR_GROUPS = PersistentConfig( "ldap.server.attribute_for_groups", os.environ.get("LDAP_ATTRIBUTE_FOR_GROUPS", "memberOf"), ) - diff --git a/backend/open_webui/main.py b/backend/open_webui/main.py index 84c6b6caa1..b9324efa44 100644 --- a/backend/open_webui/main.py +++ b/backend/open_webui/main.py @@ -57,6 +57,8 @@ from open_webui.utils.logger import start_logger from open_webui.socket.main import ( app as socket_app, periodic_usage_pool_cleanup, + get_models_in_use, + get_active_user_ids, ) from open_webui.routers import ( audio, @@ -1627,6 +1629,19 @@ async def get_app_changelog(): return {key: CHANGELOG[key] for idx, key in enumerate(CHANGELOG) if idx < 5} +@app.get("/api/usage") +async def get_current_usage(user=Depends(get_verified_user)): + """ + Get current usage statistics for Open WebUI. + This is an experimental endpoint and subject to change. + """ + try: + return {"model_ids": get_models_in_use(), "user_ids": get_active_user_ids()} + except Exception as e: + log.error(f"Error getting usage statistics: {e}") + raise HTTPException(status_code=500, detail="Internal Server Error") + + ############################ # OAuth Login & Callback ############################ diff --git a/backend/open_webui/routers/users.py b/backend/open_webui/routers/users.py index 4046dc72d8..3edd9b7dcf 100644 --- a/backend/open_webui/routers/users.py +++ b/backend/open_webui/routers/users.py @@ -14,7 +14,11 @@ from open_webui.models.users import ( ) -from open_webui.socket.main import get_active_status_by_user_id +from open_webui.socket.main import ( + get_active_status_by_user_id, + get_active_user_ids, + get_user_active_status, +) from open_webui.constants import ERROR_MESSAGES from open_webui.env import SRC_LOG_LEVELS from fastapi import APIRouter, Depends, HTTPException, Request, status @@ -29,6 +33,24 @@ log.setLevel(SRC_LOG_LEVELS["MODELS"]) router = APIRouter() + +############################ +# GetActiveUsers +############################ + + +@router.get("/active") +async def get_active_users( + user=Depends(get_verified_user), +): + """ + Get a list of active users. + """ + return { + "user_ids": get_active_user_ids(), + } + + ############################ # GetUsers ############################ @@ -303,6 +325,18 @@ async def get_user_by_id(user_id: str, user=Depends(get_verified_user)): ) +############################ +# GetUserActiveStatusById +############################ + + +@router.get("/{user_id}/active", response_model=dict) +async def get_user_active_status_by_id(user_id: str, user=Depends(get_verified_user)): + return { + "active": get_user_active_status(user_id), + } + + ############################ # UpdateUserById ############################ diff --git a/backend/open_webui/socket/main.py b/backend/open_webui/socket/main.py index 09eccd8267..35e40dccb2 100644 --- a/backend/open_webui/socket/main.py +++ b/backend/open_webui/socket/main.py @@ -135,11 +135,6 @@ async def periodic_usage_pool_cleanup(): USAGE_POOL[model_id] = connections send_usage = True - - if send_usage: - # Emit updated usage information after cleaning - await sio.emit("usage", {"models": get_models_in_use()}) - await asyncio.sleep(TIMEOUT_DURATION) finally: release_func() @@ -157,6 +152,43 @@ def get_models_in_use(): return models_in_use +def get_active_user_ids(): + """Get the list of active user IDs.""" + return list(USER_POOL.keys()) + + +def get_user_active_status(user_id): + """Check if a user is currently active.""" + return user_id in USER_POOL + + +def get_user_id_from_session_pool(sid): + user = SESSION_POOL.get(sid) + if user: + return user["id"] + return None + + +def get_user_ids_from_room(room): + active_session_ids = sio.manager.get_participants( + namespace="/", + room=room, + ) + + active_user_ids = list( + set( + [SESSION_POOL.get(session_id[0])["id"] for session_id in active_session_ids] + ) + ) + return active_user_ids + + +def get_active_status_by_user_id(user_id): + if user_id in USER_POOL: + return True + return False + + @sio.on("usage") async def usage(sid, data): if sid in SESSION_POOL: @@ -170,9 +202,6 @@ async def usage(sid, data): sid: {"updated_at": current_time}, } - # Broadcast the usage data to all clients - await sio.emit("usage", {"models": get_models_in_use()}) - @sio.event async def connect(sid, environ, auth): @@ -190,10 +219,6 @@ async def connect(sid, environ, auth): else: USER_POOL[user.id] = [sid] - # print(f"user {user.name}({user.id}) connected with session ID {sid}") - await sio.emit("user-list", {"user_ids": list(USER_POOL.keys())}) - await sio.emit("usage", {"models": get_models_in_use()}) - @sio.on("user-join") async def user_join(sid, data): @@ -221,10 +246,6 @@ async def user_join(sid, data): log.debug(f"{channels=}") for channel in channels: await sio.enter_room(sid, f"channel:{channel.id}") - - # print(f"user {user.name}({user.id}) connected with session ID {sid}") - - await sio.emit("user-list", {"user_ids": list(USER_POOL.keys())}) return {"id": user.id, "name": user.name} @@ -277,12 +298,6 @@ async def channel_events(sid, data): ) -@sio.on("user-list") -async def user_list(sid): - if sid in SESSION_POOL: - await sio.emit("user-list", {"user_ids": list(USER_POOL.keys())}) - - @sio.event async def disconnect(sid): if sid in SESSION_POOL: @@ -294,8 +309,6 @@ async def disconnect(sid): if len(USER_POOL[user_id]) == 0: del USER_POOL[user_id] - - await sio.emit("user-list", {"user_ids": list(USER_POOL.keys())}) else: pass # print(f"Unknown session ID {sid} disconnected") @@ -388,30 +401,3 @@ def get_event_call(request_info): get_event_caller = get_event_call - - -def get_user_id_from_session_pool(sid): - user = SESSION_POOL.get(sid) - if user: - return user["id"] - return None - - -def get_user_ids_from_room(room): - active_session_ids = sio.manager.get_participants( - namespace="/", - room=room, - ) - - active_user_ids = list( - set( - [SESSION_POOL.get(session_id[0])["id"] for session_id in active_session_ids] - ) - ) - return active_user_ids - - -def get_active_status_by_user_id(user_id): - if user_id in USER_POOL: - return True - return False diff --git a/src/lib/apis/index.ts b/src/lib/apis/index.ts index 8e4c78aec3..99b95b0629 100644 --- a/src/lib/apis/index.ts +++ b/src/lib/apis/index.ts @@ -1271,6 +1271,33 @@ export const updatePipelineValves = async ( return res; }; +export const getUsage = async (token: string = '') => { + let error = null; + + const res = await fetch(`${WEBUI_BASE_URL}/api/usage`, { + method: 'GET', + headers: { + 'Content-Type': 'application/json', + ...(token && { Authorization: `Bearer ${token}` }) + } + }) + .then(async (res) => { + if (!res.ok) throw await res.json(); + return res.json(); + }) + .catch((err) => { + console.error(err); + error = err; + return null; + }); + + if (error) { + throw error; + } + + return res; +}; + export const getBackendConfig = async () => { let error = null; diff --git a/src/lib/apis/users/index.ts b/src/lib/apis/users/index.ts index 391bdca56d..f2449ff3e0 100644 --- a/src/lib/apis/users/index.ts +++ b/src/lib/apis/users/index.ts @@ -348,6 +348,33 @@ export const getAndUpdateUserLocation = async (token: string) => { } }; +export const getUserActiveStatusById = async (token: string, userId: string) => { + let error = null; + + const res = await fetch(`${WEBUI_API_BASE_URL}/users/${userId}/active`, { + method: 'GET', + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${token}` + } + }) + .then(async (res) => { + if (!res.ok) throw await res.json(); + return res.json(); + }) + .catch((err) => { + console.error(err); + error = err.detail; + return null; + }); + + if (error) { + throw error; + } + + return res; +}; + export const deleteUserById = async (token: string, userId: string) => { let error = null; diff --git a/src/lib/components/channel/Messages/Message/ProfilePreview.svelte b/src/lib/components/channel/Messages/Message/ProfilePreview.svelte index a9db05b1e8..a0f3e1c322 100644 --- a/src/lib/components/channel/Messages/Message/ProfilePreview.svelte +++ b/src/lib/components/channel/Messages/Message/ProfilePreview.svelte @@ -1,10 +1,9 @@ { - dispatch('change', state); - }} + onOpenChange={(state) => {}} typeahead={false} > @@ -52,7 +65,7 @@
- {#if $activeUserIds.includes(user.id)} + {#if active}
{ + const res = await getUsage(localStorage.token).catch((error) => { + console.error('Error fetching usage info:', error); + }); + + if (res) { + usage = res; + } else { + usage = null; + } + }; + + $: if (show) { + getUsageInfo(); + } + { @@ -181,34 +203,41 @@
{$i18n.t('Sign Out')}
- {#if $activeUserIds?.length > 0} -
+ {#if usage} + {#if usage?.user_ids?.length > 0} +
- 0 - ? `${$i18n.t('Running')}: ${$USAGE_POOL.join(', ')} ✨` - : ''} - > -
-
- - - - -
+ 0 + ? `${$i18n.t('Running')}: ${usage.model_ids.join(', ')} ✨` + : ''} + > +
{ + getUsageInfo(); + }} + > +
+ + + + +
-
- - {$i18n.t('Active Users')}: - - - {$activeUserIds?.length} - +
+ + {$i18n.t('Active Users')}: + + + {usage?.user_ids?.length} + +
-
-
+ + {/if} {/if}