|
| 1 | +import json |
| 2 | +import tempfile |
| 3 | +import textwrap |
| 4 | +from pathlib import Path |
| 5 | +from collections.abc import Generator |
| 6 | + |
| 7 | +import pytest |
| 8 | +from loguru import logger |
| 9 | + |
| 10 | +from app.agents.agent_common import InvalidLLMResponse |
| 11 | +from app.agents.agent_reviewer import Review, ReviewDecision |
| 12 | +from app.agents.agent_write_patch import PatchAgent, PatchHandle |
| 13 | +from app.agents.agent_reproducer import TestAgent, TestHandle |
| 14 | +from app.data_structures import BugLocation, MessageThread, ReproResult |
| 15 | +from app.api import review_manage |
| 16 | +from app.api.review_manage import ReviewManager |
| 17 | +from app.search.search_manage import SearchManager |
| 18 | +from app.task import SweTask, Task |
| 19 | + |
| 20 | +from test.pytest_utils import DummyTask |
| 21 | + |
| 22 | +# --- Dummy Objects for Testing --- |
| 23 | + |
| 24 | +class DummyPatchHandle(PatchHandle): |
| 25 | + def __str__(self): |
| 26 | + return "dummy_patch" |
| 27 | + |
| 28 | +class DummyTestHandle(TestHandle): |
| 29 | + def __str__(self): |
| 30 | + return "dummy_test" |
| 31 | + |
| 32 | +class DummyReproResult(ReproResult): |
| 33 | + def __init__(self): |
| 34 | + self.stdout = "out" |
| 35 | + self.stderr = "err" |
| 36 | + self.returncode = 0 |
| 37 | + self.reproduced = True |
| 38 | + |
| 39 | +class DummyTestAgent(TestAgent): |
| 40 | + def __init__(self): |
| 41 | + # Simulate no history initially. |
| 42 | + self._history = [] |
| 43 | + self._tests = {} |
| 44 | + # Record feedback calls for later inspection. |
| 45 | + self.feedback = {} |
| 46 | + |
| 47 | + def write_reproducing_test_without_feedback(self): |
| 48 | + # Return a test handle, content, and a dummy reproduction result. |
| 49 | + test_handle = DummyTestHandle() |
| 50 | + test_content = "def test_dummy(): pass" |
| 51 | + self._history.append(test_handle) |
| 52 | + self._tests[test_handle] = test_content |
| 53 | + return test_handle, test_content, DummyReproResult() |
| 54 | + |
| 55 | + def save_test(self, test_handle: TestHandle) -> None: |
| 56 | + # For testing, we simply record that save_test was called. |
| 57 | + self._tests[test_handle] = self._tests.get(test_handle, "saved") |
| 58 | + |
| 59 | + def add_feedback(self, test_handle: TestHandle, feedback: str) -> None: |
| 60 | + self.feedback[str(test_handle)] = feedback |
| 61 | + |
| 62 | +class DummyPatchAgent(PatchAgent): |
| 63 | + def __init__(self, task, search_manager, issue_stmt, context_thread, bug_locs, output_dir): |
| 64 | + # For testing, we record calls. |
| 65 | + self.task = task |
| 66 | + self.search_manager = search_manager |
| 67 | + self.issue_stmt = issue_stmt |
| 68 | + self.context_thread = context_thread |
| 69 | + self.bug_locs = bug_locs |
| 70 | + self.output_dir = output_dir |
| 71 | + self.feedback = {} |
| 72 | + |
| 73 | + def write_applicable_patch_without_feedback(self): |
| 74 | + # Return a dummy patch handle and content. |
| 75 | + return DummyPatchHandle(), "patch content v1" |
| 76 | + |
| 77 | + def write_applicable_patch_with_feedback(self): |
| 78 | + # Return a new dummy patch after feedback. |
| 79 | + return DummyPatchHandle(), "patch content v2" |
| 80 | + |
| 81 | + def add_feedback(self, patch_handle: PatchHandle, feedback: str): |
| 82 | + self.feedback[str(patch_handle)] = feedback |
| 83 | + |
| 84 | +class DummyReview(Review): |
| 85 | + def __init__(self, patch_decision, test_decision): |
| 86 | + self.patch_decision = patch_decision |
| 87 | + self.test_decision = test_decision |
| 88 | + self.patch_analysis = "analysis" |
| 89 | + self.patch_advice = "advice" |
| 90 | + self.test_analysis = "test analysis" |
| 91 | + self.test_advice = "test advice" |
| 92 | + |
| 93 | + def to_json(self): |
| 94 | + return { |
| 95 | + "patch_decision": self.patch_decision.name, |
| 96 | + "test_decision": self.test_decision.name, |
| 97 | + "patch_analysis": self.patch_analysis, |
| 98 | + "patch_advice": self.patch_advice, |
| 99 | + "test_analysis": self.test_analysis, |
| 100 | + "test_advice": self.test_advice, |
| 101 | + } |
| 102 | + |
| 103 | +class DummyReviewThread: |
| 104 | + def __init__(self): |
| 105 | + self.messages = [] |
| 106 | + |
| 107 | + def save_to_file(self, path: Path) -> None: |
| 108 | + # Write a dummy JSON content. |
| 109 | + path.write_text(json.dumps({"dummy": "review_thread"}, indent=4)) |
| 110 | + |
| 111 | +# DummySweTask for testing the generator. |
| 112 | +class DummySweTask(SweTask): |
| 113 | + def __init__(self, issue_statement: str, output_dir: str): |
| 114 | + self._issue_statement = issue_statement |
| 115 | + self.output_dir = output_dir |
| 116 | + |
| 117 | + def get_issue_statement(self) -> str: |
| 118 | + return self._issue_statement |
| 119 | + |
| 120 | + def execute_reproducer(self, test_content: str, patch_content: str) -> ReproResult: |
| 121 | + # Return a dummy reproduction result that changes with patch content. |
| 122 | + result = DummyReproResult() |
| 123 | + if "v2" in patch_content: |
| 124 | + result.stdout = "changed out" |
| 125 | + return result |
| 126 | + |
| 127 | +# Dummy SearchManager: fix by accepting project_path and output_dir. |
| 128 | +class DummySearchManager(SearchManager): |
| 129 | + def __init__(self, project_path="dummy_project", output_dir="dummy_output"): |
| 130 | + self.project_path = project_path |
| 131 | + self.output_dir = output_dir |
| 132 | + |
| 133 | +# Dummy MessageThread. |
| 134 | +class DummyMessageThread(MessageThread): |
| 135 | + def __init__(self): |
| 136 | + self.messages = [] |
| 137 | + |
| 138 | + def add_system(self, content: str): pass |
| 139 | + def add_user(self, content: str): pass |
| 140 | + def add_model(self, content: str, attachments: list): pass |
| 141 | + |
| 142 | +# Monkey-patch print_review and print_acr to do nothing during tests. |
| 143 | +def dummy_print_review(msg: str): |
| 144 | + pass |
| 145 | + |
| 146 | +def dummy_print_acr(msg: str, title: str): |
| 147 | + pass |
| 148 | + |
| 149 | +@pytest.fixture(autouse=True) |
| 150 | +def monkey_patch_prints(monkeypatch): |
| 151 | + monkeypatch.setattr(review_manage, "print_review", dummy_print_review) |
| 152 | + monkeypatch.setattr(review_manage, "print_acr", dummy_print_acr) |
| 153 | + |
| 154 | +# Monkey-patch agent_reviewer.run to return our dummy review and review thread. |
| 155 | +@pytest.fixture(autouse=True) |
| 156 | +def monkey_patch_reviewer(monkeypatch): |
| 157 | + def fake_run(issue_stmt, test_content, patch_content, orig_repro_result, patched_repro_result): |
| 158 | + # For testing, we return a review with YES decisions. |
| 159 | + review = DummyReview(ReviewDecision.YES, ReviewDecision.YES) |
| 160 | + review_thread = DummyReviewThread() |
| 161 | + return review, review_thread |
| 162 | + monkeypatch.setattr(review_manage.agent_reviewer, "run", fake_run) |
| 163 | + |
| 164 | +# --- Test Cases --- |
| 165 | + |
| 166 | +def test_compose_feedback_for_patch_generation(): |
| 167 | + dummy_review = DummyReview(ReviewDecision.NO, ReviewDecision.YES) |
| 168 | + test_content = "def test_example(): pass" |
| 169 | + feedback = ReviewManager.compose_feedback_for_patch_generation(dummy_review, test_content) |
| 170 | + # Check that the feedback message contains the test content and parts from the review. |
| 171 | + assert "test_example" in feedback |
| 172 | + assert "analysis" in feedback |
| 173 | + assert "advice" in feedback |
| 174 | + |
| 175 | +def test_compose_feedback_for_test_generation(): |
| 176 | + dummy_review = DummyReview(ReviewDecision.YES, ReviewDecision.NO) |
| 177 | + patch = "patch content" |
| 178 | + feedback = ReviewManager.compose_feedback_for_test_generation(dummy_review, patch) |
| 179 | + assert "patch content" in feedback |
| 180 | + assert "test analysis" in feedback |
| 181 | + assert "test advice" in feedback |
| 182 | + |
| 183 | +def test_patch_only_generator(tmp_path): |
| 184 | + # Set up a temporary output directory. |
| 185 | + output_dir = str(tmp_path / "output") |
| 186 | + Path(output_dir).mkdir() |
| 187 | + |
| 188 | + # Create a dummy MessageThread. |
| 189 | + thread = DummyMessageThread() |
| 190 | + # Create a dummy bug location list. |
| 191 | + bug_locs = [] |
| 192 | + |
| 193 | + # Create a dummy SearchManager with required args. |
| 194 | + search_manager = DummySearchManager(project_path="dummy_project", output_dir=output_dir) |
| 195 | + |
| 196 | + # Create a dummy Task (non-SweTask is fine for patch_only_generator). |
| 197 | + # This uses the DummyTask class from pytest_utils.py. |
| 198 | + task = DummyTask() |
| 199 | + |
| 200 | + # Create a dummy test agent. |
| 201 | + test_agent = DummyTestAgent() |
| 202 | + |
| 203 | + # Create a dummy patch agent. |
| 204 | + dummy_patch_agent = DummyPatchAgent(task, search_manager, task.get_issue_statement(), thread, bug_locs, output_dir) |
| 205 | + |
| 206 | + # Create our ReviewManager and override its patch_agent with our dummy. |
| 207 | + manager = ReviewManager(thread, bug_locs, search_manager, task, output_dir, test_agent) |
| 208 | + manager.patch_agent = dummy_patch_agent |
| 209 | + |
| 210 | + gen = manager.patch_only_generator() |
| 211 | + # Get first yielded patch. |
| 212 | + patch_handle, patch_content = next(gen) |
| 213 | + # Verify that the saved patch file exists. |
| 214 | + patch_file = Path(output_dir, f"extracted_patch_{patch_handle}.diff") |
| 215 | + assert patch_file.read_text() == patch_content |
| 216 | + |
| 217 | + # Now, force the generator to abort by making write_applicable_patch_without_feedback raise an exception. |
| 218 | + def raise_invalid(): |
| 219 | + raise InvalidLLMResponse("dummy error") |
| 220 | + dummy_patch_agent.write_applicable_patch_without_feedback = raise_invalid |
| 221 | + # Advance generator to trigger exception and termination. |
| 222 | + with pytest.raises(StopIteration): |
| 223 | + next(gen) |
| 224 | + |
| 225 | +def test_generator_with_patch_decision_yes(tmp_path): |
| 226 | + # Set up a temporary output directory. |
| 227 | + output_dir = str(tmp_path / "output") |
| 228 | + Path(output_dir).mkdir() |
| 229 | + |
| 230 | + # Create a dummy MessageThread. |
| 231 | + thread = DummyMessageThread() |
| 232 | + bug_locs = [] |
| 233 | + search_manager = DummySearchManager(project_path="dummy_project", output_dir=output_dir) |
| 234 | + # Create a dummy SweTask. |
| 235 | + task = DummySweTask("dummy issue statement", output_dir) |
| 236 | + |
| 237 | + # Create dummy test agent and patch agent. |
| 238 | + test_agent = DummyTestAgent() |
| 239 | + dummy_patch_agent = DummyPatchAgent(task, search_manager, task.get_issue_statement(), thread, bug_locs, output_dir) |
| 240 | + |
| 241 | + manager = ReviewManager(thread, bug_locs, search_manager, task, output_dir, test_agent) |
| 242 | + manager.patch_agent = dummy_patch_agent |
| 243 | + |
| 244 | + # Prepare a generator from the full generator. |
| 245 | + gen: Generator = manager.generator(rounds=1) |
| 246 | + |
| 247 | + # The generator should first write the test if history is empty. |
| 248 | + # Then, it writes the first patch and then goes into the loop. |
| 249 | + # Advance generator until we reach a yield for a patch decision YES. |
| 250 | + yielded = next(gen) |
| 251 | + # At this point, yielded is from the first iteration of _generator. |
| 252 | + # Now, simulate sending an evaluation message. |
| 253 | + evaluation_msg = "Evaluation OK" |
| 254 | + try: |
| 255 | + next_val = gen.send(evaluation_msg) |
| 256 | + except StopIteration: |
| 257 | + next_val = None |
| 258 | + |
| 259 | + # Check that the evaluation message was used to add feedback. |
| 260 | + # Our dummy_patch_agent.feedback should have a key corresponding to the patch_handle. |
| 261 | + assert "dummy_patch" in dummy_patch_agent.feedback |
| 262 | + # Also, check that a review file has been written. |
| 263 | + review_file = Path(output_dir, f"review_pdummy_patch_tdummy_test.json") |
| 264 | + assert review_file.is_file() |
| 265 | + |
| 266 | + # Also, check that an execution result file has been written. |
| 267 | + exec_file = Path(output_dir, f"execution_dummy_patch_dummy_test.json") |
| 268 | + assert exec_file.is_file() |
0 commit comments