-
Notifications
You must be signed in to change notification settings - Fork 377
[INDY-1599] Add strategies for latency measurement #892
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
c33f48d
e3863a8
f167286
b5d8474
e92787c
1d2b143
7d793f2
1a612dd
ab416a1
da0e63a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
import time | ||
from abc import ABCMeta | ||
from abc import ABCMeta, abstractmethod | ||
from datetime import datetime | ||
from statistics import mean, median_low, median, median_high | ||
from typing import Dict, Iterable, Optional | ||
|
@@ -88,39 +88,87 @@ def get_throughput(self, request_time): | |
return self.throughput | ||
|
||
|
||
class LatencyMeasurement: | ||
class LatencyMeasurement(metaclass=ABCMeta): | ||
""" | ||
Measure latency params | ||
""" | ||
@abstractmethod | ||
def add_duration(self, identifier, duration): | ||
pass | ||
|
||
@abstractmethod | ||
def get_avg_latency(self): | ||
pass | ||
|
||
|
||
class EMALatencyMeasurementForEachClient(LatencyMeasurement): | ||
|
||
def __init__(self, min_latency_count=10): | ||
self.min_latency_count = min_latency_count | ||
def __init__(self, config): | ||
self.min_latency_count = config.MIN_LATENCY_COUNT | ||
# map of client identifier and (total_reqs, avg_latency) tuple | ||
self.avg_latencies = {} # type: Dict[str, (int, float)] | ||
# This parameter defines coefficient alpha, which represents the degree of weighting decrease. | ||
self.alpha = 1 / (self.min_latency_count + 1) | ||
self.total_reqs = 0 | ||
self.avg_for_clients_cls = config.AvgStrategyForAllClients() | ||
|
||
def add_duration(self, identifier, duration): | ||
total_reqs, curr_avg_lat = self.avg_latencies.get(identifier, (0, .0)) | ||
total_reqs += 1 | ||
self.avg_latencies[identifier] = (total_reqs, | ||
client_reqs, curr_avg_lat = self.avg_latencies.get(identifier, (0, .0)) | ||
client_reqs += 1 | ||
self.avg_latencies[identifier] = (client_reqs, | ||
self._accumulate(curr_avg_lat, | ||
duration)) | ||
self.total_reqs += 1 | ||
|
||
def _accumulate(self, old_accum, next_val): | ||
""" | ||
Implement exponential moving average | ||
""" | ||
return old_accum * (1 - self.alpha) + next_val * self.alpha | ||
|
||
def get_avg_latency(self, identifier): | ||
if identifier not in self.avg_latencies: | ||
def get_avg_latency(self): | ||
if self.total_reqs < self.min_latency_count: | ||
return None | ||
total_reqs, curr_avg_lat = self.avg_latencies[identifier] | ||
if total_reqs < self.min_latency_count: | ||
latencies = [lat[1] for _, lat in self.avg_latencies.items()] | ||
|
||
return self.avg_for_clients_cls.get_latency_for_clients(latencies) | ||
|
||
|
||
class EMALatencyMeasurementForAllClient(LatencyMeasurement): | ||
def __init__(self, config): | ||
self.min_latency_count = config.MIN_LATENCY_COUNT | ||
# map of client identifier and (total_reqs, avg_latency) tuple | ||
self.avg_latency = 0.0 | ||
# This parameter defines coefficient alpha, which represents the degree of weighting decrease. | ||
self.alpha = 1 / (self.min_latency_count + 1) | ||
self.total_reqs = 0 | ||
|
||
def add_duration(self, identifier, duration): | ||
self.avg_latency = self._accumulate(self.avg_latency, duration) | ||
self.total_reqs += 1 | ||
|
||
def _accumulate(self, old_accum, next_val): | ||
""" | ||
Implement exponential moving average | ||
""" | ||
return old_accum * (1 - self.alpha) + next_val * self.alpha | ||
|
||
def get_avg_latency(self): | ||
if self.total_reqs < self.min_latency_count: | ||
return None | ||
|
||
return curr_avg_lat | ||
return self.avg_latency | ||
|
||
|
||
class ClientsLatencyForAll(metaclass=ABCMeta): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need this class? |
||
@abstractmethod | ||
def get_latency_for_clients(self, metrics): | ||
pass | ||
|
||
|
||
class MedianHighLatencyForAllClients(MedianHighStrategy): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need this class? Why don't use MedianHighStrategy strategy? |
||
def get_latency_for_clients(self, metrics): | ||
return self.get_avg(metrics) | ||
|
||
|
||
class RequestTimeTracker: | ||
|
@@ -321,7 +369,8 @@ def __init__(self, name: str, Delta: float, Lambda: float, Omega: float, | |
self.sendPeriodicStats = lambda: None | ||
self.checkPerformance = lambda: None | ||
|
||
self.latency_avg_strategy_cls = MedianHighStrategy | ||
self.latency_avg_for_backup_cls = self.config.AvgStrategyForBackups | ||
self.latency_measurement_cls = self.config.LatencyMeasurementCls | ||
self.throughput_avg_strategy_cls = MedianLowStrategy | ||
|
||
def __repr__(self): | ||
|
@@ -344,7 +393,7 @@ def metrics(self): | |
("ordered request durations", | ||
{i: r[1] for i, r in enumerate(self.numOrderedRequests)}), | ||
("master request latencies", self.masterReqLatencies), | ||
("client avg request latencies", {i: self.getLatencies(i) | ||
("client avg request latencies", {i: self.getLatency(i) | ||
for i in self.instances.ids}), | ||
("throughput", {i: self.getThroughput(i) | ||
for i in self.instances.ids}), | ||
|
@@ -385,7 +434,7 @@ def reset(self): | |
min_cnt=self.config.ThroughputMinActivityThreshold, | ||
first_ts=time.perf_counter()) | ||
self.throughputs[i] = rm | ||
lm = LatencyMeasurement(min_latency_count=self.config.MIN_LATENCY_COUNT) | ||
lm = self.latency_measurement_cls(self.config) | ||
self.clientAvgReqLatencies[i] = lm | ||
|
||
def addInstance(self): | ||
|
@@ -400,7 +449,7 @@ def addInstance(self): | |
first_ts=time.perf_counter()) | ||
|
||
self.throughputs.append(rm) | ||
lm = LatencyMeasurement(min_latency_count=self.config.MIN_LATENCY_COUNT) | ||
lm = self.latency_measurement_cls(self.config) | ||
self.clientAvgReqLatencies.append(lm) | ||
|
||
def removeInstance(self, index=None): | ||
|
@@ -556,39 +605,27 @@ def isMasterAvgReqLatencyTooHigh(self): | |
Return whether the average request latency of the master instance is | ||
greater than the acceptable threshold | ||
""" | ||
avgLatM = self.getLatencies(self.instances.masterId) | ||
avgLatB = {} | ||
for lat_item in [self.getLatencies(instId) for instId in self.instances.backupIds]: | ||
for cid, lat in lat_item.items(): | ||
avgLatB.setdefault(cid, []).append(lat) | ||
|
||
# If latency of the master for any client is greater than that of | ||
# backups by more than the threshold `Omega`, then a view change | ||
# needs to happen | ||
for cid, latencies in avgLatB.items(): | ||
if cid not in avgLatM: | ||
logger.trace("{} found master had no record yet for {}". | ||
format(self, cid)) | ||
return False | ||
if not latencies: | ||
continue | ||
|
||
high_avg_lat = self.latency_avg_strategy_cls.get_avg(latencies) | ||
avg_master_lat = avgLatM[cid] | ||
if avg_master_lat - high_avg_lat < self.Omega: | ||
continue | ||
|
||
d = avg_master_lat - high_avg_lat | ||
logger.info("{}{} found difference between master's and " | ||
"backups's avg latency {} to be higher than the " | ||
"threshold".format(MONITORING_PREFIX, self, d)) | ||
logger.trace( | ||
"{}'s master's avg request latency is {} and backup's " | ||
"avg request latency is {}".format(self, avgLatM, avgLatB)) | ||
return True | ||
logger.trace("{} found difference between master and backups " | ||
"avg latencies to be acceptable".format(self)) | ||
return False | ||
avgLatM = self.getLatency(self.instances.masterId) | ||
avgLatB = [] | ||
for instId in self.instances.backupIds: | ||
lat = self.getLatency(instId) | ||
if lat: | ||
avgLatB.append(lat) | ||
if avgLatM is None or len(avgLatB) == 0: | ||
return False | ||
|
||
high_avg_lat = self.latency_avg_for_backup_cls.get_avg(avgLatB) | ||
if avgLatM - high_avg_lat < self.Omega: | ||
return False | ||
|
||
d = avgLatM - high_avg_lat | ||
logger.info("{}{} found difference between master's and " | ||
"backups's avg latency {} to be higher than the " | ||
"threshold".format(MONITORING_PREFIX, self, d)) | ||
logger.trace( | ||
"{}'s master's avg request latency is {} and backup's " | ||
"avg request latency is {}".format(self, avgLatM, avgLatB)) | ||
return True | ||
|
||
def getThroughputs(self, masterInstId: int): | ||
""" | ||
|
@@ -652,33 +689,13 @@ def getInstanceMetrics( | |
else: | ||
return None, None | ||
|
||
def getAvgLatencyForClient(self, identifier: str, *instId: int) -> float: | ||
""" | ||
Calculate and return the average latency of the requests of the | ||
client(specified by identifier) for the specified protocol instances. | ||
""" | ||
if len(self.clientAvgReqLatencies) == 0: | ||
return 0 | ||
means = [] | ||
for i in instId: | ||
avg_lat = self.clientAvgReqLatencies[i].get_avg_latency(identifier) | ||
if avg_lat: | ||
means.append(avg_lat) | ||
return self.mean(means) | ||
|
||
def getLatencies(self, instId: int) -> Dict[str, float]: | ||
def getLatency(self, instId: int) -> float: | ||
""" | ||
Return a dict with client identifier as a key and calculated latency as a value | ||
""" | ||
if len(self.clientAvgReqLatencies) == 0: | ||
return 0 | ||
latencies = {} | ||
for cid in self.clientAvgReqLatencies[instId].avg_latencies.keys(): | ||
avg_lat = self.clientAvgReqLatencies[instId].get_avg_latency(cid) | ||
if avg_lat: | ||
latencies[cid] = avg_lat | ||
|
||
return latencies | ||
return 0.0 | ||
return self.clientAvgReqLatencies[instId].get_avg_latency() | ||
|
||
def sendPeriodicStats(self): | ||
thoughputData = self.sendThroughput() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2561,15 +2561,14 @@ def checkPerformance(self) -> Optional[bool]: | |
if backup_throughput is not None: | ||
self.metrics.add_event(MetricsName.BACKUP_MONITOR_AVG_THROUGHPUT, backup_throughput) | ||
|
||
master_latencies = self.monitor.getLatencies(self.instances.masterId).values() | ||
if len(master_latencies) > 0: | ||
self.metrics.add_event(MetricsName.MONITOR_AVG_LATENCY, mean(master_latencies)) | ||
|
||
backup_latencies = {} | ||
for lat_item in [self.monitor.getLatencies(instId) for instId in self.instances.backupIds]: | ||
for cid, lat in lat_item.items(): | ||
backup_latencies.setdefault(cid, []).append(lat) | ||
backup_latencies = [mean(lat) for cid, lat in backup_latencies.items()] | ||
master_latency = self.monitor.getLatency(self.instances.masterId) | ||
self.metrics.add_event(MetricsName.MONITOR_AVG_LATENCY, master_latency) | ||
|
||
backup_latencies = [] | ||
for instId in self.instances.backupIds: | ||
lat = self.monitor.getLatency(instId) | ||
if lat: | ||
backup_latencies.append(lat) | ||
if len(backup_latencies) > 0: | ||
self.metrics.add_event(MetricsName.BACKUP_MONITOR_AVG_LATENCY, mean(backup_latencies)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use |
||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
import copy | ||
|
||
import pytest | ||
|
||
from plenum.server.monitor import EMALatencyMeasurementForAllClient, EMALatencyMeasurementForEachClient | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The file name is |
||
|
||
|
||
@pytest.fixture(scope='function', params=[EMALatencyMeasurementForEachClient]) | ||
def latency_instance(tconf, request): | ||
lm = request.param(tconf) | ||
return lm | ||
|
||
|
||
def fill_durations(li, client): | ||
for idf, durations in client.items(): | ||
durations = client[idf] | ||
for d in durations: | ||
li.add_duration(idf, d) | ||
|
||
|
||
def test_spike_from_one_client_independ_of_order(latency_instance, tconf): | ||
liM = latency_instance | ||
liB = copy.deepcopy(latency_instance) | ||
master_lat = 100 | ||
num_reqs = 100 | ||
client1 = {"first_client": [master_lat] * num_reqs} | ||
client2 = {"second_client": [master_lat / 10] * (int(num_reqs / 10))} | ||
fill_durations(liM, client1) | ||
|
||
fill_durations(liB, client1) | ||
fill_durations(liB, client2) | ||
|
||
assert liM.get_avg_latency() - liB.get_avg_latency() < tconf.OMEGA | ||
|
||
|
||
def test_spike_from_one_request_from_one_client(latency_instance, tconf): | ||
liM = latency_instance | ||
liB = copy.deepcopy(latency_instance) | ||
master_lat = 100 | ||
num_reqs = 100 | ||
client1 = {"first_client": [master_lat] * num_reqs} | ||
client2 = {"second_client": [master_lat / 10]} | ||
fill_durations(liM, client1) | ||
|
||
fill_durations(liB, client1) | ||
fill_durations(liB, client2) | ||
assert liM.get_avg_latency() - liB.get_avg_latency() < tconf.OMEGA | ||
|
||
|
||
def test_spike_from_one_request_on_one_client(latency_instance, tconf): | ||
liM = latency_instance | ||
liB = copy.deepcopy(latency_instance) | ||
master_lat = 100 | ||
num_reqs = 100 | ||
clientM = {"master_client": [master_lat] * num_reqs} | ||
clientB = {"backup_client": [master_lat] * num_reqs + [master_lat / 10]} | ||
fill_durations(liM, clientM) | ||
fill_durations(liB, clientB) | ||
|
||
assert liM.get_avg_latency() - liB.get_avg_latency() < tconf.OMEGA | ||
|
||
|
||
def test_spike_on_two_clients_on_backup(latency_instance, tconf): | ||
liM = latency_instance | ||
liB = copy.deepcopy(latency_instance) | ||
master_lat = 100 | ||
num_reqs = 100 | ||
clientM1 = {"master_client1": [master_lat] * num_reqs} | ||
clientM2 = {"master_client2": [master_lat / 10] * num_reqs} | ||
clientB1 = {"backup_client1": [master_lat] * num_reqs + [master_lat / 10]} | ||
clientB2 = {"backup_client2": [master_lat / 10] * num_reqs + [master_lat]} | ||
fill_durations(liM, clientM1) | ||
fill_durations(liM, clientM2) | ||
fill_durations(liB, clientB1) | ||
fill_durations(liB, clientB2) | ||
|
||
assert liM.get_avg_latency() - liB.get_avg_latency() < tconf.OMEGA | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we also add a test where liM.get_avg_latency() - liB.get_avg_latency() >= tconf.OMEGA |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not MedianHighStrategy directly?