Skip to content

[INDY-1599] Add strategies for latency measurement #892

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Sep 10, 2018
79 changes: 78 additions & 1 deletion plenum/common/average_strategies.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from abc import ABC
from abc import ABC, abstractmethod, ABCMeta
from statistics import median_low, median, median_high
from typing import List

Expand All @@ -25,3 +25,80 @@ class MedianHighStrategy(AverageStrategyBase):
@staticmethod
def get_avg(metrics: List):
return median_high(metrics)


class MedianHighLatencyForAllClients(MedianHighStrategy):
def get_latency_for_clients(self, metrics):
return self.get_avg(metrics)


class LatencyMeasurement(metaclass=ABCMeta):
"""
Measure latency params
"""
@abstractmethod
def add_duration(self, identifier, duration):
pass

@abstractmethod
def get_avg_latency(self):
pass


class EMALatencyMeasurementForEachClient(LatencyMeasurement):

def __init__(self, config):
self.min_latency_count = config.MIN_LATENCY_COUNT
# map of client identifier and (total_reqs, avg_latency) tuple
self.avg_latencies = {} # type: Dict[str, (int, float)]
# This parameter defines coefficient alpha, which represents the degree of weighting decrease.
self.alpha = 1 / (self.min_latency_count + 1)
self.total_reqs = 0
self.avg_for_clients_cls = config.AvgStrategyForAllClients()

def add_duration(self, identifier, duration):
client_reqs, curr_avg_lat = self.avg_latencies.get(identifier, (0, .0))
client_reqs += 1
self.avg_latencies[identifier] = (client_reqs,
self._accumulate(curr_avg_lat,
duration))
self.total_reqs += 1

def _accumulate(self, old_accum, next_val):
"""
Implement exponential moving average
"""
return old_accum * (1 - self.alpha) + next_val * self.alpha

def get_avg_latency(self):
if self.total_reqs < self.min_latency_count:
return None
latencies = [lat[1] for _, lat in self.avg_latencies.items()]

return self.avg_for_clients_cls.get_latency_for_clients(latencies)


class EMALatencyMeasurementForAllClient(LatencyMeasurement):
def __init__(self, config):
self.min_latency_count = config.MIN_LATENCY_COUNT
# map of client identifier and (total_reqs, avg_latency) tuple
self.avg_latency = 0.0
# This parameter defines coefficient alpha, which represents the degree of weighting decrease.
self.alpha = 1 / (self.min_latency_count + 1)
self.total_reqs = 0

def add_duration(self, identifier, duration):
self.avg_latency = self._accumulate(self.avg_latency, duration)
self.total_reqs += 1

def _accumulate(self, old_accum, next_val):
"""
Implement exponential moving average
"""
return old_accum * (1 - self.alpha) + next_val * self.alpha

def get_avg_latency(self):
if self.total_reqs < self.min_latency_count:
return None

return self.avg_latency
8 changes: 6 additions & 2 deletions plenum/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
KeyValueStorageType
from plenum.common.types import PLUGIN_TYPE_STATS_CONSUMER
from plenum.common.measurements import RevivalSpikeResistantEMAThroughputMeasurement
from plenum.common.average_strategies import MedianLowStrategy, MedianHighStrategy
from plenum.common.average_strategies import MedianLowStrategy, MedianHighStrategy, MedianHighLatencyForAllClients, \
EMALatencyMeasurementForEachClient

walletsDir = 'wallets'
clientDataDir = 'data/clients'
Expand Down Expand Up @@ -137,7 +138,7 @@
LatencyGraphDuration = 240

# This parameter defines minimal count of accumulated latencies for each client
MIN_LATENCY_COUNT = 10
MIN_LATENCY_COUNT = 20

latency_averaging_strategy_class = MedianHighStrategy
throughput_averaging_strategy_class = MedianLowStrategy
Expand Down Expand Up @@ -325,6 +326,9 @@
METRICS_KV_DB_NAME = 'metrics_db'
METRICS_KV_CONFIG = rocksdb_default_config.copy()

AvgStrategyForAllClients = MedianHighLatencyForAllClients
AvgStrategyForBackups = MedianHighStrategy
LatencyMeasurementCls = EMALatencyMeasurementForEachClient

