Replace vector search with rapidfuzz fuzzy matching for similar issues

This commit is contained in:
MaxData 2025-11-15 11:23:58 -05:00
parent 6d22618756
commit d9ecd2a8a5
6 changed files with 890 additions and 672 deletions

2
.gitignore vendored
View file

@ -13,3 +13,5 @@ build/
docs/.cache/ docs/.cache/
.qodo .qodo
poetry.lock poetry.lock
.pr_agent.toml
.claude/skills/SETUP_COMPLETE.md

View file

@ -1,29 +1,12 @@
from starlette_context import context from starlette_context import context
from pr_agent.config_loader import get_settings from pr_agent.config_loader import get_settings
from pr_agent.git_providers.azuredevops_provider import AzureDevopsProvider
from pr_agent.git_providers.bitbucket_provider import BitbucketProvider
from pr_agent.git_providers.bitbucket_server_provider import \
BitbucketServerProvider
from pr_agent.git_providers.codecommit_provider import CodeCommitProvider
from pr_agent.git_providers.gerrit_provider import GerritProvider
from pr_agent.git_providers.git_provider import GitProvider from pr_agent.git_providers.git_provider import GitProvider
from pr_agent.git_providers.gitea_provider import GiteaProvider
from pr_agent.git_providers.github_provider import GithubProvider from pr_agent.git_providers.github_provider import GithubProvider
from pr_agent.git_providers.gitlab_provider import GitLabProvider
from pr_agent.git_providers.local_git_provider import LocalGitProvider
from pr_agent.git_providers.gitea_provider import GiteaProvider
# Only GitHub provider - other providers removed
_GIT_PROVIDERS = { _GIT_PROVIDERS = {
'github': GithubProvider, 'github': GithubProvider,
'gitlab': GitLabProvider,
'bitbucket': BitbucketProvider,
'bitbucket_server': BitbucketServerProvider,
'azure': AzureDevopsProvider,
'codecommit': CodeCommitProvider,
'local': LocalGitProvider,
'gerrit': GerritProvider,
'gitea': GiteaProvider
} }

View file

@ -343,9 +343,9 @@ service_callback = []
[pr_similar_issue] [pr_similar_issue]
skip_comments = false skip_comments = false
force_update_dataset = false
max_issues_to_scan = 500 max_issues_to_scan = 500
vectordb = "pinecone" # options: "pinecone", "lancedb", "qdrant" number_of_similar_issues = 5 # Number of similar issues to return
min_similarity_score = 60 # Minimum fuzzy match score (0-100) to consider an issue similar
[pr_find_similar_component] [pr_find_similar_component]
class_name = "" class_name = ""
@ -355,18 +355,8 @@ allow_fallback_less_words = true
number_of_keywords = 5 number_of_keywords = 5
number_of_results = 5 number_of_results = 5
[pinecone] # Vector database configuration removed - now using rapidfuzz fuzzy matching
# fill and place in .secrets.toml # See [pr_similar_issue] section for fuzzy matching configuration
#api_key = ...
# environment = "gcp-starter"
[lancedb]
uri = "./lancedb"
[qdrant]
# fill and place credentials in .secrets.toml
# url = "https://YOUR-QDRANT-URL"
# api_key = "..."
[best_practices] [best_practices]
content = "" content = ""

View file

