Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sweep: Update readme #311

Open
lukejagg opened this issue Jul 6, 2023 · 12 comments · May be fixed by #322, #323, #324, #325 or #326
Open

Sweep: Update readme #311

lukejagg opened this issue Jul 6, 2023 · 12 comments · May be fixed by #322, #323, #324, #325 or #326
Assignees
Labels
bug Something isn't working documentation Improvements or additions to documentation sweep Assigns Sweep to an issue or pull request.

Comments

@lukejagg
Copy link
Contributor

lukejagg commented Jul 6, 2023

Description

Add header to readme

Relevant files

No response

@lukejagg lukejagg added the sweep Assigns Sweep to an issue or pull request. label Jul 6, 2023
@sweep-nightly
Copy link
Contributor

sweep-nightly bot commented Jul 6, 2023

0% 🚫


None


I'm a bot that handles simple bugs and feature requests but I might make mistakes. Please be kind!

@lukejagg lukejagg added bug Something isn't working documentation Improvements or additions to documentation duplicate This issue or pull request already exists enhancement New feature or request good first issue Good for newcomers help wanted Extra attention is needed invalid This doesn't seem right question Further information is requested sweep Assigns Sweep to an issue or pull request. and removed sweep Assigns Sweep to an issue or pull request. bug Something isn't working documentation Improvements or additions to documentation duplicate This issue or pull request already exists enhancement New feature or request good first issue Good for newcomers help wanted Extra attention is needed invalid This doesn't seem right question Further information is requested labels Jul 6, 2023
@sweep-nightly
Copy link
Contributor

sweep-nightly bot commented Jul 6, 2023

0%


Error: 🚫 Unable to Complete PR

If you would like to report this bug, please join our Discord.


I'm a bot that handles simple bugs and feature requests but I might make mistakes. Please be kind!
Join Our Discord

@lukejagg lukejagg changed the title Sweep: do nothing Sweep: Update readme Jul 7, 2023
@lukejagg lukejagg self-assigned this Jul 7, 2023
@lukejagg lukejagg added sweep Assigns Sweep to an issue or pull request. and removed sweep Assigns Sweep to an issue or pull request. labels Jul 7, 2023
@sweep-nightly
Copy link
Contributor

sweep-nightly bot commented Jul 7, 2023

0%
⭐ In the meantime, consider starring our repo so more people can hear about us!


I am currently looking into this ticket! I will update the progress of the ticket in this comment. I am currently searching through your code, looking for relevant snippets.


I'm a bot that handles simple bugs and feature requests but I might make mistakes. Please be kind!
Join Our Discord

@lukejagg lukejagg added sweep Assigns Sweep to an issue or pull request. and removed sweep Assigns Sweep to an issue or pull request. labels Jul 7, 2023
@sweep-nightly
Copy link
Contributor

sweep-nightly bot commented Jul 7, 2023

100%
⭐ In the meantime, consider starring our repo so more people can hear about us!


Step 1: 🔍 Code Search

I found the following snippets in your repository. I will now analyze this snippets and come up with a plan.

Some code snippets I looked at (click to expand). If some file is missing from here, you can mention the path in the ticket description.

