-
-
Notifications
You must be signed in to change notification settings - Fork 367
/
Copy pathlogger.py
185 lines (153 loc) · 6.61 KB
/
logger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
import json
import os
import pickle
import sys
from contextlib import contextmanager
from datetime import datetime, timezone
from functools import partial
from logging import LogRecord
from multiprocessing import Pipe
from multiprocessing.connection import Connection
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Generator, Union
from loguru import logger
if TYPE_CHECKING:
from loguru import Record
from psutil import Process
from rdagent.core.conf import RD_AGENT_SETTINGS
from rdagent.core.utils import SingletonBaseClass
from .ui.utils import format_pkl
from .storage import FileStorage
from .utils import LogColors, get_caller_info
class RDAgentLog(SingletonBaseClass):
"""
The files are organized based on the tag & PID
Here is an example tag
.. code-block::
a
- b
- c
- 123
- common_logs.log
- 1322
- common_logs.log
- 1233
- <timestamp>.pkl
- d
- 1233-673 ...
- 1233-4563 ...
- 1233-365 ...
"""
# TODO: Simplify it to introduce less concepts ( We may merge RDAgentLog, Storage &)
# Solution: Storage => PipeLog, View => PipeLogView, RDAgentLog is an instance of PipeLogger
# PipeLogger.info(...) , PipeLogger.get_resp() to get feedback from frontend.
# def f():
# logger = PipeLog()
# logger.info("<code>")
# feedback = logger.get_reps()
_tag: str = ""
def __init__(self, log_trace_path: Union[str, None] = RD_AGENT_SETTINGS.log_trace_path) -> None:
if log_trace_path is None:
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d_%H-%M-%S-%f")
self.log_trace_path = Path.cwd() / "log" / timestamp
else:
self.log_trace_path = Path(log_trace_path)
self.log_trace_path.mkdir(parents=True, exist_ok=True)
self.storage = FileStorage(self.log_trace_path)
self.main_pid = os.getpid()
def set_trace_path(self, log_trace_path: str | Path) -> None:
self.log_trace_path = Path(log_trace_path)
self.storage = FileStorage(log_trace_path)
@contextmanager
def tag(self, tag: str) -> Generator[None, None, None]:
if tag.strip() == "":
raise ValueError("Tag cannot be empty.")
if self._tag != "":
tag = "." + tag
# TODO: It may result in error in mutithreading or co-routine
self._tag = self._tag + tag
try:
yield
finally:
self._tag = self._tag[: -len(tag)]
def get_pids(self) -> str:
"""
Returns a string of pids from the current process to the main process.
Split by '-'.
"""
pid = os.getpid()
process = Process(pid)
pid_chain = f"{pid}"
while process.pid != self.main_pid:
parent_pid = process.ppid()
parent_process = Process(parent_pid)
pid_chain = f"{parent_pid}-{pid_chain}"
process = parent_process
return pid_chain
def file_format(self, record: "Record", raw: bool = False) -> str:
# FIXME: the formmat is tightly coupled with the message reading in storage.
record["message"] = LogColors.remove_ansi_codes(record["message"])
if raw:
return "{message}"
return "{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{function}:{line} - {message}\n"
def log_object(self, obj: object, *, tag: str = "") -> None:
# TODO: I think we can merge the log_object function with other normal log methods to make the interface simpler.
caller_info = get_caller_info()
tag = f"{self._tag}.{tag}.{self.get_pids()}".strip(".")
# FIXME: it looks like a hacking... We should redesign it...
if "debug_" in tag:
debug_log_path = self.log_trace_path / "debug_llm.pkl"
debug_data = {"tag": tag, "obj": obj}
if debug_log_path.exists():
with debug_log_path.open("rb") as f:
existing_data = pickle.load(f)
existing_data.append(debug_data)
with debug_log_path.open("wb") as f:
pickle.dump(existing_data, f)
else:
with debug_log_path.open("wb") as f:
pickle.dump([debug_data], f)
return
logp = self.storage.log(obj, name=tag, save_type="pkl")
format_pkl(obj=obj, tag=tag, log_trace_path=self.log_trace_path)
file_handler_id = logger.add(
self.log_trace_path / tag.replace(".", "/") / "common_logs.log", format=self.file_format
)
logger.patch(lambda r: r.update(caller_info)).info(f"Logging object in {Path(logp).absolute()}")
logger.remove(file_handler_id)
def info(self, msg: str, *, tag: str = "", raw: bool = False) -> None:
# TODO: too much duplicated. due to we have no logger with stream context;
caller_info = get_caller_info()
if raw:
logger.remove()
logger.add(sys.stderr, format=lambda r: "{message}")
tag = f"{self._tag}.{tag}.{self.get_pids()}".strip(".")
log_file_path = self.log_trace_path / tag.replace(".", "/") / "common_logs.log"
if raw:
file_handler_id = logger.add(log_file_path, format=partial(self.file_format, raw=True))
else:
file_handler_id = logger.add(log_file_path, format=self.file_format)
logger.patch(lambda r: r.update(caller_info)).info(msg)
logger.remove(file_handler_id)
if raw:
logger.remove()
logger.add(sys.stderr)
def warning(self, msg: str, *, tag: str = "") -> None:
# TODO: reuse code
# _log(self, msg: str, *, tag: str = "", level=Literal["warning", "error", ..]) -> None:
# getattr(logger.patch(lambda r: r.update(caller_info)), level)(msg)
caller_info = get_caller_info()
tag = f"{self._tag}.{tag}.{self.get_pids()}".strip(".")
file_handler_id = logger.add(
self.log_trace_path / tag.replace(".", "/") / "common_logs.log", format=self.file_format
)
logger.patch(lambda r: r.update(caller_info)).warning(msg)
logger.remove(file_handler_id)
def error(self, msg: str, *, tag: str = "") -> None:
caller_info = get_caller_info()
tag = f"{self._tag}.{tag}.{self.get_pids()}".strip(".")
file_handler_id = logger.add(
self.log_trace_path / tag.replace(".", "/") / "common_logs.log", format=self.file_format
)
logger.patch(lambda r: r.update(caller_info)).error(msg)
logger.remove(file_handler_id)