Skip to content

[INDY-1599] Add strategies for latency measurement #892

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Sep 10, 2018
8 changes: 7 additions & 1 deletion plenum/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from plenum.common.constants import ClientBootStrategy, HS_FILE, HS_LEVELDB, \
HS_ROCKSDB, HS_MEMORY, KeyValueStorageType
from plenum.common.types import PLUGIN_TYPE_STATS_CONSUMER
from plenum.server.monitor import MedianHighLatencyForAllClients, MedianHighStrategy, \
EMALatencyMeasurementForEachClient

walletsDir = 'wallets'
clientDataDir = 'data/clients'
Expand Down Expand Up @@ -138,7 +140,7 @@
LatencyGraphDuration = 240

# This parameter defines minimal count of accumulated latencies for each client
MIN_LATENCY_COUNT = 10
MIN_LATENCY_COUNT = 20

# Two following parameters define collecting statistic timeout for
# collecting ordered request and throughput evaluating them.
Expand Down Expand Up @@ -322,3 +324,7 @@
METRICS_KV_STORAGE = KeyValueStorageType.Rocksdb
METRICS_KV_DB_NAME = 'metrics_db'
METRICS_KV_CONFIG = rocksdb_default_config.copy()

AvgStrategyForAllClients = MedianHighLatencyForAllClients
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not MedianHighStrategy directly?

AvgStrategyForBackups = MedianHighStrategy
LatencyMeasurementCls = EMALatencyMeasurementForEachClient
161 changes: 89 additions & 72 deletions plenum/server/monitor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import time
from abc import ABCMeta
from abc import ABCMeta, abstractmethod
from datetime import datetime
from statistics import mean, median_low, median, median_high
from typing import Dict, Iterable, Optional
Expand Down Expand Up @@ -88,39 +88,87 @@ def get_throughput(self, request_time):
return self.throughput


class LatencyMeasurement:
class LatencyMeasurement(metaclass=ABCMeta):
"""
Measure latency params
"""
@abstractmethod
def add_duration(self, identifier, duration):
pass

@abstractmethod
def get_avg_latency(self):
pass


class EMALatencyMeasurementForEachClient(LatencyMeasurement):

def __init__(self, min_latency_count=10):
self.min_latency_count = min_latency_count
def __init__(self, config):
self.min_latency_count = config.MIN_LATENCY_COUNT
# map of client identifier and (total_reqs, avg_latency) tuple
self.avg_latencies = {} # type: Dict[str, (int, float)]
# This parameter defines coefficient alpha, which represents the degree of weighting decrease.
self.alpha = 1 / (self.min_latency_count + 1)
self.total_reqs = 0
self.avg_for_clients_cls = config.AvgStrategyForAllClients()