# Accumulating performance monitor controls
#
Expand Down
128 changes: 67 additions & 61 deletions plenum/server/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,43 @@
logger = getlogger()


class ThroughputMeasurement:
"""
Measure throughput params
"""

def __init__(self, window_size=15, min_cnt=16, first_ts=time.perf_counter()):
self.reqs_in_window = 0
self.throughput = 0
self.window_size = window_size
self.min_cnt = min_cnt
self.first_ts = first_ts
self.window_start_ts = self.first_ts
self.alpha = 2 / (self.min_cnt + 1)

def add_request(self, ordered_ts):
self.update_time(ordered_ts)
self.reqs_in_window += 1

def _accumulate(self, old_accum, next_val):
"""
Implement exponential moving average
"""
return old_accum * (1 - self.alpha) + next_val * self.alpha

def update_time(self, current_ts):
while current_ts >= self.window_start_ts + self.window_size:
self.throughput = self._accumulate(self.throughput, self.reqs_in_window / self.window_size)
self.window_start_ts = self.window_start_ts + self.window_size
self.reqs_in_window = 0

def get_throughput(self, request_time):
if request_time < self.first_ts + (self.window_size * self.min_cnt):
return None
self.update_time(request_time)
return self.throughput


class RequestTimeTracker:
"""
Request time tracking utility
Expand Down Expand Up @@ -230,7 +267,8 @@ def __init__(self, name: str, Delta: float, Lambda: float, Omega: float,
self.sendPeriodicStats = lambda: None
self.checkPerformance = lambda: None

self.latency_avg_strategy_cls = self.config.latency_averaging_strategy_class
self.latency_avg_for_backup_cls = self.config.latency_averaging_strategy_class
self.latency_measurement_cls = self.config.LatencyMeasurementCls
self.throughput_avg_strategy_cls = self.config.throughput_averaging_strategy_class

self.acc_monitor = None
Expand Down Expand Up @@ -263,7 +301,7 @@ def metrics(self):
("ordered request durations",
{i: r[1] for i, r in enumerate(self.numOrderedRequests)}),
("master request latencies", self.masterReqLatencies),
("client avg request latencies", {i: self.getLatencies(i)
("client avg request latencies", {i: self.getLatency(i)
for i in self.instances.ids}),
("throughput", {i: self.getThroughput(i)
for i in self.instances.ids}),
Expand Down Expand Up @@ -311,7 +349,7 @@ def reset(self):
for i in range(num_instances):
rm = self.create_throughput_measurement(self.config)
self.throughputs[i] = rm
lm = LatencyMeasurement(min_latency_count=self.config.MIN_LATENCY_COUNT)
lm = self.latency_measurement_cls(self.config)
self.clientAvgReqLatencies[i] = lm

def addInstance(self):
Expand All @@ -324,7 +362,7 @@ def addInstance(self):
rm = self.create_throughput_measurement(self.config)

self.throughputs.append(rm)
lm = LatencyMeasurement(min_latency_count=self.config.MIN_LATENCY_COUNT)
lm = self.latency_measurement_cls(self.config)
self.clientAvgReqLatencies.append(lm)
if self.acc_monitor:
self.acc_monitor.add_instance()
Expand Down Expand Up @@ -499,39 +537,27 @@ def isMasterAvgReqLatencyTooHigh(self):
Return whether the average request latency of the master instance is
greater than the acceptable threshold
"""
avgLatM = self.getLatencies(self.instances.masterId)
avgLatB = {}
for lat_item in [self.getLatencies(instId) for instId in self.instances.backupIds]:
for cid, lat in lat_item.items():
avgLatB.setdefault(cid, []).append(lat)

# If latency of the master for any client is greater than that of
# backups by more than the threshold `Omega`, then a view change
# needs to happen
for cid, latencies in avgLatB.items():
if cid not in avgLatM:
logger.trace("{} found master had no record yet for {}".
format(self, cid))
return False
if not latencies:
continue

high_avg_lat = self.latency_avg_strategy_cls.get_avg(latencies)
avg_master_lat = avgLatM[cid]
if avg_master_lat - high_avg_lat < self.Omega:
continue

