Skip to content

Commit 3641daf

Browse files
authored
Merge pull request #850 from skhoroshavin/indy-1475
INDY-1475: Metrics improvements
2 parents ad789e6 + 08449d9 commit 3641daf

File tree

3 files changed

+30
-9
lines changed

3 files changed

+30
-9
lines changed

plenum/common/metrics_collector.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ class MetricsName(IntEnum):
1818
INCOMING_NODE_MESSAGE_SIZE = 6 # Incoming node message size, bytes
1919
OUTGOING_CLIENT_MESSAGE_SIZE = 7 # Outgoing client message size, bytes
2020
INCOMING_CLIENT_MESSAGE_SIZE = 8 # Incoming client message size, bytes
21+
ORDERED_BATCH_SIZE = 9 # Number of requests ordered
22+
REQUEST_PROCESSING_TIME = 10 # Time spent on requests processing (including dynamic validation)
23+
MASTER_3PC_BATCH_SIZE = 11 # Number of requests in one 3PC batch created on master instance
24+
MASTER_ORDERED_BATCH_SIZE = 12 # Number of requests ordered on master instance
25+
MASTER_REQUEST_PROCESSING_TIME = 13 # Time spent on requests processing on master instance
2126

2227

2328
MetricsEvent = NamedTuple('MetricsEvent', [('timestamp', datetime), ('name', MetricsName), ('value', float)])
@@ -42,12 +47,15 @@ class KvStoreMetricsFormat:
4247
seq_mask = (1 << seq_bits) - 1
4348

4449
@staticmethod
45-
def encode(event: MetricsEvent, seq_no: int = 0) -> (bytes, bytes):
46-
int_ts = int(1000000 * event.timestamp.replace(tzinfo=timezone.utc).timestamp())
50+
def encode_key(ts: datetime, seq_no: int):
51+
int_ts = int(1000000 * ts.replace(tzinfo=timezone.utc).timestamp())
4752
int_ts = int_ts & KvStoreMetricsFormat.ts_mask
4853
seq_no = seq_no & KvStoreMetricsFormat.seq_mask
49-
key = ((int_ts << KvStoreMetricsFormat.seq_bits) | seq_no).to_bytes(64, byteorder='big', signed=False)
54+
return ((int_ts << KvStoreMetricsFormat.seq_bits) | seq_no).to_bytes(64, byteorder='big', signed=False)
5055

56+
@staticmethod
57+
def encode(event: MetricsEvent, seq_no: int = 0) -> (bytes, bytes):
58+
key = KvStoreMetricsFormat.encode_key(event.timestamp, seq_no)
5159
value = event.name.to_bytes(32, byteorder='big', signed=False) + struct.pack('d', event.value)
5260
return key, value
5361

plenum/common/metrics_stats.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ def __init__(self):
106106
def add(self, id: MetricsName, value: float):
107107
self._stats[id].add(value)
108108

109-
def get(self, id: MetricsName):
109+
def get(self, id: MetricsName) -> ValueAccumulator:
110110
return self._stats[id]
111111

112112
def merge(self, other):
@@ -186,13 +186,11 @@ def load_metrics_from_kv_store(storage: KeyValueStorage,
186186
step: timedelta = timedelta(minutes=1)) -> MetricsStats:
187187
result = MetricsStats(step)
188188

189-
# TODO: Implement faster filtering by timestamps
190-
for k, v in storage.iterator():
189+
start = KvStoreMetricsFormat.encode_key(min_ts, 0) if min_ts else None
190+
for k, v in storage.iterator(start=start):
191191
ev = KvStoreMetricsFormat.decode(k, v)
192-
if min_ts is not None and ev.timestamp < min_ts:
193-
continue
194192
if max_ts is not None and ev.timestamp > max_ts:
195-
continue
193+
break
196194
result.add(ev.timestamp, ev.name, ev.value)
197195

198196
return result

plenum/server/replica.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -704,6 +704,8 @@ def trackBatches(self, pp: PrePrepare, prevStateRootHash):
704704
self.logger.trace('{} tracking batch for {} with state root {}'.format(
705705
self, pp, prevStateRootHash))
706706
self.metrics.add_event(MetricsName.THREE_PC_BATCH_SIZE, len(pp.reqIdr))
707+
if self.isMaster:
708+
self.metrics.add_event(MetricsName.MASTER_3PC_BATCH_SIZE, len(pp.reqIdr))
707709
self.batches[(pp.viewNo, pp.ppSeqNo)] = [pp.ledgerId, pp.discarded,
708710
pp.ppTime, prevStateRootHash]
709711

@@ -816,6 +818,7 @@ def consume_req_queue_for_pre_prepare(self, ledger_id, view_no, pp_seq_no):
816818
validReqs = []
817819
inValidReqs = []
818820
rejects = []
821+
start = time.perf_counter()
819822
while len(validReqs) + len(inValidReqs) < self.config.Max3PCBatchSize \
820823
and self.requestQueues[ledger_id]:
821824
key = self.requestQueues[ledger_id].pop(0)
@@ -826,6 +829,10 @@ def consume_req_queue_for_pre_prepare(self, ledger_id, view_no, pp_seq_no):
826829
else:
827830
self.logger.debug('{} found {} in its request queue but the '
828831
'corresponding request was removed'.format(self, key))
832+
duration = time.perf_counter() - start
833+
self.metrics.add_event(MetricsName.REQUEST_PROCESSING_TIME, duration)
834+
if self.isMaster:
835+
self.metrics.add_event(MetricsName.MASTER_REQUEST_PROCESSING_TIME, duration)
829836

830837
return validReqs, inValidReqs, rejects, tm
831838

@@ -1232,6 +1239,7 @@ def _apply_pre_prepare(self, pre_prepare: PrePrepare, sender: str) -> Optional[i
12321239
old_state_root,
12331240
old_txn_root))
12341241

1242+
start = time.perf_counter()
12351243
for req_key in pre_prepare.reqIdr:
12361244
req = self.requests[req_key].finalised
12371245

@@ -1240,6 +1248,10 @@ def _apply_pre_prepare(self, pre_prepare: PrePrepare, sender: str) -> Optional[i
12401248
valid_reqs,
12411249
invalid_reqs,
12421250
rejects)
1251+
duration = time.perf_counter() - start
1252+
self.metrics.add_event(MetricsName.REQUEST_PROCESSING_TIME, duration)
1253+
if self.isMaster:
1254+
self.metrics.add_event(MetricsName.MASTER_REQUEST_PROCESSING_TIME, duration)
12431255

12441256
def revert():
12451257
self.revert(pre_prepare.ledgerId,
@@ -1735,6 +1747,9 @@ def order_3pc_key(self, key):
17351747
format(self, pp.viewNo, pp.ppSeqNo, pp.ledgerId,
17361748
pp.stateRootHash, pp.txnRootHash, len(pp.reqIdr[:pp.discarded]),
17371749
len(pp.reqIdr[pp.discarded:])))
1750+
self.metrics.add_event(MetricsName.ORDERED_BATCH_SIZE, pp.discarded)
1751+
if self.isMaster:
1752+
self.metrics.add_event(MetricsName.MASTER_ORDERED_BATCH_SIZE, pp.discarded)
17381753

17391754
self.addToCheckpoint(pp.ppSeqNo, pp.digest, pp.ledgerId)
17401755

0 commit comments

Comments
 (0)