from __future__ import annotations import json import re from dataclasses import dataclass from datetime import datetime from typing import Any, Iterable, Mapping, cast from html.parser import HTMLParser @dataclass(slots=True) class ChatThread: id: str title: str messages: list[dict[str, Any]] class ChatImporterError(Exception): pass def detect_threads(data: Any) -> list[ChatThread]: """ 尝试使用多种解析器解析导入的 JSON 数据。 支持: 1. Google AI Studio (Single / Backup) 2. OpenAI Conversation (ChatGPT export) 3. Chatbox 4. Cherry Studio 5. DeepSeek """ for parser in ( _try_google_single, _try_google_backup, _try_openai_conversation, _try_chatbox_export, _try_cherry_studio, _try_deepseek_export, ): try: threads = parser(data) if threads: return threads except Exception: # 忽略解析错误,尝试下一个解析器 continue return [] # ============================================================================= # 1. Google AI Studio # ============================================================================= def _try_google_single(data: Any) -> list[ChatThread]: if not isinstance(data, Mapping) or "chunkedPrompt" not in data: return [] chunks = data.get("chunkedPrompt", {}).get("chunks", []) or [] if not chunks: return [] title = "Google AI Studio 对话" if chunks and chunks[0].get("branchChildren"): # 尝试提取标题 display_name = chunks[0]["branchChildren"][0].get("displayName", "") match = re.search(r"Branch of (.*)", display_name or "") if match: title = match.group(1).strip() messages: list[dict[str, Any]] = [] for idx, chunk in enumerate(chunks, start=1): if chunk.get("isThought"): continue text = chunk.get("text", "") if not text and "parts" in chunk: text = "".join(p.get("text", "") for p in chunk.get("parts", []) if "text" in p) if not text.strip(): continue messages.append( { "id": f"google-single-{idx}", "role": chunk.get("role", "assistant"), "content": text.strip(), "timestamp": chunk.get("timestamp"), "metadata": {}, } ) if not messages: return [] return [ChatThread(id="google-single", title=title, messages=messages)] def _try_google_backup(data: Any) -> list[ChatThread]: if not isinstance(data, Mapping) or "conversations" not in data: return [] conversations = data.get("conversations", []) or [] threads: list[ChatThread] = [] for idx, conversation in enumerate(conversations, start=1): title = conversation.get("name") or f"备份对话_{idx}" messages_data = conversation.get("messages", []) or [] messages: list[dict[str, Any]] = [] for midx, msg in enumerate(messages_data, start=1): role = msg.get("author", {}).get("role", "user") content = msg.get("content", "") if not content: continue messages.append( { "id": f"google-backup-{idx}-{midx}", "role": role, "content": content.strip(), "timestamp": msg.get("create_time"), "metadata": {}, } ) if messages: conv_id = conversation.get("id") or f"google-backup-{idx}" threads.append(ChatThread(id=str(conv_id), title=title, messages=messages)) return threads # ============================================================================= # 2. OpenAI Conversation # ============================================================================= def _try_openai_conversation(data: Any) -> list[ChatThread]: # 简单的特征检测:list 且第一项有 mapping if not isinstance(data, list) or not data: return [] first = data[0] if not isinstance(first, Mapping) or "mapping" not in first: return [] # 区分 DeepSeek: DeepSeek 的 mapping node message fragments # OpenAI 的 mapping node message content parts # 我们可以通过检查 fragment 关键字来排除 DeepSeek,或者让 DeepSeek 解析器先跑 # 但这里我们可以通过结构特征更细致地判断 threads: list[ChatThread] = [] for idx, conversation in enumerate(data, start=1): mapping = conversation.get("mapping") or {} if not mapping: continue # 检查是否包含 fragments (DeepSeek 特征),如果包含则跳过,交给 DeepSeek 解析器 # 取一个节点样本 sample_node = next(iter(mapping.values()), {}) if sample_node.get("message", {}).get("fragments"): return [] # 这是一个 DeepSeek 文件,不是 OpenAI title = conversation.get("title") or f"对话_{idx}" conv_id = conversation.get("id") or conversation.get("conversation_id") or f"conv_{idx}" # 这里的逻辑简化为:取最长分支(main branch) branch = _reconstruct_longest_branch(mapping, conversation.get("current_node")) if not branch: continue messages: list[dict[str, Any]] = [] for node_id in branch: node = mapping.get(node_id) or {} message = node.get("message") if not message: continue metadata = message.get("metadata") or {} if metadata.get("is_visually_hidden_from_conversation"): continue text = _extract_openai_text(message.get("content")) if not text.strip(): continue messages.append( { "id": message.get("id", node_id), "role": message.get("author", {}).get("role", "user"), "content": text.strip(), "timestamp": _normalize_timestamp(message.get("create_time")), "metadata": metadata, } ) if messages: threads.append(ChatThread(id=str(conv_id), title=title, messages=messages)) return threads def _extract_openai_text(content: Mapping[str, Any] | None) -> str: if not content: return "" ctype = content.get("content_type") if ctype == "text": parts = content.get("parts") or [] return "\n".join(str(part) for part in parts if part) if ctype == "multimodal_text": texts: list[str] = [] for part in content.get("parts") or []: if isinstance(part, str): texts.append(part) elif isinstance(part, Mapping) and part.get("type") == "text": texts.append(part.get("text", "")) return "\n".join(texts) if ctype == "user_editable_context": return content.get("user_instructions", "") if isinstance(content, str): return content return json.dumps(content, ensure_ascii=False) # ============================================================================= # 3. Chatbox # ============================================================================= def _try_chatbox_export(data: Any) -> list[ChatThread]: if not isinstance(data, Mapping): return [] # 特征:chat-sessions-list if "chat-sessions-list" not in data: return [] sessions_meta = { item.get("id"): item for item in data.get("chat-sessions-list", []) or [] if isinstance(item, Mapping) } threads: list[ChatThread] = [] for key, value in data.items(): if not isinstance(key, str) or not key.startswith("session:"): continue session_id = key.split("session:", 1)[-1] session_payload = value if isinstance(value, Mapping) else {} messages_data = session_payload.get("messages", []) or [] if not messages_data: continue title = session_payload.get("name") or sessions_meta.get(session_id, {}).get("name") if not title: idx = len(threads) + 1 title = f"Chatbox 对话_{idx}" messages: list[dict[str, Any]] = [] for midx, message in enumerate(messages_data, start=1): role = message.get("role") or "user" content_parts = message.get("contentParts") or [] text = _extract_chatbox_content(content_parts) if not text.strip(): continue messages.append( { "id": message.get("id") or f"{session_id}-{midx}", "role": role, "content": text.strip(), "timestamp": _normalize_timestamp(message.get("timestamp")), "metadata": { "session": title, "session_id": session_id, "source": "chatbox", }, } ) if messages: threads.append(ChatThread(id=session_id or f"chatbox-{len(threads)+1}", title=title, messages=messages)) return threads def _extract_chatbox_content(parts: Iterable[Mapping[str, Any]]) -> str: fragments: list[str] = [] for part in parts or []: if not isinstance(part, Mapping): fragments.append(str(part)) continue ptype = part.get("type") if ptype == "text": fragments.append(str(part.get("text", ""))) elif ptype in {"image_url", "image"} and part.get("url"): fragments.append(f"[image: {part['url']}]") elif "text" in part: fragments.append(str(part["text"])) else: fragments.append(json.dumps(part, ensure_ascii=False)) return "\n\n".join(fragment for fragment in fragments if fragment) # ============================================================================= # 4. Cherry Studio # ============================================================================= def _try_cherry_studio(data: Any) -> list[ChatThread]: """解析 Cherry Studio 导出 (localStorage JSON dump)""" if not isinstance(data, Mapping): return [] if "localStorage" not in data: return [] cherry_str = data["localStorage"].get("persist:cherry-studio") if not cherry_str: return [] try: cherry = json.loads(cherry_str) except json.JSONDecodeError: return [] if not isinstance(cherry, dict): return [] assistants = cherry.get("assistants", []) if isinstance(assistants, str): # 有时候 persist 的值是 JSON string 里的 JSON string? 不,通常是 JSON object try: assistants = json.loads(assistants) except: pass # Cherry Studio structure check: top level keys often strings that need parsing? # In chat-exporter: cherry = json.loads(cherry_str) # assistants = cherry.get("assistants", []) # But in redux-persist, values might be strings. Let's assume standard structure or parsed. # If `assistants` is a string, parse it. if isinstance(assistants, str): try: assistants = json.loads(assistants) except: return [] if not isinstance(assistants, list): return [] threads: list[ChatThread] = [] for assistant in assistants: topics = assistant.get("topics", []) if not topics: continue model_info = assistant.get("model", {}) model_name = model_info.get("name") if isinstance(model_info, dict) else "Unknown" for topic in topics: topic_name = topic.get("name", "未命名话题") topic_msgs = topic.get("messages", []) if not topic_msgs: continue messages: list[dict[str, Any]] = [] for idx, msg in enumerate(topic_msgs, start=1): content = msg.get("content", "") if not content: continue role = msg.get("role", "user") messages.append({ "id": msg.get("id") or f"cherry-{idx}", "role": role, "content": content.strip(), "timestamp": _normalize_timestamp(msg.get("createdAt")), "metadata": { "model": model_name } }) if messages: tid = topic.get("id") or f"cherry-{len(threads)}" # Combine assistant name/model with topic for clarity full_title = f"[Cherry] {topic_name}" threads.append(ChatThread(id=str(tid), title=full_title, messages=messages)) return threads # ============================================================================= # 5. DeepSeek # ============================================================================= def _try_deepseek_export(data: Any) -> list[ChatThread]: """解析 DeepSeek 导出文件""" if not isinstance(data, list) or not data: return [] # 特征检测:item 包含 mapping 且 字符串形式包含 fragments first = data[0] if not isinstance(first, Mapping) or "mapping" not in first: return [] # 简单的 heuristic: 检查 mapping 里的 message 是否有 fragments is_deepseek = False mapping_sample = first.get("mapping", {}) for node in mapping_sample.values(): if node and node.get("message", {}).get("fragments"): is_deepseek = True break if not is_deepseek: return [] threads: list[ChatThread] = [] for idx, conversation in enumerate(data, start=1): title = conversation.get("title", f"DeepSeek对话_{idx}") mapping = conversation.get("mapping", {}) if not mapping: continue # 寻找叶子节点 all_nodes = set(mapping.keys()) parent_nodes = {node.get("parent") for node in mapping.values() if node} leaf_nodes = all_nodes - parent_nodes - {"root"} if not leaf_nodes: continue # 提取最长分支 (Main Branch) # 如果需要所有分支,可以遍历 leaf_nodes # 这里为了简化 UI,默认提取最长分支 longest_branch_ids: list[str] = [] # Helper to reconstruct path def reconstruct_path(leaf_id: str) -> list[str]: path = [] cur = leaf_id while cur and cur != "root": if cur not in mapping: break path.insert(0, cur) cur = mapping[cur].get("parent", "root") return path branches = [reconstruct_path(leaf) for leaf in leaf_nodes] if not branches: continue longest_branch_ids = max(branches, key=len) messages: list[dict[str, Any]] = [] for node_id in longest_branch_ids: node = mapping.get(node_id) or {} msg_obj = node.get("message", {}) if not msg_obj: continue # DeepSeek specific content extraction fragments = msg_obj.get("fragments", []) content_buf = [] role = "user" # default # Check type from fragments or fallback to author # Fragments typically define content parts for frag in fragments: text = frag.get("content", "") frag_type = frag.get("type") if frag_type == "REQUEST": role = "user" elif frag_type == "RESPONSE": role = "assistant" if text: content_buf.append(text) full_text = "".join(content_buf) if not full_text.strip(): continue # If role wasn't determined by fragments (rare), try author if not fragments: role = msg_obj.get("author", {}).get("role", "user") full_text = msg_obj.get("content", "") messages.append({ "id": node_id, "role": role, "content": full_text.strip(), "timestamp": _normalize_timestamp(conversation.get("inserted_at")), # DeepSeek conv often has one time? "metadata": {} }) if messages: conv_id = conversation.get("id") or f"deepseek-{idx}" threads.append(ChatThread(id=str(conv_id), title=f"[DeepSeek] {title}", messages=messages)) return threads # ============================================================================= # Common Helpers # ============================================================================= def _reconstruct_longest_branch(mapping: Mapping[str, Any], leaf_id: str | None) -> list[str]: """Generic branch reconstruction, preferring provided leaf_id or longest path""" if not leaf_id: # choose longest path from all leaves nodes = set(mapping.keys()) parents = {node.get("parent") for node in mapping.values() if node} leaf_nodes = nodes - parents - {"root"} longest: list[str] = [] def get_path(leaf: str) -> list[str]: path = [] cur = leaf visited = set() while cur and cur not in visited and cur != "root": node = mapping.get(cur) if not node: break path.append(cur) visited.add(cur) cur = node.get("parent") path.reverse() return path for leaf in leaf_nodes: branch = get_path(leaf) if len(branch) > len(longest): longest = branch return longest # Reconstruct from specific leaf path: list[str] = [] visited: set[str] = set() cur = leaf_id while cur and cur not in visited and cur != "root": node = mapping.get(cur) if not node: break path.append(cur) visited.add(cur) cur = node.get("parent") path.reverse() return path def _normalize_timestamp(value: Any) -> str: if value is None: return "" if isinstance(value, (int, float)): try: if value > 1e12: # milliseconds dt = datetime.fromtimestamp(value / 1000) else: dt = datetime.fromtimestamp(value) return dt.isoformat(sep=" ", timespec="minutes") except Exception: return str(value) if isinstance(value, str): try: dt = datetime.fromisoformat(value.replace("Z", "+00:00")) return dt.isoformat(sep=" ", timespec="minutes") except Exception: return str(value) return str(value) # ============================================================================= # Text Import # ============================================================================= def parse_text_transcript(text: str) -> list[ChatThread]: """ 极简文本解析。 仅基于用户指定的强分隔符进行切分。 """ if not text.strip(): return [] lines = text.strip().split('\n') messages: list[dict[str, Any]] = [] # 用户指定的强分隔符 # 1. [你说] # 2. 你说: # 3. ChatGPT说: # 4. ChatGPT 说: markers = [ (r"\[你说\]", "user"), (r"你说[::]", "user"), (r"ChatGPT\s*说[::]?", "assistant"), # 匹配 "ChatGPT说" 或 "ChatGPT说:" (r"AI\s*说[::]?", "assistant"), # 预防性添加 AI 说 ] # 保留基础的 User/AI 冒号格式,但必须行首 markers.extend([ (r"^User[::]", "user"), (r"^AI[::]", "assistant"), ]) for line in lines: line_str = line.strip() if not line_str: continue # 匹配角色 matched_role = None for pattern, role in markers: if re.search(pattern, line_str, re.IGNORECASE): matched_role = role break if matched_role: # 是 Header 行 # 尝试提取冒号后的内容 content = "" parts = re.split(r"[::]", line_str, 1) if len(parts) > 1: content = parts[1].strip() messages.append({ "id": f"text-{len(messages)+1}", "role": matched_role, "content": content, "timestamp": None }) else: # 是内容行 if messages: if messages[-1]["content"]: messages[-1]["content"] += "\n" + line else: messages[-1]["content"] = line else: # 第一行就是内容,默认 User messages.append({ "id": "text-1", "role": "user", "content": line, "timestamp": None }) if not messages: return [] msg_count = len(messages) preview = messages[0]["content"][:10].replace("\n", " ") if messages else "" title = f"文本导入 ({msg_count}条) - {preview}..." return [ChatThread(id="pasted-text", title=title, messages=messages)] # ============================================================================= # HTML Import # ============================================================================= class ChatGPTHTMLParser(HTMLParser): VOID_ELEMENTS = { 'area', 'base', 'br', 'col', 'embed', 'hr', 'img', 'input', 'link', 'meta', 'param', 'source', 'track', 'wbr' } # 移除 meta 和 link,因为它们是 void elements,不会触发 handle_endtag, # 导致 ignore_level 无法复位,从而错误忽略后续所有内容。 IGNORE_TAGS = {'button', 'svg', 'style', 'script', 'head', 'title'} def __init__(self): super().__init__() self.messages = [] self.current_msg = None self.recording = False self.depth = 0 self.turn_start_depth = 0 self.ignore_level = 0 def handle_starttag(self, tag, attrs): if tag in self.IGNORE_TAGS: self.ignore_level += 1 # Void elements do not increase depth because they don't have end tags if tag not in self.VOID_ELEMENTS: self.depth += 1 attrs_dict = dict(attrs) # 检测 Turn 容器 (data-testid="conversation-turn-X") if 'data-testid' in attrs_dict and attrs_dict['data-testid'].startswith('conversation-turn-'): # 只要遇到新的 Turn,强制保存上一个(防止 depth 误判导致未保存) if self.current_msg: self._save_current() # 默认为 user,如果后面找到了 role 标签会更新 self.current_msg = {"role": "user", "content": []} self.recording = True self.turn_start_depth = self.depth return # 在录制状态下,寻找角色标识来修正角色 if self.recording and 'data-message-author-role' in attrs_dict: self.current_msg["role"] = attrs_dict['data-message-author-role'] def handle_endtag(self, tag): if tag in self.IGNORE_TAGS: if self.ignore_level > 0: self.ignore_level -= 1 # 处理换行 (块级元素结束时) if self.recording and self.ignore_level == 0: if tag in ('p', 'div', 'br', 'li', 'tr', 'pre', 'blockquote', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6'): self.current_msg["content"].append("\n") if tag not in self.VOID_ELEMENTS: # 闭合 Turn 容器 if self.recording and self.depth == self.turn_start_depth: self._save_current() self.depth -= 1 def handle_data(self, data): if self.recording and self.ignore_level == 0: # 保留原始内容,不 strip,避免行内元素粘连 self.current_msg["content"].append(data) def close(self): super().close() # 确保最后一条消息被保存 if self.current_msg: self._save_current() def _save_current(self): if self.current_msg: full_text = "".join(self.current_msg["content"]).strip() # 简单的空白字符清理 full_text = re.sub(r'\n{3,}', '\n\n', full_text) if full_text: self.messages.append({ "role": self.current_msg["role"], "content": full_text }) self.current_msg = None self.recording = False def parse_html_transcript(html_content: str) -> list[ChatThread]: """解析 ChatGPT HTML 导出文件""" parser = ChatGPTHTMLParser() parser.feed(html_content) if not parser.messages: return [] # 构造 ChatThread title = "HTML 导入对话" title_match = re.search(r"