diff --git a/backend/open_webui/migrations/versions/2f1211949ecc_update_message_and_channel_member_table.py b/backend/open_webui/migrations/versions/2f1211949ecc_update_message_and_channel_member_table.py new file mode 100644 index 0000000000..2d72583ebe --- /dev/null +++ b/backend/open_webui/migrations/versions/2f1211949ecc_update_message_and_channel_member_table.py @@ -0,0 +1,103 @@ +"""Update messages and channel member table + +Revision ID: 2f1211949ecc +Revises: 37f288994c47 +Create Date: 2025-11-27 03:07:56.200231 + +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +import open_webui.internal.db + + +# revision identifiers, used by Alembic. +revision: str = "2f1211949ecc" +down_revision: Union[str, None] = "37f288994c47" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # New columns to be added to channel_member table + op.add_column("channel_member", sa.Column("status", sa.Text(), nullable=True)) + op.add_column( + "channel_member", + sa.Column( + "is_active", + sa.Boolean(), + nullable=False, + default=True, + server_default=sa.sql.expression.true(), + ), + ) + + op.add_column( + "channel_member", + sa.Column( + "is_channel_muted", + sa.Boolean(), + nullable=False, + default=False, + server_default=sa.sql.expression.false(), + ), + ) + op.add_column( + "channel_member", + sa.Column( + "is_channel_pinned", + sa.Boolean(), + nullable=False, + default=False, + server_default=sa.sql.expression.false(), + ), + ) + + op.add_column("channel_member", sa.Column("data", sa.JSON(), nullable=True)) + op.add_column("channel_member", sa.Column("meta", sa.JSON(), nullable=True)) + + op.add_column( + "channel_member", sa.Column("joined_at", sa.BigInteger(), nullable=False) + ) + op.add_column( + "channel_member", sa.Column("left_at", sa.BigInteger(), nullable=True) + ) + + op.add_column( + "channel_member", sa.Column("last_read_at", sa.BigInteger(), nullable=True) + ) + + op.add_column( + "channel_member", sa.Column("updated_at", sa.BigInteger(), nullable=True) + ) + + # New columns to be added to message table + op.add_column( + "message", + sa.Column( + "is_pinned", + sa.Boolean(), + nullable=False, + default=False, + server_default=sa.sql.expression.false(), + ), + ) + op.add_column("message", sa.Column("pinned_at", sa.BigInteger(), nullable=True)) + op.add_column("message", sa.Column("pinned_by", sa.Text(), nullable=True)) + + +def downgrade() -> None: + op.drop_column("channel_member", "updated_at") + op.drop_column("channel_member", "last_read_at") + + op.drop_column("channel_member", "meta") + op.drop_column("channel_member", "data") + + op.drop_column("channel_member", "is_channel_pinned") + op.drop_column("channel_member", "is_channel_muted") + + op.drop_column("message", "pinned_by") + op.drop_column("message", "pinned_at") + op.drop_column("message", "is_pinned") diff --git a/backend/open_webui/models/channels.py b/backend/open_webui/models/channels.py index 5f4d1436d9..325ce10143 100644 --- a/backend/open_webui/models/channels.py +++ b/backend/open_webui/models/channels.py @@ -48,8 +48,58 @@ class ChannelModel(BaseModel): meta: Optional[dict] = None access_control: Optional[dict] = None - created_at: int # timestamp in epoch - updated_at: int # timestamp in epoch + created_at: int # timestamp in epoch (time_ns) + updated_at: int # timestamp in epoch (time_ns) + + +class ChannelMember(Base): + __tablename__ = "channel_member" + + id = Column(Text, primary_key=True, unique=True) + channel_id = Column(Text, nullable=False) + user_id = Column(Text, nullable=False) + + status = Column(Text, nullable=True) + is_active = Column(Boolean, nullable=False, default=True) + + is_channel_muted = Column(Boolean, nullable=False, default=False) + is_channel_pinned = Column(Boolean, nullable=False, default=False) + + data = Column(JSON, nullable=True) + meta = Column(JSON, nullable=True) + + joined_at = Column(BigInteger) + left_at = Column(BigInteger, nullable=True) + + last_read_at = Column(BigInteger, nullable=True) + + created_at = Column(BigInteger) + updated_at = Column(BigInteger) + + +class ChannelMemberModel(BaseModel): + model_config = ConfigDict(from_attributes=True) + + id: str + channel_id: str + user_id: str + + status: Optional[str] = None + is_active: bool = True + + is_channel_muted: bool = False + is_channel_pinned: bool = False + + data: Optional[dict] = None + meta: Optional[dict] = None + + joined_at: Optional[int] = None # timestamp in epoch (time_ns) + left_at: Optional[int] = None # timestamp in epoch (time_ns) + + last_read_at: Optional[int] = None # timestamp in epoch (time_ns) + + created_at: Optional[int] = None # timestamp in epoch (time_ns) + updated_at: Optional[int] = None # timestamp in epoch (time_ns) #################### @@ -109,6 +159,131 @@ class ChannelTable: or has_access(user_id, permission, channel.access_control) ] + def join_channel( + self, channel_id: str, user_id: str + ) -> Optional[ChannelMemberModel]: + with get_db() as db: + # Check if the membership already exists + existing_membership = ( + db.query(ChannelMember) + .filter( + ChannelMember.channel_id == channel_id, + ChannelMember.user_id == user_id, + ) + .first() + ) + if existing_membership: + return ChannelMemberModel.model_validate(existing_membership) + + # Create new membership + channel_member = ChannelMemberModel( + **{ + "id": str(uuid.uuid4()), + "channel_id": channel_id, + "user_id": user_id, + "status": "joined", + "is_active": True, + "is_channel_muted": False, + "is_channel_pinned": False, + "joined_at": int(time.time_ns()), + "left_at": None, + "last_read_at": int(time.time_ns()), + "created_at": int(time.time_ns()), + "updated_at": int(time.time_ns()), + } + ) + new_membership = ChannelMember(**channel_member.model_dump()) + + db.add(new_membership) + db.commit() + return channel_member + + def leave_channel(self, channel_id: str, user_id: str) -> bool: + with get_db() as db: + membership = ( + db.query(ChannelMember) + .filter( + ChannelMember.channel_id == channel_id, + ChannelMember.user_id == user_id, + ) + .first() + ) + if not membership: + return False + + membership.status = "left" + membership.is_active = False + membership.left_at = int(time.time_ns()) + membership.updated_at = int(time.time_ns()) + + db.commit() + return True + + def get_member_by_channel_and_user_id( + self, channel_id: str, user_id: str + ) -> Optional[ChannelMemberModel]: + with get_db() as db: + membership = ( + db.query(ChannelMember) + .filter( + ChannelMember.channel_id == channel_id, + ChannelMember.user_id == user_id, + ) + .first() + ) + return ChannelMemberModel.model_validate(membership) if membership else None + + def pin_channel(self, channel_id: str, user_id: str, is_pinned: bool) -> bool: + with get_db() as db: + membership = ( + db.query(ChannelMember) + .filter( + ChannelMember.channel_id == channel_id, + ChannelMember.user_id == user_id, + ) + .first() + ) + if not membership: + return False + + membership.is_channel_pinned = is_pinned + membership.updated_at = int(time.time_ns()) + + db.commit() + return True + + def update_member_last_read_at(self, channel_id: str, user_id: str) -> bool: + with get_db() as db: + membership = ( + db.query(ChannelMember) + .filter( + ChannelMember.channel_id == channel_id, + ChannelMember.user_id == user_id, + ) + .first() + ) + if not membership: + return False + + membership.last_read_at = int(time.time_ns()) + membership.updated_at = int(time.time_ns()) + + db.commit() + return True + + def is_user_channel_member(self, channel_id: str, user_id: str) -> bool: + with get_db() as db: + membership = ( + db.query(ChannelMember) + .filter( + ChannelMember.channel_id == channel_id, + ChannelMember.user_id == user_id, + ChannelMember.is_active == True, + ) + .first() + ) + return membership is not None + def get_channel_by_id(self, id: str) -> Optional[ChannelModel]: with get_db() as db: channel = db.query(Channel).filter(Channel.id == id).first() diff --git a/backend/open_webui/models/messages.py b/backend/open_webui/models/messages.py index 6aaf09ca46..1094035fd5 100644 --- a/backend/open_webui/models/messages.py +++ b/backend/open_webui/models/messages.py @@ -6,6 +6,7 @@ from typing import Optional from open_webui.internal.db import Base, get_db from open_webui.models.tags import TagModel, Tag, Tags from open_webui.models.users import Users, UserNameResponse +from open_webui.models.channels import Channels, ChannelMember from pydantic import BaseModel, ConfigDict @@ -47,6 +48,11 @@ class Message(Base): reply_to_id = Column(Text, nullable=True) parent_id = Column(Text, nullable=True) + # Pins + is_pinned = Column(Boolean, nullable=False, default=False) + pinned_at = Column(BigInteger, nullable=True) + pinned_by = Column(Text, nullable=True) + content = Column(Text) data = Column(JSON, nullable=True) meta = Column(JSON, nullable=True) @@ -65,12 +71,17 @@ class MessageModel(BaseModel): reply_to_id: Optional[str] = None parent_id: Optional[str] = None + # Pins + is_pinned: bool = False + pinned_by: Optional[str] = None + pinned_at: Optional[int] = None # timestamp in epoch (time_ns) + content: str data: Optional[dict] = None meta: Optional[dict] = None - created_at: int # timestamp in epoch - updated_at: int # timestamp in epoch + created_at: int # timestamp in epoch (time_ns) + updated_at: int # timestamp in epoch (time_ns) #################### @@ -111,9 +122,11 @@ class MessageTable: self, form_data: MessageForm, channel_id: str, user_id: str ) -> Optional[MessageModel]: with get_db() as db: - id = str(uuid.uuid4()) + channel_member = Channels.join_channel(channel_id, user_id) + id = str(uuid.uuid4()) ts = int(time.time_ns()) + message = MessageModel( **{ "id": id, @@ -121,6 +134,9 @@ class MessageTable: "channel_id": channel_id, "reply_to_id": form_data.reply_to_id, "parent_id": form_data.parent_id, + "is_pinned": False, + "pinned_at": None, + "pinned_by": None, "content": form_data.content, "data": form_data.data, "meta": form_data.meta, @@ -128,8 +144,8 @@ class MessageTable: "updated_at": ts, } ) - result = Message(**message.model_dump()) + db.add(result) db.commit() db.refresh(result) @@ -280,6 +296,16 @@ class MessageTable: ) return messages + def get_last_message_by_channel_id(self, channel_id: str) -> Optional[MessageModel]: + with get_db() as db: + message = ( + db.query(Message) + .filter_by(channel_id=channel_id) + .order_by(Message.created_at.desc()) + .first() + ) + return MessageModel.model_validate(message) if message else None + def update_message_by_id( self, id: str, form_data: MessageForm ) -> Optional[MessageModel]: @@ -299,6 +325,32 @@ class MessageTable: db.refresh(message) return MessageModel.model_validate(message) if message else None + def update_message_pin_by_id( + self, id: str, is_pinned: bool, pinned_by: Optional[str] = None + ) -> Optional[MessageModel]: + with get_db() as db: + message = db.get(Message, id) + message.is_pinned = is_pinned + message.pinned_at = int(time.time_ns()) if is_pinned else None + message.pinned_by = pinned_by if is_pinned else None + message.updated_at = int(time.time_ns()) + db.commit() + db.refresh(message) + return MessageModel.model_validate(message) if message else None + + def get_unread_message_count( + self, channel_id: str, user_id: str, last_read_at: Optional[int] = None + ) -> int: + with get_db() as db: + query = db.query(Message).filter( + Message.channel_id == channel_id, + Message.parent_id == None, # only count top-level messages + Message.created_at > (last_read_at if last_read_at else 0), + ) + if user_id: + query = query.filter(Message.user_id != user_id) + return query.count() + def add_reaction_to_message( self, id: str, user_id: str, name: str ) -> Optional[MessageReactionModel]: diff --git a/backend/open_webui/routers/channels.py b/backend/open_webui/routers/channels.py index e47c98554e..e720648a7d 100644 --- a/backend/open_webui/routers/channels.py +++ b/backend/open_webui/routers/channels.py @@ -65,9 +65,35 @@ router = APIRouter() ############################ -@router.get("/", response_model=list[ChannelModel]) +class ChannelListItemResponse(ChannelModel): + last_message_at: Optional[int] = None # timestamp in epoch (time_ns) + unread_count: int = 0 + + +@router.get("/", response_model=list[ChannelListItemResponse]) async def get_channels(user=Depends(get_verified_user)): - return Channels.get_channels_by_user_id(user.id) + + channels = Channels.get_channels_by_user_id(user.id) + + channel_list = [] + for channel in channels: + last_message = Messages.get_last_message_by_channel_id(channel.id) + last_message_at = last_message.created_at if last_message else None + + channel_member = Channels.get_member_by_channel_and_user_id(channel.id, user.id) + unread_count = Messages.get_unread_message_count( + channel.id, user.id, channel_member.last_read_at if channel_member else None + ) + + channel_list.append( + ChannelListItemResponse( + **channel.model_dump(), + last_message_at=last_message_at, + unread_count=unread_count, + ) + ) + + return channel_list @router.get("/list", response_model=list[ChannelModel]) @@ -259,6 +285,10 @@ async def get_channel_messages( status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT() ) + channel_member = Channels.join_channel( + id, user.id + ) # Ensure user is a member of the channel + message_list = Messages.get_messages_by_channel_id(id, skip, limit) users = {} @@ -297,7 +327,9 @@ async def send_notification(name, webui_url, channel, message, active_user_ids): users = get_users_with_access("read", channel.access_control) for user in users: - if user.id not in active_user_ids: + if (user.id not in active_user_ids) and Channels.is_user_channel_member( + channel.id, user.id + ): if user.settings: webhook_url = user.settings.ui.get("notifications", {}).get( "webhook_url", None diff --git a/backend/open_webui/socket/main.py b/backend/open_webui/socket/main.py index bbfbfa2703..04b67dd786 100644 --- a/backend/open_webui/socket/main.py +++ b/backend/open_webui/socket/main.py @@ -408,6 +408,11 @@ async def channel_events(sid, data): event_data = data["data"] event_type = event_data["type"] + user = SESSION_POOL.get(sid) + + if not user: + return + if event_type == "typing": await sio.emit( "events:channel", @@ -415,10 +420,12 @@ async def channel_events(sid, data): "channel_id": data["channel_id"], "message_id": data.get("message_id", None), "data": event_data, - "user": UserNameResponse(**SESSION_POOL[sid]).model_dump(), + "user": UserNameResponse(**user).model_dump(), }, room=room, ) + elif event_type == "last_read_at": + Channels.update_member_last_read_at(data["channel_id"], user["id"]) @sio.on("ydoc:document:join") diff --git a/src/lib/components/channel/Channel.svelte b/src/lib/components/channel/Channel.svelte index 40ce02f3fe..419870937a 100644 --- a/src/lib/components/channel/Channel.svelte +++ b/src/lib/components/channel/Channel.svelte @@ -18,6 +18,8 @@ export let id = ''; + let currentId = null; + let scrollEnd = true; let messagesContainerElement = null; let chatInputElement = null; @@ -43,7 +45,24 @@ } }; + const updateLastReadAt = async (channelId) => { + $socket?.emit('events:channel', { + channel_id: channelId, + message_id: null, + data: { + type: 'last_read_at' + } + }); + }; + const initHandler = async () => { + if (currentId) { + updateLastReadAt(currentId); + } + + currentId = id; + updateLastReadAt(id); + top = false; messages = null; channel = null; @@ -170,6 +189,8 @@ } } }); + + updateLastReadAt(id); }; let mediaQuery; @@ -197,6 +218,8 @@ }); onDestroy(() => { + // last read at + updateLastReadAt(id); $socket?.off('events:channel', channelEventHandler); }); diff --git a/src/lib/components/layout/Sidebar/ChannelItem.svelte b/src/lib/components/layout/Sidebar/ChannelItem.svelte index 4ea884cf9d..09b8fcfaf6 100644 --- a/src/lib/components/layout/Sidebar/ChannelItem.svelte +++ b/src/lib/components/layout/Sidebar/ChannelItem.svelte @@ -4,7 +4,7 @@ const i18n = getContext('i18n'); import { page } from '$app/stores'; - import { mobile, showSidebar, user } from '$lib/stores'; + import { channels, mobile, showSidebar, user } from '$lib/stores'; import { updateChannelById } from '$lib/apis/channels'; import Cog6 from '$lib/components/icons/Cog6.svelte'; @@ -49,13 +49,27 @@ class=" w-full {className} rounded-xl flex relative group hover:bg-gray-100 dark:hover:bg-gray-900 {$page .url.pathname === `/channels/${channel.id}` ? 'bg-gray-100 dark:bg-gray-900 selected' - : ''} px-2.5 py-1" + : ''} px-2.5 py-1 {channel?.unread_count > 0 + ? 'font-medium dark:text-white text-black' + : ' dark:text-gray-400 text-gray-600'} cursor-pointer select-none" > { console.log(channel); + + if ($channels) { + channels.set( + $channels.map((ch) => { + if (ch.id === channel.id) { + ch.unread_count = 0; + } + return ch; + }) + ); + } + if ($mobile) { showSidebar.set(false); } @@ -75,19 +89,32 @@ {channel.name} - - {#if $user?.role === 'admin'} - diff --git a/src/routes/+layout.svelte b/src/routes/+layout.svelte index 9408604da6..5c1080cfea 100644 --- a/src/routes/+layout.svelte +++ b/src/routes/+layout.svelte @@ -28,7 +28,8 @@ isApp, appInfo, toolServers, - playingNotificationSound + playingNotificationSound, + channels } from '$lib/stores'; import { goto } from '$app/navigation'; import { page } from '$app/stores'; @@ -483,6 +484,23 @@ const type = event?.data?.type ?? null; const data = event?.data?.data ?? null; + if ($channels) { + channels.set( + $channels.map((ch) => { + if (ch.id === event.channel_id) { + if (type === 'message') { + return { + ...ch, + unread_count: (ch.unread_count ?? 0) + 1, + last_message_at: event.created_at + }; + } + } + return ch; + }) + ); + } + if (type === 'message') { if ($isLastActiveTab) { if ($settings?.notificationEnabled ?? false) {