From d76a3d5ce2ef0bf43f53f1816c0b07668d9480b2 Mon Sep 17 00:00:00 2001 From: Timothy Jaeryang Baek Date: Mon, 14 Jul 2025 16:29:42 +0400 Subject: [PATCH] refac: ydoc --- backend/open_webui/socket/main.py | 63 +++++++++---------- .../components/common/RichTextInput.svelte | 28 ++++----- 2 files changed, 45 insertions(+), 46 deletions(-) diff --git a/backend/open_webui/socket/main.py b/backend/open_webui/socket/main.py index 861d9ae52c..5b0eda2da9 100644 --- a/backend/open_webui/socket/main.py +++ b/backend/open_webui/socket/main.py @@ -44,13 +44,7 @@ log = logging.getLogger(__name__) log.setLevel(SRC_LOG_LEVELS["SOCKET"]) -REDIS = get_redis_connection( - redis_url=WEBSOCKET_REDIS_URL, - redis_sentinels=get_sentinels_from_env( - WEBSOCKET_SENTINEL_HOSTS, WEBSOCKET_SENTINEL_PORT - ), - async_mode=True, -) +REDIS = None if WEBSOCKET_MANAGER == "redis": if WEBSOCKET_SENTINEL_HOSTS: @@ -86,6 +80,14 @@ TIMEOUT_DURATION = 3 if WEBSOCKET_MANAGER == "redis": log.debug("Using Redis to manage websockets.") + REDIS = get_redis_connection( + redis_url=WEBSOCKET_REDIS_URL, + redis_sentinels=get_sentinels_from_env( + WEBSOCKET_SENTINEL_HOSTS, WEBSOCKET_SENTINEL_PORT + ), + async_mode=True, + ) + redis_sentinels = get_sentinels_from_env( WEBSOCKET_SENTINEL_HOSTS, WEBSOCKET_SENTINEL_PORT ) @@ -105,9 +107,6 @@ if WEBSOCKET_MANAGER == "redis": redis_sentinels=redis_sentinels, ) - # TODO: Implement Yjs document management with Redis - DOCUMENTS = {} - clean_up_lock = RedisLock( redis_url=WEBSOCKET_REDIS_URL, lock_name="usage_cleanup_lock", @@ -122,10 +121,13 @@ else: USER_POOL = {} USAGE_POOL = {} - DOCUMENTS = {} # document_id -> Y.YDoc instance aquire_func = release_func = renew_func = lambda: True +# TODO: Implement Yjs document management with Redis +DOCUMENTS = {} # document_id -> Y.YDoc instance + + async def periodic_usage_pool_cleanup(): max_retries = 2 retry_delay = random.uniform( @@ -336,8 +338,8 @@ async def channel_events(sid, data): ) -@sio.on("yjs:document:join") -async def yjs_document_join(sid, data): +@sio.on("ydoc:document:join") +async def ydoc_document_join(sid, data): """Handle user joining a document""" user = SESSION_POOL.get(sid) @@ -372,7 +374,7 @@ async def yjs_document_join(sid, data): # Initialize document if it doesn't exist if document_id not in DOCUMENTS: DOCUMENTS[document_id] = { - "ydoc": Y.Doc(), # Create actual Yjs document + "updates": [], # Store updates for the document "users": set(), } @@ -383,12 +385,16 @@ async def yjs_document_join(sid, data): await sio.enter_room(sid, f"doc_{document_id}") # Send current document state as a proper Yjs update - ydoc = DOCUMENTS[document_id]["ydoc"] + ydoc = Y.Doc() + if document_id in DOCUMENTS: + # If the document already exists, apply its updates + for update in DOCUMENTS[document_id]["updates"]: + ydoc.apply_update(bytes(update)) # Encode the entire document state as an update state_update = ydoc.get_update() await sio.emit( - "yjs:document:state", + "ydoc:document:state", { "document_id": document_id, "state": list(state_update), # Convert bytes to list for JSON @@ -398,7 +404,7 @@ async def yjs_document_join(sid, data): # Notify other users about the new user await sio.emit( - "yjs:user:joined", + "ydoc:user:joined", { "document_id": document_id, "user_id": user_id, @@ -437,7 +443,7 @@ async def document_save_handler(document_id, data, user): Notes.update_note_by_id(note_id, NoteUpdateForm(data=data)) -@sio.on("yjs:document:update") +@sio.on("ydoc:document:update") async def yjs_document_update(sid, data): """Handle Yjs document updates""" try: @@ -455,19 +461,12 @@ async def yjs_document_update(sid, data): log.warning(f"Document {document_id} not found") return - # Apply the update to the server's Yjs document - ydoc = DOCUMENTS[document_id]["ydoc"] - update_bytes = bytes(update) - - try: - ydoc.apply_update(update_bytes) - except Exception as e: - log.error(f"Failed to apply Yjs update: {e}") - return + updates = DOCUMENTS[document_id]["updates"] + updates.append(update) # Broadcast update to all other users in the document await sio.emit( - "yjs:document:update", + "ydoc:document:update", { "document_id": document_id, "user_id": user_id, @@ -490,7 +489,7 @@ async def yjs_document_update(sid, data): log.error(f"Error in yjs_document_update: {e}") -@sio.on("yjs:document:leave") +@sio.on("ydoc:document:leave") async def yjs_document_leave(sid, data): """Handle user leaving a document""" try: @@ -507,7 +506,7 @@ async def yjs_document_leave(sid, data): # Notify other users await sio.emit( - "yjs:user:left", + "ydoc:user:left", {"document_id": document_id, "user_id": user_id}, room=f"doc_{document_id}", ) @@ -521,7 +520,7 @@ async def yjs_document_leave(sid, data): log.error(f"Error in yjs_document_leave: {e}") -@sio.on("yjs:awareness:update") +@sio.on("ydoc:awareness:update") async def yjs_awareness_update(sid, data): """Handle awareness updates (cursors, selections, etc.)""" try: @@ -531,7 +530,7 @@ async def yjs_awareness_update(sid, data): # Broadcast awareness update to all other users in the document await sio.emit( - "yjs:awareness:update", + "ydoc:awareness:update", {"document_id": document_id, "user_id": user_id, "update": update}, room=f"doc_{document_id}", skip_sid=sid, diff --git a/src/lib/components/common/RichTextInput.svelte b/src/lib/components/common/RichTextInput.svelte index 324eef1cf1..41a2426acd 100644 --- a/src/lib/components/common/RichTextInput.svelte +++ b/src/lib/components/common/RichTextInput.svelte @@ -177,7 +177,7 @@ joinDocument() { const userColor = this.generateUserColor(); - this.socket.emit('yjs:document:join', { + this.socket.emit('ydoc:document:join', { document_id: this.documentId, user_id: this.user?.id, user_name: this.user?.name, @@ -196,7 +196,7 @@ setupEventListeners() { // Listen for document updates from server - this.socket.on('yjs:document:update', (data) => { + this.socket.on('ydoc:document:update', (data) => { if (data.document_id === this.documentId && data.socket_id !== this.socket.id) { try { const update = new Uint8Array(data.update); @@ -208,7 +208,7 @@ }); // Listen for document state from server - this.socket.on('yjs:document:state', async (data) => { + this.socket.on('ydoc:document:state', async (data) => { if (data.document_id === this.documentId) { try { if (data.state) { @@ -219,13 +219,13 @@ // check if editor empty as well const isEmptyEditor = !editor || editor.getText().trim() === ''; if (content && isEmptyEditor) { - const pydoc = prosemirrorJSONToYDoc(editor.schema, content); - if (pydoc) { - Y.applyUpdate(this.doc, Y.encodeStateAsUpdate(pydoc)); + const editorYdoc = prosemirrorJSONToYDoc(editor.schema, content); + if (editorYdoc) { + Y.applyUpdate(this.doc, Y.encodeStateAsUpdate(editorYdoc)); } } } else { - Y.applyUpdate(this.doc, state); + Y.applyUpdate(this.doc, state, 'server'); } } this.synced = true; @@ -236,7 +236,7 @@ }); // Listen for awareness updates - this.socket.on('yjs:awareness:update', (data) => { + this.socket.on('ydoc:awareness:update', (data) => { if (data.document_id === this.documentId && awareness) { try { const awarenessUpdate = new Uint8Array(data.update); @@ -255,7 +255,7 @@ this.doc.on('update', async (update, origin) => { if (origin !== 'server' && this.isConnected) { await tick(); // Ensure the DOM is updated before sending - this.socket.emit('yjs:document:update', { + this.socket.emit('ydoc:document:update', { document_id: this.documentId, user_id: this.user?.id, socket_id: this.socket.id, @@ -277,7 +277,7 @@ if (origin !== 'server' && this.isConnected) { const changedClients = added.concat(updated).concat(removed); const awarenessUpdate = awareness.encodeUpdate(changedClients); - this.socket.emit('yjs:awareness:update', { + this.socket.emit('ydoc:awareness:update', { document_id: this.documentId, user_id: this.socket.id, update: Array.from(awarenessUpdate) @@ -303,14 +303,14 @@ }; destroy() { - this.socket.off('yjs:document:update'); - this.socket.off('yjs:document:state'); - this.socket.off('yjs:awareness:update'); + this.socket.off('ydoc:document:update'); + this.socket.off('ydoc:document:state'); + this.socket.off('ydoc:awareness:update'); this.socket.off('connect', this.onConnect); this.socket.off('disconnect', this.onDisconnect); if (this.isConnected) { - this.socket.emit('yjs:document:leave', { + this.socket.emit('ydoc:document:leave', { document_id: this.documentId, user_id: this.user?.id });