Skip to content

Commit bb5ec6c

Browse files
author
Sergey Khoroshavin
committed
Log processor: include identifier into reqId, refactor
Signed-off-by: Sergey Khoroshavin <[email protected]>
1 parent e5d90f7 commit bb5ec6c

File tree

2 files changed

+150
-129
lines changed

2 files changed

+150
-129
lines changed

scripts/process_logs/README.md

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,9 @@ provided, if attribute contains given value. For example, matcher
118118
```
119119
checks that message has attribute `is_request`, and matcher
120120
```yaml
121-
- reqId: 42
121+
- reqId: xz 42
122122
```
123-
checks that message has attribute `reqId` containing value `42`
123+
checks that message has attribute `reqId` containing value `xz 42`
124124

125125

126126
#### Builtin matchers
@@ -279,7 +279,10 @@ commands.
279279
- `drop`: action to perform is to drop message altogether
280280
- `track_requests`: track requests, adding multiple attributes to relevant
281281
messages:
282-
- reqId: request identifier
282+
- `reqId`: request identifier, composed from `identifier` and `reqId`
283+
separated by space
284+
- `viewNo`: view number
285+
- `ppSeqNo`: pre-prepare sequence number
283286
- TODO: list other attibutes
284287
- `tag`: optionally checks if message matches some regex pattern and sets
285288
custom tags and/or attributes on it. Parameter for this command is dictionary

scripts/process_logs/process_logs

Lines changed: 144 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#!/usr/bin/env python3
22

33
import os, sys, re, gzip, yaml
4+
from typing import Iterable
45
from collections import namedtuple
56
from datetime import datetime, timedelta
67
from string import Formatter
@@ -83,6 +84,36 @@ def input_logs():
8384
REPLICA_NONE = "-"
8485

8586

87+
class MessageAttrs:
88+
def __init__(self):
89+
self._data = {}
90+
91+
def __contains__(self, item):
92+
return item in self._data
93+
94+
def __getitem__(self, item):
95+
return self._data[item]
96+
97+
def add(self, name, value=None):
98+
if isinstance(value, Iterable) and not isinstance(value, str):
99+
for v in value:
100+
self.add(name, v)
101+
return
102+
103+
if value is None:
104+
self._data.setdefault(name, set())
105+
return
106+
107+
try:
108+
self._data[name].add(value)
109+
except KeyError:
110+
self._data[name] = set([value])
111+
112+
def merge(self, other):
113+
for name, values in other._data.items():
114+
self.add(name, values)
115+
116+
86117
class LogMessage:
87118
def __init__(self, body, node=None, replica=REPLICA_NONE, timestamp=None, level=None, source=None, func=None):
88119
self.body = body
@@ -92,16 +123,7 @@ class LogMessage:
92123
self.level = level
93124
self.source = source
94125
self.func = func
95-
self.attributes = {}
96-
97-
def set_attribute(self, name, value=None):
98-
if value:
99-
try:
100-
self.attributes[name].add(value)
101-
except KeyError:
102-
self.attributes[name] = set([value])
103-
else:
104-
self.attributes.setdefault(name, set())
126+
self.attrs = MessageAttrs()
105127

106128

107129
_replica_matcher = re.compile("^REPLICA:\((\w+):(\d+)\)").search
@@ -233,11 +255,11 @@ def match_replica(replica):
233255
def match_attribute(params):
234256
name, value = kv_from_item(params)
235257
if value is None:
236-
return lambda message: name in message.attributes
258+
return lambda message: name in message.attrs
237259

238260
def match(message):
239261
try:
240-
return str(value) in message.attributes[name]
262+
return str(value) in message.attrs[name]
241263
except KeyError:
242264
return False
243265

@@ -320,18 +342,18 @@ def rule_tag(params):
320342
match = re.compile(params.get("pattern", "")).search
321343
attributes = params.get("attributes", {})
322344

323-
def process(message, output):
345+
def process(message: LogMessage, output):
324346
m = match(message.body)
325347
if m is None:
326348
return
327349
for name, value in attributes.items():
328350
if value is None:
329-
message.set_attribute(name)
351+
message.attrs.add(name)
330352
return
331353
if isinstance(value, str) and value.startswith("group "):
332-
message.set_attribute(name, m.group(int(value[6:])))
354+
message.attrs.add(name, m.group(int(value[6:])))
333355
return
334-
message.set_attribute(name, value)
356+
message.attrs.add(name, value)
335357

336358
return process
337359

@@ -452,7 +474,7 @@ class OutputLog:
452474
self.log_files = {}
453475

454476
def add_message(self, message):
455-
line = self.pattern.format(**vars(message), **message.attributes)
477+
line = self.pattern.format(**vars(message), **message.attrs._data)
456478
filename = self.filename.format(node=message.node,
457479
replica=message.replica if message.replica != REPLICA_NONE else 0)
458480
self._log_file(filename).append(message.timestamp, line)
@@ -671,29 +693,51 @@ class RequestData:
671693
self.ordered = _merge_timestamps(self.ordered, other.ordered)
672694

673695

696+
class MessageParser:
697+
def __init__(self, substr, pattern):
698+
self.substr = substr
699+
self.search = re.compile(pattern).search
700+
701+
def parse(self, message):
702+
if self.substr not in message.body:
703+
return None
704+
m = self.search(message.body)
705+
if not m: return None
706+
g = m.groups()
707+
return g if len(g) > 1 else g[0]
708+
709+
674710
class NodeRequestData:
675711
def __init__(self):
676712
self.requests = {}
677-
self._match_req_id = re.compile("'reqId': (\d+)").search
678-
self._match_req_idr = re.compile("'reqIdr': (\[(?:\['\w+', \d+\](?:, )?)*\])").search
679-
self._match_pp_seq_no = re.compile("'ppSeqNo': (\d+)").search
680-
self._match_view_no = re.compile("'viewNo': (\d+)").search
681-
self._match_prepare = re.compile("PREPARE\s?\((\d+), (\d+)\)").search
682-
self._match_auth_request = re.compile("signature on (?:[\w\s]*) request (\d+)").search
683-
self._pattern_req = "\('\w+', (\d+)\)"
684-
self._match_propagate_req = re.compile("propagating request {} from client".format(self._pattern_req)).search
685-
self._match_forward_req = re.compile("forwarding request {} to".format(self._pattern_req)).search
686-
self._pattern_req_list = "\[(?:\('\w+', \d+\)(?:, )?)*\]"
687-
self._match_ordered_req_list = re.compile("requests ordered ({})".format(self._pattern_req_list)).search
688-
self._match_discarded_req_list = re.compile("discarded ({})".format(self._pattern_req_list)).search
713+
self._identifier = MessageParser("'identifier':",
714+
"'identifier': '(\w+)'")
715+
self._req_idr = MessageParser("'reqIdr':",
716+
"'reqIdr': (\[(?:\['\w+', \d+\](?:, )?)*\])")
717+
self._req_id = MessageParser("'reqId':",
718+
"'reqId': (\d+)")
719+
self._pp_seq_no = MessageParser("'ppSeqNo':",
720+
"'ppSeqNo': (\d+)")
721+
self._view_no = MessageParser("'viewNo':",
722+
"'viewNo': (\d+)")
723+
self._prepare = MessageParser("PREPARE",
724+
"PREPARE\s?\((\d+), (\d+)\)")
725+
self._propagate_req = MessageParser("propagating request",
726+
"propagating request \('(\w+)', (\d+)\) from client")
727+
self._forward_req = MessageParser("forwarding request",
728+
"forwarding request \('(\w+)', (\d+)\) to")
729+
self._ordered = MessageParser("ordered batch request",
730+
"ordered batch request, view no (\d+), ppSeqNo (\d+), ledger (\d+), "\
731+
"state root \w+, txn root \w+, "\
732+
"requests ordered (\[(?:\('\w+', \d+\)(?:, )?)*\]), "\
733+
"discarded (\[(?:\('\w+', \d+\)(?:, )?)*\])")
689734

690735
def process_message(self, message):
691-
self._set_attributes(message)
692-
if self._check_received(message):
693-
return
694-
if self._check_already_processed(message):
736+
attrs = self._extract_attributes(message)
737+
message.attrs.merge(attrs)
738+
if self._check_received(message, attrs):
695739
return
696-
if self._check_batch_ordered(message):
740+
if self._check_already_processed(message, attrs):
697741
return
698742

699743
def merge(self, other):
@@ -723,106 +767,80 @@ class NodeRequestData:
723767
self.requests[id] = request
724768
return request
725769

726-
def _parse_reqId(self, message):
727-
if "'reqId':" not in message.body:
728-
return
729-
m = self._match_req_id(message.body)
730-
if not m: return
731-
return m.group(1)
732-
733-
def _parse_reqIdr(self, message):
734-
if "'reqIdr':" not in message.body:
735-
return []
736-
m = self._match_req_idr(message.body)
737-
if not m: return []
738-
return [str(r[1]) for r in literal_eval(m.group(1))]
739-
740-
def _parse_ppSeqNo(self, message):
741-
if "'ppSeqNo':" not in message.body:
742-
return
743-
m = self._match_pp_seq_no(message.body)
744-
if not m: return
745-
return m.group(1)
746-
747-
def _parse_viewNo(self, message):
748-
if "'viewNo':" not in message.body:
749-
return
750-
m = self._match_view_no(message.body)
751-
if not m: return
752-
return m.group(1)
753-
754-
def _process_prepare(self, message):
755-
if "PREPARE" not in message.body:
756-
return
757-
m = self._match_prepare(message.body)
758-
if not m: return
759-
message.set_attribute("viewNo", m.group(1))
760-
message.set_attribute("ppSeqNo", m.group(2))
761-
762-
def _process_auth_request(self, message):
763-
if "authenticated" not in message.body:
764-
return
765-
if "signature on" not in message.body:
766-
return
767-
m = self._match_auth_request(message.body)
768-
if not m: return
769-
message.set_attribute("reqId", m.group(1))
770-
771-
def _process_propagate_req(self, message):
772-
if "propagating request" not in message.body:
773-
return
774-
m = self._match_propagate_req(message.body)
775-
if not m: return
776-
message.set_attribute("reqId", m.group(1))
777-
778-
def _process_forward_req(self, message):
779-
if "forwarding request" not in message.body:
780-
return
781-
m = self._match_forward_req(message.body)
782-
if not m: return
783-
message.set_attribute("reqId", m.group(1))
784-
785-
def _set_attributes(self, message):
786-
reqId = self._parse_reqId(message)
787-
if reqId: message.set_attribute("reqId", reqId)
788-
reqIdr = self._parse_reqIdr(message)
789-
for reqId in reqIdr:
790-
message.set_attribute("reqId", reqId)
791-
ppSeqNo = self._parse_ppSeqNo(message)
792-
if ppSeqNo: message.set_attribute("ppSeqNo", ppSeqNo)
793-
viewNo = self._parse_viewNo(message)
794-
if viewNo: message.set_attribute("viewNo", viewNo)
795-
796-
self._process_prepare(message)
797-
self._process_auth_request(message)
798-
self._process_propagate_req(message)
799-
self._process_forward_req(message)
800-
801-
def _check_received(self, message):
770+
def _extract_attributes(self, message) -> MessageAttrs:
771+
attrs = MessageAttrs()
772+
self._extract_identifier_reqId(message, attrs)
773+
self._extract_reqIdr(message, attrs)
774+
self._extract_ppSeqNo(message, attrs)
775+
self._extract_viewNo(message, attrs)
776+
self._process_prepare(message, attrs)
777+
self._process_propagate_req(message, attrs)
778+
self._process_forward_req(message, attrs)
779+
self._process_ordered(message, attrs)
780+
return attrs
781+
782+
def _check_received(self, message, attrs):
802783
if "received client request" not in message.body:
803784
return
804-
message.set_attribute("request", "received")
805-
reqId = self._parse_reqId(message)
806-
self._request(reqId).set_received(message.timestamp)
785+
message.attrs.add("request", "received")
786+
self._request(next(iter(attrs['reqId']))).set_received(message.timestamp)
807787
return True
808788

809-
def _check_already_processed(self, message):
789+
def _check_already_processed(self, message, attrs):
810790
if "returning REPLY from already processed REQUEST" not in message.body:
811791
return
812-
message.set_attribute("request", "already_processed")
792+
message.attrs.add("request", "already_processed")
813793
return True
814794

815-
def _check_batch_ordered(self, message):
816-
if "ordered batch request" not in message.body:
817-
return
818-
message.set_attribute("request", "ordered")
819-
ordered = self._match_ordered_req_list(message.body)
820-
ordered = literal_eval(ordered.group(1))
821-
for _, reqId in ordered:
822-
reqId = str(reqId)
823-
message.set_attribute("reqId", reqId)
824-
self._request(reqId).set_ordered(message.timestamp)
825-
return True
795+
def _extract_identifier_reqId(self, message, attrs):
796+
m1 = self._identifier.parse(message)
797+
m2 = self._req_id.parse(message)
798+
if m1 is not None and m2 is not None:
799+
attrs.add('reqId', "{} {}".format(m1, m2))
800+
801+
def _extract_reqIdr(self, message, attrs):
802+
m = self._req_idr.parse(message)
803+
if m is not None:
804+
for r in literal_eval(m):
805+
attrs.add('reqId', "{} {}".format(r[0], str(r[1])))
806+
807+
def _extract_ppSeqNo(self, message, attrs):
808+
m = self._pp_seq_no.parse(message)
809+
if m is not None:
810+
attrs.add('ppSeqNo', m)
811+
812+
def _extract_viewNo(self, message, attrs):
813+
m = self._view_no.parse(message)
814+
if m is not None:
815+
attrs.add('viewNo', m)
816+
817+
def _process_prepare(self, message, attrs):
818+
m = self._prepare.parse(message)
819+
if m is not None:
820+
attrs.add('viewNo', m[0])
821+
attrs.add('ppSeqNo', m[1])
822+
823+
def _process_propagate_req(self, message, attrs):
824+
m = self._propagate_req.parse(message)
825+
if m is not None:
826+
attrs.add('reqId', "{} {}".format(m[0], m[1]))
827+
828+
def _process_forward_req(self, message, attrs):
829+
m = self._forward_req.parse(message)
830+
if m is not None:
831+
attrs.add('reqId', "{} {}".format(m[0], m[1]))
832+
833+
def _process_ordered(self, message, attrs):
834+
m = self._ordered.parse(message)
835+
if m is None: return
836+
message.attrs.add("request", "ordered")
837+
attrs.add('viewNo', m[0])
838+
attrs.add('ppSeqNo', m[1])
839+
attrs.add('ledger', m[2])
840+
for r in literal_eval(m[3]):
841+
req_id = "{} {}".format(r[0], str(r[1]))
842+
attrs.add('reqId', req_id)
843+
self._request(req_id).set_ordered(message.timestamp)
826844

827845

828846
class AllRequestData:

0 commit comments

Comments
 (0)