@ -1,692 +1,247 @@
"""
PR Similar Issue Finder - Simplified with Fuzzy Matching
Uses rapidfuzz for fast, local fuzzy text matching instead of vector embeddings.
No external APIs or databases required.
"""
import time import time
from enum import Enum from typing import List, Tuple, Dict
from typing import List from rapidfuzz import fuzz, process
import openai
from pydantic import BaseModel, Field
from pr_agent.algo import MAX_TOKENS
from pr_agent.algo.token_handler import TokenHandler
from pr_agent.algo.utils import get_max_tokens
from pr_agent.config_loader import get_settings from pr_agent.config_loader import get_settings
from pr_agent.git_providers import get_git_provider from pr_agent.git_providers import get_git_provider
from pr_agent.log import get_logger from pr_agent.log import get_logger
MODEL = "text-embedding-ada-002"
class PRSimilarIssue: class PRSimilarIssue:
def __init__(self, issue_url: str, ai_handler, args: list = None): """
Find similar issues using fuzzy text matching.
Replaces vector-based search (Pinecone/LanceDB/Qdrant + OpenAI embeddings)
with simple, fast fuzzy matching using rapidfuzz.
"""
def __init__(self, issue_url: str, ai_handler=None, args: list = None):
"""Initialize the similar issue finder."""
if get_settings().config.git_provider != "github": if get_settings().config.git_provider != "github":
raise Exception("Only github is supported for similar issue tool") raise Exception("Only github is supported for similar issue tool")
self.cli_mode = get_settings().CONFIG.CLI_MODE self.cli_mode = get_settings().CONFIG.CLI_MODE
self.max_issues_to_scan = get_settings().pr_similar_issue.max_issues_to_scan self.max_issues_to_scan = get_settings().pr_similar_issue.max_issues_to_scan
self.number_of_similar_issues = get_settings().pr_similar_issue.get(
'number_of_similar_issues', 5
)
self.min_similarity_score = get_settings().pr_similar_issue.get(
'min_similarity_score', 60
)
self.skip_comments = get_settings().pr_similar_issue.get(
'skip_comments', False
)
self.issue_url = issue_url self.issue_url = issue_url
self.git_provider = get_git_provider()() self.git_provider = get_git_provider()()
repo_name, issue_number = self.git_provider._parse_issue_url(issue_url.split('=')[-1])
# Parse issue URL
repo_name, issue_number = self.git_provider._parse_issue_url(
issue_url.split('=')[-1]
)
self.git_provider.repo = repo_name self.git_provider.repo = repo_name
self.git_provider.repo_obj = self.git_provider.github_client.get_repo(repo_name) self.git_provider.repo_obj = self.git_provider.github_client.get_repo(repo_name)
self.token_handler = TokenHandler() self.query_issue_number = issue_number
repo_obj = self.git_provider.repo_obj
repo_name_for_index = self.repo_name_for_index = repo_obj.full_name.lower().replace('/', '-').replace('_/', '-')
index_name = self.index_name = "codium-ai-pr-agent-issues"
if get_settings().pr_similar_issue.vectordb == "pinecone": # In-memory cache for issues
try: self.issues_cache: Dict[int, Dict[str, str]] = {}
import pandas as pd
import pinecone
from pinecone_datasets import Dataset, DatasetMetadata
except:
raise Exception("Please install 'pinecone' and 'pinecone_datasets' to use pinecone as vectordb")
# assuming pinecone api key and environment are set in secrets file
try:
api_key = get_settings().pinecone.api_key
environment = get_settings().pinecone.environment
except Exception:
if not self.cli_mode:
repo_name, original_issue_number = self.git_provider._parse_issue_url(self.issue_url.split('=')[-1])
issue_main = self.git_provider.repo_obj.get_issue(original_issue_number)
issue_main.create_comment("Please set pinecone api key and environment in secrets file")
raise Exception("Please set pinecone api key and environment in secrets file")
# check if index exists, and if repo is already indexed
run_from_scratch = False
if run_from_scratch: # for debugging
pinecone.init(api_key=api_key, environment=environment)
if index_name in pinecone.list_indexes():
get_logger().info('Removing index...')
pinecone.delete_index(index_name)
get_logger().info('Done')
upsert = True
pinecone.init(api_key=api_key, environment=environment)
if not index_name in pinecone.list_indexes():
run_from_scratch = True
upsert = False
else:
if get_settings().pr_similar_issue.force_update_dataset:
upsert = True
else:
pinecone_index = pinecone.Index(index_name=index_name)
res = pinecone_index.fetch([f"example_issue_{repo_name_for_index}"]).to_dict()
if res["vectors"]:
upsert = False
if run_from_scratch or upsert: # index the entire repo
get_logger().info('Indexing the entire repo...')
get_logger().info('Getting issues...')
issues = list(repo_obj.get_issues(state='all'))
get_logger().info('Done')
self._update_index_with_issues(issues, repo_name_for_index, upsert=upsert)
else: # update index if needed
pinecone_index = pinecone.Index(index_name=index_name)
issues_to_update = []
issues_paginated_list = repo_obj.get_issues(state='all')
counter = 1
for issue in issues_paginated_list:
if issue.pull_request:
continue
issue_str, comments, number = self._process_issue(issue)
issue_key = f"issue_{number}"
id = issue_key + "." + "issue"
res = pinecone_index.fetch([id]).to_dict()
is_new_issue = True
for vector in res["vectors"].values():
if vector['metadata']['repo'] == repo_name_for_index:
is_new_issue = False
break
if is_new_issue:
counter += 1
issues_to_update.append(issue)
else:
break
if issues_to_update:
get_logger().info(f'Updating index with {counter} new issues...')
self._update_index_with_issues(issues_to_update, repo_name_for_index, upsert=True)
else:
get_logger().info('No new issues to update')
elif get_settings().pr_similar_issue.vectordb == "lancedb":
try:
import lancedb # import lancedb only if needed
except:
raise Exception("Please install lancedb to use lancedb as vectordb")
self.db = lancedb.connect(get_settings().lancedb.uri)
self.table = None
run_from_scratch = False
if run_from_scratch: # for debugging
if index_name in self.db.table_names():
get_logger().info('Removing Table...')
self.db.drop_table(index_name)
get_logger().info('Done')
ingest = True
if index_name not in self.db.table_names():
run_from_scratch = True
ingest = False
else:
if get_settings().pr_similar_issue.force_update_dataset:
ingest = True
else:
self.table = self.db[index_name]
res = self.table.search().limit(len(self.table)).where(f"id='example_issue_{repo_name_for_index}'").to_list()
get_logger().info("result: ", res)
if res[0].get("vector"):
ingest = False
if run_from_scratch or ingest: # indexing the entire repo
get_logger().info('Indexing the entire repo...')
get_logger().info('Getting issues...')
issues = list(repo_obj.get_issues(state='all'))
get_logger().info('Done')
self._update_table_with_issues(issues, repo_name_for_index, ingest=ingest)
else: # update table if needed
issues_to_update = []
issues_paginated_list = repo_obj.get_issues(state='all')
counter = 1
for issue in issues_paginated_list:
if issue.pull_request:
continue
issue_str, comments, number = self._process_issue(issue)
issue_key = f"issue_{number}"
issue_id = issue_key + "." + "issue"
res = self.table.search().limit(len(self.table)).where(f"id='{issue_id}'").to_list()
is_new_issue = True
for r in res:
if r['metadata']['repo'] == repo_name_for_index:
is_new_issue = False
break
if is_new_issue:
counter += 1
issues_to_update.append(issue)
else:
break
if issues_to_update:
get_logger().info(f'Updating index with {counter} new issues...')
self._update_table_with_issues(issues_to_update, repo_name_for_index, ingest=True)
else:
get_logger().info('No new issues to update')
elif get_settings().pr_similar_issue.vectordb == "qdrant":
try:
import qdrant_client
from qdrant_client.models import (Distance, FieldCondition,
Filter, MatchValue,
PointStruct, VectorParams)
except Exception:
raise Exception("Please install qdrant-client to use qdrant as vectordb")
api_key = None
url = None
try:
api_key = get_settings().qdrant.api_key
url = get_settings().qdrant.url
except Exception:
if not self.cli_mode:
repo_name, original_issue_number = self.git_provider._parse_issue_url(self.issue_url.split('=')[-1])
issue_main = self.git_provider.repo_obj.get_issue(original_issue_number)
issue_main.create_comment("Please set qdrant url and api key in secrets file")
raise Exception("Please set qdrant url and api key in secrets file")
self.qdrant = qdrant_client.QdrantClient(url=url, api_key=api_key)
run_from_scratch = False
ingest = True
if not self.qdrant.collection_exists(collection_name=self.index_name):
run_from_scratch = True
ingest = False
self.qdrant.create_collection(
collection_name=self.index_name,
vectors_config=VectorParams(size=1536, distance=Distance.COSINE),
)
else:
if get_settings().pr_similar_issue.force_update_dataset:
ingest = True
else:
response = self.qdrant.count(
collection_name=self.index_name,
count_filter=Filter(must=[
FieldCondition(key="metadata.repo", match=MatchValue(value=repo_name_for_index)),
FieldCondition(key="id", match=MatchValue(value=f"example_issue_{repo_name_for_index}")),
]),
)
ingest = True if response.count == 0 else False
if run_from_scratch or ingest:
get_logger().info('Indexing the entire repo...')
get_logger().info('Getting issues...')
issues = list(repo_obj.get_issues(state='all'))
get_logger().info('Done')
self._update_qdrant_with_issues(issues, repo_name_for_index, ingest=ingest)
else:
issues_to_update = []
issues_paginated_list = repo_obj.get_issues(state='all')
counter = 1
for issue in issues_paginated_list:
if issue.pull_request:
continue
issue_str, comments, number = self._process_issue(issue)
issue_key = f"issue_{number}"
point_id = issue_key + "." + "issue"
response = self.qdrant.count(
collection_name=self.index_name,
count_filter=Filter(must=[
FieldCondition(key="id", match=MatchValue(value=point_id)),
FieldCondition(key="metadata.repo", match=MatchValue(value=repo_name_for_index)),
]),
)
if response.count == 0:
counter += 1
issues_to_update.append(issue)
else:
break
if issues_to_update:
get_logger().info(f'Updating index with {counter} new issues...')
self._update_qdrant_with_issues(issues_to_update, repo_name_for_index, ingest=True)
else:
get_logger().info('No new issues to update')
get_logger().info(f"Initialized PRSimilarIssue for {repo_name} issue #{issue_number}")
async def run(self): async def run(self):
get_logger().info('Getting issue...') """Main execution method - find and post similar issues."""
repo_name, original_issue_number = self.git_provider._parse_issue_url(self.issue_url.split('=')[-1])
issue_main = self.git_provider.repo_obj.get_issue(original_issue_number)
issue_str, comments, number = self._process_issue(issue_main)
openai.api_key = get_settings().openai.key
get_logger().info('Done')
get_logger().info('Querying...')
res = openai.Embedding.create(input=[issue_str], engine=MODEL)
embeds = [record['embedding'] for record in res['data']]
relevant_issues_number_list = []
relevant_comment_number_list = []
score_list = []
if get_settings().pr_similar_issue.vectordb == "pinecone":
pinecone_index = pinecone.Index(index_name=self.index_name)
res = pinecone_index.query(embeds[0],
top_k=5,
filter={"repo": self.repo_name_for_index},
include_metadata=True).to_dict()
for r in res['matches']:
# skip example issue
if 'example_issue_' in r["id"]:
continue
try: try:
issue_number = int(r["id"].split('.')[0].split('_')[-1]) get_logger().info("Starting similar issue search...")
except:
get_logger().debug(f"Failed to parse issue number from {r['id']}")
continue
if original_issue_number == issue_number: # 1. Fetch all issues from GitHub
continue get_logger().info("Fetching issues from GitHub...")
if issue_number not in relevant_issues_number_list: repo_obj = self.git_provider.repo_obj
relevant_issues_number_list.append(issue_number) issues_list = list(repo_obj.get_issues(state='all'))
if 'comment' in r["id"]: get_logger().info(f"Found {len(issues_list)} total issues")
relevant_comment_number_list.append(int(r["id"].split('.')[1].split('_')[-1]))
else:
relevant_comment_number_list.append(-1)
score_list.append(str("{:.2f}".format(r['score'])))
get_logger().info('Done')
elif get_settings().pr_similar_issue.vectordb == "lancedb": # 2. Index issues in memory
res = self.table.search(embeds[0]).where(f"metadata.repo='{self.repo_name_for_index}'", prefilter=True).to_list() get_logger().info("Indexing issues...")
self._index_issues(issues_list)
for r in res: # 3. Get query issue details
# skip example issue query_issue = repo_obj.get_issue(self.query_issue_number)
if 'example_issue_' in r["id"]: query_title = query_issue.title
continue query_body = query_issue.body or ""
try: get_logger().info(f"Query issue: {query_title}")
issue_number = int(r["id"].split('.')[0].split('_')[-1])
except:
get_logger().debug(f"Failed to parse issue number from {r['id']}")
continue
if original_issue_number == issue_number: # 4. Find similar issues using fuzzy matching
continue get_logger().info("Finding similar issues...")
if issue_number not in relevant_issues_number_list: similar_issues = self._find_similar(
relevant_issues_number_list.append(issue_number) query_title=query_title,
query_body=query_body,
if 'comment' in r["id"]: skip_issue_number=self.query_issue_number,
relevant_comment_number_list.append(int(r["id"].split('.')[1].split('_')[-1])) top_k=self.number_of_similar_issues
else:
relevant_comment_number_list.append(-1)
score_list.append(str("{:.2f}".format(1-r['_distance'])))
get_logger().info('Done')
elif get_settings().pr_similar_issue.vectordb == "qdrant":
from qdrant_client.models import FieldCondition, Filter, MatchValue
res = self.qdrant.search(
collection_name=self.index_name,
query_vector=embeds[0],
limit=5,
query_filter=Filter(must=[FieldCondition(key="metadata.repo", match=MatchValue(value=self.repo_name_for_index))]),
with_payload=True,
) )
for r in res: # 5. Post results
rid = r.payload.get("id", "") if similar_issues:
if 'example_issue_' in rid: get_logger().info(f"Found {len(similar_issues)} similar issues")
continue self._post_results(query_issue, similar_issues)
try:
issue_number = int(rid.split('.')[0].split('_')[-1])
except Exception:
get_logger().debug(f"Failed to parse issue number from {rid}")
continue
if original_issue_number == issue_number:
continue
if issue_number not in relevant_issues_number_list:
relevant_issues_number_list.append(issue_number)
if 'comment' in rid:
relevant_comment_number_list.append(int(rid.split('.')[1].split('_')[-1]))
else: else:
relevant_comment_number_list.append(-1) get_logger().info("No similar issues found above threshold")
score_list.append(str("{:.2f}".format(r.score))) if not get_settings().pr_similar_issue.get('skip_comments', False):
get_logger().info('Done') query_issue.create_comment("No similar issues found.")
get_logger().info('Publishing response...') return similar_issues
similar_issues_str = "### Similar Issues\n___\n\n"
for i, issue_number_similar in enumerate(relevant_issues_number_list): except Exception as e:
issue = self.git_provider.repo_obj.get_issue(issue_number_similar) get_logger().error(f"Error in PRSimilarIssue.run(): {e}")
title = issue.title
url = issue.html_url
if relevant_comment_number_list[i] != -1:
url = list(issue.get_comments())[relevant_comment_number_list[i]].html_url
similar_issues_str += f"{i + 1}. **[{title}]({url})** (score={score_list[i]})\n\n"
if get_settings().config.publish_output:
response = issue_main.create_comment(similar_issues_str)
get_logger().info(similar_issues_str)
get_logger().info('Done')
def _process_issue(self, issue):
header = issue.title
body = issue.body
number = issue.number
if get_settings().pr_similar_issue.skip_comments:
comments = []
else:
comments = list(issue.get_comments())
issue_str = f"Issue Header: \"{header}\"\n\nIssue Body:\n{body}"
return issue_str, comments, number
def _update_index_with_issues(self, issues_list, repo_name_for_index, upsert=False):
get_logger().info('Processing issues...')
corpus = Corpus()
example_issue_record = Record(
id=f"example_issue_{repo_name_for_index}",
text="example_issue",
metadata=Metadata(repo=repo_name_for_index)
)
corpus.append(example_issue_record)
counter = 0
for issue in issues_list:
if issue.pull_request:
continue
counter += 1
if counter % 100 == 0:
get_logger().info(f"Scanned {counter} issues")
if counter >= self.max_issues_to_scan:
get_logger().info(f"Scanned {self.max_issues_to_scan} issues, stopping")
break
issue_str, comments, number = self._process_issue(issue)
issue_key = f"issue_{number}"
username = issue.user.login
created_at = str(issue.created_at)
if len(issue_str) < 8000 or \
self.token_handler.count_tokens(issue_str) < get_max_tokens(MODEL): # fast reject first
issue_record = Record(
id=issue_key + "." + "issue",
text=issue_str,
metadata=Metadata(repo=repo_name_for_index,
username=username,
created_at=created_at,
level=IssueLevel.ISSUE)
)
corpus.append(issue_record)
if comments:
for j, comment in enumerate(comments):
comment_body = comment.body
num_words_comment = len(comment_body.split())
if num_words_comment < 10 or not isinstance(comment_body, str):
continue
if len(comment_body) < 8000 or \
self.token_handler.count_tokens(comment_body) < MAX_TOKENS[MODEL]:
comment_record = Record(
id=issue_key + ".comment_" + str(j + 1),
text=comment_body,
metadata=Metadata(repo=repo_name_for_index,
username=username, # use issue username for all comments
created_at=created_at,
level=IssueLevel.COMMENT)
)
corpus.append(comment_record)
df = pd.DataFrame(corpus.dict()["documents"])
get_logger().info('Done')
get_logger().info('Embedding...')
openai.api_key = get_settings().openai.key
list_to_encode = list(df["text"].values)
try:
res = openai.Embedding.create(input=list_to_encode, engine=MODEL)
embeds = [record['embedding'] for record in res['data']]
except:
embeds = []
get_logger().error('Failed to embed entire list, embedding one by one...')
for i, text in enumerate(list_to_encode):
try:
res = openai.Embedding.create(input=[text], engine=MODEL)
embeds.append(res['data'][0]['embedding'])
except:
embeds.append([0] * 1536)
df["values"] = embeds
meta = DatasetMetadata.empty()
meta.dense_model.dimension = len(embeds[0])
ds = Dataset.from_pandas(df, meta)
get_logger().info('Done')
api_key = get_settings().pinecone.api_key
environment = get_settings().pinecone.environment
if not upsert:
get_logger().info('Creating index from scratch...')
ds.to_pinecone_index(self.index_name, api_key=api_key, environment=environment)
time.sleep(15) # wait for pinecone to finalize indexing before querying
else:
get_logger().info('Upserting index...')
namespace = ""
batch_size: int = 100
concurrency: int = 10
pinecone.init(api_key=api_key, environment=environment)
ds._upsert_to_index(self.index_name, namespace, batch_size, concurrency)
time.sleep(5) # wait for pinecone to finalize upserting before querying
get_logger().info('Done')
def _update_table_with_issues(self, issues_list, repo_name_for_index, ingest=False):
get_logger().info('Processing issues...')
corpus = Corpus()
example_issue_record = Record(
id=f"example_issue_{repo_name_for_index}",
text="example_issue",
metadata=Metadata(repo=repo_name_for_index)
)
corpus.append(example_issue_record)
counter = 0
for issue in issues_list:
if issue.pull_request:
continue
counter += 1
if counter % 100 == 0:
get_logger().info(f"Scanned {counter} issues")
if counter >= self.max_issues_to_scan:
get_logger().info(f"Scanned {self.max_issues_to_scan} issues, stopping")
break
issue_str, comments, number = self._process_issue(issue)
issue_key = f"issue_{number}"
username = issue.user.login
created_at = str(issue.created_at)
if len(issue_str) < 8000 or \
self.token_handler.count_tokens(issue_str) < get_max_tokens(MODEL): # fast reject first
issue_record = Record(
id=issue_key + "." + "issue",
text=issue_str,
metadata=Metadata(repo=repo_name_for_index,
username=username,
created_at=created_at,
level=IssueLevel.ISSUE)
)
corpus.append(issue_record)
if comments:
for j, comment in enumerate(comments):
comment_body = comment.body
num_words_comment = len(comment_body.split())
if num_words_comment < 10 or not isinstance(comment_body, str):
continue
if len(comment_body) < 8000 or \
self.token_handler.count_tokens(comment_body) < MAX_TOKENS[MODEL]:
comment_record = Record(
id=issue_key + ".comment_" + str(j + 1),
text=comment_body,
metadata=Metadata(repo=repo_name_for_index,
username=username, # use issue username for all comments
created_at=created_at,
level=IssueLevel.COMMENT)
)
corpus.append(comment_record)
df = pd.DataFrame(corpus.dict()["documents"])
get_logger().info('Done')
get_logger().info('Embedding...')
openai.api_key = get_settings().openai.key
list_to_encode = list(df["text"].values)
try:
res = openai.Embedding.create(input=list_to_encode, engine=MODEL)
embeds = [record['embedding'] for record in res['data']]
except:
embeds = []
get_logger().error('Failed to embed entire list, embedding one by one...')
for i, text in enumerate(list_to_encode):
try:
res = openai.Embedding.create(input=[text], engine=MODEL)
embeds.append(res['data'][0]['embedding'])
except:
embeds.append([0] * 1536)
df["vector"] = embeds
get_logger().info('Done')
if not ingest:
get_logger().info('Creating table from scratch...')
self.table = self.db.create_table(self.index_name, data=df, mode="overwrite")
time.sleep(15)
else:
get_logger().info('Ingesting in Table...')
if self.index_name not in self.db.table_names():
self.table.add(df)
else:
get_logger().info(f"Table {self.index_name} doesn't exists!")
time.sleep(5)
get_logger().info('Done')
def _update_qdrant_with_issues(self, issues_list, repo_name_for_index, ingest=False):
try:
import uuid
import pandas as pd
from qdrant_client.models import PointStruct
except Exception:
raise raise
get_logger().info('Processing issues...') def _index_issues(self, issues_list: List) -> None:
corpus = Corpus() """
example_issue_record = Record( Index issues in memory for fast searching.
id=f"example_issue_{repo_name_for_index}",
text="example_issue",
metadata=Metadata(repo=repo_name_for_index)
)
corpus.append(example_issue_record)
Args:
issues_list: List of GitHub issue objects
"""
counter = 0 counter = 0
for issue in issues_list: for issue in issues_list:
# Skip pull requests
if issue.pull_request: if issue.pull_request:
continue continue
counter += 1 counter += 1
if counter % 100 == 0:
get_logger().info(f"Scanned {counter} issues")
if counter >= self.max_issues_to_scan: if counter >= self.max_issues_to_scan:
get_logger().info(f"Scanned {self.max_issues_to_scan} issues, stopping") get_logger().info(f"Reached max issues to scan: {self.max_issues_to_scan}")
break break
issue_str, comments, number = self._process_issue(issue) # Extract issue content
issue_key = f"issue_{number}" title = issue.title
username = issue.user.login body = issue.body or ""
created_at = str(issue.created_at)
if len(issue_str) < 8000 or \ # Optionally include comments
self.token_handler.count_tokens(issue_str) < get_max_tokens(MODEL): comments_text = ""
issue_record = Record( if not self.skip_comments:
id=issue_key + "." + "issue", try:
text=issue_str, comments = list(issue.get_comments())
metadata=Metadata(repo=repo_name_for_index, comments_text = " ".join([c.body for c in comments if c.body])
username=username, except:
created_at=created_at, pass # Comments not critical
level=IssueLevel.ISSUE)
) # Store in cache
corpus.append(issue_record) self.issues_cache[issue.number] = {
if comments: 'title': title,
for j, comment in enumerate(comments): 'body': body,
comment_body = comment.body 'comments': comments_text,
num_words_comment = len(comment_body.split()) 'url': issue.html_url,
if num_words_comment < 10 or not isinstance(comment_body, str): 'state': issue.state,
}
get_logger().info(f"Indexed {len(self.issues_cache)} issues")
def _find_similar(
self,
query_title: str,
query_body: str,
skip_issue_number: int = None,
top_k: int = 5
) -> List[Tuple[float, int, str, str]]:
"""
Find similar issues using fuzzy text matching.
Args:
query_title: Title of query issue
query_body: Body of query issue
skip_issue_number: Issue number to skip (the query issue itself)
top_k: Number of similar issues to return
Returns:
List of tuples: (score, issue_number, title, url)
"""
# Build query string (weight title more by repeating it)
query_text = f"{query_title} {query_title} {query_body}"
# Prepare choices for fuzzy matching
choices = {}
for issue_num, issue_data in self.issues_cache.items():
# Skip the query issue itself
if skip_issue_number and issue_num == skip_issue_number:
continue continue
if len(comment_body) < 8000 or \ # Build issue text (weight title 2x)
self.token_handler.count_tokens(comment_body) < MAX_TOKENS[MODEL]: issue_text = (
comment_record = Record( f"{issue_data['title']} {issue_data['title']} "
id=issue_key + ".comment_" + str(j + 1), f"{issue_data['body']} {issue_data['comments']}"
text=comment_body,
metadata=Metadata(repo=repo_name_for_index,
username=username,
created_at=created_at,
level=IssueLevel.COMMENT)
) )
corpus.append(comment_record) choices[issue_num] = issue_text
df = pd.DataFrame(corpus.dict()["documents"]) if not choices:
get_logger().info('Done') get_logger().warning("No issues available for comparison")
return []
get_logger().info('Embedding...') # Use rapidfuzz for fuzzy matching
openai.api_key = get_settings().openai.key # token_sort_ratio: handles word order differences well
list_to_encode = list(df["text"].values) results = process.extract(
try: query_text,
res = openai.Embedding.create(input=list_to_encode, engine=MODEL) choices,
embeds = [record['embedding'] for record in res['data']] scorer=fuzz.token_sort_ratio,
except Exception: limit=top_k * 2, # Get extra in case we need to filter
embeds = []
get_logger().error('Failed to embed entire list, embedding one by one...')
for i, text in enumerate(list_to_encode):
try:
res = openai.Embedding.create(input=[text], engine=MODEL)
embeds.append(res['data'][0]['embedding'])
except Exception:
embeds.append([0] * 1536)
df["vector"] = embeds
get_logger().info('Done')
get_logger().info('Upserting into Qdrant...')
points = []
for row in df.to_dict(orient="records"):
points.append(
PointStruct(id=uuid.uuid5(uuid.NAMESPACE_DNS, row["id"]).hex, vector=row["vector"], payload={"id": row["id"], "text": row["text"], "metadata": row["metadata"]})
) )
self.qdrant.upsert(collection_name=self.index_name, points=points)
get_logger().info('Done')
# Filter by minimum score and format results
similar_issues = []
for matched_text, score, issue_num in results:
if score >= self.min_similarity_score:
issue_data = self.issues_cache[issue_num]
similar_issues.append((
score,
issue_num,
issue_data['title'],
issue_data['url']
))
class IssueLevel(str, Enum): # Stop once we have enough results
ISSUE = "issue" if len(similar_issues) >= top_k:
COMMENT = "comment" break
return similar_issues
class Metadata(BaseModel): def _post_results(
repo: str self,
username: str = Field(default="@codium") query_issue,
created_at: str = Field(default="01-01-1970 00:00:00.00000") similar_issues: List[Tuple[float, int, str, str]]
level: IssueLevel = Field(default=IssueLevel.ISSUE) ) -> None:
"""
Post similar issues as a comment.
class Config: Args:
use_enum_values = True query_issue: GitHub issue object to comment on
similar_issues: List of (score, number, title, url) tuples
"""
# Build comment
comment_lines = ["### Similar Issues\n___\n"]
for i, (score, number, title, url) in enumerate(similar_issues, 1):
# Format score as percentage
score_pct = f"{score:.1f}%"
comment_lines.append(
f"{i}. **[{title}]({url})** (similarity: {score_pct})\n"
)
class Record(BaseModel): similar_issues_str = "\n".join(comment_lines)
id: str
text: str
metadata: Metadata
# Post comment (unless skip_comments is True)
if not get_settings().pr_similar_issue.get('skip_comments', False):
try:
query_issue.create_comment(similar_issues_str)
get_logger().info("Posted similar issues comment")
except Exception as e:
get_logger().error(f"Failed to post comment: {e}")
class Corpus(BaseModel): # Always log results
documents: List[Record] = Field(default=[]) get_logger().info(f"\n{similar_issues_str}")
def append(self, r: Record):
self.documents.append(r)

