refac: ydoc

This commit is contained in:
Timothy Jaeryang Baek 2025-07-14 16:29:42 +04:00
parent 203c5b1956
commit d76a3d5ce2
2 changed files with 45 additions and 46 deletions

View file

@ -44,13 +44,7 @@ log = logging.getLogger(__name__)
log.setLevel(SRC_LOG_LEVELS["SOCKET"]) log.setLevel(SRC_LOG_LEVELS["SOCKET"])
REDIS = get_redis_connection( REDIS = None
redis_url=WEBSOCKET_REDIS_URL,
redis_sentinels=get_sentinels_from_env(
WEBSOCKET_SENTINEL_HOSTS, WEBSOCKET_SENTINEL_PORT
),
async_mode=True,
)
if WEBSOCKET_MANAGER == "redis": if WEBSOCKET_MANAGER == "redis":
if WEBSOCKET_SENTINEL_HOSTS: if WEBSOCKET_SENTINEL_HOSTS:
@ -86,6 +80,14 @@ TIMEOUT_DURATION = 3
if WEBSOCKET_MANAGER == "redis": if WEBSOCKET_MANAGER == "redis":
log.debug("Using Redis to manage websockets.") log.debug("Using Redis to manage websockets.")
REDIS = get_redis_connection(
redis_url=WEBSOCKET_REDIS_URL,
redis_sentinels=get_sentinels_from_env(
WEBSOCKET_SENTINEL_HOSTS, WEBSOCKET_SENTINEL_PORT
),
async_mode=True,
)
redis_sentinels = get_sentinels_from_env( redis_sentinels = get_sentinels_from_env(
WEBSOCKET_SENTINEL_HOSTS, WEBSOCKET_SENTINEL_PORT WEBSOCKET_SENTINEL_HOSTS, WEBSOCKET_SENTINEL_PORT
) )
@ -105,9 +107,6 @@ if WEBSOCKET_MANAGER == "redis":
redis_sentinels=redis_sentinels, redis_sentinels=redis_sentinels,
) )
# TODO: Implement Yjs document management with Redis
DOCUMENTS = {}
clean_up_lock = RedisLock( clean_up_lock = RedisLock(
redis_url=WEBSOCKET_REDIS_URL, redis_url=WEBSOCKET_REDIS_URL,
lock_name="usage_cleanup_lock", lock_name="usage_cleanup_lock",
@ -122,10 +121,13 @@ else:
USER_POOL = {} USER_POOL = {}
USAGE_POOL = {} USAGE_POOL = {}
DOCUMENTS = {} # document_id -> Y.YDoc instance
aquire_func = release_func = renew_func = lambda: True aquire_func = release_func = renew_func = lambda: True
# TODO: Implement Yjs document management with Redis
DOCUMENTS = {} # document_id -> Y.YDoc instance
async def periodic_usage_pool_cleanup(): async def periodic_usage_pool_cleanup():
max_retries = 2 max_retries = 2
retry_delay = random.uniform( retry_delay = random.uniform(
@ -336,8 +338,8 @@ async def channel_events(sid, data):
) )
@sio.on("yjs:document:join") @sio.on("ydoc:document:join")
async def yjs_document_join(sid, data): async def ydoc_document_join(sid, data):
"""Handle user joining a document""" """Handle user joining a document"""
user = SESSION_POOL.get(sid) user = SESSION_POOL.get(sid)
@ -372,7 +374,7 @@ async def yjs_document_join(sid, data):
# Initialize document if it doesn't exist # Initialize document if it doesn't exist
if document_id not in DOCUMENTS: if document_id not in DOCUMENTS:
DOCUMENTS[document_id] = { DOCUMENTS[document_id] = {
"ydoc": Y.Doc(), # Create actual Yjs document "updates": [], # Store updates for the document
"users": set(), "users": set(),
} }
@ -383,12 +385,16 @@ async def yjs_document_join(sid, data):
await sio.enter_room(sid, f"doc_{document_id}") await sio.enter_room(sid, f"doc_{document_id}")
# Send current document state as a proper Yjs update # Send current document state as a proper Yjs update
ydoc = DOCUMENTS[document_id]["ydoc"] ydoc = Y.Doc()
if document_id in DOCUMENTS:
# If the document already exists, apply its updates
for update in DOCUMENTS[document_id]["updates"]:
ydoc.apply_update(bytes(update))
# Encode the entire document state as an update # Encode the entire document state as an update
state_update = ydoc.get_update() state_update = ydoc.get_update()
await sio.emit( await sio.emit(
"yjs:document:state", "ydoc:document:state",
{ {
"document_id": document_id, "document_id": document_id,
"state": list(state_update), # Convert bytes to list for JSON "state": list(state_update), # Convert bytes to list for JSON
@ -398,7 +404,7 @@ async def yjs_document_join(sid, data):
# Notify other users about the new user # Notify other users about the new user
await sio.emit( await sio.emit(
"yjs:user:joined", "ydoc:user:joined",
{ {
"document_id": document_id, "document_id": document_id,
"user_id": user_id, "user_id": user_id,
@ -437,7 +443,7 @@ async def document_save_handler(document_id, data, user):
Notes.update_note_by_id(note_id, NoteUpdateForm(data=data)) Notes.update_note_by_id(note_id, NoteUpdateForm(data=data))
@sio.on("yjs:document:update") @sio.on("ydoc:document:update")
async def yjs_document_update(sid, data): async def yjs_document_update(sid, data):
"""Handle Yjs document updates""" """Handle Yjs document updates"""
try: try:
@ -455,19 +461,12 @@ async def yjs_document_update(sid, data):
log.warning(f"Document {document_id} not found") log.warning(f"Document {document_id} not found")
return return
# Apply the update to the server's Yjs document updates = DOCUMENTS[document_id]["updates"]
ydoc = DOCUMENTS[document_id]["ydoc"] updates.append(update)
update_bytes = bytes(update)
try:
ydoc.apply_update(update_bytes)
except Exception as e:
log.error(f"Failed to apply Yjs update: {e}")
return
# Broadcast update to all other users in the document # Broadcast update to all other users in the document
await sio.emit( await sio.emit(
"yjs:document:update", "ydoc:document:update",
{ {
"document_id": document_id, "document_id": document_id,
"user_id": user_id, "user_id": user_id,
@ -490,7 +489,7 @@ async def yjs_document_update(sid, data):
log.error(f"Error in yjs_document_update: {e}") log.error(f"Error in yjs_document_update: {e}")
@sio.on("yjs:document:leave") @sio.on("ydoc:document:leave")
async def yjs_document_leave(sid, data): async def yjs_document_leave(sid, data):
"""Handle user leaving a document""" """Handle user leaving a document"""
try: try:
@ -507,7 +506,7 @@ async def yjs_document_leave(sid, data):
# Notify other users # Notify other users
await sio.emit( await sio.emit(
"yjs:user:left", "ydoc:user:left",
{"document_id": document_id, "user_id": user_id}, {"document_id": document_id, "user_id": user_id},
room=f"doc_{document_id}", room=f"doc_{document_id}",
) )
@ -521,7 +520,7 @@ async def yjs_document_leave(sid, data):
log.error(f"Error in yjs_document_leave: {e}") log.error(f"Error in yjs_document_leave: {e}")
@sio.on("yjs:awareness:update") @sio.on("ydoc:awareness:update")
async def yjs_awareness_update(sid, data): async def yjs_awareness_update(sid, data):
"""Handle awareness updates (cursors, selections, etc.)""" """Handle awareness updates (cursors, selections, etc.)"""
try: try:
@ -531,7 +530,7 @@ async def yjs_awareness_update(sid, data):
# Broadcast awareness update to all other users in the document # Broadcast awareness update to all other users in the document
await sio.emit( await sio.emit(
"yjs:awareness:update", "ydoc:awareness:update",
{"document_id": document_id, "user_id": user_id, "update": update}, {"document_id": document_id, "user_id": user_id, "update": update},
room=f"doc_{document_id}", room=f"doc_{document_id}",
skip_sid=sid, skip_sid=sid,

View file

@ -177,7 +177,7 @@
joinDocument() { joinDocument() {
const userColor = this.generateUserColor(); const userColor = this.generateUserColor();
this.socket.emit('yjs:document:join', { this.socket.emit('ydoc:document:join', {
document_id: this.documentId, document_id: this.documentId,
user_id: this.user?.id, user_id: this.user?.id,
user_name: this.user?.name, user_name: this.user?.name,
@ -196,7 +196,7 @@
setupEventListeners() { setupEventListeners() {
// Listen for document updates from server // Listen for document updates from server
this.socket.on('yjs:document:update', (data) => { this.socket.on('ydoc:document:update', (data) => {
if (data.document_id === this.documentId && data.socket_id !== this.socket.id) { if (data.document_id === this.documentId && data.socket_id !== this.socket.id) {
try { try {
const update = new Uint8Array(data.update); const update = new Uint8Array(data.update);
@ -208,7 +208,7 @@
}); });
// Listen for document state from server // Listen for document state from server
this.socket.on('yjs:document:state', async (data) => { this.socket.on('ydoc:document:state', async (data) => {
if (data.document_id === this.documentId) { if (data.document_id === this.documentId) {
try { try {
if (data.state) { if (data.state) {
@ -219,13 +219,13 @@
// check if editor empty as well // check if editor empty as well
const isEmptyEditor = !editor || editor.getText().trim() === ''; const isEmptyEditor = !editor || editor.getText().trim() === '';
if (content && isEmptyEditor) { if (content && isEmptyEditor) {
const pydoc = prosemirrorJSONToYDoc(editor.schema, content); const editorYdoc = prosemirrorJSONToYDoc(editor.schema, content);
if (pydoc) { if (editorYdoc) {
Y.applyUpdate(this.doc, Y.encodeStateAsUpdate(pydoc)); Y.applyUpdate(this.doc, Y.encodeStateAsUpdate(editorYdoc));
} }
} }
} else { } else {
Y.applyUpdate(this.doc, state); Y.applyUpdate(this.doc, state, 'server');
} }
} }
this.synced = true; this.synced = true;
@ -236,7 +236,7 @@
}); });
// Listen for awareness updates // Listen for awareness updates
this.socket.on('yjs:awareness:update', (data) => { this.socket.on('ydoc:awareness:update', (data) => {
if (data.document_id === this.documentId && awareness) { if (data.document_id === this.documentId && awareness) {
try { try {
const awarenessUpdate = new Uint8Array(data.update); const awarenessUpdate = new Uint8Array(data.update);
@ -255,7 +255,7 @@
this.doc.on('update', async (update, origin) => { this.doc.on('update', async (update, origin) => {
if (origin !== 'server' && this.isConnected) { if (origin !== 'server' && this.isConnected) {
await tick(); // Ensure the DOM is updated before sending await tick(); // Ensure the DOM is updated before sending
this.socket.emit('yjs:document:update', { this.socket.emit('ydoc:document:update', {
document_id: this.documentId, document_id: this.documentId,
user_id: this.user?.id, user_id: this.user?.id,
socket_id: this.socket.id, socket_id: this.socket.id,
@ -277,7 +277,7 @@
if (origin !== 'server' && this.isConnected) { if (origin !== 'server' && this.isConnected) {
const changedClients = added.concat(updated).concat(removed); const changedClients = added.concat(updated).concat(removed);
const awarenessUpdate = awareness.encodeUpdate(changedClients); const awarenessUpdate = awareness.encodeUpdate(changedClients);
this.socket.emit('yjs:awareness:update', { this.socket.emit('ydoc:awareness:update', {
document_id: this.documentId, document_id: this.documentId,
user_id: this.socket.id, user_id: this.socket.id,
update: Array.from(awarenessUpdate) update: Array.from(awarenessUpdate)
@ -303,14 +303,14 @@
}; };
destroy() { destroy() {
this.socket.off('yjs:document:update'); this.socket.off('ydoc:document:update');
this.socket.off('yjs:document:state'); this.socket.off('ydoc:document:state');
this.socket.off('yjs:awareness:update'); this.socket.off('ydoc:awareness:update');
this.socket.off('connect', this.onConnect); this.socket.off('connect', this.onConnect);
this.socket.off('disconnect', this.onDisconnect); this.socket.off('disconnect', this.onDisconnect);
if (this.isConnected) { if (this.isConnected) {
this.socket.emit('yjs:document:leave', { this.socket.emit('ydoc:document:leave', {
document_id: this.documentId, document_id: this.documentId,
user_id: this.user?.id user_id: this.user?.id
}); });