d = avg_master_lat - high_avg_lat
logger.info("{}{} found difference between master's and "
"backups's avg latency {} to be higher than the "
"threshold".format(MONITORING_PREFIX, self, d))
logger.trace(
"{}'s master's avg request latency is {} and backup's "
"avg request latency is {}".format(self, avgLatM, avgLatB))
return True
logger.trace("{} found difference between master and backups "
"avg latencies to be acceptable".format(self))
return False
avgLatM = self.getLatency(self.instances.masterId)
avgLatB = []
for instId in self.instances.backupIds:
lat = self.getLatency(instId)
if lat:
avgLatB.append(lat)
if avgLatM is None or len(avgLatB) == 0:
return False

high_avg_lat = self.latency_avg_for_backup_cls.get_avg(avgLatB)
if avgLatM - high_avg_lat < self.Omega:
return False

d = avgLatM - high_avg_lat
logger.info("{}{} found difference between master's and "
"backups's avg latency {} to be higher than the "
"threshold".format(MONITORING_PREFIX, self, d))
logger.trace(
"{}'s master's avg request latency is {} and backup's "
"avg request latency is {}".format(self, avgLatM, avgLatB))
return True

def getThroughputs(self, masterInstId: int):
"""
Expand Down Expand Up @@ -595,33 +621,13 @@ def getInstanceMetrics(
else:
return None, None

def getAvgLatencyForClient(self, identifier: str, *instId: int) -> float:
"""
Calculate and return the average latency of the requests of the
client(specified by identifier) for the specified protocol instances.
"""
if len(self.clientAvgReqLatencies) == 0:
return 0
means = []
for i in instId:
avg_lat = self.clientAvgReqLatencies[i].get_avg_latency(identifier)
if avg_lat:
means.append(avg_lat)
return self.mean(means)

def getLatencies(self, instId: int) -> Dict[str, float]:
def getLatency(self, instId: int) -> float:
"""
Return a dict with client identifier as a key and calculated latency as a value
"""
if len(self.clientAvgReqLatencies) == 0:
return 0
latencies = {}
for cid in self.clientAvgReqLatencies[instId].avg_latencies.keys():
avg_lat = self.clientAvgReqLatencies[instId].get_avg_latency(cid)
if avg_lat:
latencies[cid] = avg_lat

return latencies
return 0.0
return self.clientAvgReqLatencies[instId].get_avg_latency()

def sendPeriodicStats(self):
thoughputData = self.sendThroughput()
Expand Down Expand Up @@ -706,7 +712,7 @@ def avgBackupLatency(self):
len(latencies) > 0 else 0)
self.latenciesByBackupsInLast[instId] = latencies

return self.mean(backupLatencies)
return self.latency_avg_for_backup_cls.get_avg(backupLatencies) if backupLatencies else None

def sendLatencies(self):
logger.debug("{} sending latencies".format(self))
Expand Down
21 changes: 11 additions & 10 deletions plenum/server/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -2565,17 +2565,18 @@ def checkPerformance(self) -> Optional[bool]:
if backup_throughput is not None:
self.metrics.add_event(MetricsName.BACKUP_MONITOR_AVG_THROUGHPUT, backup_throughput)

master_latencies = self.monitor.getLatencies(self.instances.masterId).values()
if len(master_latencies) > 0:
self.metrics.add_event(MetricsName.MONITOR_AVG_LATENCY, mean(master_latencies))

backup_latencies = {}
for lat_item in [self.monitor.getLatencies(instId) for instId in self.instances.backupIds]:
for cid, lat in lat_item.items():
backup_latencies.setdefault(cid, []).append(lat)
backup_latencies = [mean(lat) for cid, lat in backup_latencies.items()]
master_latency = self.monitor.getLatency(self.instances.masterId)
if master_latency:
self.metrics.add_event(MetricsName.MONITOR_AVG_LATENCY, master_latency)

backup_latencies = []
for instId in self.instances.backupIds:
lat = self.monitor.getLatency(instId)
if lat:
backup_latencies.append(lat)
if len(backup_latencies) > 0:
self.metrics.add_event(MetricsName.BACKUP_MONITOR_AVG_LATENCY, mean(backup_latencies))
self.metrics.add_event(MetricsName.BACKUP_MONITOR_AVG_LATENCY,
self.monitor.latency_avg_for_backup_cls.get_avg(backup_latencies))

if self.monitor.isMasterDegraded():
logger.display('{} master instance performance degraded'.format(self))
Expand Down
Loading