View file

@ -0,0 +1,692 @@
import time
from enum import Enum
from typing import List
import openai
from pydantic import BaseModel, Field
from pr_agent.algo import MAX_TOKENS
from pr_agent.algo.token_handler import TokenHandler
from pr_agent.algo.utils import get_max_tokens
from pr_agent.config_loader import get_settings
from pr_agent.git_providers import get_git_provider
from pr_agent.log import get_logger
MODEL = "text-embedding-ada-002"
class PRSimilarIssue:
def __init__(self, issue_url: str, ai_handler, args: list = None):
if get_settings().config.git_provider != "github":
raise Exception("Only github is supported for similar issue tool")
self.cli_mode = get_settings().CONFIG.CLI_MODE
self.max_issues_to_scan = get_settings().pr_similar_issue.max_issues_to_scan
self.issue_url = issue_url
self.git_provider = get_git_provider()()
repo_name, issue_number = self.git_provider._parse_issue_url(issue_url.split('=')[-1])
self.git_provider.repo = repo_name
self.git_provider.repo_obj = self.git_provider.github_client.get_repo(repo_name)
self.token_handler = TokenHandler()
repo_obj = self.git_provider.repo_obj
repo_name_for_index = self.repo_name_for_index = repo_obj.full_name.lower().replace('/', '-').replace('_/', '-')
index_name = self.index_name = "codium-ai-pr-agent-issues"
if get_settings().pr_similar_issue.vectordb == "pinecone":
try:
import pandas as pd
import pinecone
from pinecone_datasets import Dataset, DatasetMetadata
except:
raise Exception("Please install 'pinecone' and 'pinecone_datasets' to use pinecone as vectordb")
# assuming pinecone api key and environment are set in secrets file
try:
api_key = get_settings().pinecone.api_key
environment = get_settings().pinecone.environment
except Exception:
if not self.cli_mode:
repo_name, original_issue_number = self.git_provider._parse_issue_url(self.issue_url.split('=')[-1])
issue_main = self.git_provider.repo_obj.get_issue(original_issue_number)
issue_main.create_comment("Please set pinecone api key and environment in secrets file")
raise Exception("Please set pinecone api key and environment in secrets file")
# check if index exists, and if repo is already indexed
run_from_scratch = False
if run_from_scratch: # for debugging
pinecone.init(api_key=api_key, environment=environment)
if index_name in pinecone.list_indexes():
get_logger().info('Removing index...')
pinecone.delete_index(index_name)
get_logger().info('Done')
upsert = True
pinecone.init(api_key=api_key, environment=environment)
if not index_name in pinecone.list_indexes():
run_from_scratch = True
upsert = False
else:
if get_settings().pr_similar_issue.force_update_dataset:
upsert = True
else:
pinecone_index = pinecone.Index(index_name=index_name)
res = pinecone_index.fetch([f"example_issue_{repo_name_for_index}"]).to_dict()
if res["vectors"]:
upsert = False
if run_from_scratch or upsert: # index the entire repo
get_logger().info('Indexing the entire repo...')
get_logger().info('Getting issues...')
issues = list(repo_obj.get_issues(state='all'))
get_logger().info('Done')
self._update_index_with_issues(issues, repo_name_for_index, upsert=upsert)
else: # update index if needed
pinecone_index = pinecone.Index(index_name=index_name)
issues_to_update = []
issues_paginated_list = repo_obj.get_issues(state='all')
counter = 1
for issue in issues_paginated_list:
if issue.pull_request:
continue
issue_str, comments, number = self._process_issue(issue)
issue_key = f"issue_{number}"
id = issue_key + "." + "issue"
res = pinecone_index.fetch([id]).to_dict()
is_new_issue = True
for vector in res["vectors"].values():
if vector['metadata']['repo'] == repo_name_for_index:
is_new_issue = False
break
if is_new_issue:
counter += 1
issues_to_update.append(issue)
else:
break
if issues_to_update:
get_logger().info(f'Updating index with {counter} new issues...')
self._update_index_with_issues(issues_to_update, repo_name_for_index, upsert=True)
else:
get_logger().info('No new issues to update')
elif get_settings().pr_similar_issue.vectordb == "lancedb":
try:
import lancedb # import lancedb only if needed
except:
raise Exception("Please install lancedb to use lancedb as vectordb")
self.db = lancedb.connect(get_settings().lancedb.uri)
self.table = None
run_from_scratch = False
if run_from_scratch: # for debugging
if index_name in self.db.table_names():
get_logger().info('Removing Table...')
self.db.drop_table(index_name)
get_logger().info('Done')
ingest = True
if index_name not in self.db.table_names():
run_from_scratch = True
ingest = False
else:
if get_settings().pr_similar_issue.force_update_dataset:
ingest = True
else:
self.table = self.db[index_name]
res = self.table.search().limit(len(self.table)).where(f"id='example_issue_{repo_name_for_index}'").to_list()
get_logger().info("result: ", res)
if res[0].get("vector"):
ingest = False
if run_from_scratch or ingest: # indexing the entire repo
get_logger().info('Indexing the entire repo...')
get_logger().info('Getting issues...')
issues = list(repo_obj.get_issues(state='all'))
get_logger().info('Done')
self._update_table_with_issues(issues, repo_name_for_index, ingest=ingest)
else: # update table if needed
issues_to_update = []
issues_paginated_list = repo_obj.get_issues(state='all')
counter = 1
for issue in issues_paginated_list:
if issue.pull_request:
continue
issue_str, comments, number = self._process_issue(issue)
issue_key = f"issue_{number}"
issue_id = issue_key + "." + "issue"
res = self.table.search().limit(len(self.table)).where(f"id='{issue_id}'").to_list()
is_new_issue = True
for r in res:
if r['metadata']['repo'] == repo_name_for_index:
is_new_issue = False
break
if is_new_issue:
counter += 1
issues_to_update.append(issue)
else:
break
if issues_to_update:
get_logger().info(f'Updating index with {counter} new issues...')
self._update_table_with_issues(issues_to_update, repo_name_for_index, ingest=True)
else:
get_logger().info('No new issues to update')
elif get_settings().pr_similar_issue.vectordb == "qdrant":
try:
import qdrant_client
from qdrant_client.models import (Distance, FieldCondition,
Filter, MatchValue,
PointStruct, VectorParams)
except Exception:
raise Exception("Please install qdrant-client to use qdrant as vectordb")
api_key = None
url = None
try:
api_key = get_settings().qdrant.api_key
url = get_settings().qdrant.url
except Exception:
if not self.cli_mode:
repo_name, original_issue_number = self.git_provider._parse_issue_url(self.issue_url.split('=')[-1])
issue_main = self.git_provider.repo_obj.get_issue(original_issue_number)
issue_main.create_comment("Please set qdrant url and api key in secrets file")
raise Exception("Please set qdrant url and api key in secrets file")
self.qdrant = qdrant_client.QdrantClient(url=url, api_key=api_key)
run_from_scratch = False
ingest = True
if not self.qdrant.collection_exists(collection_name=self.index_name):
run_from_scratch = True
ingest = False
self.qdrant.create_collection(
collection_name=self.index_name,
vectors_config=VectorParams(size=1536, distance=Distance.COSINE),
)
else:
if get_settings().pr_similar_issue.force_update_dataset:
ingest = True
else:
response = self.qdrant.count(
collection_name=self.index_name,
count_filter=Filter(must=[
FieldCondition(key="metadata.repo", match=MatchValue(value=repo_name_for_index)),
FieldCondition(key="id", match=MatchValue(value=f"example_issue_{repo_name_for_index}")),
]),
)
ingest = True if response.count == 0 else False
if run_from_scratch or ingest:
get_logger().info('Indexing the entire repo...')
get_logger().info('Getting issues...')
issues = list(repo_obj.get_issues(state='all'))
get_logger().info('Done')
self._update_qdrant_with_issues(issues, repo_name_for_index, ingest=ingest)
else:
issues_to_update = []
issues_paginated_list = repo_obj.get_issues(state='all')
counter = 1
for issue in issues_paginated_list:
if issue.pull_request:
continue
issue_str, comments, number = self._process_issue(issue)
issue_key = f"issue_{number}"
point_id = issue_key + "." + "issue"
response = self.qdrant.count(
collection_name=self.index_name,
count_filter=Filter(must=[
FieldCondition(key="id", match=MatchValue(value=point_id)),
FieldCondition(key="metadata.repo", match=MatchValue(value=repo_name_for_index)),
]),
)
if response.count == 0:
counter += 1
issues_to_update.append(issue)
else:
break
if issues_to_update:
get_logger().info(f'Updating index with {counter} new issues...')
self._update_qdrant_with_issues(issues_to_update, repo_name_for_index, ingest=True)
else:
get_logger().info('No new issues to update')
async def run(self):
get_logger().info('Getting issue...')
repo_name, original_issue_number = self.git_provider._parse_issue_url(self.issue_url.split('=')[-1])
issue_main = self.git_provider.repo_obj.get_issue(original_issue_number)
issue_str, comments, number = self._process_issue(issue_main)
openai.api_key = get_settings().openai.key
get_logger().info('Done')
get_logger().info('Querying...')
res = openai.Embedding.create(input=[issue_str], engine=MODEL)
embeds = [record['embedding'] for record in res['data']]
relevant_issues_number_list = []
relevant_comment_number_list = []
score_list = []
if get_settings().pr_similar_issue.vectordb == "pinecone":
pinecone_index = pinecone.Index(index_name=self.index_name)
res = pinecone_index.query(embeds[0],
top_k=5,
filter={"repo": self.repo_name_for_index},
include_metadata=True).to_dict()
for r in res['matches']:
# skip example issue
if 'example_issue_' in r["id"]:
continue
try:
issue_number = int(r["id"].split('.')[0].split('_')[-1])
except:
get_logger().debug(f"Failed to parse issue number from {r['id']}")
continue
if original_issue_number == issue_number:
continue
if issue_number not in relevant_issues_number_list:
relevant_issues_number_list.append(issue_number)
if 'comment' in r["id"]:
relevant_comment_number_list.append(int(r["id"].split('.')[1].split('_')[-1]))
else:
relevant_comment_number_list.append(-1)
score_list.append(str("{:.2f}".format(r['score'])))
get_logger().info('Done')
elif get_settings().pr_similar_issue.vectordb == "lancedb":
res = self.table.search(embeds[0]).where(f"metadata.repo='{self.repo_name_for_index}'", prefilter=True).to_list()
for r in res:
# skip example issue
if 'example_issue_' in r["id"]:
continue
try:
issue_number = int(r["id"].split('.')[0].split('_')[-1])
except:
get_logger().debug(f"Failed to parse issue number from {r['id']}")
continue
if original_issue_number == issue_number:
continue
if issue_number not in relevant_issues_number_list:
relevant_issues_number_list.append(issue_number)
if 'comment' in r["id"]:
relevant_comment_number_list.append(int(r["id"].split('.')[1].split('_')[-1]))
else:
relevant_comment_number_list.append(-1)
score_list.append(str("{:.2f}".format(1-r['_distance'])))
get_logger().info('Done')
elif get_settings().pr_similar_issue.vectordb == "qdrant":
from qdrant_client.models import FieldCondition, Filter, MatchValue
res = self.qdrant.search(
collection_name=self.index_name,
query_vector=embeds[0],
limit=5,
query_filter=Filter(must=[FieldCondition(key="metadata.repo", match=MatchValue(value=self.repo_name_for_index))]),
with_payload=True,
)
for r in res:
rid = r.payload.get("id", "")
if 'example_issue_' in rid:
continue
try:
issue_number = int(rid.split('.')[0].split('_')[-1])
except Exception:
get_logger().debug(f"Failed to parse issue number from {rid}")
continue
if original_issue_number == issue_number:
continue
if issue_number not in relevant_issues_number_list:
relevant_issues_number_list.append(issue_number)
if 'comment' in rid:
relevant_comment_number_list.append(int(rid.split('.')[1].split('_')[-1]))
else:
relevant_comment_number_list.append(-1)
score_list.append(str("{:.2f}".format(r.score)))
get_logger().info('Done')
get_logger().info('Publishing response...')
similar_issues_str = "### Similar Issues\n___\n\n"
for i, issue_number_similar in enumerate(relevant_issues_number_list):
issue = self.git_provider.repo_obj.get_issue(issue_number_similar)
title = issue.title
url = issue.html_url
if relevant_comment_number_list[i] != -1:
url = list(issue.get_comments())[relevant_comment_number_list[i]].html_url
similar_issues_str += f"{i + 1}. **[{title}]({url})** (score={score_list[i]})\n\n"
if get_settings().config.publish_output:
response = issue_main.create_comment(similar_issues_str)
get_logger().info(similar_issues_str)
get_logger().info('Done')
def _process_issue(self, issue):
header = issue.title
body = issue.body
number = issue.number
if get_settings().pr_similar_issue.skip_comments:
comments = []
else:
comments = list(issue.get_comments())
issue_str = f"Issue Header: \"{header}\"\n\nIssue Body:\n{body}"
return issue_str, comments, number
def _update_index_with_issues(self, issues_list, repo_name_for_index, upsert=False):
get_logger().info('Processing issues...')
corpus = Corpus()
example_issue_record = Record(
id=f"example_issue_{repo_name_for_index}",
text="example_issue",
metadata=Metadata(repo=repo_name_for_index)
)
corpus.append(example_issue_record)
counter = 0
for issue in issues_list:
if issue.pull_request:
continue
counter += 1
if counter % 100 == 0:
get_logger().info(f"Scanned {counter} issues")
if counter >= self.max_issues_to_scan:
get_logger().info(f"Scanned {self.max_issues_to_scan} issues, stopping")
break
issue_str, comments, number = self._process_issue(issue)
issue_key = f"issue_{number}"
username = issue.user.login
created_at = str(issue.created_at)
if len(issue_str) < 8000 or \
self.token_handler.count_tokens(issue_str) < get_max_tokens(MODEL): # fast reject first
issue_record = Record(
id=issue_key + "." + "issue",
text=issue_str,
metadata=Metadata(repo=repo_name_for_index,
username=username,
created_at=created_at,
level=IssueLevel.ISSUE)
)
corpus.append(issue_record)
if comments:
for j, comment in enumerate(comments):
comment_body = comment.body
num_words_comment = len(comment_body.split())
if num_words_comment < 10 or not isinstance(comment_body, str):
continue
if len(comment_body) < 8000 or \
self.token_handler.count_tokens(comment_body) < MAX_TOKENS[MODEL]:
comment_record = Record(
id=issue_key + ".comment_" + str(j + 1),
text=comment_body,
metadata=Metadata(repo=repo_name_for_index,
username=username, # use issue username for all comments
created_at=created_at,
level=IssueLevel.COMMENT)
)
corpus.append(comment_record)
df = pd.DataFrame(corpus.dict()["documents"])
get_logger().info('Done')
get_logger().info('Embedding...')
openai.api_key = get_settings().openai.key
list_to_encode = list(df["text"].values)
try:
res = openai.Embedding.create(input=list_to_encode, engine=MODEL)
embeds = [record['embedding'] for record in res['data']]
except:
embeds = []
get_logger().error('Failed to embed entire list, embedding one by one...')
for i, text in enumerate(list_to_encode):
try:
res = openai.Embedding.create(input=[text], engine=MODEL)
embeds.append(res['data'][0]['embedding'])
except:
embeds.append([0] * 1536)
df["values"] = embeds
meta = DatasetMetadata.empty()
meta.dense_model.dimension = len(embeds[0])
ds = Dataset.from_pandas(df, meta)
get_logger().info('Done')
api_key = get_settings().pinecone.api_key
environment = get_settings().pinecone.environment
if not upsert:
get_logger().info('Creating index from scratch...')
ds.to_pinecone_index(self.index_name, api_key=api_key, environment=environment)
time.sleep(15) # wait for pinecone to finalize indexing before querying
else:
get_logger().info('Upserting index...')
namespace = ""
batch_size: int = 100
concurrency: int = 10
pinecone.init(api_key=api_key, environment=environment)
ds._upsert_to_index(self.index_name, namespace, batch_size, concurrency)
time.sleep(5) # wait for pinecone to finalize upserting before querying
get_logger().info('Done')
def _update_table_with_issues(self, issues_list, repo_name_for_index, ingest=False):
get_logger().info('Processing issues...')
corpus = Corpus()
example_issue_record = Record(
id=f"example_issue_{repo_name_for_index}",
text="example_issue",
metadata=Metadata(repo=repo_name_for_index)
)
corpus.append(example_issue_record)
counter = 0
for issue in issues_list:
if issue.pull_request:
continue
counter += 1
if counter % 100 == 0:
get_logger().info(f"Scanned {counter} issues")
if counter >= self.max_issues_to_scan:
get_logger().info(f"Scanned {self.max_issues_to_scan} issues, stopping")
break
issue_str, comments, number = self._process_issue(issue)
issue_key = f"issue_{number}"
username = issue.user.login
created_at = str(issue.created_at)
if len(issue_str) < 8000 or \
self.token_handler.count_tokens(issue_str) < get_max_tokens(MODEL): # fast reject first
issue_record = Record(
id=issue_key + "." + "issue",
text=issue_str,
metadata=Metadata(repo=repo_name_for_index,
username=username,
created_at=created_at,
level=IssueLevel.ISSUE)
)
corpus.append(issue_record)
if comments:
for j, comment in enumerate(comments):
comment_body = comment.body
num_words_comment = len(comment_body.split())
if num_words_comment < 10 or not isinstance(comment_body, str):
continue
if len(comment_body) < 8000 or \
self.token_handler.count_tokens(comment_body) < MAX_TOKENS[MODEL]:
comment_record = Record(
id=issue_key + ".comment_" + str(j + 1),
text=comment_body,
metadata=Metadata(repo=repo_name_for_index,
username=username, # use issue username for all comments
created_at=created_at,
level=IssueLevel.COMMENT)
)
corpus.append(comment_record)
df = pd.DataFrame(corpus.dict()["documents"])
get_logger().info('Done')
get_logger().info('Embedding...')
openai.api_key = get_settings().openai.key
list_to_encode = list(df["text"].values)
try:
res = openai.Embedding.create(input=list_to_encode, engine=MODEL)
embeds = [record['embedding'] for record in res['data']]
except:
embeds = []
get_logger().error('Failed to embed entire list, embedding one by one...')
for i, text in enumerate(list_to_encode):
try:
res = openai.Embedding.create(input=[text], engine=MODEL)
embeds.append(res['data'][0]['embedding'])
except:
embeds.append([0] * 1536)
df["vector"] = embeds
get_logger().info('Done')
if not ingest:
get_logger().info('Creating table from scratch...')
self.table = self.db.create_table(self.index_name, data=df, mode="overwrite")
time.sleep(15)
else:
get_logger().info('Ingesting in Table...')
if self.index_name not in self.db.table_names():
self.table.add(df)
else:
get_logger().info(f"Table {self.index_name} doesn't exists!")
time.sleep(5)
get_logger().info('Done')
def _update_qdrant_with_issues(self, issues_list, repo_name_for_index, ingest=False):
try:
import uuid
import pandas as pd
from qdrant_client.models import PointStruct
except Exception:
raise
get_logger().info('Processing issues...')
corpus = Corpus()
example_issue_record = Record(
id=f"example_issue_{repo_name_for_index}",
text="example_issue",
metadata=Metadata(repo=repo_name_for_index)
)
corpus.append(example_issue_record)
counter = 0
for issue in issues_list:
if issue.pull_request:
continue
counter += 1
if counter % 100 == 0:
get_logger().info(f"Scanned {counter} issues")
if counter >= self.max_issues_to_scan:
get_logger().info(f"Scanned {self.max_issues_to_scan} issues, stopping")
break
issue_str, comments, number = self._process_issue(issue)
issue_key = f"issue_{number}"
username = issue.user.login
created_at = str(issue.created_at)
if len(issue_str) < 8000 or \
self.token_handler.count_tokens(issue_str) < get_max_tokens(MODEL):
issue_record = Record(
id=issue_key + "." + "issue",
text=issue_str,
metadata=Metadata(repo=repo_name_for_index,
username=username,
created_at=created_at,
level=IssueLevel.ISSUE)
)
corpus.append(issue_record)
if comments:
for j, comment in enumerate(comments):
comment_body = comment.body
num_words_comment = len(comment_body.split())
if num_words_comment < 10 or not isinstance(comment_body, str):
continue
if len(comment_body) < 8000 or \
self.token_handler.count_tokens(comment_body) < MAX_TOKENS[MODEL]:
comment_record = Record(
id=issue_key + ".comment_" + str(j + 1),
text=comment_body,
metadata=Metadata(repo=repo_name_for_index,
username=username,
created_at=created_at,
level=IssueLevel.COMMENT)
)
corpus.append(comment_record)
df = pd.DataFrame(corpus.dict()["documents"])
get_logger().info('Done')
get_logger().info('Embedding...')
openai.api_key = get_settings().openai.key
list_to_encode = list(df["text"].values)
try:
res = openai.Embedding.create(input=list_to_encode, engine=MODEL)
embeds = [record['embedding'] for record in res['data']]
except Exception:
embeds = []
get_logger().error('Failed to embed entire list, embedding one by one...')
for i, text in enumerate(list_to_encode):
try:
res = openai.Embedding.create(input=[text], engine=MODEL)
embeds.append(res['data'][0]['embedding'])
except Exception:
embeds.append([0] * 1536)
df["vector"] = embeds
get_logger().info('Done')
get_logger().info('Upserting into Qdrant...')
points = []
for row in df.to_dict(orient="records"):
points.append(
PointStruct(id=uuid.uuid5(uuid.NAMESPACE_DNS, row["id"]).hex, vector=row["vector"], payload={"id": row["id"], "text": row["text"], "metadata": row["metadata"]})
)
self.qdrant.upsert(collection_name=self.index_name, points=points)
get_logger().info('Done')
class IssueLevel(str, Enum):
ISSUE = "issue"
COMMENT = "comment"
class Metadata(BaseModel):
repo: str
username: str = Field(default="@codium")
created_at: str = Field(default="01-01-1970 00:00:00.00000")
level: IssueLevel = Field(default=IssueLevel.ISSUE)
class Config:
use_enum_values = True
class Record(BaseModel):
id: str
text: str
metadata: Metadata
class Corpus(BaseModel):
documents: List[Record] = Field(default=[])
def append(self, r: Record):
self.documents.append(r)

View file

@ -32,11 +32,7 @@ pytest-cov==5.0.0
pydantic==2.8.2 pydantic==2.8.2
html2text==2024.2.26 html2text==2024.2.26
giteapy==1.0.8 giteapy==1.0.8
# Uncomment the following lines to enable the 'similar issue' tool rapidfuzz>=3.0.0 # For fuzzy text matching in similar_issue tool
# pinecone-client
# pinecone-datasets @ git+https://github.com/mrT23/pinecone-datasets.git@main
# lancedb==0.5.1
# qdrant-client==1.15.1
# uncomment this to support language LangChainOpenAIHandler # uncomment this to support language LangChainOpenAIHandler
# langchain==0.2.0 # langchain==0.2.0
# langchain-core==0.2.28 # langchain-core==0.2.28