enh/refac: deprecate USER_POOL

This commit is contained in:
Timothy Jaeryang Baek 2025-11-28 07:39:02 -05:00
parent c2634d45ad
commit 70948f8803
10 changed files with 50 additions and 88 deletions

View file

@ -66,7 +66,6 @@ from open_webui.socket.main import (
periodic_usage_pool_cleanup, periodic_usage_pool_cleanup,
get_event_emitter, get_event_emitter,
get_models_in_use, get_models_in_use,
get_active_user_ids,
) )
from open_webui.routers import ( from open_webui.routers import (
audio, audio,
@ -2021,7 +2020,10 @@ async def get_current_usage(user=Depends(get_verified_user)):
This is an experimental endpoint and subject to change. This is an experimental endpoint and subject to change.
""" """
try: try:
return {"model_ids": get_models_in_use(), "user_ids": get_active_user_ids()} return {
"model_ids": get_models_in_use(),
"user_count": Users.get_active_user_count(),
}
except Exception as e: except Exception as e:
log.error(f"Error getting usage statistics: {e}") log.error(f"Error getting usage statistics: {e}")
raise HTTPException(status_code=500, detail="Internal Server Error") raise HTTPException(status_code=500, detail="Internal Server Error")

View file

@ -489,7 +489,7 @@ class UsersTable:
return None return None
@throttle(DATABASE_USER_ACTIVE_STATUS_UPDATE_INTERVAL) @throttle(DATABASE_USER_ACTIVE_STATUS_UPDATE_INTERVAL)
def update_user_last_active_by_id(self, id: str) -> Optional[UserModel]: def update_last_active_by_id(self, id: str) -> Optional[UserModel]:
try: try:
with get_db() as db: with get_db() as db:
db.query(User).filter_by(id=id).update( db.query(User).filter_by(id=id).update(

View file

@ -10,7 +10,6 @@ from pydantic import BaseModel
from open_webui.socket.main import ( from open_webui.socket.main import (
sio, sio,
get_user_ids_from_room, get_user_ids_from_room,
get_active_status_by_user_id,
) )
from open_webui.models.users import ( from open_webui.models.users import (
UserIdNameResponse, UserIdNameResponse,
@ -99,10 +98,7 @@ async def get_channels(user=Depends(get_verified_user)):
] ]
users = [ users = [
UserIdNameStatusResponse( UserIdNameStatusResponse(
**{ **{**user.model_dump(), "is_active": Users.is_user_active(user.id)}
**user.model_dump(),
"is_active": get_active_status_by_user_id(user.id),
}
) )
for user in Users.get_users_by_user_ids(user_ids) for user in Users.get_users_by_user_ids(user_ids)
] ]
@ -284,7 +280,7 @@ async def get_channel_members_by_id(
return { return {
"users": [ "users": [
UserModelResponse( UserModelResponse(
**user.model_dump(), is_active=get_active_status_by_user_id(user.id) **user.model_dump(), is_active=Users.is_user_active(user.id)
) )
for user in users for user in users
], ],
@ -316,7 +312,7 @@ async def get_channel_members_by_id(
return { return {
"users": [ "users": [
UserModelResponse( UserModelResponse(
**user.model_dump(), is_active=get_active_status_by_user_id(user.id) **user.model_dump(), is_active=Users.is_user_active(user.id)
) )
for user in users for user in users
], ],

View file

@ -26,12 +26,6 @@ from open_webui.models.users import (
UserUpdateForm, UserUpdateForm,
) )
from open_webui.socket.main import (
get_active_status_by_user_id,
get_active_user_ids,
get_user_active_status,
)
from open_webui.constants import ERROR_MESSAGES from open_webui.constants import ERROR_MESSAGES
from open_webui.env import SRC_LOG_LEVELS, STATIC_DIR from open_webui.env import SRC_LOG_LEVELS, STATIC_DIR
@ -51,23 +45,6 @@ log.setLevel(SRC_LOG_LEVELS["MODELS"])
router = APIRouter() router = APIRouter()
############################
# GetActiveUsers
############################
@router.get("/active")
async def get_active_users(
user=Depends(get_verified_user),
):
"""
Get a list of active users.
"""
return {
"user_ids": get_active_user_ids(),
}
############################ ############################
# GetUsers # GetUsers
############################ ############################
@ -364,7 +341,7 @@ async def update_user_info_by_session_user(
class UserActiveResponse(BaseModel): class UserActiveResponse(BaseModel):
name: str name: str
profile_image_url: Optional[str] = None profile_image_url: Optional[str] = None
active: Optional[bool] = None is_active: bool
model_config = ConfigDict(extra="allow") model_config = ConfigDict(extra="allow")
@ -390,7 +367,7 @@ async def get_user_by_id(user_id: str, user=Depends(get_verified_user)):
**{ **{
"id": user.id, "id": user.id,
"name": user.name, "name": user.name,
"active": get_active_status_by_user_id(user_id), "is_active": Users.is_user_active(user_id),
} }
) )
else: else:
@ -457,7 +434,7 @@ async def get_user_profile_image_by_id(user_id: str, user=Depends(get_verified_u
@router.get("/{user_id}/active", response_model=dict) @router.get("/{user_id}/active", response_model=dict)
async def get_user_active_status_by_id(user_id: str, user=Depends(get_verified_user)): async def get_user_active_status_by_id(user_id: str, user=Depends(get_verified_user)):
return { return {
"active": get_user_active_status(user_id), "active": Users.is_user_active(user_id),
} }

View file

@ -132,12 +132,6 @@ if WEBSOCKET_MANAGER == "redis":
redis_sentinels=redis_sentinels, redis_sentinels=redis_sentinels,
redis_cluster=WEBSOCKET_REDIS_CLUSTER, redis_cluster=WEBSOCKET_REDIS_CLUSTER,
) )
USER_POOL = RedisDict(
f"{REDIS_KEY_PREFIX}:user_pool",
redis_url=WEBSOCKET_REDIS_URL,
redis_sentinels=redis_sentinels,
redis_cluster=WEBSOCKET_REDIS_CLUSTER,
)
USAGE_POOL = RedisDict( USAGE_POOL = RedisDict(
f"{REDIS_KEY_PREFIX}:usage_pool", f"{REDIS_KEY_PREFIX}:usage_pool",
redis_url=WEBSOCKET_REDIS_URL, redis_url=WEBSOCKET_REDIS_URL,
@ -159,7 +153,6 @@ else:
MODELS = {} MODELS = {}
SESSION_POOL = {} SESSION_POOL = {}
USER_POOL = {}
USAGE_POOL = {} USAGE_POOL = {}
aquire_func = release_func = renew_func = lambda: True aquire_func = release_func = renew_func = lambda: True
@ -235,16 +228,6 @@ def get_models_in_use():
return models_in_use return models_in_use
def get_active_user_ids():
"""Get the list of active user IDs."""
return list(USER_POOL.keys())
def get_user_active_status(user_id):
"""Check if a user is currently active."""
return user_id in USER_POOL
def get_user_id_from_session_pool(sid): def get_user_id_from_session_pool(sid):
user = SESSION_POOL.get(sid) user = SESSION_POOL.get(sid)
if user: if user:
@ -270,12 +253,6 @@ def get_user_ids_from_room(room):
return active_user_ids return active_user_ids
def get_active_status_by_user_id(user_id):
if user_id in USER_POOL:
return True
return False
@sio.on("usage") @sio.on("usage")
async def usage(sid, data): async def usage(sid, data):
if sid in SESSION_POOL: if sid in SESSION_POOL:
@ -303,11 +280,6 @@ async def connect(sid, environ, auth):
SESSION_POOL[sid] = user.model_dump( SESSION_POOL[sid] = user.model_dump(
exclude=["date_of_birth", "bio", "gender"] exclude=["date_of_birth", "bio", "gender"]
) )
if user.id in USER_POOL:
USER_POOL[user.id] = USER_POOL[user.id] + [sid]
else:
USER_POOL[user.id] = [sid]
await sio.enter_room(sid, f"user:{user.id}") await sio.enter_room(sid, f"user:{user.id}")
@ -326,11 +298,15 @@ async def user_join(sid, data):
if not user: if not user:
return return
SESSION_POOL[sid] = user.model_dump(exclude=["date_of_birth", "bio", "gender"]) SESSION_POOL[sid] = user.model_dump(
if user.id in USER_POOL: exclude=[
USER_POOL[user.id] = USER_POOL[user.id] + [sid] "profile_image_url",
else: "profile_banner_image_url",
USER_POOL[user.id] = [sid] "date_of_birth",
"bio",
"gender",
]
)
await sio.enter_room(sid, f"user:{user.id}") await sio.enter_room(sid, f"user:{user.id}")
# Join all the channels # Join all the channels
@ -341,6 +317,13 @@ async def user_join(sid, data):
return {"id": user.id, "name": user.name} return {"id": user.id, "name": user.name}
@sio.on("heartbeat")
async def heartbeat(sid, data):
user = SESSION_POOL.get(sid)
if user:
Users.update_last_active_by_id(user["id"])
@sio.on("join-channels") @sio.on("join-channels")
async def join_channel(sid, data): async def join_channel(sid, data):
auth = data["auth"] if "auth" in data else None auth = data["auth"] if "auth" in data else None
@ -669,13 +652,6 @@ async def disconnect(sid):
if sid in SESSION_POOL: if sid in SESSION_POOL:
user = SESSION_POOL[sid] user = SESSION_POOL[sid]
del SESSION_POOL[sid] del SESSION_POOL[sid]
user_id = user["id"]
USER_POOL[user_id] = [_sid for _sid in USER_POOL[user_id] if _sid != sid]
if len(USER_POOL[user_id]) == 0:
del USER_POOL[user_id]
await YDOC_MANAGER.remove_user_from_all_documents(sid) await YDOC_MANAGER.remove_user_from_all_documents(sid)
else: else:
pass pass

View file

@ -344,9 +344,7 @@ async def get_current_user(
# Refresh the user's last active timestamp asynchronously # Refresh the user's last active timestamp asynchronously
# to prevent blocking the request # to prevent blocking the request
if background_tasks: if background_tasks:
background_tasks.add_task( background_tasks.add_task(Users.update_last_active_by_id, user.id)
Users.update_user_last_active_by_id, user.id
)
return user return user
else: else:
raise HTTPException( raise HTTPException(
@ -397,8 +395,7 @@ def get_current_user_by_api_key(request, api_key: str):
current_span.set_attribute("client.user.role", user.role) current_span.set_attribute("client.user.role", user.role)
current_span.set_attribute("client.auth.type", "api_key") current_span.set_attribute("client.auth.type", "api_key")
Users.update_user_last_active_by_id(user.id) Users.update_last_active_by_id(user.id)
return user return user

View file

@ -32,7 +32,6 @@ from open_webui.models.users import Users
from open_webui.socket.main import ( from open_webui.socket.main import (
get_event_call, get_event_call,
get_event_emitter, get_event_emitter,
get_active_status_by_user_id,
) )
from open_webui.routers.tasks import ( from open_webui.routers.tasks import (
generate_queries, generate_queries,
@ -1915,7 +1914,7 @@ async def process_chat_response(
) )
# Send a webhook notification if the user is not active # Send a webhook notification if the user is not active
if not get_active_status_by_user_id(user.id): if not Users.is_user_active(user.id):
webhook_url = Users.get_user_webhook_url_by_id(user.id) webhook_url = Users.get_user_webhook_url_by_id(user.id)
if webhook_url: if webhook_url:
await post_webhook( await post_webhook(
@ -3210,7 +3209,7 @@ async def process_chat_response(
) )
# Send a webhook notification if the user is not active # Send a webhook notification if the user is not active
if not get_active_status_by_user_id(user.id): if not Users.is_user_active(user.id):
webhook_url = Users.get_user_webhook_url_by_id(user.id) webhook_url = Users.get_user_webhook_url_by_id(user.id)
if webhook_url: if webhook_url:
await post_webhook( await post_webhook(

View file

@ -45,7 +45,6 @@ from open_webui.env import (
OTEL_METRICS_OTLP_SPAN_EXPORTER, OTEL_METRICS_OTLP_SPAN_EXPORTER,
OTEL_METRICS_EXPORTER_OTLP_INSECURE, OTEL_METRICS_EXPORTER_OTLP_INSECURE,
) )
from open_webui.socket.main import get_active_user_ids
from open_webui.models.users import Users from open_webui.models.users import Users
_EXPORT_INTERVAL_MILLIS = 10_000 # 10 seconds _EXPORT_INTERVAL_MILLIS = 10_000 # 10 seconds
@ -135,7 +134,7 @@ def setup_metrics(app: FastAPI, resource: Resource) -> None:
) -> Sequence[metrics.Observation]: ) -> Sequence[metrics.Observation]:
return [ return [
metrics.Observation( metrics.Observation(
value=len(get_active_user_ids()), value=Users.get_active_user_count(),
) )
] ]

View file

@ -222,7 +222,7 @@
</DropdownMenu.Item> </DropdownMenu.Item>
{#if showActiveUsers && usage} {#if showActiveUsers && usage}
{#if usage?.user_ids?.length > 0} {#if usage?.user_count}
<hr class=" border-gray-50 dark:border-gray-800 my-1 p-0" /> <hr class=" border-gray-50 dark:border-gray-800 my-1 p-0" />
<Tooltip <Tooltip
@ -250,7 +250,7 @@
{$i18n.t('Active Users')}: {$i18n.t('Active Users')}:
</span> </span>
<span class=" font-semibold"> <span class=" font-semibold">
{usage?.user_ids?.length} {usage?.user_count}
</span> </span>
</div> </div>
</div> </div>

View file

@ -90,6 +90,8 @@
let showRefresh = false; let showRefresh = false;
let heartbeatInterval = null;
const BREAKPOINT = 768; const BREAKPOINT = 768;
const setupSocket = async (enableWebsocket) => { const setupSocket = async (enableWebsocket) => {
@ -126,6 +128,14 @@
} }
} }
// Send heartbeat every 30 seconds
heartbeatInterval = setInterval(() => {
if (_socket.connected) {
console.log('Sending heartbeat');
_socket.emit('heartbeat', {});
}
}, 30000);
if (deploymentId !== null) { if (deploymentId !== null) {
WEBUI_DEPLOYMENT_ID.set(deploymentId); WEBUI_DEPLOYMENT_ID.set(deploymentId);
} }
@ -154,6 +164,12 @@
_socket.on('disconnect', (reason, details) => { _socket.on('disconnect', (reason, details) => {
console.log(`Socket ${_socket.id} disconnected due to ${reason}`); console.log(`Socket ${_socket.id} disconnected due to ${reason}`);
if (heartbeatInterval) {
clearInterval(heartbeatInterval);
heartbeatInterval = null;
}
if (details) { if (details) {
console.log('Additional details:', details); console.log('Additional details:', details);
} }