def log_error(error_type, exception):
content = f"**{error_type} Error**\n{username}: {issue_url}\n```{exception}```"
discord_log_error(content)
def fetch_file_contents_with_retry():
retries = 3
error = None
for i in range(retries):
try:
logger.info(f"Fetching relevant files for the {i}th time...")
return search_snippets(
repo,
f"{title}\n{summary}\n{replies_text}",
num_files=num_of_snippets_to_query,
branch=None,
installation_id=installation_id,
)
except Exception as e:
error = e
continue
posthog.capture(
username, "fetching_failed", properties={"error": error, **metadata}
)
raise error
logger.info("Fetching relevant files...")
try:
snippets, tree = fetch_file_contents_with_retry()
assert len(snippets) > 0
except Exception as e:
logger.error(e)
comment_reply(
"It looks like an issue has occured around fetching the files. Perhaps the repo has not been initialized: try removing this repo and adding it back. I'll try again in a minute. If this error persists contact team@sweep.dev.",
-1
)
log_error("File Fetch", str(e))
raise e
num_full_files = 2
num_extended_snippets = 2
most_relevant_snippets = snippets[:num_full_files]
snippets = snippets[:-num_full_files]
logger.info("Expanding snippets...")
for snippet in most_relevant_snippets:
current_snippet = snippet
_chunks, metadatas, _ids = chunker.call(
current_snippet.content,
current_snippet.file_path
)
segmented_snippets = [
Snippet(
content=current_snippet.content,
start=metadata["start"],
end=metadata["end"],
file_path=metadata["file_path"],
) for metadata in metadatas
]
index = 0
while index < len(segmented_snippets) and segmented_snippets[index].start <= current_snippet.start:
index += 1
index -= 1
for i in range(index + 1, min(index + num_extended_snippets + 1, len(segmented_snippets))):
current_snippet += segmented_snippets[i]
for i in range(index - 1, max(index - num_extended_snippets - 1, 0), -1):
current_snippet = segmented_snippets[i] + current_snippet
snippets.append(current_snippet)
# snippet fusing
i = 0
while i < len(snippets):
j = i + 1
while j < len(snippets):
if snippets[i] ^ snippets[j]: # this checks for overlap
snippets[i] = snippets[i] | snippets[j] # merging
snippets.pop(j)
else:
j += 1
i += 1
snippets = snippets[:min(len(snippets), max_num_of_snippets)]
human_message = HumanMessagePrompt(
repo_name=repo_name,
issue_url=issue_url,
username=username,
repo_description=repo_description,
title=title,
summary=summary + replies_text,
snippets=snippets,
tree=tree, # TODO: Anything in repo tree that has something going through is expanded
)
chat_logger = ChatLogger({
'repo_name': repo_name,
'title': title,
'summary': summary + replies_text,
"issue_number": issue_number,
"issue_url": issue_url,
"username": username,
"repo_full_name": repo_full_name,
"repo_description": repo_description,
"installation_id": installation_id,
"comment_id": comment_id,
})
sweep_bot = SweepBot.from_system_message_content(
human_message=human_message, repo=repo, is_reply=bool(comments), chat_logger=chat_logger
)
sweepbot_retries = 3
try:
for i in range(sweepbot_retries):
# ANALYZE SNIPPETS
logger.info("CoT retrieval...")
if sweep_bot.model == "gpt-4-32k-0613":
sweep_bot.cot_retrieval()
newline = '\n'
comment_reply(
"I found the following snippets in your repository. I will now analyze this snippets and come up with a plan."
+ "\n\n"
+ collapsible_template.format(
summary="Some code snippets I looked at (click to expand). If some file is missing from here, you can mention the path in the ticket description.",
body="\n".join(
[
f"https://github.com/{organization}/{repo_name}/blob/{repo.get_commits()[0].sha}/{snippet.file_path}#L{max(snippet.start, 1)}-L{min(snippet.end, snippet.content.count(newline))}\n"
for snippet in snippets
]
),
),
1
)
# COMMENT ON ISSUE
# TODO: removed issue commenting here
logger.info("Fetching files to modify/create...")
file_change_requests = sweep_bot.get_files_to_change()
file_change_requests = sweep_bot.validate_file_change_requests(file_change_requests)
table = tabulate(
[[f"`{file_change_request.filename}`", file_change_request.instructions] for file_change_request in file_change_requests],
headers=["File Path", "Proposed Changes"],
tablefmt="pipe"
)
comment_reply(
"From looking through the relevant snippets, I decided to make the following modifications:\n\n" + table + "\n\n",
2
)
# CREATE PR METADATA
logger.info("Generating PR...")
pull_request = sweep_bot.generate_pull_request()
pull_request_content = pull_request.content.strip().replace("\n", "\n>")
pull_request_summary = f"**{pull_request.title}**\n`{pull_request.branch_name}`\n>{pull_request_content}\n"
comment_reply(
f"I have created a plan for writing the pull request. I am now working on executing my plan and coding the required changes to address this issue. Here is the planned pull request:\n\n{pull_request_summary}",
3
)
# WRITE PULL REQUEST
logger.info("Making PR...")
response = create_pr(file_change_requests, pull_request, sweep_bot, username, installation_id, issue_number)
if not response or not response["success"]: raise Exception("Failed to create PR")
pr = response["pull_request"]
current_issue.create_reaction("rocket")
comment_reply(
"I have finished coding the issue. I am now reviewing it for completeness.",
4
)
try:
current_issue.delete_reaction(eyes_reaction.id)
except:
pass
try:
# CODE REVIEW
changes_required, review_comment = review_pr(repo=repo, pr=pr, issue_url=issue_url, username=username,
repo_description=repo_description, title=title,
summary=summary, replies_text=replies_text, tree=tree)
logger.info(f"Addressing review comment {review_comment}")
if changes_required:
on_comment(repo_full_name=repo_full_name,
repo_description=repo_description,
comment=review_comment,
username=username,
installation_id=installation_id,
pr_path=None,
pr_line_position=None,
pr_number=pr.number)
except Exception as e:
logger.error(e)
# Completed code review
comment_reply(
"Success! 🚀",
5,
pr_message=f"## Here's the PR! [https://github.com/{repo_full_name}/pull/{pr.number}](https://github.com/{repo_full_name}/pull/{pr.number})",
)
break
except openai.error.InvalidRequestError as e:
logger.error(e)
comment_reply(
"I'm sorry, but it looks our model has ran out of context length. We're trying to make this happen less, but one way to mitigate this is to code smaller files. If this error persists contact team@sweep.dev.",
-1
)
log_error("Context Length", str(e))
posthog.capture(
username,
"failed",
properties={
"error": str(e),
"reason": "Invalid request error / context length",
**metadata,
},

except:
file_change_request.change_type = "create"
return file_change_requests
class SweepBot(CodeGenBot, GithubBot):
def cot_retrieval(self):
# TODO(sweep): add semantic search using vector db
# TODO(sweep): add search using webpilot + github
functions = [
Function(
name="cat",
description="Cat files. Max 3 files per request.",
parameters={
"properties": {
"filepath": {
"type": "string",
"description": "Paths to files. One per line."
},
}
} # manage file too large
),
Function(
name="finish",
description="Indicate you have sufficient data to proceed.",
parameters={"properties": {}}
),
]
# self.chat(
# cot_retrieval_prompt,
# message_key="cot_retrieval",
# functions=functions,
# )
# is_function_call = self.messages[-1].function_call is not None
# for _retry in range(3):
# logger.info("Got response.")
# if not is_function_call:
# break
# response = self.messages[-1].function_call
# # response = json.loads(response)
# function_name = response["name"]
# arguments = response["arguments"]
# logger.info(f"Fetching file {function_name} with arguments {arguments}.")
# arguments = json.loads(arguments)
# if function_name == "finish":
# return
# elif function_name == "cat":
# path = arguments["filepath"]
# try:
# logger.info("Retrieving file...")
# content = self.get_file(path).decoded_content.decode("utf-8")
# logger.info("Received file")
# except github.GithubException:
# response = self.chat(
# f"File not found: {path}",
# message_key=path,
# functions=functions,
# )
# else:
# response = self.chat(
# f"Here is the file: <file path=\"{path}\">\n\n{content[:10000]}</file>. Fetch more content or call finish.",
# message_key=path,
# functions=functions
# ) # update this constant
return
def create_file(self, file_change_request: FileChangeRequest) -> FileChange:
file_change: FileChange | None = None
for count in range(5):
create_file_response = self.chat(
create_file_prompt.format(
filename=file_change_request.filename,
instructions=file_change_request.instructions,
),
message_key=f"file_change_{file_change_request.filename}",
)
# Add file to list of changed_files
self.file_change_paths.append(file_change_request.filename)
# self.delete_file_from_system_message(file_path=file_change_request.filename)
try:
file_change = FileChange.from_string(create_file_response)
assert file_change is not None
file_change.commit_message = f"sweep: {file_change.commit_message[:50]}"
return file_change
except Exception:
logger.warning(f"Failed to parse. Retrying for the {count}th time...")
self.undo()
continue
raise Exception("Failed to parse response after 5 attempts.")
def modify_file(
self, file_change_request: FileChangeRequest, contents: str = ""
) -> tuple[str, str]:
if not contents:
contents = self.get_file(
file_change_request.filename
).decoded_content.decode("utf-8")
# Add line numbers to the contents; goes in prompts but not github
contents_line_numbers = "\n".join([f"{i}:{line}" for i, line in enumerate(contents.split("\n"))])
contents_line_numbers = contents_line_numbers.replace('"""', "'''")
for count in range(5):
if "0613" in self.model:
_ = self.chat( # We don't use the plan in the next call
modify_file_plan_prompt.format(
filename=file_change_request.filename,
instructions=file_change_request.instructions,
code=contents_line_numbers,
),
message_key=f"file_change_{file_change_request.filename}",
)
modify_file_response = self.chat(
modify_file_prompt,
message_key=f"file_change_{file_change_request.filename}",
)
try:
logger.info(f"modify_file_response: {modify_file_response}")
new_file = generate_new_file(modify_file_response, contents)
if not is_markdown(file_change_request.filename):
code_repairer = CodeRepairer(chat_logger=self.chat_logger)
diff = generate_diff(old_code=contents, new_code=new_file)
new_file = code_repairer.repair_code(diff=diff, user_code=new_file, feature=file_change_request.instructions)
# new_file = revert_whitespace_changes(original_file_str=contents, modified_file_str=new_file)
return (new_file, file_change_request.filename)
except Exception as e:
logger.warning(f"Recieved error {e}")
logger.warning(
f"Failed to parse. Retrying for the {count}th time..."
)
self.undo()
self.undo()
continue
raise Exception("Failed to parse response after 5 attempts.")
def change_file(self, file_change_request: FileChangeRequest):
if file_change_request.change_type == "create":
return self.create_file(file_change_request)
elif file_change_request.change_type == "modify":
return self.create_file(file_change_request)
else:
raise Exception("Not a valid file type")
def change_files_in_github(
self,
file_change_requests: list[FileChangeRequest],
branch: str,
):
# should check if branch exists, if not, create it
logger.debug(file_change_requests)
for file_change_request in file_change_requests:
file_markdown = is_markdown(file_change_request.filename)
if file_change_request.change_type == "create":
try: # Try to create
file_change = self.create_file(file_change_request)
logger.debug(
f"{file_change_request.filename}, {file_change.commit_message}, {file_change.code}, {branch}"
)
file_change.code = format_contents(file_change.code, file_markdown)
self.repo.create_file(
file_change_request.filename,
file_change.commit_message,
file_change.code,
branch=branch,
)
except github.GithubException as e:
logger.info(e)
try: # Try to modify
contents = self.get_file(file_change_request.filename, branch=branch)
file_change.code = format_contents(file_change.code, file_markdown)
self.repo.update_file(
file_change_request.filename,
file_change.commit_message,
file_change.code,
contents.sha,
branch=branch,
)
except:
pass
elif file_change_request.change_type == "modify":
# TODO(sweep): Cleanup this
try:
contents = self.get_file(file_change_request.filename, branch=branch)
except github.UnknownObjectException as e:
logger.warning(f"Received error {e}, trying creating file...")
file_change_request.change_type = "create"
self.create_file(file_change_request)
file_change = self.create_file(file_change_request)
logger.debug(
f"{file_change_request.filename}, {file_change.commit_message}, {file_change.code}, {branch}"
)
file_change.code = format_contents(file_change.code, file_markdown)
self.repo.create_file(
file_change_request.filename,
file_change.commit_message,
file_change.code,
branch=branch,
)
else:
new_file_contents, file_name = self.modify_file(
file_change_request, contents.decoded_content.decode("utf-8")
)
new_file_contents = format_contents(new_file_contents, file_markdown)
if contents.decoded_content.decode("utf-8").endswith("\n"):
new_file_contents += "\n"
logger.debug(
f"{file_name}, {f'Update {file_name}'}, {new_file_contents}, {branch}"
)
self.repo.update_file(
file_name,
f'Update {file_name}',
new_file_contents,
contents.sha,
branch=branch,
)
else:
raise Exception("Invalid change type")

...
</create>
<modify>
* filename_3: instructions_3
* filename_4: instructions_4
...
</modify>
"""
reply_prompt = """
Write a 1-paragraph response to this user:
* Tell them you have started working on this PR and a rough summary of your plan.
* Do not start with "Here is a draft", just write the response.
* Use github markdown to format the response.
"""
create_file_prompt = """
Think step-by-step regarding the instructions and what should be added to the new file.
Then create a plan of parts of the code to create, with low-level, detailed references to functions and variable to create, and what each function does.
Then create the following file using the following instructions:
File Name: {filename}
Instructions: {instructions}
Reply in the following format. DO NOT write "pass" or "Rest of code". Do not literally write "{{new_file}}". You must use the new_file XML tags, and all text inside these tags will be placed in the newly created file.
Step-by-step thoughts with explanations:
* Thought 1 - Explanation 1
* Thought 2 - Explanation 2
...
Detailed plan of additions:
* Addition 1
* Addition 2
...
Commit Message: {{commit_message}}
<new_file>
{{new_file}}
</new_file>
"""
"""
Reply in the format below.
* You MUST use the new_file XML tags
* DO NOT write ``` anywhere, unless it's markdown
* DO NOT write "pass" or "Rest of code"
* Do not literally write "{{new_file}}".
* Format:
"""
modify_file_plan_prompt = """
Think step-by-step regarding the instructions and how that can be applied to the current file to improve the current codebase.
Then create a plan of parts of the code to modify with detailed references to functions to modify.
File Name: {filename}
<old_file>
{code}
</old_file>
Your instructions to modify the file are: "{instructions}".
Step-by-step thoughts with explanations:
* Thought 1 - Explanation 1
* Thought 2 - Explanation 2
...
Detailed plan of modifications:
* Modification 1
* Modification 2
...
"""
modify_file_prompt = """
Generate a new_file based on the given plan, ensuring that you:
1. Do not write "pass" statements.
2. Provide complete functions with actual business logic. It is imperative that we do not leave any work to the user/future readers of this code.
3. Do not write new "todo" comments.
4. Do not write incomplete functions.
5. Do not write the original line numbers with the new code.
6. Make sure the new code follows the same programming language conventions as the old code.
Instead of writing "# Rest of Code", specify the lines to copy from the old file using an XML tag, inclusive (e.g., "<copied>0-25</copied>"). Make sure to use this exact format.
Copy the correct line numbers and copy as long of a prefix and suffix as possible. For instance, if you want to insert code after line 50, start with "<copied>0-50</copied>".
Example: New file:
print("new file")
</new_file>
Example: Insert at end:
<copied>0-100</copied>
print("inserted at end")
</new_file>
Example: If you want to insert code after lines 50 and 75:
<new_file>
<copied>0-50</copied>
def main():
print("hello world")
<copied>51-75</copied>
print("debug statement")
<copied>76-100</copied>
</new_file>
"""
pr_code_prompt = "" # TODO: deprecate this
pull_request_prompt = """
Awesome! Could you also provide a PR message in the following format? Content should be in Github style markdown. Thanks!
Title: {title}
Branch Name: {branch_name}
<content>
{content}
</content>
"""
summarize_system_prompt = """
Your name is Sweep bot. You are an engineer assigned to helping summarize code instructions and code changes.
"""
user_file_change_summarize_prompt = """
Summarize the given instructions for making changes in a pull request.
Code Instructions:
{message_content}
"""
assistant_file_change_summarize_prompt = """
Please summarize the following file using the file stubs.
Be sure to repeat each method signature and docstring. You may also add additional comments to the docstring.
Do not repeat the code in the file stubs.
Code Changes:
{message_content}
"""
slack_system_message_prompt = "Your name is Sweep bot. You are an engineer assigned to assisting the following Slack user. You will be helpful and friendly, but informal and concise: get to the point. You will use Slack-style markdown when needed to structure your responses."
slack_slash_command_prompt = """
Relevant snippets provided by search engine (decreasing relevance):
<relevant_snippets_in_repo>
{relevant_snippets}
</relevant_snippets_in_repo>
<relevant_paths_in_repo>
{relevant_directories}
</relevant_paths_in_repo>
Repo: {repo_name}: {repo_description}
Username: {username}
Query: {query}
Gather information (i.e. fetch more snippets) to solve the problem. Use "create_pr" if the user asks for changes or you think code changes are needed.
"""
code_repair_system_prompt = """\
You are a genius trained for code repair.
You will be given two pieces of code marked by xml tags. The code inside <diff></diff> is the difference betwen the user_code and the original code, and the code inside <user_code></user_code> is a user's attempt at adding a change described as {feature}.
Our goal is to return a working version of user_code that follows {feature}.
"""
code_repair_prompt = """\
<diff>
{diff}
</diff>
<user_code>
{user_code}
</user_code>
This is the user_code.
Instructions:
* Keep the logic changes from user_code.
* Fix any issues using our knowledge of both the diff and user_code files.
* Fix syntax errors and accidentally deleted lines.
* Do not perform code style cleanup.
* Do not add or remove any whitespace besides what is necessary to fix syntax errors.
* Do not add or remove any comments.
Return the repaired user_code without xml tags. All of the text you return will be placed in the file. Revert any unrelated deletions to user_code, using the diff and described change.
"""
gradio_system_message_prompt = """Your name is Sweep bot. You are a brilliant and thorough engineer assigned to assist the following user with their problems in the Github repo. You will be helpful and friendly, but informal and concise: get to the point. When you write code to solve tickets, the code works on the first try and is formatted perfectly. You have the utmost care for the user that you write for, so you do not make mistakes. If the user asks you to create a PR, you will use the create_pr function.
Relevant snippets provided by search engine (decreasing relevance):
{snippets}
Repo: {repo_name}

# # response = json.loads(response)
# function_name = response["name"]
# arguments = response["arguments"]
# logger.info(f"Fetching file {function_name} with arguments {arguments}.")
# arguments = json.loads(arguments)
# if function_name == "finish":
# return
# elif function_name == "cat":
# path = arguments["filepath"]
# try:
# logger.info("Retrieving file...")
# content = self.get_file(path).decoded_content.decode("utf-8")
# logger.info("Received file")
# except github.GithubException:
# response = self.chat(
# f"File not found: {path}",
# message_key=path,
# functions=functions,
# )
# else:
# response = self.chat(
# f"Here is the file: <file path=\"{path}\">\n\n{content[:10000]}</file>. Fetch more content or call finish.",
# message_key=path,
# functions=functions
# ) # update this constant
return
def create_file(self, file_change_request: FileChangeRequest) -> FileChange:
file_change: FileChange | None = None
for count in range(5):
create_file_response = self.chat(
create_file_prompt.format(
filename=file_change_request.filename,
instructions=file_change_request.instructions,
),
message_key=f"file_change_{file_change_request.filename}",
)
# Add file to list of changed_files
self.file_change_paths.append(file_change_request.filename)
# self.delete_file_from_system_message(file_path=file_change_request.filename)
try:
file_change = FileChange.from_string(create_file_response)
assert file_change is not None
file_change.commit_message = f"sweep: {file_change.commit_message[:50]}"
return file_change
except Exception:
logger.warning(f"Failed to parse. Retrying for the {count}th time...")
self.undo()
continue
raise Exception("Failed to parse response after 5 attempts.")
def modify_file(
self, file_change_request: FileChangeRequest, contents: str = ""
) -> tuple[str, str]:
if not contents:
contents = self.get_file(
file_change_request.filename
).decoded_content.decode("utf-8")
# Add line numbers to the contents; goes in prompts but not github
contents_line_numbers = "\n".join([f"{i}:{line}" for i, line in enumerate(contents.split("\n"))])
contents_line_numbers = contents_line_numbers.replace('"""', "'''")
for count in range(5):
if "0613" in self.model:
_ = self.chat( # We don't use the plan in the next call
modify_file_plan_prompt.format(
filename=file_change_request.filename,
instructions=file_change_request.instructions,
code=contents_line_numbers,
),
message_key=f"file_change_{file_change_request.filename}",
)
modify_file_response = self.chat(
modify_file_prompt,
message_key=f"file_change_{file_change_request.filename}",
)
try:
logger.info(f"modify_file_response: {modify_file_response}")
new_file = generate_new_file(modify_file_response, contents)
if not is_markdown(file_change_request.filename):
code_repairer = CodeRepairer(chat_logger=self.chat_logger)
diff = generate_diff(old_code=contents, new_code=new_file)
new_file = code_repairer.repair_code(diff=diff, user_code=new_file, feature=file_change_request.instructions)
# new_file = revert_whitespace_changes(original_file_str=contents, modified_file_str=new_file)
return (new_file, file_change_request.filename)
except Exception as e:
logger.warning(f"Recieved error {e}")
logger.warning(
f"Failed to parse. Retrying for the {count}th time..."
)
self.undo()
self.undo()
continue
raise Exception("Failed to parse response after 5 attempts.")
def change_file(self, file_change_request: FileChangeRequest):
if file_change_request.change_type == "create":
return self.create_file(file_change_request)
elif file_change_request.change_type == "modify":
return self.create_file(file_change_request)
else:
raise Exception("Not a valid file type")
def change_files_in_github(
self,

from sweepai.utils.chat_logger import ChatLogger
from sweepai.core.entities import (
FileChange,
FileChangeRequest,
FilesToChange,
PullRequest,
RegexMatchError,
Function,
Snippet
)
from sweepai.core.chat import ChatGPT
from sweepai.core.prompts import (
files_to_change_prompt,
pull_request_prompt,
create_file_prompt,
modify_file_prompt,
modify_file_plan_prompt,
)
from sweepai.utils.constants import DB_NAME
from sweepai.utils.diff import format_contents, generate_diff, generate_new_file, is_markdown, revert_whitespace_changes
class CodeGenBot(ChatGPT):
def get_files_to_change(self):
file_change_requests: list[FileChangeRequest] = []
for count in range(5):
try:
logger.info(f"Generating for the {count}th time...")
files_to_change_response = self.chat(files_to_change_prompt, message_key="files_to_change") # Dedup files to change here
files_to_change = FilesToChange.from_string(files_to_change_response)
files_to_create: list[str] = files_to_change.files_to_create.split("\n*")
files_to_modify: list[str] = files_to_change.files_to_modify.split("\n*")
for file_change_request, change_type in zip(
files_to_create + files_to_modify,
["create"] * len(files_to_create)
+ ["modify"] * len(files_to_modify),
):
file_change_request = file_change_request.strip()
if not file_change_request or file_change_request == "* None":
continue
logger.debug(file_change_request)
logger.debug(change_type)
file_change_requests.append(
FileChangeRequest.from_string(
file_change_request, change_type=change_type
)
)
# Create a dictionary to hold file names and their corresponding instructions
file_instructions_dict = {}
for file_change_request in file_change_requests:
# If the file name is already in the dictionary, append the new instructions
if file_change_request.filename in file_instructions_dict:
instructions, change_type = file_instructions_dict[file_change_request.filename]
file_instructions_dict[file_change_request.filename] = (instructions + " " + file_change_request.instructions, change_type)
else:
file_instructions_dict[file_change_request.filename] = (file_change_request.instructions, file_change_request.change_type)
file_change_requests = [FileChangeRequest(filename=file_name, instructions=instructions, change_type=change_type) for file_name, (instructions, change_type) in file_instructions_dict.items()]
if file_change_requests:
return file_change_requests
except RegexMatchError:
logger.warning("Failed to parse! Retrying...")
self.delete_messages_from_chat("files_to_change")
continue
raise Exception("Could not generate files to change")
def generate_pull_request(self) -> PullRequest:
pull_request = None
for count in range(5):
try:
logger.info(f"Generating for the {count}th time...")
pr_text_response = self.chat(pull_request_prompt, message_key="pull_request")
except Exception as e:
logger.warning(f"Exception {e}. Failed to parse! Retrying...")
self.undo()
continue
pull_request = PullRequest.from_string(pr_text_response)
pull_request.branch_name = "sweep/" + pull_request.branch_name[:250]
return pull_request
raise Exception("Could not generate PR text")
class GithubBot(BaseModel):
class Config:
arbitrary_types_allowed = True # for repo: Repository
repo: Repository
def get_contents(self, path: str, branch: str = ""):
if not branch:
branch = self.repo.default_branch
try:
return self.repo.get_contents(path, ref=branch)
except Exception as e:
logger.warning(path)
raise e
def get_file(self, file_path: str, branch: str = "") -> ContentFile:
content = self.get_contents(file_path, branch)
assert not isinstance(content, list)
return content
def check_path_exists(self, path: str, branch: str = ""):
try:
self.get_contents(path, branch)
return True
except Exception:
return False
def create_branch(self, branch: str) -> str:
# Generate PR if nothing is supplied maybe
base_branch = self.repo.get_branch(self.repo.default_branch)
try:
self.repo.create_git_ref(f"refs/heads/{branch}", base_branch.commit.sha)
return branch
except GithubException as e:
logger.error(f"Error: {e}, trying with other branch names...")
for i in range(1, 100):
try:
logger.warning(f"Retrying {branch}_{i}...")
self.repo.create_git_ref(
f"refs/heads/{branch}_{i}", base_branch.commit.sha
)
return f"{branch}_{i}"
except GithubException:
pass
raise e
def populate_snippets(self, snippets: list[Snippet]):


Step 2: 🧐 Snippet Analysis

From looking through the relevant snippets, I decided to make the following modifications:

File Path Proposed Changes
Non * None
README.md Add a header at the beginning of the file with the text "Welcome to Sweep!".

Step 3: 📝 Planning

I have created a plan for writing the pull request. I am now working on executing my plan and coding the required changes to address this issue. Here is the planned pull request:

Update readme with header
sweep/update-readme-header

I have added a header to the README.md file with the following content:

# Welcome to Sweep!

This header provides a warm welcome to users visiting the repository. Please review the changes and let me know if any further modifications are needed. Thank you!


Step 4: ⌨️ Coding

I have finished coding the issue. I am now reviewing it for completeness.


Step 5: 🔁 Code Review

Success! 🚀


I'm a bot that handles simple bugs and feature requests but I might make mistakes. Please be kind!
Join Our Discord

@sweep-nightly sweep-nightly bot linked a pull request Jul 7, 2023 that will close this issue
@lukejagg lukejagg added sweep Assigns Sweep to an issue or pull request. and removed sweep Assigns Sweep to an issue or pull request. labels Jul 7, 2023
@sweep-nightly
Copy link
Contributor

sweep-nightly bot commented Jul 7, 2023

Here's the PR! #323


Step 1: 🔍 Code Search

I found the following snippets in your repository. I will now analyze this snippets and come up with a plan.

Some code snippets I looked at (click to expand). If some file is missing from here, you can mention the path in the ticket description.

def log_error(error_type, exception):
content = f"**{error_type} Error**\n{username}: {issue_url}\n```{exception}```"
discord_log_error(content)
def fetch_file_contents_with_retry():
retries = 3
error = None
for i in range(retries):
try:
logger.info(f"Fetching relevant files for the {i}th time...")
return search_snippets(
repo,
f"{title}\n{summary}\n{replies_text}",
num_files=num_of_snippets_to_query,
branch=None,
installation_id=installation_id,
)
except Exception as e:
error = e
continue
posthog.capture(
username, "fetching_failed", properties={"error": error, **metadata}
)
raise error
logger.info("Fetching relevant files...")
try:
snippets, tree = fetch_file_contents_with_retry()
assert len(snippets) > 0
except Exception as e:
logger.error(e)
comment_reply(
"It looks like an issue has occured around fetching the files. Perhaps the repo has not been initialized: try removing this repo and adding it back. I'll try again in a minute. If this error persists contact team@sweep.dev.",
-1
)
log_error("File Fetch", str(e))
raise e
num_full_files = 2
num_extended_snippets = 2
most_relevant_snippets = snippets[:num_full_files]
snippets = snippets[:-num_full_files]
logger.info("Expanding snippets...")
for snippet in most_relevant_snippets:
current_snippet = snippet
_chunks, metadatas, _ids = chunker.call(
current_snippet.content,
current_snippet.file_path
)
segmented_snippets = [
Snippet(
content=current_snippet.content,
start=metadata["start"],
end=metadata["end"],
file_path=metadata["file_path"],
) for metadata in metadatas
]
index = 0
while index < len(segmented_snippets) and segmented_snippets[index].start <= current_snippet.start:
index += 1
index -= 1
for i in range(index + 1, min(index + num_extended_snippets + 1, len(segmented_snippets))):
current_snippet += segmented_snippets[i]
for i in range(index - 1, max(index - num_extended_snippets - 1, 0), -1):
current_snippet = segmented_snippets[i] + current_snippet
snippets.append(current_snippet)
# snippet fusing
i = 0
while i < len(snippets):
j = i + 1
while j < len(snippets):
if snippets[i] ^ snippets[j]: # this checks for overlap
snippets[i] = snippets[i] | snippets[j] # merging
snippets.pop(j)
else:
j += 1
i += 1
snippets = snippets[:min(len(snippets), max_num_of_snippets)]
human_message = HumanMessagePrompt(
repo_name=repo_name,
issue_url=issue_url,
username=username,
repo_description=repo_description,
title=title,
summary=summary + replies_text,
snippets=snippets,
tree=tree, # TODO: Anything in repo tree that has something going through is expanded
)
chat_logger = ChatLogger({
'repo_name': repo_name,
'title': title,
'summary': summary + replies_text,
"issue_number": issue_number,
"issue_url": issue_url,
"username": username,
"repo_full_name": repo_full_name,
"repo_description": repo_description,
"installation_id": installation_id,
"comment_id": comment_id,
})
sweep_bot = SweepBot.from_system_message_content(
human_message=human_message, repo=repo, is_reply=bool(comments), chat_logger=chat_logger
)
sweepbot_retries = 3
try:
for i in range(sweepbot_retries):
# ANALYZE SNIPPETS
logger.info("CoT retrieval...")
if sweep_bot.model == "gpt-4-32k-0613":
sweep_bot.cot_retrieval()
newline = '\n'
comment_reply(
"I found the following snippets in your repository. I will now analyze this snippets and come up with a plan."
+ "\n\n"
+ collapsible_template.format(
summary="Some code snippets I looked at (click to expand). If some file is missing from here, you can mention the path in the ticket description.",
body="\n".join(
[
f"https://github.com/{organization}/{repo_name}/blob/{repo.get_commits()[0].sha}/{snippet.file_path}#L{max(snippet.start, 1)}-L{min(snippet.end, snippet.content.count(newline))}\n"
for snippet in snippets
]
),
),
1
)
# COMMENT ON ISSUE
# TODO: removed issue commenting here
logger.info("Fetching files to modify/create...")
file_change_requests = sweep_bot.get_files_to_change()
file_change_requests = sweep_bot.validate_file_change_requests(file_change_requests)
table = tabulate(
[[f"`{file_change_request.filename}`", file_change_request.instructions] for file_change_request in file_change_requests],
headers=["File Path", "Proposed Changes"],
tablefmt="pipe"
)
comment_reply(
"From looking through the relevant snippets, I decided to make the following modifications:\n\n" + table + "\n\n",
2
)
# CREATE PR METADATA
logger.info("Generating PR...")
pull_request = sweep_bot.generate_pull_request()
pull_request_content = pull_request.content.strip().replace("\n", "\n>")
pull_request_summary = f"**{pull_request.title}**\n`{pull_request.branch_name}`\n>{pull_request_content}\n"
comment_reply(
f"I have created a plan for writing the pull request. I am now working on executing my plan and coding the required changes to address this issue. Here is the planned pull request:\n\n{pull_request_summary}",
3
)
# WRITE PULL REQUEST
logger.info("Making PR...")
response = create_pr(file_change_requests, pull_request, sweep_bot, username, installation_id, issue_number)
if not response or not response["success"]: raise Exception("Failed to create PR")
pr = response["pull_request"]
current_issue.create_reaction("rocket")
comment_reply(
"I have finished coding the issue. I am now reviewing it for completeness.",
4
)
try:
current_issue.delete_reaction(eyes_reaction.id)
except:
pass
try:
# CODE REVIEW
changes_required, review_comment = review_pr(repo=repo, pr=pr, issue_url=issue_url, username=username,
repo_description=repo_description, title=title,
summary=summary, replies_text=replies_text, tree=tree)
logger.info(f"Addressing review comment {review_comment}")
if changes_required:
on_comment(repo_full_name=repo_full_name,
repo_description=repo_description,
comment=review_comment,
username=username,
installation_id=installation_id,
pr_path=None,
pr_line_position=None,
pr_number=pr.number)
except Exception as e:
logger.error(e)
# Completed code review
comment_reply(
"Success! 🚀",
5,
pr_message=f"## Here's the PR! [https://github.com/{repo_full_name}/pull/{pr.number}](https://github.com/{repo_full_name}/pull/{pr.number})",
)
break
except openai.error.InvalidRequestError as e:
logger.error(e)
comment_reply(
"I'm sorry, but it looks our model has ran out of context length. We're trying to make this happen less, but one way to mitigate this is to code smaller files. If this error persists contact team@sweep.dev.",
-1
)
log_error("Context Length", str(e))
posthog.capture(
username,
"failed",
properties={
"error": str(e),
"reason": "Invalid request error / context length",
**metadata,
},

except:
file_change_request.change_type = "create"
return file_change_requests
class SweepBot(CodeGenBot, GithubBot):
def cot_retrieval(self):
# TODO(sweep): add semantic search using vector db
# TODO(sweep): add search using webpilot + github
functions = [
Function(
name="cat",
description="Cat files. Max 3 files per request.",
parameters={
"properties": {
"filepath": {
"type": "string",
"description": "Paths to files. One per line."
},
}
} # manage file too large
),
Function(
name="finish",
description="Indicate you have sufficient data to proceed.",
parameters={"properties": {}}
),
]
# self.chat(
# cot_retrieval_prompt,
# message_key="cot_retrieval",
# functions=functions,
# )
# is_function_call = self.messages[-1].function_call is not None
# for _retry in range(3):
# logger.info("Got response.")
# if not is_function_call:
# break
# response = self.messages[-1].function_call
# # response = json.loads(response)
# function_name = response["name"]
# arguments = response["arguments"]
# logger.info(f"Fetching file {function_name} with arguments {arguments}.")
# arguments = json.loads(arguments)
# if function_name == "finish":
# return
# elif function_name == "cat":
# path = arguments["filepath"]
# try:
# logger.info("Retrieving file...")
# content = self.get_file(path).decoded_content.decode("utf-8")
# logger.info("Received file")
# except github.GithubException:
# response = self.chat(
# f"File not found: {path}",
# message_key=path,
# functions=functions,
# )
# else:
# response = self.chat(
# f"Here is the file: <file path=\"{path}\">\n\n{content[:10000]}</file>. Fetch more content or call finish.",
# message_key=path,
# functions=functions
# ) # update this constant
return
def create_file(self, file_change_request: FileChangeRequest) -> FileChange:
file_change: FileChange | None = None
for count in range(5):
create_file_response = self.chat(
create_file_prompt.format(
filename=file_change_request.filename,
instructions=file_change_request.instructions,
),
message_key=f"file_change_{file_change_request.filename}",
)
# Add file to list of changed_files
self.file_change_paths.append(file_change_request.filename)
# self.delete_file_from_system_message(file_path=file_change_request.filename)
try:
file_change = FileChange.from_string(create_file_response)
assert file_change is not None
file_change.commit_message = f"sweep: {file_change.commit_message[:50]}"
return file_change
except Exception:
logger.warning(f"Failed to parse. Retrying for the {count}th time...")
self.undo()
continue
raise Exception("Failed to parse response after 5 attempts.")
def modify_file(
self, file_change_request: FileChangeRequest, contents: str = ""
) -> tuple[str, str]:
if not contents:
contents = self.get_file(
file_change_request.filename
).decoded_content.decode("utf-8")
# Add line numbers to the contents; goes in prompts but not github
contents_line_numbers = "\n".join([f"{i}:{line}" for i, line in enumerate(contents.split("\n"))])
contents_line_numbers = contents_line_numbers.replace('"""', "'''")
for count in range(5):
if "0613" in self.model:
_ = self.chat( # We don't use the plan in the next call
modify_file_plan_prompt.format(
filename=file_change_request.filename,
instructions=file_change_request.instructions,
code=contents_line_numbers,
),
message_key=f"file_change_{file_change_request.filename}",
)
modify_file_response = self.chat(
modify_file_prompt,
message_key=f"file_change_{file_change_request.filename}",
)
try:
logger.info(f"modify_file_response: {modify_file_response}")
new_file = generate_new_file(modify_file_response, contents)
if not is_markdown(file_change_request.filename):
code_repairer = CodeRepairer(chat_logger=self.chat_logger)
diff = generate_diff(old_code=contents, new_code=new_file)
new_file = code_repairer.repair_code(diff=diff, user_code=new_file, feature=file_change_request.instructions)
# new_file = revert_whitespace_changes(original_file_str=contents, modified_file_str=new_file)
return (new_file, file_change_request.filename)
except Exception as e:
logger.warning(f"Recieved error {e}")
logger.warning(
f"Failed to parse. Retrying for the {count}th time..."
)
self.undo()
self.undo()
continue
raise Exception("Failed to parse response after 5 attempts.")
def change_file(self, file_change_request: FileChangeRequest):
if file_change_request.change_type == "create":
return self.create_file(file_change_request)
elif file_change_request.change_type == "modify":
return self.create_file(file_change_request)
else:
raise Exception("Not a valid file type")
def change_files_in_github(
self,
file_change_requests: list[FileChangeRequest],
branch: str,
):
# should check if branch exists, if not, create it
logger.debug(file_change_requests)
for file_change_request in file_change_requests:
file_markdown = is_markdown(file_change_request.filename)
if file_change_request.change_type == "create":
try: # Try to create
file_change = self.create_file(file_change_request)
logger.debug(
f"{file_change_request.filename}, {file_change.commit_message}, {file_change.code}, {branch}"
)
file_change.code = format_contents(file_change.code, file_markdown)
self.repo.create_file(
file_change_request.filename,
file_change.commit_message,
file_change.code,
branch=branch,
)
except github.GithubException as e:
logger.info(e)
try: # Try to modify
contents = self.get_file(file_change_request.filename, branch=branch)
file_change.code = format_contents(file_change.code, file_markdown)
self.repo.update_file(
file_change_request.filename,
file_change.commit_message,
file_change.code,
contents.sha,
branch=branch,
)
except:
pass
elif file_change_request.change_type == "modify":
# TODO(sweep): Cleanup this
try:
contents = self.get_file(file_change_request.filename, branch=branch)
except github.UnknownObjectException as e:
logger.warning(f"Received error {e}, trying creating file...")
file_change_request.change_type = "create"
self.create_file(file_change_request)
file_change = self.create_file(file_change_request)
logger.debug(
f"{file_change_request.filename}, {file_change.commit_message}, {file_change.code}, {branch}"
)
file_change.code = format_contents(file_change.code, file_markdown)
self.repo.create_file(
file_change_request.filename,
file_change.commit_message,
file_change.code,
branch=branch,
)
else:
new_file_contents, file_name = self.modify_file(
file_change_request, contents.decoded_content.decode("utf-8")
)
new_file_contents = format_contents(new_file_contents, file_markdown)
if contents.decoded_content.decode("utf-8").endswith("\n"):
new_file_contents += "\n"
logger.debug(
f"{file_name}, {f'Update {file_name}'}, {new_file_contents}, {branch}"
)
self.repo.update_file(
file_name,
f'Update {file_name}',
new_file_contents,
contents.sha,
branch=branch,
)
else:
raise Exception("Invalid change type")

...
</create>
<modify>
* filename_3: instructions_3
* filename_4: instructions_4
...
</modify>
"""
reply_prompt = """
Write a 1-paragraph response to this user:
* Tell them you have started working on this PR and a rough summary of your plan.
* Do not start with "Here is a draft", just write the response.
* Use github markdown to format the response.
"""
create_file_prompt = """
Think step-by-step regarding the instructions and what should be added to the new file.
Then create a plan of parts of the code to create, with low-level, detailed references to functions and variable to create, and what each function does.
Then create the following file using the following instructions:
File Name: {filename}
Instructions: {instructions}
Reply in the following format. DO NOT write "pass" or "Rest of code". Do not literally write "{{new_file}}". You must use the new_file XML tags, and all text inside these tags will be placed in the newly created file.
Step-by-step thoughts with explanations:
* Thought 1 - Explanation 1
* Thought 2 - Explanation 2
...
Detailed plan of additions:
* Addition 1
* Addition 2
...
Commit Message: {{commit_message}}
<new_file>
{{new_file}}
</new_file>
"""
"""
Reply in the format below.
* You MUST use the new_file XML tags
* DO NOT write ``` anywhere, unless it's markdown
* DO NOT write "pass" or "Rest of code"
* Do not literally write "{{new_file}}".
* Format:
"""
modify_file_plan_prompt = """
Think step-by-step regarding the instructions and how that can be applied to the current file to improve the current codebase.
Then create a plan of parts of the code to modify with detailed references to functions to modify.
File Name: {filename}
<old_file>
{code}
</old_file>
Your instructions to modify the file are: "{instructions}".
Step-by-step thoughts with explanations:
* Thought 1 - Explanation 1
* Thought 2 - Explanation 2
...
Detailed plan of modifications:
* Modification 1
* Modification 2
...
"""
modify_file_prompt = """
Generate a new_file based on the given plan, ensuring that you:
1. Do not write "pass" statements.
2. Provide complete functions with actual business logic. It is imperative that we do not leave any work to the user/future readers of this code.
3. Do not write new "todo" comments.
4. Do not write incomplete functions.
5. Do not write the original line numbers with the new code.
6. Make sure the new code follows the same programming language conventions as the old code.
Instead of writing "# Rest of Code", specify the lines to copy from the old file using an XML tag, inclusive (e.g., "<copied>0-25</copied>"). Make sure to use this exact format.
Copy the correct line numbers and copy as long of a prefix and suffix as possible. For instance, if you want to insert code after line 50, start with "<copied>0-50</copied>".
Example: New file:
print("new file")
</new_file>
Example: Insert at end:
<copied>0-100</copied>
print("inserted at end")
</new_file>
Example: If you want to insert code after lines 50 and 75:
<new_file>
<copied>0-50</copied>
def main():
print("hello world")
<copied>51-75</copied>
print("debug statement")
<copied>76-100</copied>
</new_file>
"""
pr_code_prompt = "" # TODO: deprecate this
pull_request_prompt = """
Awesome! Could you also provide a PR message in the following format? Content should be in Github style markdown. Thanks!
Title: {title}
Branch Name: {branch_name}
<content>
{content}
</content>
"""
summarize_system_prompt = """
Your name is Sweep bot. You are an engineer assigned to helping summarize code instructions and code changes.
"""
user_file_change_summarize_prompt = """
Summarize the given instructions for making changes in a pull request.
Code Instructions:
{message_content}
"""
assistant_file_change_summarize_prompt = """
Please summarize the following file using the file stubs.
Be sure to repeat each method signature and docstring. You may also add additional comments to the docstring.
Do not repeat the code in the file stubs.
Code Changes:
{message_content}
"""
slack_system_message_prompt = "Your name is Sweep bot. You are an engineer assigned to assisting the following Slack user. You will be helpful and friendly, but informal and concise: get to the point. You will use Slack-style markdown when needed to structure your responses."
slack_slash_command_prompt = """
Relevant snippets provided by search engine (decreasing relevance):
<relevant_snippets_in_repo>
{relevant_snippets}
</relevant_snippets_in_repo>
<relevant_paths_in_repo>
{relevant_directories}
</relevant_paths_in_repo>
Repo: {repo_name}: {repo_description}
Username: {username}
Query: {query}
Gather information (i.e. fetch more snippets) to solve the problem. Use "create_pr" if the user asks for changes or you think code changes are needed.
"""
code_repair_system_prompt = """\
You are a genius trained for code repair.
You will be given two pieces of code marked by xml tags. The code inside <diff></diff> is the difference betwen the user_code and the original code, and the code inside <user_code></user_code> is a user's attempt at adding a change described as {feature}.
Our goal is to return a working version of user_code that follows {feature}.
"""
code_repair_prompt = """\
<diff>
{diff}
</diff>
<user_code>
{user_code}
</user_code>
This is the user_code.
Instructions:
* Keep the logic changes from user_code.
* Fix any issues using our knowledge of both the diff and user_code files.
* Fix syntax errors and accidentally deleted lines.
* Do not perform code style cleanup.
* Do not add or remove any whitespace besides what is necessary to fix syntax errors.
* Do not add or remove any comments.
Return the repaired user_code without xml tags. All of the text you return will be placed in the file. Revert any unrelated deletions to user_code, using the diff and described change.
"""
gradio_system_message_prompt = """Your name is Sweep bot. You are a brilliant and thorough engineer assigned to assist the following user with their problems in the Github repo. You will be helpful and friendly, but informal and concise: get to the point. When you write code to solve tickets, the code works on the first try and is formatted perfectly. You have the utmost care for the user that you write for, so you do not make mistakes. If the user asks you to create a PR, you will use the create_pr function.
Relevant snippets provided by search engine (decreasing relevance):
{snippets}
Repo: {repo_name}

# # response = json.loads(response)
# function_name = response["name"]
# arguments = response["arguments"]
# logger.info(f"Fetching file {function_name} with arguments {arguments}.")
# arguments = json.loads(arguments)
# if function_name == "finish":
# return
# elif function_name == "cat":
# path = arguments["filepath"]
# try:
# logger.info("Retrieving file...")
# content = self.get_file(path).decoded_content.decode("utf-8")
# logger.info("Received file")
# except github.GithubException:
# response = self.chat(
# f"File not found: {path}",
# message_key=path,
# functions=functions,
# )
# else:
# response = self.chat(
# f"Here is the file: <file path=\"{path}\">\n\n{content[:10000]}</file>. Fetch more content or call finish.",
# message_key=path,
# functions=functions
# ) # update this constant
return
def create_file(self, file_change_request: FileChangeRequest) -> FileChange:
file_change: FileChange | None = None
for count in range(5):
create_file_response = self.chat(
create_file_prompt.format(
filename=file_change_request.filename,
instructions=file_change_request.instructions,
),
message_key=f"file_change_{file_change_request.filename}",
)
# Add file to list of changed_files
self.file_change_paths.append(file_change_request.filename)
# self.delete_file_from_system_message(file_path=file_change_request.filename)
try:
file_change = FileChange.from_string(create_file_response)
assert file_change is not None
file_change.commit_message = f"sweep: {file_change.commit_message[:50]}"
return file_change
except Exception:
logger.warning(f"Failed to parse. Retrying for the {count}th time...")
self.undo()
continue
raise Exception("Failed to parse response after 5 attempts.")
def modify_file(
self, file_change_request: FileChangeRequest, contents: str = ""
) -> tuple[str, str]:
if not contents:
contents = self.get_file(
file_change_request.filename
).decoded_content.decode("utf-8")
# Add line numbers to the contents; goes in prompts but not github
contents_line_numbers = "\n".join([f"{i}:{line}" for i, line in enumerate(contents.split("\n"))])
contents_line_numbers = contents_line_numbers.replace('"""', "'''")
for count in range(5):
if "0613" in self.model:
_ = self.chat( # We don't use the plan in the next call
modify_file_plan_prompt.format(
filename=file_change_request.filename,
instructions=file_change_request.instructions,
code=contents_line_numbers,
),
message_key=f"file_change_{file_change_request.filename}",
)
modify_file_response = self.chat(
modify_file_prompt,
message_key=f"file_change_{file_change_request.filename}",
)
try:
logger.info(f"modify_file_response: {modify_file_response}")
new_file = generate_new_file(modify_file_response, contents)
if not is_markdown(file_change_request.filename):
code_repairer = CodeRepairer(chat_logger=self.chat_logger)
diff = generate_diff(old_code=contents, new_code=new_file)
new_file = code_repairer.repair_code(diff=diff, user_code=new_file, feature=file_change_request.instructions)
# new_file = revert_whitespace_changes(original_file_str=contents, modified_file_str=new_file)
return (new_file, file_change_request.filename)
except Exception as e:
logger.warning(f"Recieved error {e}")
logger.warning(
f"Failed to parse. Retrying for the {count}th time..."
)
self.undo()
self.undo()
continue
raise Exception("Failed to parse response after 5 attempts.")
def change_file(self, file_change_request: FileChangeRequest):
if file_change_request.change_type == "create":
return self.create_file(file_change_request)
elif file_change_request.change_type == "modify":
return self.create_file(file_change_request)
else:
raise Exception("Not a valid file type")
def change_files_in_github(
self,

from sweepai.utils.chat_logger import ChatLogger
from sweepai.core.entities import (
FileChange,
FileChangeRequest,
FilesToChange,
PullRequest,
RegexMatchError,
Function,
Snippet
)
from sweepai.core.chat import ChatGPT
from sweepai.core.prompts import (
files_to_change_prompt,
pull_request_prompt,
create_file_prompt,
modify_file_prompt,
modify_file_plan_prompt,
)
from sweepai.utils.constants import DB_NAME
from sweepai.utils.diff import format_contents, generate_diff, generate_new_file, is_markdown, revert_whitespace_changes
class CodeGenBot(ChatGPT):
def get_files_to_change(self):
file_change_requests: list[FileChangeRequest] = []
for count in range(5):
try:
logger.info(f"Generating for the {count}th time...")
files_to_change_response = self.chat(files_to_change_prompt, message_key="files_to_change") # Dedup files to change here
files_to_change = FilesToChange.from_string(files_to_change_response)
files_to_create: list[str] = files_to_change.files_to_create.split("\n*")
files_to_modify: list[str] = files_to_change.files_to_modify.split("\n*")
for file_change_request, change_type in zip(
files_to_create + files_to_modify,
["create"] * len(files_to_create)
+ ["modify"] * len(files_to_modify),
):
file_change_request = file_change_request.strip()
if not file_change_request or file_change_request == "* None":
continue
logger.debug(file_change_request)
logger.debug(change_type)
file_change_requests.append(
FileChangeRequest.from_string(
file_change_request, change_type=change_type
)
)
# Create a dictionary to hold file names and their corresponding instructions
file_instructions_dict = {}
for file_change_request in file_change_requests:
# If the file name is already in the dictionary, append the new instructions
if file_change_request.filename in file_instructions_dict:
instructions, change_type = file_instructions_dict[file_change_request.filename]
file_instructions_dict[file_change_request.filename] = (instructions + " " + file_change_request.instructions, change_type)
else:
file_instructions_dict[file_change_request.filename] = (file_change_request.instructions, file_change_request.change_type)
file_change_requests = [FileChangeRequest(filename=file_name, instructions=instructions, change_type=change_type) for file_name, (instructions, change_type) in file_instructions_dict.items()]
if file_change_requests:
return file_change_requests
except RegexMatchError:
logger.warning("Failed to parse! Retrying...")
self.delete_messages_from_chat("files_to_change")
continue
raise Exception("Could not generate files to change")
def generate_pull_request(self) -> PullRequest:
pull_request = None
for count in range(5):
try:
logger.info(f"Generating for the {count}th time...")
pr_text_response = self.chat(pull_request_prompt, message_key="pull_request")
except Exception as e:
logger.warning(f"Exception {e}. Failed to parse! Retrying...")
self.undo()
continue
pull_request = PullRequest.from_string(pr_text_response)
pull_request.branch_name = "sweep/" + pull_request.branch_name[:250]
return pull_request
raise Exception("Could not generate PR text")
class GithubBot(BaseModel):
class Config:
arbitrary_types_allowed = True # for repo: Repository
repo: Repository
def get_contents(self, path: str, branch: str = ""):
if not branch:
branch = self.repo.default_branch
try:
return self.repo.get_contents(path, ref=branch)
except Exception as e:
logger.warning(path)
raise e
def get_file(self, file_path: str, branch: str = "") -> ContentFile:
content = self.get_contents(file_path, branch)
assert not isinstance(content, list)
return content
def check_path_exists(self, path: str, branch: str = ""):
try:
self.get_contents(path, branch)
return True
except Exception:
return False
def create_branch(self, branch: str) -> str:
# Generate PR if nothing is supplied maybe
base_branch = self.repo.get_branch(self.repo.default_branch)
try:
self.repo.create_git_ref(f"refs/heads/{branch}", base_branch.commit.sha)
return branch
except GithubException as e:
logger.error(f"Error: {e}, trying with other branch names...")
for i in range(1, 100):
try:
logger.warning(f"Retrying {branch}_{i}...")
self.repo.create_git_ref(
f"refs/heads/{branch}_{i}", base_branch.commit.sha
)
return f"{branch}_{i}"
except GithubException:
pass
raise e
def populate_snippets(self, snippets: list[Snippet]):


Step 2: 🧐 Snippet Analysis

From looking through the relevant snippets, I decided to make the following modifications:

File Path Proposed Changes
README.md Add a header "Welcome to Sweep!" at the top of the file.

Step 3: 📝 Planning

I have created a plan for writing the pull request. I am now working on executing my plan and coding the required changes to address this issue. Here is the planned pull request:

Add Header to README
sweep/update-readme-header

This PR adds a new header "Welcome to Sweep!" to the README.md file as per the user's request in issue #311. This change will make the README more welcoming to new visitors.


Step 4: ⌨️ Coding

I have finished coding the issue. I am now reviewing it for completeness.


Step 5: 🔁 Code Review

Success! 🚀


I'm a bot that handles simple bugs and feature requests but I might make mistakes. Please be kind!
Join Our Discord

@sweep-nightly sweep-nightly bot linked a pull request Jul 7, 2023 that will close this issue
@lukejagg lukejagg added sweep Assigns Sweep to an issue or pull request. and removed sweep Assigns Sweep to an issue or pull request. labels Jul 7, 2023
@sweep-nightly
Copy link
Contributor

sweep-nightly bot commented Jul 7, 2023

Here's the PR! #324


Step 1: 🔍 Code Search

I found the following snippets in your repository. I will now analyze this snippets and come up with a plan.

Some code snippets I looked at (click to expand). If some file is missing from here, you can mention the path in the ticket description.

def log_error(error_type, exception):
content = f"**{error_type} Error**\n{username}: {issue_url}\n```{exception}```"
discord_log_error(content)
def fetch_file_contents_with_retry():
retries = 3
error = None
for i in range(retries):
try:
logger.info(f"Fetching relevant files for the {i}th time...")
return search_snippets(
repo,
f"{title}\n{summary}\n{replies_text}",
num_files=num_of_snippets_to_query,
branch=None,
installation_id=installation_id,
)
except Exception as e:
error = e
continue
posthog.capture(
username, "fetching_failed", properties={"error": error, **metadata}
)
raise error
logger.info("Fetching relevant files...")
try:
snippets, tree = fetch_file_contents_with_retry()
assert len(snippets) > 0
except Exception as e:
logger.error(e)
comment_reply(
"It looks like an issue has occured around fetching the files. Perhaps the repo has not been initialized: try removing this repo and adding it back. I'll try again in a minute. If this error persists contact team@sweep.dev.",
-1
)
log_error("File Fetch", str(e))
raise e
num_full_files = 2
num_extended_snippets = 2
most_relevant_snippets = snippets[:num_full_files]
snippets = snippets[:-num_full_files]
logger.info("Expanding snippets...")
for snippet in most_relevant_snippets:
current_snippet = snippet
_chunks, metadatas, _ids = chunker.call(
current_snippet.content,
current_snippet.file_path
)
segmented_snippets = [
Snippet(
content=current_snippet.content,
start=metadata["start"],
end=metadata["end"],
file_path=metadata["file_path"],
) for metadata in metadatas
]
index = 0
while index < len(segmented_snippets) and segmented_snippets[index].start <= current_snippet.start:
index += 1
index -= 1
for i in range(index + 1, min(index + num_extended_snippets + 1, len(segmented_snippets))):
current_snippet += segmented_snippets[i]
for i in range(index - 1, max(index - num_extended_snippets - 1, 0), -1):
current_snippet = segmented_snippets[i] + current_snippet
snippets.append(current_snippet)
# snippet fusing
i = 0
while i < len(snippets):
j = i + 1
while j < len(snippets):
if snippets[i] ^ snippets[j]: # this checks for overlap
snippets[i] = snippets[i] | snippets[j] # merging
snippets.pop(j)
else:
j += 1
i += 1
snippets = snippets[:min(len(snippets), max_num_of_snippets)]
human_message = HumanMessagePrompt(
repo_name=repo_name,
issue_url=issue_url,
username=username,
repo_description=repo_description,
title=title,
summary=summary + replies_text,
snippets=snippets,
tree=tree, # TODO: Anything in repo tree that has something going through is expanded
)
chat_logger = ChatLogger({
'repo_name': repo_name,
'title': title,
'summary': summary + replies_text,
"issue_number": issue_number,
"issue_url": issue_url,
"username": username,
"repo_full_name": repo_full_name,
"repo_description": repo_description,
"installation_id": installation_id,
"comment_id": comment_id,
})
sweep_bot = SweepBot.from_system_message_content(
human_message=human_message, repo=repo, is_reply=bool(comments), chat_logger=chat_logger
)
sweepbot_retries = 3
try:
for i in range(sweepbot_retries):
# ANALYZE SNIPPETS
logger.info("CoT retrieval...")
if sweep_bot.model == "gpt-4-32k-0613":
sweep_bot.cot_retrieval()
newline = '\n'
comment_reply(
"I found the following snippets in your repository. I will now analyze this snippets and come up with a plan."
+ "\n\n"
+ collapsible_template.format(
summary="Some code snippets I looked at (click to expand). If some file is missing from here, you can mention the path in the ticket description.",
body="\n".join(
[
f"https://github.com/{organization}/{repo_name}/blob/{repo.get_commits()[0].sha}/{snippet.file_path}#L{max(snippet.start, 1)}-L{min(snippet.end, snippet.content.count(newline))}\n"
for snippet in snippets
]
),
),
1
)
# COMMENT ON ISSUE
# TODO: removed issue commenting here
logger.info("Fetching files to modify/create...")
file_change_requests = sweep_bot.get_files_to_change()
file_change_requests = sweep_bot.validate_file_change_requests(file_change_requests)
table = tabulate(
[[f"`{file_change_request.filename}`", file_change_request.instructions] for file_change_request in file_change_requests],
headers=["File Path", "Proposed Changes"],
tablefmt="pipe"
)
comment_reply(
"From looking through the relevant snippets, I decided to make the following modifications:\n\n" + table + "\n\n",
2
)
# CREATE PR METADATA
logger.info("Generating PR...")
pull_request = sweep_bot.generate_pull_request()
pull_request_content = pull_request.content.strip().replace("\n", "\n>")
pull_request_summary = f"**{pull_request.title}**\n`{pull_request.branch_name}`\n>{pull_request_content}\n"
comment_reply(
f"I have created a plan for writing the pull request. I am now working on executing my plan and coding the required changes to address this issue. Here is the planned pull request:\n\n{pull_request_summary}",
3
)
# WRITE PULL REQUEST
logger.info("Making PR...")
response = create_pr(file_change_requests, pull_request, sweep_bot, username, installation_id, issue_number)
if not response or not response["success"]: raise Exception("Failed to create PR")
pr = response["pull_request"]
current_issue.create_reaction("rocket")
comment_reply(
"I have finished coding the issue. I am now reviewing it for completeness.",
4
)
try:
current_issue.delete_reaction(eyes_reaction.id)
except:
pass
try:
# CODE REVIEW
changes_required, review_comment = review_pr(repo=repo, pr=pr, issue_url=issue_url, username=username,
repo_description=repo_description, title=title,
summary=summary, replies_text=replies_text, tree=tree)
logger.info(f"Addressing review comment {review_comment}")
if changes_required:
on_comment(repo_full_name=repo_full_name,
repo_description=repo_description,
comment=review_comment,
username=username,
installation_id=installation_id,
pr_path=None,
pr_line_position=None,
pr_number=pr.number)
except Exception as e:
logger.error(e)
# Completed code review
comment_reply(
"Success! 🚀",
5,
pr_message=f"## Here's the PR! [https://github.com/{repo_full_name}/pull/{pr.number}](https://github.com/{repo_full_name}/pull/{pr.number})",
)
break
except openai.error.InvalidRequestError as e:
logger.error(e)
comment_reply(
"I'm sorry, but it looks our model has ran out of context length. We're trying to make this happen less, but one way to mitigate this is to code smaller files. If this error persists contact team@sweep.dev.",
-1
)
log_error("Context Length", str(e))
posthog.capture(
username,
"failed",
properties={
"error": str(e),
"reason": "Invalid request error / context length",
**metadata,
},

except:
file_change_request.change_type = "create"
return file_change_requests
class SweepBot(CodeGenBot, GithubBot):
def cot_retrieval(self):
# TODO(sweep): add semantic search using vector db
# TODO(sweep): add search using webpilot + github
functions = [
Function(
name="cat",
description="Cat files. Max 3 files per request.",
parameters={
"properties": {
"filepath": {
"type": "string",
"description": "Paths to files. One per line."
},
}
} # manage file too large
),
Function(
name="finish",
description="Indicate you have sufficient data to proceed.",
parameters={"properties": {}}
),
]
# self.chat(
# cot_retrieval_prompt,
# message_key="cot_retrieval",
# functions=functions,
# )
# is_function_call = self.messages[-1].function_call is not None
# for _retry in range(3):
# logger.info("Got response.")
# if not is_function_call:
# break
# response = self.messages[-1].function_call
# # response = json.loads(response)
# function_name = response["name"]
# arguments = response["arguments"]
# logger.info(f"Fetching file {function_name} with arguments {arguments}.")
# arguments = json.loads(arguments)
# if function_name == "finish":
# return
# elif function_name == "cat":
# path = arguments["filepath"]
# try:
# logger.info("Retrieving file...")
# content = self.get_file(path).decoded_content.decode("utf-8")
# logger.info("Received file")
# except github.GithubException:
# response = self.chat(
# f"File not found: {path}",
# message_key=path,
# functions=functions,
# )
# else:
# response = self.chat(
# f"Here is the file: <file path=\"{path}\">\n\n{content[:10000]}</file>. Fetch more content or call finish.",
# message_key=path,
# functions=functions
# ) # update this constant
return
def create_file(self, file_change_request: FileChangeRequest) -> FileChange:
file_change: FileChange | None = None
for count in range(5):
create_file_response = self.chat(
create_file_prompt.format(
filename=file_change_request.filename,
instructions=file_change_request.instructions,
),
message_key=f"file_change_{file_change_request.filename}",
)
# Add file to list of changed_files
self.file_change_paths.append(file_change_request.filename)
# self.delete_file_from_system_message(file_path=file_change_request.filename)
try:
file_change = FileChange.from_string(create_file_response)
assert file_change is not None
file_change.commit_message = f"sweep: {file_change.commit_message[:50]}"
return file_change
except Exception:
logger.warning(f"Failed to parse. Retrying for the {count}th time...")
self.undo()
continue
raise Exception("Failed to parse response after 5 attempts.")
def modify_file(
self, file_change_request: FileChangeRequest, contents: str = ""
) -> tuple[str, str]:
if not contents:
contents = self.get_file(
file_change_request.filename
).decoded_content.decode("utf-8")
# Add line numbers to the contents; goes in prompts but not github
contents_line_numbers = "\n".join([f"{i}:{line}" for i, line in enumerate(contents.split("\n"))])
contents_line_numbers = contents_line_numbers.replace('"""', "'''")
for count in range(5):
if "0613" in self.model:
_ = self.chat( # We don't use the plan in the next call
modify_file_plan_prompt.format(
filename=file_change_request.filename,
instructions=file_change_request.instructions,
code=contents_line_numbers,
),
message_key=f"file_change_{file_change_request.filename}",
)
modify_file_response = self.chat(
modify_file_prompt,
message_key=f"file_change_{file_change_request.filename}",
)
try:
logger.info(f"modify_file_response: {modify_file_response}")
new_file = generate_new_file(modify_file_response, contents)
if not is_markdown(file_change_request.filename):
code_repairer = CodeRepairer(chat_logger=self.chat_logger)
diff = generate_diff(old_code=contents, new_code=new_file)
new_file = code_repairer.repair_code(diff=diff, user_code=new_file, feature=file_change_request.instructions)
# new_file = revert_whitespace_changes(original_file_str=contents, modified_file_str=new_file)
return (new_file, file_change_request.filename)
except Exception as e:
logger.warning(f"Recieved error {e}")
logger.warning(
f"Failed to parse. Retrying for the {count}th time..."
)
self.undo()
self.undo()
continue
raise Exception("Failed to parse response after 5 attempts.")
def change_file(self, file_change_request: FileChangeRequest):
if file_change_request.change_type == "create":
return self.create_file(file_change_request)
elif file_change_request.change_type == "modify":
return self.create_file(file_change_request)
else:
raise Exception("Not a valid file type")
def change_files_in_github(
self,
file_change_requests: list[FileChangeRequest],
branch: str,
):
# should check if branch exists, if not, create it
logger.debug(file_change_requests)
for file_change_request in file_change_requests:
file_markdown = is_markdown(file_change_request.filename)
if file_change_request.change_type == "create":
try: # Try to create
file_change = self.create_file(file_change_request)
logger.debug(
f"{file_change_request.filename}, {file_change.commit_message}, {file_change.code}, {branch}"
)
file_change.code = format_contents(file_change.code, file_markdown)
self.repo.create_file(
file_change_request.filename,
file_change.commit_message,
file_change.code,
branch=branch,
)
except github.GithubException as e:
logger.info(e)
try: # Try to modify
contents = self.get_file(file_change_request.filename, branch=branch)
file_change.code = format_contents(file_change.code, file_markdown)
self.repo.update_file(
file_change_request.filename,
file_change.commit_message,
file_change.code,
contents.sha,
branch=branch,
)
except:
pass
elif file_change_request.change_type == "modify":
# TODO(sweep): Cleanup this
try:
contents = self.get_file(file_change_request.filename, branch=branch)
except github.UnknownObjectException as e:
logger.warning(f"Received error {e}, trying creating file...")
file_change_request.change_type = "create"
self.create_file(file_change_request)
file_change = self.create_file(file_change_request)
logger.debug(
f"{file_change_request.filename}, {file_change.commit_message}, {file_change.code}, {branch}"
)
file_change.code = format_contents(file_change.code, file_markdown)
self.repo.create_file(
file_change_request.filename,
file_change.commit_message,
file_change.code,
branch=branch,
)
else:
new_file_contents, file_name = self.modify_file(
file_change_request, contents.decoded_content.decode("utf-8")
)
new_file_contents = format_contents(new_file_contents, file_markdown)
if contents.decoded_content.decode("utf-8").endswith("\n"):
new_file_contents += "\n"
logger.debug(
f"{file_name}, {f'Update {file_name}'}, {new_file_contents}, {branch}"
)
self.repo.update_file(
file_name,
f'Update {file_name}',
new_file_contents,
contents.sha,
branch=branch,
)
else:
raise Exception("Invalid change type")

...
</create>
<modify>
* filename_3: instructions_3
* filename_4: instructions_4
...
</modify>
"""
reply_prompt = """
Write a 1-paragraph response to this user:
* Tell them you have started working on this PR and a rough summary of your plan.
* Do not start with "Here is a draft", just write the response.
* Use github markdown to format the response.
"""
create_file_prompt = """
Think step-by-step regarding the instructions and what should be added to the new file.
Then create a plan of parts of the code to create, with low-level, detailed references to functions and variable to create, and what each function does.
Then create the following file using the following instructions:
File Name: {filename}
Instructions: {instructions}
Reply in the following format. DO NOT write "pass" or "Rest of code". Do not literally write "{{new_file}}". You must use the new_file XML tags, and all text inside these tags will be placed in the newly created file.
Step-by-step thoughts with explanations:
* Thought 1 - Explanation 1
* Thought 2 - Explanation 2
...
Detailed plan of additions:
* Addition 1
* Addition 2
...
Commit Message: {{commit_message}}
<new_file>
{{new_file}}
</new_file>
"""
"""
Reply in the format below.
* You MUST use the new_file XML tags
* DO NOT write ``` anywhere, unless it's markdown
* DO NOT write "pass" or "Rest of code"
* Do not literally write "{{new_file}}".
* Format:
"""
modify_file_plan_prompt = """
Think step-by-step regarding the instructions and how that can be applied to the current file to improve the current codebase.
Then create a plan of parts of the code to modify with detailed references to functions to modify.
File Name: {filename}
<old_file>
{code}
</old_file>
Your instructions to modify the file are: "{instructions}".
Step-by-step thoughts with explanations:
* Thought 1 - Explanation 1
* Thought 2 - Explanation 2
...
Detailed plan of modifications:
* Modification 1
* Modification 2
...
"""
modify_file_prompt = """
Generate a new_file based on the given plan, ensuring that you:
1. Do not write "pass" statements.
2. Provide complete functions with actual business logic. It is imperative that we do not leave any work to the user/future readers of this code.
3. Do not write new "todo" comments.
4. Do not write incomplete functions.
5. Do not write the original line numbers with the new code.
6. Make sure the new code follows the same programming language conventions as the old code.
Instead of writing "# Rest of Code", specify the lines to copy from the old file using an XML tag, inclusive (e.g., "<copied>0-25</copied>"). Make sure to use this exact format.
Copy the correct line numbers and copy as long of a prefix and suffix as possible. For instance, if you want to insert code after line 50, start with "<copied>0-50</copied>".
Example: New file:
print("new file")
</new_file>
Example: Insert at end:
<copied>0-100</copied>
print("inserted at end")
</new_file>
Example: If you want to insert code after lines 50 and 75:
<new_file>
<copied>0-50</copied>
def main():
print("hello world")
<copied>51-75</copied>
print("debug statement")
<copied>76-100</copied>
</new_file>
"""
pr_code_prompt = "" # TODO: deprecate this
pull_request_prompt = """
Awesome! Could you also provide a PR message in the following format? Content should be in Github style markdown. Thanks!
Title: {title}
Branch Name: {branch_name}
<content>
{content}
</content>
"""
summarize_system_prompt = """
Your name is Sweep bot. You are an engineer assigned to helping summarize code instructions and code changes.
"""
user_file_change_summarize_prompt = """
Summarize the given instructions for making changes in a pull request.
Code Instructions:
{message_content}
"""
assistant_file_change_summarize_prompt = """
Please summarize the following file using the file stubs.
Be sure to repeat each method signature and docstring. You may also add additional comments to the docstring.
Do not repeat the code in the file stubs.
Code Changes:
{message_content}
"""
slack_system_message_prompt = "Your name is Sweep bot. You are an engineer assigned to assisting the following Slack user. You will be helpful and friendly, but informal and concise: get to the point. You will use Slack-style markdown when needed to structure your responses."
slack_slash_command_prompt = """
Relevant snippets provided by search engine (decreasing relevance):
<relevant_snippets_in_repo>
{relevant_snippets}
</relevant_snippets_in_repo>
<relevant_paths_in_repo>
{relevant_directories}
</relevant_paths_in_repo>
Repo: {repo_name}: {repo_description}
Username: {username}
Query: {query}
Gather information (i.e. fetch more snippets) to solve the problem. Use "create_pr" if the user asks for changes or you think code changes are needed.
"""
code_repair_system_prompt = """\
You are a genius trained for code repair.
You will be given two pieces of code marked by xml tags. The code inside <diff></diff> is the difference betwen the user_code and the original code, and the code inside <user_code></user_code> is a user's attempt at adding a change described as {feature}.
Our goal is to return a working version of user_code that follows {feature}.
"""
code_repair_prompt = """\
<diff>
{diff}
</diff>
<user_code>
{user_code}
</user_code>
This is the user_code.
Instructions:
* Keep the logic changes from user_code.
* Fix any issues using our knowledge of both the diff and user_code files.
* Fix syntax errors and accidentally deleted lines.
* Do not perform code style cleanup.
* Do not add or remove any whitespace besides what is necessary to fix syntax errors.
* Do not add or remove any comments.
Return the repaired user_code without xml tags. All of the text you return will be placed in the file. Revert any unrelated deletions to user_code, using the diff and described change.
"""
gradio_system_message_prompt = """Your name is Sweep bot. You are a brilliant and thorough engineer assigned to assist the following user with their problems in the Github repo. You will be helpful and friendly, but informal and concise: get to the point. When you write code to solve tickets, the code works on the first try and is formatted perfectly. You have the utmost care for the user that you write for, so you do not make mistakes. If the user asks you to create a PR, you will use the create_pr function.
Relevant snippets provided by search engine (decreasing relevance):
{snippets}
Repo: {repo_name}

# # response = json.loads(response)
# function_name = response["name"]
# arguments = response["arguments"]
# logger.info(f"Fetching file {function_name} with arguments {arguments}.")
# arguments = json.loads(arguments)
# if function_name == "finish":
# return
# elif function_name == "cat":
# path = arguments["filepath"]
# try:
# logger.info("Retrieving file...")
# content = self.get_file(path).decoded_content.decode("utf-8")
# logger.info("Received file")
# except github.GithubException:
# response = self.chat(
# f"File not found: {path}",
# message_key=path,
# functions=functions,
# )
# else:
# response = self.chat(
# f"Here is the file: <file path=\"{path}\">\n\n{content[:10000]}</file>. Fetch more content or call finish.",
# message_key=path,
# functions=functions
# ) # update this constant
return
def create_file(self, file_change_request: FileChangeRequest) -> FileChange:
file_change: FileChange | None = None
for count in range(5):
create_file_response = self.chat(
create_file_prompt.format(
filename=file_change_request.filename,
instructions=file_change_request.instructions,
),
message_key=f"file_change_{file_change_request.filename}",
)
# Add file to list of changed_files
self.file_change_paths.append(file_change_request.filename)
# self.delete_file_from_system_message(file_path=file_change_request.filename)
try:
file_change = FileChange.from_string(create_file_response)
assert file_change is not None
file_change.commit_message = f"sweep: {file_change.commit_message[:50]}"
return file_change
except Exception:
logger.warning(f"Failed to parse. Retrying for the {count}th time...")
self.undo()
continue
raise Exception("Failed to parse response after 5 attempts.")
def modify_file(
self, file_change_request: FileChangeRequest, contents: str = ""
) -> tuple[str, str]:
if not contents:
contents = self.get_file(
file_change_request.filename
).decoded_content.decode("utf-8")
# Add line numbers to the contents; goes in prompts but not github
contents_line_numbers = "\n".join([f"{i}:{line}" for i, line in enumerate(contents.split("\n"))])
contents_line_numbers = contents_line_numbers.replace('"""', "'''")
for count in range(5):
if "0613" in self.model:
_ = self.chat( # We don't use the plan in the next call
modify_file_plan_prompt.format(
filename=file_change_request.filename,
instructions=file_change_request.instructions,
code=contents_line_numbers,
),
message_key=f"file_change_{file_change_request.filename}",
)
modify_file_response = self.chat(
modify_file_prompt,
message_key=f"file_change_{file_change_request.filename}",
)
try:
logger.info(f"modify_file_response: {modify_file_response}")
new_file = generate_new_file(modify_file_response, contents)
if not is_markdown(file_change_request.filename):
code_repairer = CodeRepairer(chat_logger=self.chat_logger)
diff = generate_diff(old_code=contents, new_code=new_file)
new_file = code_repairer.repair_code(diff=diff, user_code=new_file, feature=file_change_request.instructions)
# new_file = revert_whitespace_changes(original_file_str=contents, modified_file_str=new_file)
return (new_file, file_change_request.filename)
except Exception as e:
logger.warning(f"Recieved error {e}")
logger.warning(
f"Failed to parse. Retrying for the {count}th time..."
)
self.undo()
self.undo()
continue
raise Exception("Failed to parse response after 5 attempts.")
def change_file(self, file_change_request: FileChangeRequest):
if file_change_request.change_type == "create":
return self.create_file(file_change_request)
elif file_change_request.change_type == "modify":
return self.create_file(file_change_request)
else:
raise Exception("Not a valid file type")
def change_files_in_github(
self,

from sweepai.utils.chat_logger import ChatLogger
from sweepai.core.entities import (
FileChange,
FileChangeRequest,
FilesToChange,
PullRequest,
RegexMatchError,
Function,
Snippet
)
from sweepai.core.chat import ChatGPT
from sweepai.core.prompts import (
files_to_change_prompt,
pull_request_prompt,
create_file_prompt,
modify_file_prompt,
modify_file_plan_prompt,
)
from sweepai.utils.constants import DB_NAME
from sweepai.utils.diff import format_contents, generate_diff, generate_new_file, is_markdown, revert_whitespace_changes
class CodeGenBot(ChatGPT):
def get_files_to_change(self):
file_change_requests: list[FileChangeRequest] = []
for count in range(5):
try:
logger.info(f"Generating for the {count}th time...")
files_to_change_response = self.chat(files_to_change_prompt, message_key="files_to_change") # Dedup files to change here
files_to_change = FilesToChange.from_string(files_to_change_response)
files_to_create: list[str] = files_to_change.files_to_create.split("\n*")
files_to_modify: list[str] = files_to_change.files_to_modify.split("\n*")
for file_change_request, change_type in zip(
files_to_create + files_to_modify,
["create"] * len(files_to_create)
+ ["modify"] * len(files_to_modify),
):
file_change_request = file_change_request.strip()
if not file_change_request or file_change_request == "* None":
continue
logger.debug(file_change_request)
logger.debug(change_type)
file_change_requests.append(
FileChangeRequest.from_string(
file_change_request, change_type=change_type
)
)
# Create a dictionary to hold file names and their corresponding instructions
file_instructions_dict = {}
for file_change_request in file_change_requests:
# If the file name is already in the dictionary, append the new instructions
if file_change_request.filename in file_instructions_dict:
instructions, change_type = file_instructions_dict[file_change_request.filename]
file_instructions_dict[file_change_request.filename] = (instructions + " " + file_change_request.instructions, change_type)
else:
file_instructions_dict[file_change_request.filename] = (file_change_request.instructions, file_change_request.change_type)
file_change_requests = [FileChangeRequest(filename=file_name, instructions=instructions, change_type=change_type) for file_name, (instructions, change_type) in file_instructions_dict.items()]
if file_change_requests:
return file_change_requests
except RegexMatchError:
logger.warning("Failed to parse! Retrying...")
self.delete_messages_from_chat("files_to_change")
continue
raise Exception("Could not generate files to change")
def generate_pull_request(self) -> PullRequest:
pull_request = None
for count in range(5):
try:
logger.info(f"Generating for the {count}th time...")
pr_text_response = self.chat(pull_request_prompt, message_key="pull_request")
except Exception as e:
logger.warning(f"Exception {e}. Failed to parse! Retrying...")
self.undo()
continue
pull_request = PullRequest.from_string(pr_text_response)
pull_request.branch_name = "sweep/" + pull_request.branch_name[:250]
return pull_request
raise Exception("Could not generate PR text")
class GithubBot(BaseModel):
class Config:
arbitrary_types_allowed = True # for repo: Repository
repo: Repository
def get_contents(self, path: str, branch: str = ""):
if not branch:
branch = self.repo.default_branch
try:
return self.repo.get_contents(path, ref=branch)
except Exception as e:
logger.warning(path)
raise e
def get_file(self, file_path: str, branch: str = "") -> ContentFile:
content = self.get_contents(file_path, branch)
assert not isinstance(content, list)
return content
def check_path_exists(self, path: str, branch: str = ""):
try:
self.get_contents(path, branch)
return True
except Exception:
return False
def create_branch(self, branch: str) -> str:
# Generate PR if nothing is supplied maybe
base_branch = self.repo.get_branch(self.repo.default_branch)
try:
self.repo.create_git_ref(f"refs/heads/{branch}", base_branch.commit.sha)
return branch
except GithubException as e:
logger.error(f"Error: {e}, trying with other branch names...")
for i in range(1, 100):
try:
logger.warning(f"Retrying {branch}_{i}...")
self.repo.create_git_ref(
f"refs/heads/{branch}_{i}", base_branch.commit.sha
)
return f"{branch}_{i}"
except GithubException:
pass
raise e
def populate_snippets(self, snippets: list[Snippet]):


Step 2: 🧐 Snippet Analysis

From looking through the relevant snippets, I decided to make the following modifications:

File Path Proposed Changes
README.md Add a generic header at the beginning of the file.

Step 3: 📝 Planning

I have created a plan for writing the pull request. I am now working on executing my plan and coding the required changes to address this issue. Here is the planned pull request:

Add Header to README
sweep/update-readme-header

This PR adds a generic header to the README file. The header can be easily modified to suit the project's needs. This change was requested in issue #311.


Step 4: ⌨️ Coding

I have finished coding the issue. I am now reviewing it for completeness.


Step 5: 🔁 Code Review

Success! 🚀


I'm a bot that handles simple bugs and feature requests but I might make mistakes. Please be kind!
Join Our Discord

@sweep-nightly sweep-nightly bot linked a pull request Jul 7, 2023 that will close this issue
@lukejagg lukejagg added sweep Assigns Sweep to an issue or pull request. and removed sweep Assigns Sweep to an issue or pull request. labels Jul 7, 2023
@sweep-nightly
Copy link
Contributor

sweep-nightly bot commented Jul 7, 2023

Here's the PR! #325


Step 1: 🔍 Code Search

I found the following snippets in your repository. I will now analyze this snippets and come up with a plan.

Some code snippets I looked at (click to expand). If some file is missing from here, you can mention the path in the ticket description.

def log_error(error_type, exception):
content = f"**{error_type} Error**\n{username}: {issue_url}\n```{exception}```"
discord_log_error(content)
def fetch_file_contents_with_retry():
retries = 3
error = None
for i in range(retries):
try:
logger.info(f"Fetching relevant files for the {i}th time...")
return search_snippets(
repo,
f"{title}\n{summary}\n{replies_text}",
num_files=num_of_snippets_to_query,
branch=None,
installation_id=installation_id,
)
except Exception as e:
error = e
continue
posthog.capture(
username, "fetching_failed", properties={"error": error, **metadata}
)
raise error
logger.info("Fetching relevant files...")
try:
snippets, tree = fetch_file_contents_with_retry()
assert len(snippets) > 0
except Exception as e:
logger.error(e)
comment_reply(
"It looks like an issue has occured around fetching the files. Perhaps the repo has not been initialized: try removing this repo and adding it back. I'll try again in a minute. If this error persists contact team@sweep.dev.",
-1
)
log_error("File Fetch", str(e))
raise e
num_full_files = 2
num_extended_snippets = 2
most_relevant_snippets = snippets[:num_full_files]
snippets = snippets[:-num_full_files]
logger.info("Expanding snippets...")
for snippet in most_relevant_snippets:
current_snippet = snippet
_chunks, metadatas, _ids = chunker.call(
current_snippet.content,
current_snippet.file_path
)
segmented_snippets = [
Snippet(
content=current_snippet.content,
start=metadata["start"],
end=metadata["end"],
file_path=metadata["file_path"],
) for metadata in metadatas
]
index = 0
while index < len(segmented_snippets) and segmented_snippets[index].start <= current_snippet.start:
index += 1
index -= 1
for i in range(index + 1, min(index + num_extended_snippets + 1, len(segmented_snippets))):
current_snippet += segmented_snippets[i]
for i in range(index - 1, max(index - num_extended_snippets - 1, 0), -1):
current_snippet = segmented_snippets[i] + current_snippet
snippets.append(current_snippet)
# snippet fusing
i = 0
while i < len(snippets):
j = i + 1
while j < len(snippets):
if snippets[i] ^ snippets[j]: # this checks for overlap
snippets[i] = snippets[i] | snippets[j] # merging
snippets.pop(j)
else:
j += 1
i += 1
snippets = snippets[:min(len(snippets), max_num_of_snippets)]
human_message = HumanMessagePrompt(
repo_name=repo_name,
issue_url=issue_url,
username=username,
repo_description=repo_description,
title=title,
summary=summary + replies_text,
snippets=snippets,
tree=tree, # TODO: Anything in repo tree that has something going through is expanded
)
chat_logger = ChatLogger({
'repo_name': repo_name,
'title': title,
'summary': summary + replies_text,
"issue_number": issue_number,
"issue_url": issue_url,
"username": username,
"repo_full_name": repo_full_name,
"repo_description": repo_description,
"installation_id": installation_id,
"comment_id": comment_id,
})
sweep_bot = SweepBot.from_system_message_content(
human_message=human_message, repo=repo, is_reply=bool(comments), chat_logger=chat_logger
)
sweepbot_retries = 3
try:
for i in range(sweepbot_retries):
# ANALYZE SNIPPETS
logger.info("CoT retrieval...")
if sweep_bot.model == "gpt-4-32k-0613":
sweep_bot.cot_retrieval()
newline = '\n'
comment_reply(
"I found the following snippets in your repository. I will now analyze this snippets and come up with a plan."
+ "\n\n"
+ collapsible_template.format(
summary="Some code snippets I looked at (click to expand). If some file is missing from here, you can mention the path in the ticket description.",
body="\n".join(
[
f"https://github.com/{organization}/{repo_name}/blob/{repo.get_commits()[0].sha}/{snippet.file_path}#L{max(snippet.start, 1)}-L{min(snippet.end, snippet.content.count(newline))}\n"
for snippet in snippets
]
),
),
1
)
# COMMENT ON ISSUE
# TODO: removed issue commenting here
logger.info("Fetching files to modify/create...")
file_change_requests = sweep_bot.get_files_to_change()
file_change_requests = sweep_bot.validate_file_change_requests(file_change_requests)
table = tabulate(
[[f"`{file_change_request.filename}`", file_change_request.instructions] for file_change_request in file_change_requests],
headers=["File Path", "Proposed Changes"],
tablefmt="pipe"
)
comment_reply(
"From looking through the relevant snippets, I decided to make the following modifications:\n\n" + table + "\n\n",
2
)
# CREATE PR METADATA
logger.info("Generating PR...")
pull_request = sweep_bot.generate_pull_request()
pull_request_content = pull_request.content.strip().replace("\n", "\n>")
pull_request_summary = f"**{pull_request.title}**\n`{pull_request.branch_name}`\n>{pull_request_content}\n"
comment_reply(
f"I have created a plan for writing the pull request. I am now working on executing my plan and coding the required changes to address this issue. Here is the planned pull request:\n\n{pull_request_summary}",
3
)
# WRITE PULL REQUEST
logger.info("Making PR...")
response = create_pr(file_change_requests, pull_request, sweep_bot, username, installation_id, issue_number)
if not response or not response["success"]: raise Exception("Failed to create PR")
pr = response["pull_request"]
current_issue.create_reaction("rocket")
comment_reply(
"I have finished coding the issue. I am now reviewing it for completeness.",
4
)
try:
current_issue.delete_reaction(eyes_reaction.id)
except:
pass
try:
# CODE REVIEW
changes_required, review_comment = review_pr(repo=repo, pr=pr, issue_url=issue_url, username=username,
repo_description=repo_description, title=title,
summary=summary, replies_text=replies_text, tree=tree)
logger.info(f"Addressing review comment {review_comment}")
if changes_required:
on_comment(repo_full_name=repo_full_name,
repo_description=repo_description,
comment=review_comment,
username=username,
installation_id=installation_id,
pr_path=None,
pr_line_position=None,
pr_number=pr.number)
except Exception as e:
logger.error(e)
# Completed code review
comment_reply(
"Success! 🚀",
5,
pr_message=f"## Here's the PR! [https://github.com/{repo_full_name}/pull/{pr.number}](https://github.com/{repo_full_name}/pull/{pr.number})",
)
break
except openai.error.InvalidRequestError as e:
logger.error(e)
comment_reply(
"I'm sorry, but it looks our model has ran out of context length. We're trying to make this happen less, but one way to mitigate this is to code smaller files. If this error persists contact team@sweep.dev.",
-1
)
log_error("Context Length", str(e))
posthog.capture(
username,
"failed",
properties={
"error": str(e),
"reason": "Invalid request error / context length",
**metadata,
},

except:
file_change_request.change_type = "create"
return file_change_requests
class SweepBot(CodeGenBot, GithubBot):
def cot_retrieval(self):
# TODO(sweep): add semantic search using vector db
# TODO(sweep): add search using webpilot + github
functions = [
Function(
name="cat",
description="Cat files. Max 3 files per request.",
parameters={
"properties": {
"filepath": {
"type": "string",
"description": "Paths to files. One per line."
},
}
} # manage file too large
),
Function(
name="finish",
description="Indicate you have sufficient data to proceed.",
parameters={"properties": {}}
),
]
# self.chat(
# cot_retrieval_prompt,
# message_key="cot_retrieval",
# functions=functions,
# )
# is_function_call = self.messages[-1].function_call is not None
# for _retry in range(3):
# logger.info("Got response.")
# if not is_function_call:
# break
# response = self.messages[-1].function_call
# # response = json.loads(response)
# function_name = response["name"]
# arguments = response["arguments"]
# logger.info(f"Fetching file {function_name} with arguments {arguments}.")
# arguments = json.loads(arguments)
# if function_name == "finish":
# return
# elif function_name == "cat":
# path = arguments["filepath"]
# try:
# logger.info("Retrieving file...")
# content = self.get_file(path).decoded_content.decode("utf-8")
# logger.info("Received file")
# except github.GithubException:
# response = self.chat(
# f"File not found: {path}",
# message_key=path,
# functions=functions,
# )
# else:
# response = self.chat(
# f"Here is the file: <file path=\"{path}\">\n\n{content[:10000]}</file>. Fetch more content or call finish.",
# message_key=path,
# functions=functions
# ) # update this constant
return
def create_file(self, file_change_request: FileChangeRequest) -> FileChange:
file_change: FileChange | None = None
for count in range(5):
create_file_response = self.chat(
create_file_prompt.format(
filename=file_change_request.filename,
instructions=file_change_request.instructions,
),
message_key=f"file_change_{file_change_request.filename}",
)
# Add file to list of changed_files
self.file_change_paths.append(file_change_request.filename)
# self.delete_file_from_system_message(file_path=file_change_request.filename)
try:
file_change = FileChange.from_string(create_file_response)
assert file_change is not None
file_change.commit_message = f"sweep: {file_change.commit_message[:50]}"
return file_change
except Exception:
logger.warning(f"Failed to parse. Retrying for the {count}th time...")
self.undo()
continue
raise Exception("Failed to parse response after 5 attempts.")
def modify_file(
self, file_change_request: FileChangeRequest, contents: str = ""
) -> tuple[str, str]:
if not contents:
contents = self.get_file(
file_change_request.filename
).decoded_content.decode("utf-8")
# Add line numbers to the contents; goes in prompts but not github
contents_line_numbers = "\n".join([f"{i}:{line}" for i, line in enumerate(contents.split("\n"))])
contents_line_numbers = contents_line_numbers.replace('"""', "'''")
for count in range(5):
if "0613" in self.model:
_ = self.chat( # We don't use the plan in the next call
modify_file_plan_prompt.format(
filename=file_change_request.filename,
instructions=file_change_request.instructions,
code=contents_line_numbers,
),
message_key=f"file_change_{file_change_request.filename}",
)
modify_file_response = self.chat(
modify_file_prompt,
message_key=f"file_change_{file_change_request.filename}",
)
try:
logger.info(f"modify_file_response: {modify_file_response}")
new_file = generate_new_file(modify_file_response, contents)
if not is_markdown(file_change_request.filename):
code_repairer = CodeRepairer(chat_logger=self.chat_logger)
diff = generate_diff(old_code=contents, new_code=new_file)
new_file = code_repairer.repair_code(diff=diff, user_code=new_file, feature=file_change_request.instructions)
# new_file = revert_whitespace_changes(original_file_str=contents, modified_file_str=new_file)
return (new_file, file_change_request.filename)
except Exception as e:
logger.warning(f"Recieved error {e}")
logger.warning(
f"Failed to parse. Retrying for the {count}th time..."
)
self.undo()
self.undo()
continue
raise Exception("Failed to parse response after 5 attempts.")
def change_file(self, file_change_request: FileChangeRequest):
if file_change_request.change_type == "create":
return self.create_file(file_change_request)
elif file_change_request.change_type == "modify":
return self.create_file(file_change_request)
else:
raise Exception("Not a valid file type")
def change_files_in_github(
self,
file_change_requests: list[FileChangeRequest],
branch: str,
):
# should check if branch exists, if not, create it
logger.debug(file_change_requests)
for file_change_request in file_change_requests:
file_markdown = is_markdown(file_change_request.filename)
if file_change_request.change_type == "create":
try: # Try to create
file_change = self.create_file(file_change_request)
logger.debug(
f"{file_change_request.filename}, {file_change.commit_message}, {file_change.code}, {branch}"
)
file_change.code = format_contents(file_change.code, file_markdown)
self.repo.create_file(
file_change_request.filename,
file_change.commit_message,
file_change.code,
branch=branch,
)
except github.GithubException as e:
logger.info(e)
try: # Try to modify
contents = self.get_file(file_change_request.filename, branch=branch)
file_change.code = format_contents(file_change.code, file_markdown)
self.repo.update_file(
file_change_request.filename,
file_change.commit_message,
file_change.code,
contents.sha,
branch=branch,
)
except:
pass
elif file_change_request.change_type == "modify":
# TODO(sweep): Cleanup this
try:
contents = self.get_file(file_change_request.filename, branch=branch)
except github.UnknownObjectException as e:
logger.warning(f"Received error {e}, trying creating file...")
file_change_request.change_type = "create"
self.create_file(file_change_request)
file_change = self.create_file(file_change_request)
logger.debug(
f"{file_change_request.filename}, {file_change.commit_message}, {file_change.code}, {branch}"
)
file_change.code = format_contents(file_change.code, file_markdown)
self.repo.create_file(
file_change_request.filename,
file_change.commit_message,
file_change.code,
branch=branch,
)
else:
new_file_contents, file_name = self.modify_file(
file_change_request, contents.decoded_content.decode("utf-8")
)
new_file_contents = format_contents(new_file_contents, file_markdown)
if contents.decoded_content.decode("utf-8").endswith("\n"):
new_file_contents += "\n"
logger.debug(
f"{file_name}, {f'Update {file_name}'}, {new_file_contents}, {branch}"
)
self.repo.update_file(
file_name,
f'Update {file_name}',
new_file_contents,
contents.sha,
branch=branch,
)
else:
raise Exception("Invalid change type")

...
</create>
<modify>
* filename_3: instructions_3
* filename_4: instructions_4
...
</modify>
"""
reply_prompt = """
Write a 1-paragraph response to this user:
* Tell them you have started working on this PR and a rough summary of your plan.
* Do not start with "Here is a draft", just write the response.
* Use github markdown to format the response.
"""
create_file_prompt = """
Think step-by-step regarding the instructions and what should be added to the new file.
Then create a plan of parts of the code to create, with low-level, detailed references to functions and variable to create, and what each function does.
Then create the following file using the following instructions:
File Name: {filename}
Instructions: {instructions}
Reply in the following format. DO NOT write "pass" or "Rest of code". Do not literally write "{{new_file}}". You must use the new_file XML tags, and all text inside these tags will be placed in the newly created file.
Step-by-step thoughts with explanations:
* Thought 1 - Explanation 1
* Thought 2 - Explanation 2
...
Detailed plan of additions:
* Addition 1
* Addition 2
...
Commit Message: {{commit_message}}
<new_file>
{{new_file}}
</new_file>
"""
"""
Reply in the format below.
* You MUST use the new_file XML tags
* DO NOT write ``` anywhere, unless it's markdown
* DO NOT write "pass" or "Rest of code"
* Do not literally write "{{new_file}}".
* Format:
"""
modify_file_plan_prompt = """
Think step-by-step regarding the instructions and how that can be applied to the current file to improve the current codebase.
Then create a plan of parts of the code to modify with detailed references to functions to modify.
File Name: {filename}
<old_file>
{code}
</old_file>
Your instructions to modify the file are: "{instructions}".
Step-by-step thoughts with explanations:
* Thought 1 - Explanation 1
* Thought 2 - Explanation 2
...
Detailed plan of modifications:
* Modification 1
* Modification 2
...
"""
modify_file_prompt = """
Generate a new_file based on the given plan, ensuring that you:
1. Do not write "pass" statements.
2. Provide complete functions with actual business logic. It is imperative that we do not leave any work to the user/future readers of this code.
3. Do not write new "todo" comments.
4. Do not write incomplete functions.
5. Do not write the original line numbers with the new code.
6. Make sure the new code follows the same programming language conventions as the old code.
Instead of writing "# Rest of Code", specify the lines to copy from the old file using an XML tag, inclusive (e.g., "<copied>0-25</copied>"). Make sure to use this exact format.
Copy the correct line numbers and copy as long of a prefix and suffix as possible. For instance, if you want to insert code after line 50, start with "<copied>0-50</copied>".
Example: New file:
print("new file")
</new_file>
Example: Insert at end:
<copied>0-100</copied>
print("inserted at end")
</new_file>
Example: If you want to insert code after lines 50 and 75:
<new_file>
<copied>0-50</copied>
def main():
print("hello world")
<copied>51-75</copied>
print("debug statement")
<copied>76-100</copied>
</new_file>
"""
pr_code_prompt = "" # TODO: deprecate this
pull_request_prompt = """
Awesome! Could you also provide a PR message in the following format? Content should be in Github style markdown. Thanks!
Title: {title}
Branch Name: {branch_name}
<content>
{content}
</content>
"""
summarize_system_prompt = """
Your name is Sweep bot. You are an engineer assigned to helping summarize code instructions and code changes.
"""
user_file_change_summarize_prompt = """
Summarize the given instructions for making changes in a pull request.
Code Instructions:
{message_content}
"""
assistant_file_change_summarize_prompt = """
Please summarize the following file using the file stubs.
Be sure to repeat each method signature and docstring. You may also add additional comments to the docstring.
Do not repeat the code in the file stubs.
Code Changes:
{message_content}
"""
slack_system_message_prompt = "Your name is Sweep bot. You are an engineer assigned to assisting the following Slack user. You will be helpful and friendly, but informal and concise: get to the point. You will use Slack-style markdown when needed to structure your responses."
slack_slash_command_prompt = """
Relevant snippets provided by search engine (decreasing relevance):
<relevant_snippets_in_repo>
{relevant_snippets}
</relevant_snippets_in_repo>
<relevant_paths_in_repo>
{relevant_directories}
</relevant_paths_in_repo>
Repo: {repo_name}: {repo_description}
Username: {username}
Query: {query}
Gather information (i.e. fetch more snippets) to solve the problem. Use "create_pr" if the user asks for changes or you think code changes are needed.
"""
code_repair_system_prompt = """\
You are a genius trained for code repair.
You will be given two pieces of code marked by xml tags. The code inside <diff></diff> is the difference betwen the user_code and the original code, and the code inside <user_code></user_code> is a user's attempt at adding a change described as {feature}.
Our goal is to return a working version of user_code that follows {feature}.
"""
code_repair_prompt = """\
<diff>
{diff}
</diff>
<user_code>
{user_code}
</user_code>
This is the user_code.
Instructions:
* Keep the logic changes from user_code.
* Fix any issues using our knowledge of both the diff and user_code files.
* Fix syntax errors and accidentally deleted lines.
* Do not perform code style cleanup.
* Do not add or remove any whitespace besides what is necessary to fix syntax errors.
* Do not add or remove any comments.
Return the repaired user_code without xml tags. All of the text you return will be placed in the file. Revert any unrelated deletions to user_code, using the diff and described change.
"""
gradio_system_message_prompt = """Your name is Sweep bot. You are a brilliant and thorough engineer assigned to assist the following user with their problems in the Github repo. You will be helpful and friendly, but informal and concise: get to the point. When you write code to solve tickets, the code works on the first try and is formatted perfectly. You have the utmost care for the user that you write for, so you do not make mistakes. If the user asks you to create a PR, you will use the create_pr function.
Relevant snippets provided by search engine (decreasing relevance):
{snippets}
Repo: {repo_name}

# # response = json.loads(response)
# function_name = response["name"]
# arguments = response["arguments"]
# logger.info(f"Fetching file {function_name} with arguments {arguments}.")
# arguments = json.loads(arguments)
# if function_name == "finish":
# return
# elif function_name == "cat":
# path = arguments["filepath"]
# try:
# logger.info("Retrieving file...")
# content = self.get_file(path).decoded_content.decode("utf-8")
# logger.info("Received file")
# except github.GithubException:
# response = self.chat(
# f"File not found: {path}",
# message_key=path,
# functions=functions,
# )
# else:
# response = self.chat(
# f"Here is the file: <file path=\"{path}\">\n\n{content[:10000]}</file>. Fetch more content or call finish.",
# message_key=path,
# functions=functions
# ) # update this constant
return
def create_file(self, file_change_request: FileChangeRequest) -> FileChange:
file_change: FileChange | None = None
for count in range(5):
create_file_response = self.chat(
create_file_prompt.format(
filename=file_change_request.filename,
instructions=file_change_request.instructions,
),
message_key=f"file_change_{file_change_request.filename}",
)
# Add file to list of changed_files
self.file_change_paths.append(file_change_request.filename)
# self.delete_file_from_system_message(file_path=file_change_request.filename)
try:
file_change = FileChange.from_string(create_file_response)
assert file_change is not None
file_change.commit_message = f"sweep: {file_change.commit_message[:50]}"
return file_change
except Exception:
logger.warning(f"Failed to parse. Retrying for the {count}th time...")
self.undo()
continue
raise Exception("Failed to parse response after 5 attempts.")
def modify_file(
self, file_change_request: FileChangeRequest, contents: str = ""
) -> tuple[str, str]:
if not contents:
contents = self.get_file(
file_change_request.filename
).decoded_content.decode("utf-8")
# Add line numbers to the contents; goes in prompts but not github
contents_line_numbers = "\n".join([f"{i}:{line}" for i, line in enumerate(contents.split("\n"))])
contents_line_numbers = contents_line_numbers.replace('"""', "'''")
for count in range(5):
if "0613" in self.model:
_ = self.chat( # We don't use the plan in the next call
modify_file_plan_prompt.format(
filename=file_change_request.filename,
instructions=file_change_request.instructions,
code=contents_line_numbers,
),
message_key=f"file_change_{file_change_request.filename}",
)
modify_file_response = self.chat(
modify_file_prompt,
message_key=f"file_change_{file_change_request.filename}",
)
try:
logger.info(f"modify_file_response: {modify_file_response}")
new_file = generate_new_file(modify_file_response, contents)
if not is_markdown(file_change_request.filename):
code_repairer = CodeRepairer(chat_logger=self.chat_logger)
diff = generate_diff(old_code=contents, new_code=new_file)
new_file = code_repairer.repair_code(diff=diff, user_code=new_file, feature=file_change_request.instructions)
# new_file = revert_whitespace_changes(original_file_str=contents, modified_file_str=new_file)
return (new_file, file_change_request.filename)
except Exception as e:
logger.warning(f"Recieved error {e}")
logger.warning(
f"Failed to parse. Retrying for the {count}th time..."
)
self.undo()
self.undo()
continue
raise Exception("Failed to parse response after 5 attempts.")
def change_file(self, file_change_request: FileChangeRequest):
if file_change_request.change_type == "create":
return self.create_file(file_change_request)
elif file_change_request.change_type == "modify":
return self.create_file(file_change_request)
else:
raise Exception("Not a valid file type")
def change_files_in_github(
self,

from sweepai.utils.chat_logger import ChatLogger
from sweepai.core.entities import (
FileChange,
FileChangeRequest,
FilesToChange,
PullRequest,
RegexMatchError,
Function,
Snippet
)
from sweepai.core.chat import ChatGPT
from sweepai.core.prompts import (
files_to_change_prompt,
pull_request_prompt,
create_file_prompt,
modify_file_prompt,
modify_file_plan_prompt,
)
from sweepai.utils.constants import DB_NAME
from sweepai.utils.diff import format_contents, generate_diff, generate_new_file, is_markdown, revert_whitespace_changes
class CodeGenBot(ChatGPT):
def get_files_to_change(self):
file_change_requests: list[FileChangeRequest] = []
for count in range(5):
try:
logger.info(f"Generating for the {count}th time...")
files_to_change_response = self.chat(files_to_change_prompt, message_key="files_to_change") # Dedup files to change here
files_to_change = FilesToChange.from_string(files_to_change_response)
files_to_create: list[str] = files_to_change.files_to_create.split("\n*")
files_to_modify: list[str] = files_to_change.files_to_modify.split("\n*")
for file_change_request, change_type in zip(
files_to_create + files_to_modify,
["create"] * len(files_to_create)
+ ["modify"] * len(files_to_modify),
):
file_change_request = file_change_request.strip()
if not file_change_request or file_change_request == "* None":
continue
logger.debug(file_change_request)
logger.debug(change_type)
file_change_requests.append(
FileChangeRequest.from_string(
file_change_request, change_type=change_type
)
)
# Create a dictionary to hold file names and their corresponding instructions
file_instructions_dict = {}
for file_change_request in file_change_requests:
# If the file name is already in the dictionary, append the new instructions
if file_change_request.filename in file_instructions_dict:
instructions, change_type = file_instructions_dict[file_change_request.filename]
file_instructions_dict[file_change_request.filename] = (instructions + " " + file_change_request.instructions, change_type)
else:
file_instructions_dict[file_change_request.filename] = (file_change_request.instructions, file_change_request.change_type)
file_change_requests = [FileChangeRequest(filename=file_name, instructions=instructions, change_type=change_type) for file_name, (instructions, change_type) in file_instructions_dict.items()]
if file_change_requests:
return file_change_requests
except RegexMatchError:
logger.warning("Failed to parse! Retrying...")
self.delete_messages_from_chat("files_to_change")
continue
raise Exception("Could not generate files to change")
def generate_pull_request(self) -> PullRequest:
pull_request = None
for count in range(5):
try:
logger.info(f"Generating for the {count}th time...")
pr_text_response = self.chat(pull_request_prompt, message_key="pull_request")
except Exception as e:
logger.warning(f"Exception {e}. Failed to parse! Retrying...")
self.undo()
continue
pull_request = PullRequest.from_string(pr_text_response)
pull_request.branch_name = "sweep/" + pull_request.branch_name[:250]
return pull_request
raise Exception("Could not generate PR text")
class GithubBot(BaseModel):
class Config:
arbitrary_types_allowed = True # for repo: Repository
repo: Repository
def get_contents(self, path: str, branch: str = ""):
if not branch:
branch = self.repo.default_branch
try:
return self.repo.get_contents(path, ref=branch)
except Exception as e:
logger.warning(path)
raise e
def get_file(self, file_path: str, branch: str = "") -> ContentFile:
content = self.get_contents(file_path, branch)
assert not isinstance(content, list)
return content
def check_path_exists(self, path: str, branch: str = ""):
try:
self.get_contents(path, branch)
return True
except Exception:
return False
def create_branch(self, branch: str) -> str:
# Generate PR if nothing is supplied maybe
base_branch = self.repo.get_branch(self.repo.default_branch)
try:
self.repo.create_git_ref(f"refs/heads/{branch}", base_branch.commit.sha)
return branch
except GithubException as e:
logger.error(f"Error: {e}, trying with other branch names...")
for i in range(1, 100):
try:
logger.warning(f"Retrying {branch}_{i}...")
self.repo.create_git_ref(
f"refs/heads/{branch}_{i}", base_branch.commit.sha
)
return f"{branch}_{i}"
except GithubException:
pass
raise e
def populate_snippets(self, snippets: list[Snippet]):


Step 2: 🧐 Snippet Analysis

From looking through the relevant snippets, I decided to make the following modifications:

File Path Proposed Changes
README.md Add a header "Welcome to Sweep!" at the top of the file.

Step 3: 📝 Planning

I have created a plan for writing the pull request. I am now working on executing my plan and coding the required changes to address this issue. Here is the planned pull request:

Add Header to README
sweep/update-readme-header

This PR adds a new header "Welcome to Sweep!" to the README.md file as per the user's request in issue #311. The change is simple and straightforward, enhancing the readability of the README.


Step 4: ⌨️ Coding

I have finished coding the issue. I am now reviewing it for completeness.


Step 5: 🔁 Code Review

Success! 🚀


I'm a bot that handles simple bugs and feature requests but I might make mistakes. Please be kind!
Join Our Discord

@sweep-nightly sweep-nightly bot linked a pull request Jul 7, 2023 that will close this issue
@lukejagg lukejagg added sweep Assigns Sweep to an issue or pull request. and removed sweep Assigns Sweep to an issue or pull request. labels Jul 7, 2023
@sweep-nightly
Copy link
Contributor

sweep-nightly bot commented Jul 7, 2023

Here's the PR! #326


Step 1: 🔍 Code Search

I found the following snippets in your repository. I will now analyze this snippets and come up with a plan.

Some code snippets I looked at (click to expand). If some file is missing from here, you can mention the path in the ticket description.

def log_error(error_type, exception):
content = f"**{error_type} Error**\n{username}: {issue_url}\n```{exception}```"
discord_log_error(content)
def fetch_file_contents_with_retry():
retries = 3
error = None
for i in range(retries):
try:
logger.info(f"Fetching relevant files for the {i}th time...")
return search_snippets(
repo,
f"{title}\n{summary}\n{replies_text}",
num_files=num_of_snippets_to_query,
branch=None,
installation_id=installation_id,
)
except Exception as e:
error = e
continue
posthog.capture(
username, "fetching_failed", properties={"error": error, **metadata}
)
raise error
logger.info("Fetching relevant files...")
try:
snippets, tree = fetch_file_contents_with_retry()
assert len(snippets) > 0
except Exception as e:
logger.error(e)
comment_reply(
"It looks like an issue has occured around fetching the files. Perhaps the repo has not been initialized: try removing this repo and adding it back. I'll try again in a minute. If this error persists contact team@sweep.dev.",
-1
)
log_error("File Fetch", str(e))
raise e
num_full_files = 2
num_extended_snippets = 2
most_relevant_snippets = snippets[:num_full_files]
snippets = snippets[:-num_full_files]
logger.info("Expanding snippets...")
for snippet in most_relevant_snippets:
current_snippet = snippet
_chunks, metadatas, _ids = chunker.call(
current_snippet.content,
current_snippet.file_path
)
segmented_snippets = [
Snippet(
content=current_snippet.content,
start=metadata["start"],
end=metadata["end"],
file_path=metadata["file_path"],
) for metadata in metadatas
]
index = 0
while index < len(segmented_snippets) and segmented_snippets[index].start <= current_snippet.start:
index += 1
index -= 1
for i in range(index + 1, min(index + num_extended_snippets + 1, len(segmented_snippets))):
current_snippet += segmented_snippets[i]
for i in range(index - 1, max(index - num_extended_snippets - 1, 0), -1):
current_snippet = segmented_snippets[i] + current_snippet
snippets.append(current_snippet)
# snippet fusing
i = 0
while i < len(snippets):
j = i + 1
while j < len(snippets):
if snippets[i] ^ snippets[j]: # this checks for overlap
snippets[i] = snippets[i] | snippets[j] # merging
snippets.pop(j)
else:
j += 1
i += 1
snippets = snippets[:min(len(snippets), max_num_of_snippets)]
human_message = HumanMessagePrompt(
repo_name=repo_name,
issue_url=issue_url,
username=username,
repo_description=repo_description,
title=title,
summary=summary + replies_text,
snippets=snippets,
tree=tree, # TODO: Anything in repo tree that has something going through is expanded
)
chat_logger = ChatLogger({
'repo_name': repo_name,
'title': title,
'summary': summary + replies_text,
"issue_number": issue_number,
"issue_url": issue_url,
"username": username,
"repo_full_name": repo_full_name,
"repo_description": repo_description,
"installation_id": installation_id,
"comment_id": comment_id,
})
sweep_bot = SweepBot.from_system_message_content(
human_message=human_message, repo=repo, is_reply=bool(comments), chat_logger=chat_logger
)
sweepbot_retries = 3
try:
for i in range(sweepbot_retries):
# ANALYZE SNIPPETS
logger.info("CoT retrieval...")
if sweep_bot.model == "gpt-4-32k-0613":
sweep_bot.cot_retrieval()
newline = '\n'
comment_reply(
"I found the following snippets in your repository. I will now analyze this snippets and come up with a plan."
+ "\n\n"
+ collapsible_template.format(
summary="Some code snippets I looked at (click to expand). If some file is missing from here, you can mention the path in the ticket description.",
body="\n".join(
[
f"https://github.com/{organization}/{repo_name}/blob/{repo.get_commits()[0].sha}/{snippet.file_path}#L{max(snippet.start, 1)}-L{min(snippet.end, snippet.content.count(newline))}\n"
for snippet in snippets
]
),
),
1
)
# COMMENT ON ISSUE
# TODO: removed issue commenting here
logger.info("Fetching files to modify/create...")
file_change_requests = sweep_bot.get_files_to_change()
file_change_requests = sweep_bot.validate_file_change_requests(file_change_requests)
table = tabulate(
[[f"`{file_change_request.filename}`", file_change_request.instructions] for file_change_request in file_change_requests],
headers=["File Path", "Proposed Changes"],
tablefmt="pipe"
)
comment_reply(
"From looking through the relevant snippets, I decided to make the following modifications:\n\n" + table + "\n\n",
2
)
# CREATE PR METADATA
logger.info("Generating PR...")
pull_request = sweep_bot.generate_pull_request()
pull_request_content = pull_request.content.strip().replace("\n", "\n>")
pull_request_summary = f"**{pull_request.title}**\n`{pull_request.branch_name}`\n>{pull_request_content}\n"
comment_reply(
f"I have created a plan for writing the pull request. I am now working on executing my plan and coding the required changes to address this issue. Here is the planned pull request:\n\n{pull_request_summary}",
3
)
# WRITE PULL REQUEST
logger.info("Making PR...")
response = create_pr(file_change_requests, pull_request, sweep_bot, username, installation_id, issue_number)
if not response or not response["success"]: raise Exception("Failed to create PR")
pr = response["pull_request"]
current_issue.create_reaction("rocket")
comment_reply(
"I have finished coding the issue. I am now reviewing it for completeness.",
4
)
try:
current_issue.delete_reaction(eyes_reaction.id)
except:
pass
try:
# CODE REVIEW
changes_required, review_comment = review_pr(repo=repo, pr=pr, issue_url=issue_url, username=username,
repo_description=repo_description, title=title,
summary=summary, replies_text=replies_text, tree=tree)
logger.info(f"Addressing review comment {review_comment}")
if changes_required:
on_comment(repo_full_name=repo_full_name,
repo_description=repo_description,
comment=review_comment,
username=username,
installation_id=installation_id,
pr_path=None,
pr_line_position=None,
pr_number=pr.number)
except Exception as e:
logger.error(e)
# Completed code review
comment_reply(
"Success! 🚀",
5,
pr_message=f"## Here's the PR! [https://github.com/{repo_full_name}/pull/{pr.number}](https://github.com/{repo_full_name}/pull/{pr.number})",
)
break
except openai.error.InvalidRequestError as e:
logger.error(e)
comment_reply(
"I'm sorry, but it looks our model has ran out of context length. We're trying to make this happen less, but one way to mitigate this is to code smaller files. If this error persists contact team@sweep.dev.",
-1
)
log_error("Context Length", str(e))
posthog.capture(
username,
"failed",
properties={
"error": str(e),
"reason": "Invalid request error / context length",
**metadata,
},

except:
file_change_request.change_type = "create"
return file_change_requests
class SweepBot(CodeGenBot, GithubBot):
def cot_retrieval(self):
# TODO(sweep): add semantic search using vector db
# TODO(sweep): add search using webpilot + github
functions = [
Function(
name="cat",
description="Cat files. Max 3 files per request.",
parameters={
"properties": {
"filepath": {
"type": "string",
"description": "Paths to files. One per line."
},
}
} # manage file too large
),
Function(
name="finish",
description="Indicate you have sufficient data to proceed.",
parameters={"properties": {}}
),
]
# self.chat(
# cot_retrieval_prompt,
# message_key="cot_retrieval",
# functions=functions,
# )
# is_function_call = self.messages[-1].function_call is not None
# for _retry in range(3):
# logger.info("Got response.")
# if not is_function_call:
# break
# response = self.messages[-1].function_call
# # response = json.loads(response)
# function_name = response["name"]
# arguments = response["arguments"]
# logger.info(f"Fetching file {function_name} with arguments {arguments}.")
# arguments = json.loads(arguments)
# if function_name == "finish":
# return
# elif function_name == "cat":
# path = arguments["filepath"]
# try:
# logger.info("Retrieving file...")
# content = self.get_file(path).decoded_content.decode("utf-8")
# logger.info("Received file")
# except github.GithubException:
# response = self.chat(
# f"File not found: {path}",
# message_key=path,
# functions=functions,
# )
# else:
# response = self.chat(
# f"Here is the file: <file path=\"{path}\">\n\n{content[:10000]}</file>. Fetch more content or call finish.",
# message_key=path,
# functions=functions
# ) # update this constant
return
def create_file(self, file_change_request: FileChangeRequest) -> FileChange:
file_change: FileChange | None = None
for count in range(5):
create_file_response = self.chat(
create_file_prompt.format(
filename=file_change_request.filename,
instructions=file_change_request.instructions,
),
message_key=f"file_change_{file_change_request.filename}",
)
# Add file to list of changed_files
self.file_change_paths.append(file_change_request.filename)
# self.delete_file_from_system_message(file_path=file_change_request.filename)
try:
file_change = FileChange.from_string(create_file_response)
assert file_change is not None
file_change.commit_message = f"sweep: {file_change.commit_message[:50]}"
return file_change
except Exception:
logger.warning(f"Failed to parse. Retrying for the {count}th time...")
self.undo()
continue
raise Exception("Failed to parse response after 5 attempts.")
def modify_file(
self, file_change_request: FileChangeRequest, contents: str = ""
) -> tuple[str, str]:
if not contents:
contents = self.get_file(
file_change_request.filename
).decoded_content.decode("utf-8")
# Add line numbers to the contents; goes in prompts but not github
contents_line_numbers = "\n".join([f"{i}:{line}" for i, line in enumerate(contents.split("\n"))])
contents_line_numbers = contents_line_numbers.replace('"""', "'''")
for count in range(5):
if "0613" in self.model:
_ = self.chat( # We don't use the plan in the next call
modify_file_plan_prompt.format(
filename=file_change_request.filename,
instructions=file_change_request.instructions,
code=contents_line_numbers,
),
message_key=f"file_change_{file_change_request.filename}",
)
modify_file_response = self.chat(
modify_file_prompt,
message_key=f"file_change_{file_change_request.filename}",
)
try:
logger.info(f"modify_file_response: {modify_file_response}")
new_file = generate_new_file(modify_file_response, contents)
if not is_markdown(file_change_request.filename):
code_repairer = CodeRepairer(chat_logger=self.chat_logger)
diff = generate_diff(old_code=contents, new_code=new_file)
new_file = code_repairer.repair_code(diff=diff, user_code=new_file, feature=file_change_request.instructions)
# new_file = revert_whitespace_changes(original_file_str=contents, modified_file_str=new_file)
return (new_file, file_change_request.filename)
except Exception as e:
logger.warning(f"Recieved error {e}")
logger.warning(
f"Failed to parse. Retrying for the {count}th time..."
)
self.undo()
self.undo()
continue
raise Exception("Failed to parse response after 5 attempts.")
def change_file(self, file_change_request: FileChangeRequest):
if file_change_request.change_type == "create":
return self.create_file(file_change_request)
elif file_change_request.change_type == "modify":
return self.create_file(file_change_request)
else:
raise Exception("Not a valid file type")
def change_files_in_github(
self,
file_change_requests: list[FileChangeRequest],
branch: str,
):
# should check if branch exists, if not, create it
logger.debug(file_change_requests)
for file_change_request in file_change_requests:
file_markdown = is_markdown(file_change_request.filename)
if file_change_request.change_type == "create":
try: # Try to create
file_change = self.create_file(file_change_request)
logger.debug(
f"{file_change_request.filename}, {file_change.commit_message}, {file_change.code}, {branch}"
)
file_change.code = format_contents(file_change.code, file_markdown)
self.repo.create_file(
file_change_request.filename,
file_change.commit_message,
file_change.code,
branch=branch,
)
except github.GithubException as e:
logger.info(e)
try: # Try to modify
contents = self.get_file(file_change_request.filename, branch=branch)
file_change.code = format_contents(file_change.code, file_markdown)
self.repo.update_file(
file_change_request.filename,
file_change.commit_message,
file_change.code,
contents.sha,
branch=branch,
)
except:
pass
elif file_change_request.change_type == "modify":
# TODO(sweep): Cleanup this
try:
contents = self.get_file(file_change_request.filename, branch=branch)
except github.UnknownObjectException as e:
logger.warning(f"Received error {e}, trying creating file...")
file_change_request.change_type = "create"
self.create_file(file_change_request)
file_change = self.create_file(file_change_request)
logger.debug(
f"{file_change_request.filename}, {file_change.commit_message}, {file_change.code}, {branch}"
)
file_change.code = format_contents(file_change.code, file_markdown)
self.repo.create_file(
file_change_request.filename,
file_change.commit_message,
file_change.code,
branch=branch,
)
else:
new_file_contents, file_name = self.modify_file(
file_change_request, contents.decoded_content.decode("utf-8")
)
new_file_contents = format_contents(new_file_contents, file_markdown)
if contents.decoded_content.decode("utf-8").endswith("\n"):
new_file_contents += "\n"
logger.debug(
f"{file_name}, {f'Update {file_name}'}, {new_file_contents}, {branch}"
)
self.repo.update_file(
file_name,
f'Update {file_name}',
new_file_contents,
contents.sha,
branch=branch,
)
else:
raise Exception("Invalid change type")

...
</create>
<modify>
* filename_3: instructions_3
* filename_4: instructions_4
...
</modify>
"""
reply_prompt = """
Write a 1-paragraph response to this user:
* Tell them you have started working on this PR and a rough summary of your plan.
* Do not start with "Here is a draft", just write the response.
* Use github markdown to format the response.
"""
create_file_prompt = """
Think step-by-step regarding the instructions and what should be added to the new file.
Then create a plan of parts of the code to create, with low-level, detailed references to functions and variable to create, and what each function does.
Then create the following file using the following instructions:
File Name: {filename}
Instructions: {instructions}
Reply in the following format. DO NOT write "pass" or "Rest of code". Do not literally write "{{new_file}}". You must use the new_file XML tags, and all text inside these tags will be placed in the newly created file.
Step-by-step thoughts with explanations:
* Thought 1 - Explanation 1
* Thought 2 - Explanation 2
...
Detailed plan of additions:
* Addition 1
* Addition 2
...
Commit Message: {{commit_message}}
<new_file>
{{new_file}}
</new_file>
"""
"""
Reply in the format below.
* You MUST use the new_file XML tags
* DO NOT write ``` anywhere, unless it's markdown
* DO NOT write "pass" or "Rest of code"
* Do not literally write "{{new_file}}".
* Format:
"""
modify_file_plan_prompt = """
Think step-by-step regarding the instructions and how that can be applied to the current file to improve the current codebase.
Then create a plan of parts of the code to modify with detailed references to functions to modify.
File Name: {filename}
<old_file>
{code}
</old_file>
Your instructions to modify the file are: "{instructions}".
Step-by-step thoughts with explanations:
* Thought 1 - Explanation 1
* Thought 2 - Explanation 2
...
Detailed plan of modifications:
* Modification 1
* Modification 2
...
"""
modify_file_prompt = """
Generate a new_file based on the given plan, ensuring that you:
1. Do not write "pass" statements.
2. Provide complete functions with actual business logic. It is imperative that we do not leave any work to the user/future readers of this code.
3. Do not write new "todo" comments.
4. Do not write incomplete functions.
5. Do not write the original line numbers with the new code.
6. Make sure the new code follows the same programming language conventions as the old code.
Instead of writing "# Rest of Code", specify the lines to copy from the old file using an XML tag, inclusive (e.g., "<copied>0-25</copied>"). Make sure to use this exact format.
Copy the correct line numbers and copy as long of a prefix and suffix as possible. For instance, if you want to insert code after line 50, start with "<copied>0-50</copied>".
Example: New file:
print("new file")
</new_file>
Example: Insert at end:
<copied>0-100</copied>
print("inserted at end")
</new_file>
Example: If you want to insert code after lines 50 and 75:
<new_file>
<copied>0-50</copied>
def main():
print("hello world")
<copied>51-75</copied>
print("debug statement")
<copied>76-100</copied>
</new_file>
"""
pr_code_prompt = "" # TODO: deprecate this
pull_request_prompt = """
Awesome! Could you also provide a PR message in the following format? Content should be in Github style markdown. Thanks!
Title: {title}
Branch Name: {branch_name}
<content>
{content}
</content>
"""
summarize_system_prompt = """
Your name is Sweep bot. You are an engineer assigned to helping summarize code instructions and code changes.
"""
user_file_change_summarize_prompt = """
Summarize the given instructions for making changes in a pull request.
Code Instructions:
{message_content}
"""
assistant_file_change_summarize_prompt = """
Please summarize the following file using the file stubs.
Be sure to repeat each method signature and docstring. You may also add additional comments to the docstring.
Do not repeat the code in the file stubs.
Code Changes:
{message_content}
"""
slack_system_message_prompt = "Your name is Sweep bot. You are an engineer assigned to assisting the following Slack user. You will be helpful and friendly, but informal and concise: get to the point. You will use Slack-style markdown when needed to structure your responses."
slack_slash_command_prompt = """
Relevant snippets provided by search engine (decreasing relevance):
<relevant_snippets_in_repo>
{relevant_snippets}
</relevant_snippets_in_repo>
<relevant_paths_in_repo>
{relevant_directories}
</relevant_paths_in_repo>
Repo: {repo_name}: {repo_description}
Username: {username}
Query: {query}
Gather information (i.e. fetch more snippets) to solve the problem. Use "create_pr" if the user asks for changes or you think code changes are needed.
"""
code_repair_system_prompt = """\
You are a genius trained for code repair.
You will be given two pieces of code marked by xml tags. The code inside <diff></diff> is the difference betwen the user_code and the original code, and the code inside <user_code></user_code> is a user's attempt at adding a change described as {feature}.
Our goal is to return a working version of user_code that follows {feature}.
"""
code_repair_prompt = """\
<diff>
{diff}
</diff>
<user_code>
{user_code}
</user_code>
This is the user_code.
Instructions:
* Keep the logic changes from user_code.
* Fix any issues using our knowledge of both the diff and user_code files.
* Fix syntax errors and accidentally deleted lines.
* Do not perform code style cleanup.
* Do not add or remove any whitespace besides what is necessary to fix syntax errors.
* Do not add or remove any comments.
Return the repaired user_code without xml tags. All of the text you return will be placed in the file. Revert any unrelated deletions to user_code, using the diff and described change.
"""
gradio_system_message_prompt = """Your name is Sweep bot. You are a brilliant and thorough engineer assigned to assist the following user with their problems in the Github repo. You will be helpful and friendly, but informal and concise: get to the point. When you write code to solve tickets, the code works on the first try and is formatted perfectly. You have the utmost care for the user that you write for, so you do not make mistakes. If the user asks you to create a PR, you will use the create_pr function.
Relevant snippets provided by search engine (decreasing relevance):
{snippets}
Repo: {repo_name}

# # response = json.loads(response)
# function_name = response["name"]
# arguments = response["arguments"]
# logger.info(f"Fetching file {function_name} with arguments {arguments}.")
# arguments = json.loads(arguments)
# if function_name == "finish":
# return
# elif function_name == "cat":
# path = arguments["filepath"]
# try:
# logger.info("Retrieving file...")
# content = self.get_file(path).decoded_content.decode("utf-8")
# logger.info("Received file")
# except github.GithubException:
# response = self.chat(
# f"File not found: {path}",
# message_key=path,
# functions=functions,
# )
# else:
# response = self.chat(
# f"Here is the file: <file path=\"{path}\">\n\n{content[:10000]}</file>. Fetch more content or call finish.",
# message_key=path,
# functions=functions
# ) # update this constant
return
def create_file(self, file_change_request: FileChangeRequest) -> FileChange:
file_change: FileChange | None = None
for count in range(5):
create_file_response = self.chat(
create_file_prompt.format(
filename=file_change_request.filename,
instructions=file_change_request.instructions,
),
message_key=f"file_change_{file_change_request.filename}",
)
# Add file to list of changed_files
self.file_change_paths.append(file_change_request.filename)
# self.delete_file_from_system_message(file_path=file_change_request.filename)
try:
file_change = FileChange.from_string(create_file_response)
assert file_change is not None
file_change.commit_message = f"sweep: {file_change.commit_message[:50]}"
return file_change
except Exception:
logger.warning(f"Failed to parse. Retrying for the {count}th time...")
self.undo()
continue
raise Exception("Failed to parse response after 5 attempts.")
def modify_file(
self, file_change_request: FileChangeRequest, contents: str = ""
) -> tuple[str, str]:
if not contents:
contents = self.get_file(
file_change_request.filename
).decoded_content.decode("utf-8")
# Add line numbers to the contents; goes in prompts but not github
contents_line_numbers = "\n".join([f"{i}:{line}" for i, line in enumerate(contents.split("\n"))])
contents_line_numbers = contents_line_numbers.replace('"""', "'''")
for count in range(5):
if "0613" in self.model:
_ = self.chat( # We don't use the plan in the next call
modify_file_plan_prompt.format(
filename=file_change_request.filename,
instructions=file_change_request.instructions,
code=contents_line_numbers,
),
message_key=f"file_change_{file_change_request.filename}",
)
modify_file_response = self.chat(
modify_file_prompt,
message_key=f"file_change_{file_change_request.filename}",
)
try:
logger.info(f"modify_file_response: {modify_file_response}")
new_file = generate_new_file(modify_file_response, contents)
if not is_markdown(file_change_request.filename):
code_repairer = CodeRepairer(chat_logger=self.chat_logger)
diff = generate_diff(old_code=contents, new_code=new_file)
new_file = code_repairer.repair_code(diff=diff, user_code=new_file, feature=file_change_request.instructions)
# new_file = revert_whitespace_changes(original_file_str=contents, modified_file_str=new_file)
return (new_file, file_change_request.filename)
except Exception as e:
logger.warning(f"Recieved error {e}")
logger.warning(
f"Failed to parse. Retrying for the {count}th time..."
)
self.undo()
self.undo()
continue
raise Exception("Failed to parse response after 5 attempts.")
def change_file(self, file_change_request: FileChangeRequest):
if file_change_request.change_type == "create":
return self.create_file(file_change_request)
elif file_change_request.change_type == "modify":
return self.create_file(file_change_request)
else:
raise Exception("Not a valid file type")
def change_files_in_github(
self,

from sweepai.utils.chat_logger import ChatLogger
from sweepai.core.entities import (
FileChange,
FileChangeRequest,
FilesToChange,
PullRequest,
RegexMatchError,
Function,
Snippet
)
from sweepai.core.chat import ChatGPT
from sweepai.core.prompts import (
files_to_change_prompt,
pull_request_prompt,
create_file_prompt,
modify_file_prompt,
modify_file_plan_prompt,
)
from sweepai.utils.constants import DB_NAME
from sweepai.utils.diff import format_contents, generate_diff, generate_new_file, is_markdown, revert_whitespace_changes
class CodeGenBot(ChatGPT):
def get_files_to_change(self):
file_change_requests: list[FileChangeRequest] = []
for count in range(5):
try:
logger.info(f"Generating for the {count}th time...")
files_to_change_response = self.chat(files_to_change_prompt, message_key="files_to_change") # Dedup files to change here
files_to_change = FilesToChange.from_string(files_to_change_response)
files_to_create: list[str] = files_to_change.files_to_create.split("\n*")
files_to_modify: list[str] = files_to_change.files_to_modify.split("\n*")
for file_change_request, change_type in zip(
files_to_create + files_to_modify,
["create"] * len(files_to_create)
+ ["modify"] * len(files_to_modify),
):
file_change_request = file_change_request.strip()
if not file_change_request or file_change_request == "* None":
continue
logger.debug(file_change_request)
logger.debug(change_type)
file_change_requests.append(
FileChangeRequest.from_string(
file_change_request, change_type=change_type
)
)
# Create a dictionary to hold file names and their corresponding instructions
file_instructions_dict = {}
for file_change_request in file_change_requests:
# If the file name is already in the dictionary, append the new instructions
if file_change_request.filename in file_instructions_dict:
instructions, change_type = file_instructions_dict[file_change_request.filename]
file_instructions_dict[file_change_request.filename] = (instructions + " " + file_change_request.instructions, change_type)
else:
file_instructions_dict[file_change_request.filename] = (file_change_request.instructions, file_change_request.change_type)
file_change_requests = [FileChangeRequest(filename=file_name, instructions=instructions, change_type=change_type) for file_name, (instructions, change_type) in file_instructions_dict.items()]
if file_change_requests:
return file_change_requests
except RegexMatchError:
logger.warning("Failed to parse! Retrying...")
self.delete_messages_from_chat("files_to_change")
continue
raise Exception("Could not generate files to change")
def generate_pull_request(self) -> PullRequest:
pull_request = None
for count in range(5):
try:
logger.info(f"Generating for the {count}th time...")
pr_text_response = self.chat(pull_request_prompt, message_key="pull_request")
except Exception as e:
logger.warning(f"Exception {e}. Failed to parse! Retrying...")
self.undo()
continue
pull_request = PullRequest.from_string(pr_text_response)
pull_request.branch_name = "sweep/" + pull_request.branch_name[:250]
return pull_request
raise Exception("Could not generate PR text")
class GithubBot(BaseModel):
class Config:
arbitrary_types_allowed = True # for repo: Repository
repo: Repository
def get_contents(self, path: str, branch: str = ""):
if not branch:
branch = self.repo.default_branch
try:
return self.repo.get_contents(path, ref=branch)
except Exception as e:
logger.warning(path)
raise e
def get_file(self, file_path: str, branch: str = "") -> ContentFile:
content = self.get_contents(file_path, branch)
assert not isinstance(content, list)
return content
def check_path_exists(self, path: str, branch: str = ""):
try:
self.get_contents(path, branch)
return True
except Exception:
return False
def create_branch(self, branch: str) -> str:
# Generate PR if nothing is supplied maybe
base_branch = self.repo.get_branch(self.repo.default_branch)
try:
self.repo.create_git_ref(f"refs/heads/{branch}", base_branch.commit.sha)
return branch
except GithubException as e:
logger.error(f"Error: {e}, trying with other branch names...")
for i in range(1, 100):
try:
logger.warning(f"Retrying {branch}_{i}...")
self.repo.create_git_ref(
f"refs/heads/{branch}_{i}", base_branch.commit.sha
)
return f"{branch}_{i}"
except GithubException:
pass
raise e
def populate_snippets(self, snippets: list[Snippet]):


Step 2: 🧐 Snippet Analysis

From looking through the relevant snippets, I decided to make the following modifications:

File Path Proposed Changes
README.md Add a header at the beginning of the file with the text "Welcome to Sweep!".

Step 3: 📝 Planning

I have created a plan for writing the pull request. I am now working on executing my plan and coding the required changes to address this issue. Here is the planned pull request:

Add Header to README
sweep/update-readme-header

This PR adds a new header to the README file. The header text is "Welcome to Sweep!". This change is in response to a user request to add a header to the README.


Step 4: ⌨️ Coding

I have finished coding the issue. I am now reviewing it for completeness.


Step 5: 🔁 Code Review

Success! 🚀


I'm a bot that handles simple bugs and feature requests but I might make mistakes. Please be kind!
Join Our Discord

@sweep-nightly sweep-nightly bot linked a pull request Jul 7, 2023 that will close this issue
@lukejagg lukejagg added sweep Assigns Sweep to an issue or pull request. and removed sweep Assigns Sweep to an issue or pull request. labels Jul 7, 2023
@sweep-nightly
Copy link
Contributor

sweep-nightly bot commented Jul 7, 2023

Here's the PR! #327


Step 1: 🔍 Code Search

I found the following snippets in your repository. I will now analyze this snippets and come up with a plan.

Some code snippets I looked at (click to expand). If some file is missing from here, you can mention the path in the ticket description.

def log_error(error_type, exception):
content = f"**{error_type} Error**\n{username}: {issue_url}\n```{exception}```"
discord_log_error(content)
def fetch_file_contents_with_retry():
retries = 3
error = None
for i in range(retries):
try:
logger.info(f"Fetching relevant files for the {i}th time...")
return search_snippets(
repo,
f"{title}\n{summary}\n{replies_text}",
num_files=num_of_snippets_to_query,
branch=None,
installation_id=installation_id,
)
except Exception as e:
error = e
continue
posthog.capture(
username, "fetching_failed", properties={"error": error, **metadata}
)
raise error
logger.info("Fetching relevant files...")
try:
snippets, tree = fetch_file_contents_with_retry()
assert len(snippets) > 0
except Exception as e:
logger.error(e)
comment_reply(
"It looks like an issue has occured around fetching the files. Perhaps the repo has not been initialized: try removing this repo and adding it back. I'll try again in a minute. If this error persists contact team@sweep.dev.",
-1
)
log_error("File Fetch", str(e))
raise e
num_full_files = 2
num_extended_snippets = 2
most_relevant_snippets = snippets[:num_full_files]
snippets = snippets[:-num_full_files]
logger.info("Expanding snippets...")
for snippet in most_relevant_snippets:
current_snippet = snippet
_chunks, metadatas, _ids = chunker.call(
current_snippet.content,
current_snippet.file_path
)
segmented_snippets = [
Snippet(
content=current_snippet.content,
start=metadata["start"],
end=metadata["end"],
file_path=metadata["file_path"],
) for metadata in metadatas
]
index = 0
while index < len(segmented_snippets) and segmented_snippets[index].start <= current_snippet.start:
index += 1
index -= 1
for i in range(index + 1, min(index + num_extended_snippets + 1, len(segmented_snippets))):
current_snippet += segmented_snippets[i]
for i in range(index - 1, max(index - num_extended_snippets - 1, 0), -1):
current_snippet = segmented_snippets[i] + current_snippet
snippets.append(current_snippet)
# snippet fusing
i = 0
while i < len(snippets):
j = i + 1
while j < len(snippets):
if snippets[i] ^ snippets[j]: # this checks for overlap
snippets[i] = snippets[i] | snippets[j] # merging
snippets.pop(j)
else:
j += 1
i += 1
snippets = snippets[:min(len(snippets), max_num_of_snippets)]
human_message = HumanMessagePrompt(
repo_name=repo_name,
issue_url=issue_url,
username=username,
repo_description=repo_description,
title=title,
summary=summary + replies_text,
snippets=snippets,
tree=tree, # TODO: Anything in repo tree that has something going through is expanded
)
chat_logger = ChatLogger({
'repo_name': repo_name,
'title': title,
'summary': summary + replies_text,
"issue_number": issue_number,
"issue_url": issue_url,
"username": username,
"repo_full_name": repo_full_name,
"repo_description": repo_description,
"installation_id": installation_id,
"comment_id": comment_id,
})
sweep_bot = SweepBot.from_system_message_content(
human_message=human_message, repo=repo, is_reply=bool(comments), chat_logger=chat_logger
)
sweepbot_retries = 3
try:
for i in range(sweepbot_retries):
# ANALYZE SNIPPETS
logger.info("CoT retrieval...")
if sweep_bot.model == "gpt-4-32k-0613":
sweep_bot.cot_retrieval()
newline = '\n'
comment_reply(
"I found the following snippets in your repository. I will now analyze this snippets and come up with a plan."
+ "\n\n"
+ collapsible_template.format(
summary="Some code snippets I looked at (click to expand). If some file is missing from here, you can mention the path in the ticket description.",
body="\n".join(
[
f"https://github.com/{organization}/{repo_name}/blob/{repo.get_commits()[0].sha}/{snippet.file_path}#L{max(snippet.start, 1)}-L{min(snippet.end, snippet.content.count(newline))}\n"
for snippet in snippets
]
),
),
1
)
# COMMENT ON ISSUE
# TODO: removed issue commenting here
logger.info("Fetching files to modify/create...")
file_change_requests = sweep_bot.get_files_to_change()
file_change_requests = sweep_bot.validate_file_change_requests(file_change_requests)
table = tabulate(
[[f"`{file_change_request.filename}`", file_change_request.instructions] for file_change_request in file_change_requests],
headers=["File Path", "Proposed Changes"],
tablefmt="pipe"
)
comment_reply(
"From looking through the relevant snippets, I decided to make the following modifications:\n\n" + table + "\n\n",
2
)
# CREATE PR METADATA
logger.info("Generating PR...")
pull_request = sweep_bot.generate_pull_request()
pull_request_content = pull_request.content.strip().replace("\n", "\n>")
pull_request_summary = f"**{pull_request.title}**\n`{pull_request.branch_name}`\n>{pull_request_content}\n"
comment_reply(
f"I have created a plan for writing the pull request. I am now working on executing my plan and coding the required changes to address this issue. Here is the planned pull request:\n\n{pull_request_summary}",
3
)
# WRITE PULL REQUEST
logger.info("Making PR...")
response = create_pr(file_change_requests, pull_request, sweep_bot, username, installation_id, issue_number)
if not response or not response["success"]: raise Exception("Failed to create PR")
pr = response["pull_request"]
current_issue.create_reaction("rocket")
comment_reply(
"I have finished coding the issue. I am now reviewing it for completeness.",
4
)
try:
current_issue.delete_reaction(eyes_reaction.id)
except:
pass
try:
# CODE REVIEW
changes_required, review_comment = review_pr(repo=repo, pr=pr, issue_url=issue_url, username=username,
repo_description=repo_description, title=title,
summary=summary, replies_text=replies_text, tree=tree)
logger.info(f"Addressing review comment {review_comment}")
if changes_required:
on_comment(repo_full_name=repo_full_name,
repo_description=repo_description,
comment=review_comment,
username=username,
installation_id=installation_id,
pr_path=None,
pr_line_position=None,
pr_number=pr.number)
except Exception as e:
logger.error(e)
# Completed code review
comment_reply(
"Success! 🚀",
5,
pr_message=f"## Here's the PR! [https://github.com/{repo_full_name}/pull/{pr.number}](https://github.com/{repo_full_name}/pull/{pr.number})",
)
break
except openai.error.InvalidRequestError as e:
logger.error(e)
comment_reply(
"I'm sorry, but it looks our model has ran out of context length. We're trying to make this happen less, but one way to mitigate this is to code smaller files. If this error persists contact team@sweep.dev.",
-1
)
log_error("Context Length", str(e))
posthog.capture(
username,
"failed",
properties={
"error": str(e),
"reason": "Invalid request error / context length",
**metadata,
},

except:
file_change_request.change_type = "create"
return file_change_requests
class SweepBot(CodeGenBot, GithubBot):
def cot_retrieval(self):
# TODO(sweep): add semantic search using vector db
# TODO(sweep): add search using webpilot + github
functions = [
Function(
name="cat",
description="Cat files. Max 3 files per request.",
parameters={
"properties": {
"filepath": {
"type": "string",
"description": "Paths to files. One per line."
},
}
} # manage file too large
),
Function(
name="finish",
description="Indicate you have sufficient data to proceed.",
parameters={"properties": {}}
),
]
# self.chat(
# cot_retrieval_prompt,
# message_key="cot_retrieval",
# functions=functions,
# )
# is_function_call = self.messages[-1].function_call is not None
# for _retry in range(3):
# logger.info("Got response.")
# if not is_function_call:
# break
# response = self.messages[-1].function_call
# # response = json.loads(response)
# function_name = response["name"]
# arguments = response["arguments"]
# logger.info(f"Fetching file {function_name} with arguments {arguments}.")
# arguments = json.loads(arguments)
# if function_name == "finish":
# return
# elif function_name == "cat":
# path = arguments["filepath"]
# try:
# logger.info("Retrieving file...")
# content = self.get_file(path).decoded_content.decode("utf-8")
# logger.info("Received file")
# except github.GithubException:
# response = self.chat(
# f"File not found: {path}",
# message_key=path,
# functions=functions,
# )
# else:
# response = self.chat(
# f"Here is the file: <file path=\"{path}\">\n\n{content[:10000]}</file>. Fetch more content or call finish.",
# message_key=path,
# functions=functions
# ) # update this constant
return
def create_file(self, file_change_request: FileChangeRequest) -> FileChange:
file_change: FileChange | None = None
for count in range(5):
create_file_response = self.chat(
create_file_prompt.format(
filename=file_change_request.filename,
instructions=file_change_request.instructions,
),
message_key=f"file_change_{file_change_request.filename}",
)
# Add file to list of changed_files
self.file_change_paths.append(file_change_request.filename)
# self.delete_file_from_system_message(file_path=file_change_request.filename)
try:
file_change = FileChange.from_string(create_file_response)
assert file_change is not None
file_change.commit_message = f"sweep: {file_change.commit_message[:50]}"
return file_change
except Exception:
logger.warning(f"Failed to parse. Retrying for the {count}th time...")
self.undo()
continue
raise Exception("Failed to parse response after 5 attempts.")
def modify_file(
self, file_change_request: FileChangeRequest, contents: str = ""
) -> tuple[str, str]:
if not contents:
contents = self.get_file(
file_change_request.filename
).decoded_content.decode("utf-8")
# Add line numbers to the contents; goes in prompts but not github
contents_line_numbers = "\n".join([f"{i}:{line}" for i, line in enumerate(contents.split("\n"))])
contents_line_numbers = contents_line_numbers.replace('"""', "'''")
for count in range(5):
if "0613" in self.model:
_ = self.chat( # We don't use the plan in the next call
modify_file_plan_prompt.format(
filename=file_change_request.filename,
instructions=file_change_request.instructions,
code=contents_line_numbers,
),
message_key=f"file_change_{file_change_request.filename}",
)
modify_file_response = self.chat(
modify_file_prompt,
message_key=f"file_change_{file_change_request.filename}",
)
try:
logger.info(f"modify_file_response: {modify_file_response}")
new_file = generate_new_file(modify_file_response, contents)
if not is_markdown(file_change_request.filename):
code_repairer = CodeRepairer(chat_logger=self.chat_logger)
diff = generate_diff(old_code=contents, new_code=new_file)
new_file = code_repairer.repair_code(diff=diff, user_code=new_file, feature=file_change_request.instructions)
# new_file = revert_whitespace_changes(original_file_str=contents, modified_file_str=new_file)
return (new_file, file_change_request.filename)
except Exception as e:
logger.warning(f"Recieved error {e}")
logger.warning(
f"Failed to parse. Retrying for the {count}th time..."
)
self.undo()
self.undo()
continue
raise Exception("Failed to parse response after 5 attempts.")
def change_file(self, file_change_request: FileChangeRequest):
if file_change_request.change_type == "create":
return self.create_file(file_change_request)
elif file_change_request.change_type == "modify":
return self.create_file(file_change_request)
else:
raise Exception("Not a valid file type")
def change_files_in_github(
self,
file_change_requests: list[FileChangeRequest],
branch: str,
):
# should check if branch exists, if not, create it
logger.debug(file_change_requests)
for file_change_request in file_change_requests:
file_markdown = is_markdown(file_change_request.filename)
if file_change_request.change_type == "create":
try: # Try to create
file_change = self.create_file(file_change_request)
logger.debug(
f"{file_change_request.filename}, {file_change.commit_message}, {file_change.code}, {branch}"
)
file_change.code = format_contents(file_change.code, file_markdown)
self.repo.create_file(
file_change_request.filename,
file_change.commit_message,
file_change.code,
branch=branch,
)
except github.GithubException as e:
logger.info(e)
try: # Try to modify
contents = self.get_file(file_change_request.filename, branch=branch)
file_change.code = format_contents(file_change.code, file_markdown)
self.repo.update_file(
file_change_request.filename,
file_change.commit_message,
file_change.code,
contents.sha,
branch=branch,
)
except:
pass
elif file_change_request.change_type == "modify":
# TODO(sweep): Cleanup this
try:
contents = self.get_file(file_change_request.filename, branch=branch)
except github.UnknownObjectException as e:
logger.warning(f"Received error {e}, trying creating file...")
file_change_request.change_type = "create"
self.create_file(file_change_request)
file_change = self.create_file(file_change_request)
logger.debug(
f"{file_change_request.filename}, {file_change.commit_message}, {file_change.code}, {branch}"
)
file_change.code = format_contents(file_change.code, file_markdown)
self.repo.create_file(
file_change_request.filename,
file_change.commit_message,
file_change.code,
branch=branch,
)
else:
new_file_contents, file_name = self.modify_file(
file_change_request, contents.decoded_content.decode("utf-8")
)
new_file_contents = format_contents(new_file_contents, file_markdown)
if contents.decoded_content.decode("utf-8").endswith("\n"):
new_file_contents += "\n"
logger.debug(
f"{file_name}, {f'Update {file_name}'}, {new_file_contents}, {branch}"
)
self.repo.update_file(
file_name,
f'Update {file_name}',
new_file_contents,
contents.sha,
branch=branch,
)
else:
raise Exception("Invalid change type")

...
</create>
<modify>
* filename_3: instructions_3
* filename_4: instructions_4
...
</modify>
"""
reply_prompt = """
Write a 1-paragraph response to this user:
* Tell them you have started working on this PR and a rough summary of your plan.
* Do not start with "Here is a draft", just write the response.
* Use github markdown to format the response.
"""
create_file_prompt = """
Think step-by-step regarding the instructions and what should be added to the new file.
Then create a plan of parts of the code to create, with low-level, detailed references to functions and variable to create, and what each function does.
Then create the following file using the following instructions:
File Name: {filename}
Instructions: {instructions}
Reply in the following format. DO NOT write "pass" or "Rest of code". Do not literally write "{{new_file}}". You must use the new_file XML tags, and all text inside these tags will be placed in the newly created file.
Step-by-step thoughts with explanations:
* Thought 1 - Explanation 1
* Thought 2 - Explanation 2
...
Detailed plan of additions:
* Addition 1
* Addition 2
...
Commit Message: {{commit_message}}
<new_file>
{{new_file}}
</new_file>
"""
"""
Reply in the format below.
* You MUST use the new_file XML tags
* DO NOT write ``` anywhere, unless it's markdown
* DO NOT write "pass" or "Rest of code"
* Do not literally write "{{new_file}}".
* Format:
"""
modify_file_plan_prompt = """
Think step-by-step regarding the instructions and how that can be applied to the current file to improve the current codebase.
Then create a plan of parts of the code to modify with detailed references to functions to modify.
File Name: {filename}
<old_file>
{code}
</old_file>
Your instructions to modify the file are: "{instructions}".
Step-by-step thoughts with explanations:
* Thought 1 - Explanation 1
* Thought 2 - Explanation 2
...
Detailed plan of modifications:
* Modification 1
* Modification 2
...
"""
modify_file_prompt = """
Generate a new_file based on the given plan, ensuring that you:
1. Do not write "pass" statements.
2. Provide complete functions with actual business logic. It is imperative that we do not leave any work to the user/future readers of this code.
3. Do not write new "todo" comments.
4. Do not write incomplete functions.
5. Do not write the original line numbers with the new code.
6. Make sure the new code follows the same programming language conventions as the old code.
Instead of writing "# Rest of Code", specify the lines to copy from the old file using an XML tag, inclusive (e.g., "<copied>0-25</copied>"). Make sure to use this exact format.
Copy the correct line numbers and copy as long of a prefix and suffix as possible. For instance, if you want to insert code after line 50, start with "<copied>0-50</copied>".
Example: New file:
print("new file")
</new_file>
Example: Insert at end:
<copied>0-100</copied>
print("inserted at end")
</new_file>
Example: If you want to insert code after lines 50 and 75:
<new_file>
<copied>0-50</copied>
def main():
print("hello world")
<copied>51-75</copied>
print("debug statement")
<copied>76-100</copied>
</new_file>
"""
pr_code_prompt = "" # TODO: deprecate this
pull_request_prompt = """
Awesome! Could you also provide a PR message in the following format? Content should be in Github style markdown. Thanks!
Title: {title}
Branch Name: {branch_name}
<content>
{content}
</content>
"""
summarize_system_prompt = """
Your name is Sweep bot. You are an engineer assigned to helping summarize code instructions and code changes.
"""
user_file_change_summarize_prompt = """
Summarize the given instructions for making changes in a pull request.
Code Instructions:
{message_content}
"""
assistant_file_change_summarize_prompt = """
Please summarize the following file using the file stubs.
Be sure to repeat each method signature and docstring. You may also add additional comments to the docstring.
Do not repeat the code in the file stubs.
Code Changes:
{message_content}
"""
slack_system_message_prompt = "Your name is Sweep bot. You are an engineer assigned to assisting the following Slack user. You will be helpful and friendly, but informal and concise: get to the point. You will use Slack-style markdown when needed to structure your responses."
slack_slash_command_prompt = """
Relevant snippets provided by search engine (decreasing relevance):
<relevant_snippets_in_repo>
{relevant_snippets}
</relevant_snippets_in_repo>
<relevant_paths_in_repo>
{relevant_directories}
</relevant_paths_in_repo>
Repo: {repo_name}: {repo_description}
Username: {username}
Query: {query}
Gather information (i.e. fetch more snippets) to solve the problem. Use "create_pr" if the user asks for changes or you think code changes are needed.
"""
code_repair_system_prompt = """\
You are a genius trained for code repair.
You will be given two pieces of code marked by xml tags. The code inside <diff></diff> is the difference betwen the user_code and the original code, and the code inside <user_code></user_code> is a user's attempt at adding a change described as {feature}.
Our goal is to return a working version of user_code that follows {feature}.
"""
code_repair_prompt = """\
<diff>
{diff}
</diff>
<user_code>
{user_code}
</user_code>
This is the user_code.
Instructions:
* Keep the logic changes from user_code.
* Fix any issues using our knowledge of both the diff and user_code files.
* Fix syntax errors and accidentally deleted lines.
* Do not perform code style cleanup.
* Do not add or remove any whitespace besides what is necessary to fix syntax errors.
* Do not add or remove any comments.
Return the repaired user_code without xml tags. All of the text you return will be placed in the file. Revert any unrelated deletions to user_code, using the diff and described change.
"""
gradio_system_message_prompt = """Your name is Sweep bot. You are a brilliant and thorough engineer assigned to assist the following user with their problems in the Github repo. You will be helpful and friendly, but informal and concise: get to the point. When you write code to solve tickets, the code works on the first try and is formatted perfectly. You have the utmost care for the user that you write for, so you do not make mistakes. If the user asks you to create a PR, you will use the create_pr function.
Relevant snippets provided by search engine (decreasing relevance):
{snippets}
Repo: {repo_name}

# # response = json.loads(response)
# function_name = response["name"]
# arguments = response["arguments"]
# logger.info(f"Fetching file {function_name} with arguments {arguments}.")
# arguments = json.loads(arguments)
# if function_name == "finish":
# return
# elif function_name == "cat":
# path = arguments["filepath"]
# try:
# logger.info("Retrieving file...")
# content = self.get_file(path).decoded_content.decode("utf-8")
# logger.info("Received file")
# except github.GithubException:
# response = self.chat(
# f"File not found: {path}",
# message_key=path,
# functions=functions,
# )
# else:
# response = self.chat(
# f"Here is the file: <file path=\"{path}\">\n\n{content[:10000]}</file>. Fetch more content or call finish.",
# message_key=path,
# functions=functions
# ) # update this constant
return
def create_file(self, file_change_request: FileChangeRequest) -> FileChange:
file_change: FileChange | None = None
for count in range(5):
create_file_response = self.chat(
create_file_prompt.format(
filename=file_change_request.filename,
instructions=file_change_request.instructions,
),
message_key=f"file_change_{file_change_request.filename}",
)
# Add file to list of changed_files
self.file_change_paths.append(file_change_request.filename)
# self.delete_file_from_system_message(file_path=file_change_request.filename)
try:
file_change = FileChange.from_string(create_file_response)
assert file_change is not None
file_change.commit_message = f"sweep: {file_change.commit_message[:50]}"
return file_change
except Exception:
logger.warning(f"Failed to parse. Retrying for the {count}th time...")
self.undo()
continue
raise Exception("Failed to parse response after 5 attempts.")
def modify_file(
self, file_change_request: FileChangeRequest, contents: str = ""
) -> tuple[str, str]:
if not contents:
contents = self.get_file(
file_change_request.filename
).decoded_content.decode("utf-8")
# Add line numbers to the contents; goes in prompts but not github
contents_line_numbers = "\n".join([f"{i}:{line}" for i, line in enumerate(contents.split("\n"))])
contents_line_numbers = contents_line_numbers.replace('"""', "'''")
for count in range(5):
if "0613" in self.model:
_ = self.chat( # We don't use the plan in the next call
modify_file_plan_prompt.format(
filename=file_change_request.filename,
instructions=file_change_request.instructions,
code=contents_line_numbers,
),
message_key=f"file_change_{file_change_request.filename}",
)
modify_file_response = self.chat(
modify_file_prompt,
message_key=f"file_change_{file_change_request.filename}",
)
try:
logger.info(f"modify_file_response: {modify_file_response}")
new_file = generate_new_file(modify_file_response, contents)
if not is_markdown(file_change_request.filename):
code_repairer = CodeRepairer(chat_logger=self.chat_logger)
diff = generate_diff(old_code=contents, new_code=new_file)
new_file = code_repairer.repair_code(diff=diff, user_code=new_file, feature=file_change_request.instructions)
# new_file = revert_whitespace_changes(original_file_str=contents, modified_file_str=new_file)
return (new_file, file_change_request.filename)
except Exception as e:
logger.warning(f"Recieved error {e}")
logger.warning(
f"Failed to parse. Retrying for the {count}th time..."
)
self.undo()
self.undo()
continue
raise Exception("Failed to parse response after 5 attempts.")
def change_file(self, file_change_request: FileChangeRequest):
if file_change_request.change_type == "create":
return self.create_file(file_change_request)
elif file_change_request.change_type == "modify":
return self.create_file(file_change_request)
else:
raise Exception("Not a valid file type")
def change_files_in_github(
self,

from sweepai.utils.chat_logger import ChatLogger
from sweepai.core.entities import (
FileChange,
FileChangeRequest,
FilesToChange,
PullRequest,
RegexMatchError,
Function,
Snippet
)
from sweepai.core.chat import ChatGPT
from sweepai.core.prompts import (
files_to_change_prompt,
pull_request_prompt,
create_file_prompt,
modify_file_prompt,
modify_file_plan_prompt,
)
from sweepai.utils.constants import DB_NAME
from sweepai.utils.diff import format_contents, generate_diff, generate_new_file, is_markdown, revert_whitespace_changes
class CodeGenBot(ChatGPT):
def get_files_to_change(self):
file_change_requests: list[FileChangeRequest] = []
for count in range(5):
try:
logger.info(f"Generating for the {count}th time...")
files_to_change_response = self.chat(files_to_change_prompt, message_key="files_to_change") # Dedup files to change here
files_to_change = FilesToChange.from_string(files_to_change_response)
files_to_create: list[str] = files_to_change.files_to_create.split("\n*")
files_to_modify: list[str] = files_to_change.files_to_modify.split("\n*")
for file_change_request, change_type in zip(
files_to_create + files_to_modify,
["create"] * len(files_to_create)
+ ["modify"] * len(files_to_modify),
):
file_change_request = file_change_request.strip()
if not file_change_request or file_change_request == "* None":
continue
logger.debug(file_change_request)
logger.debug(change_type)
file_change_requests.append(
FileChangeRequest.from_string(
file_change_request, change_type=change_type
)
)
# Create a dictionary to hold file names and their corresponding instructions
file_instructions_dict = {}
for file_change_request in file_change_requests:
# If the file name is already in the dictionary, append the new instructions
if file_change_request.filename in file_instructions_dict:
instructions, change_type = file_instructions_dict[file_change_request.filename]
file_instructions_dict[file_change_request.filename] = (instructions + " " + file_change_request.instructions, change_type)
else:
file_instructions_dict[file_change_request.filename] = (file_change_request.instructions, file_change_request.change_type)
file_change_requests = [FileChangeRequest(filename=file_name, instructions=instructions, change_type=change_type) for file_name, (instructions, change_type) in file_instructions_dict.items()]
if file_change_requests:
return file_change_requests
except RegexMatchError:
logger.warning("Failed to parse! Retrying...")
self.delete_messages_from_chat("files_to_change")
continue
raise Exception("Could not generate files to change")
def generate_pull_request(self) -> PullRequest:
pull_request = None
for count in range(5):
try:
logger.info(f"Generating for the {count}th time...")
pr_text_response = self.chat(pull_request_prompt, message_key="pull_request")
except Exception as e:
logger.warning(f"Exception {e}. Failed to parse! Retrying...")
self.undo()
continue
pull_request = PullRequest.from_string(pr_text_response)
pull_request.branch_name = "sweep/" + pull_request.branch_name[:250]
return pull_request
raise Exception("Could not generate PR text")
class GithubBot(BaseModel):
class Config:
arbitrary_types_allowed = True # for repo: Repository
repo: Repository
def get_contents(self, path: str, branch: str = ""):
if not branch:
branch = self.repo.default_branch
try:
return self.repo.get_contents(path, ref=branch)
except Exception as e:
logger.warning(path)
raise e
def get_file(self, file_path: str, branch: str = "") -> ContentFile:
content = self.get_contents(file_path, branch)
assert not isinstance(content, list)
return content
def check_path_exists(self, path: str, branch: str = ""):
try:
self.get_contents(path, branch)
return True
except Exception:
return False
def create_branch(self, branch: str) -> str:
# Generate PR if nothing is supplied maybe
base_branch = self.repo.get_branch(self.repo.default_branch)
try:
self.repo.create_git_ref(f"refs/heads/{branch}", base_branch.commit.sha)
return branch
except GithubException as e:
logger.error(f"Error: {e}, trying with other branch names...")
for i in range(1, 100):
try:
logger.warning(f"Retrying {branch}_{i}...")
self.repo.create_git_ref(
f"refs/heads/{branch}_{i}", base_branch.commit.sha
)
return f"{branch}_{i}"
except GithubException:
pass
raise e
def populate_snippets(self, snippets: list[Snippet]):


Step 2: 🧐 Snippet Analysis

From looking through the relevant snippets, I decided to make the following modifications:

File Path Proposed Changes
README.md Add a header to the top of the file with the text "## Project Description"

Step 3: 📝 Planning

I have created a plan for writing the pull request. I am now working on executing my plan and coding the required changes to address this issue. Here is the planned pull request:

Update readme with header
sweep/update-readme-header

I have added a header to the readme file to improve readability and organization. The header includes the text "## Project Description" at the top of the file.

Please review the changes and let me know if any further modifications are needed.

Thank you!


Step 4: ⌨️ Coding

I have finished coding the issue. I am now reviewing it for completeness.


Step 5: 🔁 Code Review

Success! 🚀


I'm a bot that handles simple bugs and feature requests but I might make mistakes. Please be kind!
Join Our Discord

@sweep-nightly sweep-nightly bot linked a pull request Jul 7, 2023 that will close this issue
@lukejagg lukejagg added sweep Assigns Sweep to an issue or pull request. and removed sweep Assigns Sweep to an issue or pull request. labels Jul 7, 2023
@sweep-nightly
Copy link
Contributor

sweep-nightly bot commented Jul 7, 2023

0%


Error: 🚫 Unable to Complete PR

If you would like to report this bug, please join our Discord.


I'm a bot that handles simple bugs and feature requests but I might make mistakes. Please be kind!
Join Our Discord

@lukejagg lukejagg added sweep Assigns Sweep to an issue or pull request. and removed sweep Assigns Sweep to an issue or pull request. labels Jul 7, 2023
@sweep-nightly
Copy link
Contributor

sweep-nightly bot commented Jul 7, 2023

0%


Error: 🚫 Unable to Complete PR

If you would like to report this bug, please join our Discord.


I'm a bot that handles simple bugs and feature requests but I might make mistakes. Please be kind!
Join Our Discord

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working documentation Improvements or additions to documentation sweep Assigns Sweep to an issue or pull request.
Projects
None yet
1 participant