def add_duration(self, identifier, duration):
total_reqs, curr_avg_lat = self.avg_latencies.get(identifier, (0, .0))
total_reqs += 1
self.avg_latencies[identifier] = (total_reqs,
client_reqs, curr_avg_lat = self.avg_latencies.get(identifier, (0, .0))
client_reqs += 1
self.avg_latencies[identifier] = (client_reqs,
self._accumulate(curr_avg_lat,
duration))
self.total_reqs += 1

def _accumulate(self, old_accum, next_val):
"""
Implement exponential moving average
"""
return old_accum * (1 - self.alpha) + next_val * self.alpha

def get_avg_latency(self, identifier):
if identifier not in self.avg_latencies:
def get_avg_latency(self):
if self.total_reqs < self.min_latency_count:
return None
total_reqs, curr_avg_lat = self.avg_latencies[identifier]
if total_reqs < self.min_latency_count:
latencies = [lat[1] for _, lat in self.avg_latencies.items()]

return self.avg_for_clients_cls.get_latency_for_clients(latencies)


class EMALatencyMeasurementForAllClient(LatencyMeasurement):
def __init__(self, config):
self.min_latency_count = config.MIN_LATENCY_COUNT
# map of client identifier and (total_reqs, avg_latency) tuple
self.avg_latency = 0.0
# This parameter defines coefficient alpha, which represents the degree of weighting decrease.
self.alpha = 1 / (self.min_latency_count + 1)
self.total_reqs = 0

def add_duration(self, identifier, duration):
self.avg_latency = self._accumulate(self.avg_latency, duration)
self.total_reqs += 1

def _accumulate(self, old_accum, next_val):
"""
Implement exponential moving average
"""
return old_accum * (1 - self.alpha) + next_val * self.alpha

def get_avg_latency(self):
if self.total_reqs < self.min_latency_count:
return None

return curr_avg_lat
return self.avg_latency


class ClientsLatencyForAll(metaclass=ABCMeta):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this class?

@abstractmethod
def get_latency_for_clients(self, metrics):
pass


class MedianHighLatencyForAllClients(MedianHighStrategy):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this class? Why don't use MedianHighStrategy strategy?

def get_latency_for_clients(self, metrics):
return self.get_avg(metrics)


class RequestTimeTracker:
Expand Down Expand Up @@ -321,7 +369,8 @@ def __init__(self, name: str, Delta: float, Lambda: float, Omega: float,
self.sendPeriodicStats = lambda: None
self.checkPerformance = lambda: None

self.latency_avg_strategy_cls = MedianHighStrategy
self.latency_avg_for_backup_cls = self.config.AvgStrategyForBackups
self.latency_measurement_cls = self.config.LatencyMeasurementCls
self.throughput_avg_strategy_cls = MedianLowStrategy

def __repr__(self):
Expand All @@ -344,7 +393,7 @@ def metrics(self):
("ordered request durations",
{i: r[1] for i, r in enumerate(self.numOrderedRequests)}),
("master request latencies", self.masterReqLatencies),
("client avg request latencies", {i: self.getLatencies(i)
("client avg request latencies", {i: self.getLatency(i)
for i in self.instances.ids}),
("throughput", {i: self.getThroughput(i)
for i in self.instances.ids}),
Expand Down Expand Up @@ -385,7 +434,7 @@ def reset(self):
min_cnt=self.config.ThroughputMinActivityThreshold,
first_ts=time.perf_counter())
self.throughputs[i] = rm
lm = LatencyMeasurement(min_latency_count=self.config.MIN_LATENCY_COUNT)
lm = self.latency_measurement_cls(self.config)
self.clientAvgReqLatencies[i] = lm

def addInstance(self):
Expand All @@ -400,7 +449,7 @@ def addInstance(self):
first_ts=time.perf_counter())

self.throughputs.append(rm)
lm = LatencyMeasurement(min_latency_count=self.config.MIN_LATENCY_COUNT)
lm = self.latency_measurement_cls(self.config)
self.clientAvgReqLatencies.append(lm)

def removeInstance(self, index=None):
Expand Down Expand Up @@ -556,39 +605,27 @@ def isMasterAvgReqLatencyTooHigh(self):
Return whether the average request latency of the master instance is
greater than the acceptable threshold
"""
avgLatM = self.getLatencies(self.instances.masterId)
avgLatB = {}
for lat_item in [self.getLatencies(instId) for instId in self.instances.backupIds]:
for cid, lat in lat_item.items():
avgLatB.setdefault(cid, []).append(lat)

# If latency of the master for any client is greater than that of
# backups by more than the threshold `Omega`, then a view change
# needs to happen
for cid, latencies in avgLatB.items():
if cid not in avgLatM:
logger.trace("{} found master had no record yet for {}".
format(self, cid))
return False
if not latencies:
continue

high_avg_lat = self.latency_avg_strategy_cls.get_avg(latencies)
avg_master_lat = avgLatM[cid]
if avg_master_lat - high_avg_lat < self.Omega:
continue

d = avg_master_lat - high_avg_lat
logger.info("{}{} found difference between master's and "
"backups's avg latency {} to be higher than the "
"threshold".format(MONITORING_PREFIX, self, d))
logger.trace(
"{}'s master's avg request latency is {} and backup's "
"avg request latency is {}".format(self, avgLatM, avgLatB))
return True
logger.trace("{} found difference between master and backups "
"avg latencies to be acceptable".format(self))
return False
avgLatM = self.getLatency(self.instances.masterId)
avgLatB = []
for instId in self.instances.backupIds:
lat = self.getLatency(instId)
if lat:
avgLatB.append(lat)
if avgLatM is None or len(avgLatB) == 0:
return False

high_avg_lat = self.latency_avg_for_backup_cls.get_avg(avgLatB)
if avgLatM - high_avg_lat < self.Omega:
return False

d = avgLatM - high_avg_lat
logger.info("{}{} found difference between master's and "
"backups's avg latency {} to be higher than the "
"threshold".format(MONITORING_PREFIX, self, d))
logger.trace(
"{}'s master's avg request latency is {} and backup's "
"avg request latency is {}".format(self, avgLatM, avgLatB))
return True

def getThroughputs(self, masterInstId: int):
"""
Expand Down Expand Up @@ -652,33 +689,13 @@ def getInstanceMetrics(
else:
return None, None

def getAvgLatencyForClient(self, identifier: str, *instId: int) -> float:
"""
Calculate and return the average latency of the requests of the
client(specified by identifier) for the specified protocol instances.
"""
if len(self.clientAvgReqLatencies) == 0:
return 0
means = []
for i in instId:
avg_lat = self.clientAvgReqLatencies[i].get_avg_latency(identifier)
if avg_lat:
means.append(avg_lat)
return self.mean(means)

def getLatencies(self, instId: int) -> Dict[str, float]:
def getLatency(self, instId: int) -> float:
"""
Return a dict with client identifier as a key and calculated latency as a value
"""
if len(self.clientAvgReqLatencies) == 0:
return 0
latencies = {}
for cid in self.clientAvgReqLatencies[instId].avg_latencies.keys():
avg_lat = self.clientAvgReqLatencies[instId].get_avg_latency(cid)
if avg_lat:
latencies[cid] = avg_lat

return latencies
return 0.0
return self.clientAvgReqLatencies[instId].get_avg_latency()

def sendPeriodicStats(self):
thoughputData = self.sendThroughput()
Expand Down
17 changes: 8 additions & 9 deletions plenum/server/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -2561,15 +2561,14 @@ def checkPerformance(self) -> Optional[bool]:
if backup_throughput is not None:
self.metrics.add_event(MetricsName.BACKUP_MONITOR_AVG_THROUGHPUT, backup_throughput)

master_latencies = self.monitor.getLatencies(self.instances.masterId).values()
if len(master_latencies) > 0:
self.metrics.add_event(MetricsName.MONITOR_AVG_LATENCY, mean(master_latencies))

backup_latencies = {}
for lat_item in [self.monitor.getLatencies(instId) for instId in self.instances.backupIds]:
for cid, lat in lat_item.items():
backup_latencies.setdefault(cid, []).append(lat)
backup_latencies = [mean(lat) for cid, lat in backup_latencies.items()]
master_latency = self.monitor.getLatency(self.instances.masterId)
self.metrics.add_event(MetricsName.MONITOR_AVG_LATENCY, master_latency)

backup_latencies = []
for instId in self.instances.backupIds:
lat = self.monitor.getLatency(instId)
if lat:
backup_latencies.append(lat)
if len(backup_latencies) > 0:
self.metrics.add_event(MetricsName.BACKUP_MONITOR_AVG_LATENCY, mean(backup_latencies))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use latency_avg_for_backup_cls here


Expand Down
77 changes: 77 additions & 0 deletions plenum/test/monitoring/test_EMALatencyMeasurementForAllClient.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import copy

import pytest

from plenum.server.monitor import EMALatencyMeasurementForAllClient, EMALatencyMeasurementForEachClient
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file name is test_EMALatencyMeasurementForAllClient.py while we are testing EMALatencyMeasurementForEachClient.
Can we test both BTW?



@pytest.fixture(scope='function', params=[EMALatencyMeasurementForEachClient])
def latency_instance(tconf, request):
lm = request.param(tconf)
return lm


def fill_durations(li, client):
for idf, durations in client.items():
durations = client[idf]
for d in durations:
li.add_duration(idf, d)


def test_spike_from_one_client_independ_of_order(latency_instance, tconf):
liM = latency_instance
liB = copy.deepcopy(latency_instance)
master_lat = 100
num_reqs = 100
client1 = {"first_client": [master_lat] * num_reqs}
client2 = {"second_client": [master_lat / 10] * (int(num_reqs / 10))}
fill_durations(liM, client1)

fill_durations(liB, client1)
fill_durations(liB, client2)

assert liM.get_avg_latency() - liB.get_avg_latency() < tconf.OMEGA


def test_spike_from_one_request_from_one_client(latency_instance, tconf):
liM = latency_instance
liB = copy.deepcopy(latency_instance)
master_lat = 100
num_reqs = 100
client1 = {"first_client": [master_lat] * num_reqs}
client2 = {"second_client": [master_lat / 10]}
fill_durations(liM, client1)

fill_durations(liB, client1)
fill_durations(liB, client2)
assert liM.get_avg_latency() - liB.get_avg_latency() < tconf.OMEGA


def test_spike_from_one_request_on_one_client(latency_instance, tconf):
liM = latency_instance
liB = copy.deepcopy(latency_instance)
master_lat = 100
num_reqs = 100
clientM = {"master_client": [master_lat] * num_reqs}
clientB = {"backup_client": [master_lat] * num_reqs + [master_lat / 10]}
fill_durations(liM, clientM)
fill_durations(liB, clientB)

assert liM.get_avg_latency() - liB.get_avg_latency() < tconf.OMEGA


def test_spike_on_two_clients_on_backup(latency_instance, tconf):
liM = latency_instance
liB = copy.deepcopy(latency_instance)
master_lat = 100
num_reqs = 100
clientM1 = {"master_client1": [master_lat] * num_reqs}
clientM2 = {"master_client2": [master_lat / 10] * num_reqs}
clientB1 = {"backup_client1": [master_lat] * num_reqs + [master_lat / 10]}
clientB2 = {"backup_client2": [master_lat / 10] * num_reqs + [master_lat]}
fill_durations(liM, clientM1)
fill_durations(liM, clientM2)
fill_durations(liB, clientB1)
fill_durations(liB, clientB2)

assert liM.get_avg_latency() - liB.get_avg_latency() < tconf.OMEGA
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add a test where liM.get_avg_latency() - liB.get_avg_latency() >= tconf.OMEGA

2 changes: 1 addition & 1 deletion plenum/test/monitoring/test_avg_latency.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def tconf(tconf):
yield tconf
tconf.MIN_LATENCY_COUNT = old_min_cnt


@pytest.mark.skip(reason="Not used now")
def testAvgReqLatency(looper, tconf, txnPoolNodeSet, sdk_wallet_client, sdk_pool_handle):
"""
Checking if average latency is being set
Expand Down
Loading