Skip to content

Commit bfce2b4

Browse files
committed
feat: add HistoryProcessors wrapper
1 parent 49658f3 commit bfce2b4

File tree

5 files changed

+209
-17
lines changed

5 files changed

+209
-17
lines changed

docs/message-history.md

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -522,6 +522,64 @@ agent = Agent('openai:gpt-4o', history_processors=[filter_responses, summarize_o
522522
In this case, the `filter_responses` processor will be applied first, and the
523523
`summarize_old_messages` processor will be applied second.
524524

525+
### Modifying Message History
526+
527+
By default, history processors only modify the messages sent to the model without changing the original conversation history. However, you can use `HistoryProcessors` with `replace_history=True` to actually modify the original message history stored in the agent.
528+
529+
This is useful for scenarios like permanently compressing long conversations, implementing sliding window memory management, or removing sensitive information from the conversation history.
530+
531+
```python {title="modify_message_history.py"}
532+
from pydantic_ai import Agent, HistoryProcessors
533+
from pydantic_ai.messages import ModelMessage, ModelRequest
534+
535+
def keep_recent_messages(messages: list[ModelMessage]) -> list[ModelMessage]:
536+
"""Keep only the last 3 messages to manage memory."""
537+
return messages[-3:] if len(messages) > 3 else messages
538+
539+
# Create HistoryProcessors with replace_history=True
540+
processors = HistoryProcessors(
541+
funcs=[keep_recent_messages],
542+
replace_history=True
543+
)
544+
545+
agent = Agent('openai:gpt-4o', history_processors=processors)
546+
547+
# Long conversation history
548+
long_history = [
549+
# ... assume this contains many messages
550+
]
551+
552+
result = agent.run_sync('What did we discuss recently?', message_history=long_history)
553+
554+
# The original history has been modified - only recent messages remain
555+
print(len(result.all_messages())) # Much shorter than the original long_history
556+
```
557+
558+
You can also combine multiple processors with `replace_history=True`:
559+
560+
```python {title="multiple_processors_with_replace.py"}
561+
from pydantic_ai import Agent
562+
from pydantic_ai._agent_graph import HistoryProcessors
563+
from pydantic_ai.messages import ModelMessage, ModelRequest
564+
565+
def remove_system_messages(messages: list[ModelMessage]) -> list[ModelMessage]:
566+
# Remove system messages for privacy
567+
return [msg for msg in messages if not is_system_message(msg)]
568+
569+
def summarize_old_messages(messages: list[ModelMessage]) -> list[ModelMessage]:
570+
# Keep only the last 5 messages
571+
return messages[-5:] if len(messages) > 5 else messages
572+
573+
processors = HistoryProcessors(
574+
funcs=[remove_system_messages, summarize_old_messages],
575+
replace_history=True
576+
)
577+
578+
agent = Agent('openai:gpt-4o', history_processors=processors)
579+
```
580+
581+
**Note:** When `replace_history=False` (the default), the behavior is the same as using a list of processors directly - the original conversation history remains unchanged.
582+
525583
## Examples
526584

527585
For a more complete example of using messages in conversations, see the [chat app](examples/chat-app.md) example.

pydantic_ai_slim/pydantic_ai/__init__.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
11
from importlib.metadata import version as _metadata_version
22

3-
from .agent import Agent, CallToolsNode, EndStrategy, ModelRequestNode, UserPromptNode, capture_run_messages
3+
from .agent import (
4+
Agent,
5+
CallToolsNode,
6+
EndStrategy,
7+
HistoryProcessors,
8+
ModelRequestNode,
9+
UserPromptNode,
10+
capture_run_messages,
11+
)
412
from .exceptions import (
513
AgentRunError,
614
FallbackExceptionGroup,
@@ -19,6 +27,7 @@
1927
'__version__',
2028
# agent
2129
'Agent',
30+
'HistoryProcessors',
2231
'EndStrategy',
2332
'CallToolsNode',
2433
'ModelRequestNode',

pydantic_ai_slim/pydantic_ai/_agent_graph.py

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
'build_run_context',
3636
'capture_run_messages',
3737
'HistoryProcessor',
38+
'HistoryProcessors',
3839
)
3940

4041

@@ -68,6 +69,16 @@
6869
"""
6970

7071

72+
@dataclasses.dataclass
73+
class HistoryProcessors[DepsT]:
74+
"""A wrapper for a list of history processors."""
75+
76+
funcs: list[HistoryProcessor[DepsT]]
77+
"""A list of functions to process the message history."""
78+
replace_history: bool = False
79+
"""Whether to replace the message history with the processed history."""
80+
81+
7182
@dataclasses.dataclass
7283
class GraphAgentState:
7384
"""State kept across the execution of the agent graph."""
@@ -106,7 +117,7 @@ class GraphAgentDeps(Generic[DepsT, OutputDataT]):
106117
output_schema: _output.OutputSchema[OutputDataT]
107118
output_validators: list[_output.OutputValidator[DepsT, OutputDataT]]
108119

109-
history_processors: Sequence[HistoryProcessor[DepsT]]
120+
history_processors: HistoryProcessors[DepsT]
110121

111122
function_tools: dict[str, Tool[DepsT]] = dataclasses.field(repr=False)
112123
mcp_servers: Sequence[MCPServer] = dataclasses.field(repr=False)
@@ -358,9 +369,7 @@ async def _stream(
358369

359370
model_settings, model_request_parameters = await self._prepare_request(ctx)
360371
model_request_parameters = ctx.deps.model.customize_request_parameters(model_request_parameters)
361-
message_history = await _process_message_history(
362-
ctx.state.message_history, ctx.deps.history_processors, build_run_context(ctx)
363-
)
372+
message_history = await _process_message_history(ctx.state, ctx.deps.history_processors, build_run_context(ctx))
364373
async with ctx.deps.model.request_stream(
365374
message_history, model_settings, model_request_parameters
366375
) as streamed_response:
@@ -384,9 +393,7 @@ async def _make_request(
384393

385394
model_settings, model_request_parameters = await self._prepare_request(ctx)
386395
model_request_parameters = ctx.deps.model.customize_request_parameters(model_request_parameters)
387-
message_history = await _process_message_history(
388-
ctx.state.message_history, ctx.deps.history_processors, build_run_context(ctx)
389-
)
396+
message_history = await _process_message_history(ctx.state, ctx.deps.history_processors, build_run_context(ctx))
390397
model_response = await ctx.deps.model.request(message_history, model_settings, model_request_parameters)
391398
ctx.state.usage.incr(_usage.Usage())
392399

@@ -955,12 +962,13 @@ def build_agent_graph(
955962

956963

957964
async def _process_message_history(
958-
messages: list[_messages.ModelMessage],
959-
processors: Sequence[HistoryProcessor[DepsT]],
965+
state: GraphAgentState,
966+
processors: HistoryProcessors[DepsT],
960967
run_context: RunContext[DepsT],
961968
) -> list[_messages.ModelMessage]:
962969
"""Process message history through a sequence of processors."""
963-
for processor in processors:
970+
messages = state.message_history
971+
for processor in processors.funcs:
964972
takes_ctx = is_takes_ctx(processor)
965973

966974
if is_async_callable(processor):
@@ -976,4 +984,6 @@ async def _process_message_history(
976984
else:
977985
sync_processor = cast(_HistoryProcessorSync, processor)
978986
messages = await run_in_executor(sync_processor, messages)
987+
if processors.replace_history:
988+
state.message_history = messages
979989
return messages

pydantic_ai_slim/pydantic_ai/agent.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
result,
3131
usage as _usage,
3232
)
33-
from ._agent_graph import HistoryProcessor
33+
from ._agent_graph import HistoryProcessor, HistoryProcessors
3434
from .models.instrumented import InstrumentationSettings, InstrumentedModel, instrument_model
3535
from .output import OutputDataT, OutputSpec
3636
from .result import FinalResult, StreamedRunResult
@@ -78,6 +78,7 @@
7878
'ModelRequestNode',
7979
'UserPromptNode',
8080
'InstrumentationSettings',
81+
'HistoryProcessors',
8182
)
8283

8384

@@ -181,7 +182,7 @@ def __init__(
181182
defer_model_check: bool = False,
182183
end_strategy: EndStrategy = 'early',
183184
instrument: InstrumentationSettings | bool | None = None,
184-
history_processors: Sequence[HistoryProcessor[AgentDepsT]] | None = None,
185+
history_processors: Sequence[HistoryProcessor[AgentDepsT]] | HistoryProcessors[AgentDepsT] | None = None,
185186
) -> None: ...
186187

187188
@overload
@@ -211,7 +212,7 @@ def __init__(
211212
defer_model_check: bool = False,
212213
end_strategy: EndStrategy = 'early',
213214
instrument: InstrumentationSettings | bool | None = None,
214-
history_processors: Sequence[HistoryProcessor[AgentDepsT]] | None = None,
215+
history_processors: Sequence[HistoryProcessor[AgentDepsT]] | HistoryProcessors[AgentDepsT] | None = None,
215216
) -> None: ...
216217

217218
def __init__(
@@ -236,7 +237,7 @@ def __init__(
236237
defer_model_check: bool = False,
237238
end_strategy: EndStrategy = 'early',
238239
instrument: InstrumentationSettings | bool | None = None,
239-
history_processors: Sequence[HistoryProcessor[AgentDepsT]] | None = None,
240+
history_processors: Sequence[HistoryProcessor[AgentDepsT]] | HistoryProcessors[AgentDepsT] | None = None,
240241
**_deprecated_kwargs: Any,
241242
):
242243
"""Create an agent.
@@ -359,7 +360,13 @@ def __init__(
359360
self._max_result_retries = output_retries if output_retries is not None else retries
360361
self._mcp_servers = mcp_servers
361362
self._prepare_tools = prepare_tools
362-
self.history_processors = history_processors or []
363+
history_processors = history_processors or []
364+
self.history_processors = cast(
365+
HistoryProcessors[AgentDepsT],
366+
HistoryProcessors(funcs=list(history_processors))
367+
if not isinstance(history_processors, HistoryProcessors)
368+
else history_processors,
369+
)
363370
for tool in tools:
364371
if isinstance(tool, Tool):
365372
self._register_tool(tool)

tests/test_history_processor.py

Lines changed: 109 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import pytest
55
from inline_snapshot import snapshot
66

7-
from pydantic_ai import Agent
7+
from pydantic_ai import Agent, HistoryProcessors
88
from pydantic_ai.messages import ModelMessage, ModelRequest, ModelRequestPart, ModelResponse, TextPart, UserPromptPart
99
from pydantic_ai.models.function import AgentInfo, FunctionModel
1010
from pydantic_ai.tools import RunContext
@@ -301,3 +301,111 @@ class Deps:
301301
user_part = msg.parts[0]
302302
assert isinstance(user_part, UserPromptPart)
303303
assert cast(str, user_part.content).startswith('TEST: ')
304+
305+
306+
async def test_history_processors_replace_history_true(function_model: FunctionModel):
307+
"""Test HistoryProcessors with replace_history=True modifies original history."""
308+
309+
def keep_only_requests(messages: list[ModelMessage]) -> list[ModelMessage]:
310+
return [msg for msg in messages if isinstance(msg, ModelRequest)]
311+
312+
processors = HistoryProcessors(funcs=[keep_only_requests], replace_history=True) # type: ignore
313+
agent = Agent(function_model, history_processors=processors) # type: ignore
314+
315+
original_history = [
316+
ModelRequest(parts=[UserPromptPart(content='Question 1')]),
317+
ModelResponse(parts=[TextPart(content='Answer 1')]),
318+
ModelRequest(parts=[UserPromptPart(content='Question 2')]),
319+
ModelResponse(parts=[TextPart(content='Answer 2')]),
320+
]
321+
322+
result = await agent.run('Question 3', message_history=original_history.copy())
323+
324+
# Verify the history was modified - responses should be removed
325+
all_messages = result.all_messages()
326+
requests = [msg for msg in all_messages if isinstance(msg, ModelRequest)]
327+
responses = [msg for msg in all_messages if isinstance(msg, ModelResponse)]
328+
329+
# Should have 3 requests (2 original + 1 new) and 1 response (only the new one)
330+
assert len(requests) == 3
331+
assert len(responses) == 1
332+
333+
334+
async def test_history_processors_multiple_with_replace_history(function_model: FunctionModel):
335+
"""Test multiple processors with replace_history=True."""
336+
337+
def remove_responses(messages: list[ModelMessage]) -> list[ModelMessage]:
338+
return [msg for msg in messages if isinstance(msg, ModelRequest)]
339+
340+
def keep_recent(messages: list[ModelMessage]) -> list[ModelMessage]:
341+
return messages[-2:] if len(messages) > 2 else messages
342+
343+
processors = HistoryProcessors( # type: ignore
344+
funcs=[remove_responses, keep_recent], replace_history=True
345+
)
346+
agent = Agent(function_model, history_processors=processors) # type: ignore
347+
348+
# Create history with 4 requests and 4 responses
349+
original_history: list[ModelMessage] = []
350+
for i in range(4):
351+
original_history.append(ModelRequest(parts=[UserPromptPart(content=f'Question {i + 1}')]))
352+
original_history.append(ModelResponse(parts=[TextPart(content=f'Answer {i + 1}')]))
353+
354+
result = await agent.run('Final question', message_history=original_history.copy())
355+
356+
# After processing: remove responses -> keep recent 2 -> add new exchange
357+
all_messages = result.all_messages()
358+
requests = [msg for msg in all_messages if isinstance(msg, ModelRequest)]
359+
responses = [msg for msg in all_messages if isinstance(msg, ModelResponse)]
360+
361+
# Should have 2 requests (1 requests + 1 new) and 1 response (new only), responses should be removed
362+
assert len(requests) == 2
363+
assert len(responses) == 1
364+
365+
366+
async def test_history_processors_streaming_with_replace_history(function_model: FunctionModel):
367+
"""Test replace_history=True works with streaming runs."""
368+
369+
def summarize_history(messages: list[ModelMessage]) -> list[ModelMessage]:
370+
# Simple summarization - keep only the last message
371+
return messages[-1:] if messages else []
372+
373+
processors = HistoryProcessors(funcs=[summarize_history], replace_history=True) # type: ignore
374+
agent = Agent(function_model, history_processors=processors) # type: ignore
375+
376+
original_history = [
377+
ModelRequest(parts=[UserPromptPart(content='Question 1')]),
378+
ModelResponse(parts=[TextPart(content='Answer 1')]),
379+
ModelRequest(parts=[UserPromptPart(content='Question 2')]),
380+
ModelResponse(parts=[TextPart(content='Answer 2')]),
381+
]
382+
383+
async with agent.run_stream('Question 3', message_history=original_history.copy()) as result:
384+
async for _ in result.stream_text():
385+
pass
386+
387+
# Verify history was modified during streaming
388+
all_messages = result.all_messages()
389+
# Should only have: new request + new response = 2 total
390+
assert len(all_messages) == 2
391+
392+
393+
async def test_history_processors_replace_history_false_default(function_model: FunctionModel):
394+
"""Test HistoryProcessors with replace_history=False (default) preserves original history."""
395+
396+
def keep_only_requests(messages: list[ModelMessage]) -> list[ModelMessage]:
397+
return [msg for msg in messages if isinstance(msg, ModelRequest)]
398+
399+
processors = HistoryProcessors(funcs=[keep_only_requests]) # replace_history=False by default # type: ignore
400+
agent = Agent(function_model, history_processors=processors) # type: ignore
401+
402+
original_history = [
403+
ModelRequest(parts=[UserPromptPart(content='Question 1')]),
404+
ModelResponse(parts=[TextPart(content='Answer 1')]),
405+
]
406+
407+
result = await agent.run('Question 2', message_history=original_history.copy())
408+
409+
# Verify original history is preserved
410+
all_messages = result.all_messages()
411+
assert len(all_messages) == 4 # 2 original + 1 new request + 1 new response

0 commit comments

Comments
 (0)