Skip to content

INDY-2223: Improve simulation tests #1333

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion plenum/common/messages/internal_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
('stash_data', Optional[tuple])])

# by default view_no for StartViewChange is None meaning that we move to the next view
NeedViewChange = NamedTuple('StartViewChange',
NeedViewChange = NamedTuple('NeedViewChange',
[('view_no', int)])
NeedViewChange.__new__.__defaults__ = (None,) * len(NeedViewChange._fields)

Expand Down
1 change: 1 addition & 0 deletions plenum/server/consensus/ordering_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def __init__(self,
self._network = network
self._write_manager = write_manager
self._name = self._data.name
# TODO: We shouldn't use get_utc_epoch here, time needs to be under full control through TimerService
self.get_time_for_3pc_batch = get_time_for_3pc_batch or get_utc_epoch
# Flag which node set, when it have set new primaries and need to send batch
self.primaries_batch_needed = False
Expand Down
13 changes: 13 additions & 0 deletions plenum/server/consensus/view_change_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ def view_change_votes(self):
return self._data.view_change_votes

def process_need_view_change(self, msg: NeedViewChange):
self._logger.info("{} processing {}".format(self, msg))

# 1. calculate new viewno
view_no = msg.view_no
if view_no is None:
Expand All @@ -77,6 +79,7 @@ def process_need_view_change(self, msg: NeedViewChange):
vc = self._build_view_change_msg()

# 5. Send ViewChangeStarted via internal bus to update other services
self._logger.info("{} sending {}".format(self, vc))
self._bus.send(ViewChangeStarted(view_no=self._data.view_no))

# 6. Send ViewChange msg to other nodes (via external bus)
Expand Down Expand Up @@ -128,6 +131,8 @@ def _build_view_change_msg(self):
)

def process_view_change_message(self, msg: ViewChange, frm: str):
self._logger.info("{} processing {} from {}".format(self, msg, frm))

result = self._validate(msg, frm)
if result != PROCESS:
return result, None
Expand All @@ -144,12 +149,15 @@ def process_view_change_message(self, msg: ViewChange, frm: str):
digest=view_change_digest(msg)
)
primary_node_name = getNodeName(self._data.primary_name)
self._logger.info("{} sending {}".format(self, vca))
self._network.send(vca, [primary_node_name])

self._finish_view_change_if_needed()
return PROCESS, None

def process_view_change_ack_message(self, msg: ViewChangeAck, frm: str):
self._logger.info("{} processing {} from {}".format(self, msg, frm))

result = self._validate(msg, frm)
if result != PROCESS:
return result, None
Expand All @@ -162,6 +170,8 @@ def process_view_change_ack_message(self, msg: ViewChangeAck, frm: str):
return PROCESS, None

def process_new_view_message(self, msg: NewView, frm: str):
self._logger.info("{} processing {} from {}".format(self, msg, frm))

result = self._validate(msg, frm)
if result != PROCESS:
return result, None
Expand All @@ -181,6 +191,8 @@ def process_new_view_message(self, msg: NewView, frm: str):
return PROCESS, None

def process_new_view_accepted(self, msg: NewViewAccepted):
self._logger.info("{} processing {}".format(self, msg))

self._data.prev_view_prepare_cert = msg.batches[-1].pp_seq_no if msg.batches else None

def _validate(self, msg: Union[ViewChange, ViewChangeAck, NewView], frm: str) -> int:
Expand Down Expand Up @@ -217,6 +229,7 @@ def _send_new_view_if_needed(self):
checkpoint=cp,
batches=batches
)
self._logger.info("{} sending {}".format(self, nv))
self._network.send(nv)
self._new_view = nv
self._finish_view_change()
Expand Down
1 change: 1 addition & 0 deletions plenum/test/consensus/order_service/sim_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def setup_pool(random, req_count):
pool = create_pool(random)

for node in pool.nodes:
# TODO: This propagates to global config and sometimes breaks other tests
node._orderer._config.Max3PCBatchSize = MAX_BATCH_SIZE
node._orderer._config.CHK_FREQ = 5
node._orderer._config.LOG_SIZE = 3 * node._orderer._config.CHK_FREQ
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ def test_view_change_while_ordering_with_real_msgs(seed):
partial(node._view_changer.process_need_view_change, NeedViewChange(view_no=1)))
# 3. Make sure that view_change is completed
for node in pool.nodes:
pool.timer.wait_for(lambda: node._view_changer._data.view_no == 1, timeout=20000)
pool.timer.wait_for(lambda: node._view_changer._data.view_no == 1)

# 3. Make sure all nodes ordered all the requests
for node in pool.nodes:
pool.timer.wait_for(partial(check_batch_count, node, batches_count), timeout=20000)
pool.timer.wait_for(partial(check_batch_count, node, batches_count))

# 4. Check data consistency
check_consistency(pool)
6 changes: 4 additions & 2 deletions plenum/test/consensus/view_change/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def some_checkpoint(random: SimRandom, view_no: int, pp_seq_no: int) -> Checkpoi
def some_pool(random: SimRandom) -> (SimPool, List):
pool_size = random.integer(4, 8)
pool = SimPool(pool_size, random)
log_size = pool.nodes[0].config.LOG_SIZE

# Create simulated history
# TODO: Move into helper?
Expand All @@ -41,8 +42,9 @@ def some_pool(random: SimRandom) -> (SimPool, List):

# Initialize consensus data
for i, node in enumerate(pool.nodes):
node._data.preprepared = batches[:pp_count[i]]
node._data.prepared = batches[:p_count[i]]
high_watermark = stable_cp[i] + log_size
node._data.preprepared = batches[:min(high_watermark, pp_count[i])]
node._data.prepared = batches[:min(high_watermark, p_count[i])]
node._data.checkpoints.update(checkpoints[:cp_count[i]])
node._data.stable_checkpoint = stable_cp[i]

Expand Down
7 changes: 2 additions & 5 deletions plenum/test/consensus/view_change/test_sim_view_change.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ def check_view_change_completes_under_normal_conditions(random: SimRandom):
# Make sure all nodes complete view change
pool.timer.wait_for(lambda: all(not node._data.waiting_for_new_view
and node._data.view_no > 0
for node in pool.nodes),
timeout=5 * 30 * 1000) # 5 NEW_VIEW_TIMEOUT intervals
for node in pool.nodes))

# Make sure all nodes end up in same state
for node_a, node_b in zip(pool.nodes, pool.nodes[1:]):
Expand Down Expand Up @@ -58,9 +57,7 @@ def calc_committed(view_changes):
return committed


# Increased count from 200 to 150 because of jenkin's failures.
# After integration, need to get it back
@pytest.mark.parametrize("seed", range(150))
@pytest.mark.parametrize("seed", range(200))
def test_view_change_completes_under_normal_conditions(seed):
random = DefaultSimRandom(seed)
check_view_change_completes_under_normal_conditions(random)
Expand Down
13 changes: 9 additions & 4 deletions plenum/test/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -1402,19 +1402,24 @@ def run_for(self, seconds):
"""
self.advance_until(self._ts.value + seconds)

def wait_for(self, condition: Callable[[], bool], timeout: Optional = None):
def wait_for(self, condition: Callable[[], bool], timeout: Optional = None, max_iterations: int = 500000):
"""
Advance time in steps until condition is reached, running scheduled callbacks in process
Throws TimeoutError if fail to reach condition (under required timeout if defined)
"""
counter = 0
deadline = self._ts.value + timeout if timeout else None
while self._events and not condition():
while self._events and not condition() and counter < max_iterations:
if deadline and self._next_timestamp() > deadline:
raise TimeoutError("Failed to reach condition in required time")
raise TimeoutError("Failed to reach condition in required time, {} iterations passed".format(counter))
self.advance()
counter += 1

if not condition():
raise TimeoutError("Condition will be never reached")
if not self._events:
raise TimeoutError("Condition will be never reached, {} iterations passed".format(counter))
else:
raise TimeoutError("Failed to reach condition in {} iterations".format(max_iterations))

def run_to_completion(self):
"""
Expand Down
2 changes: 1 addition & 1 deletion plenum/test/simulation/sim_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def _send_message(self, frm: str, msg: Any, dst: ExternalBus.Destination):
else:
assert False, "{} tried to send message {} to unsupported destination {}".format(frm, msg, dst)

for name in dst:
for name in sorted(dst):
assert name != frm, "{} tried to send message {} to itself".format(frm, msg)

peer = self._peers.get(name)
Expand Down
4 changes: 2 additions & 2 deletions plenum/test/simulation/test_sim_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,10 @@ def test_sim_network_respects_latencies(random, test_nodes, mock_timer, initial_
assert all(min_ts <= ts <= max_ts for ts in node.receive_timestamps)


def test_sim_network_broadcast_preserves_order(mock_timer, sim_network, test_nodes, some_node):
def test_sim_network_broadcast_in_lexicographic_order(mock_timer, sim_network, test_nodes, some_node):
latency = 10
sim_network.set_latency(latency, latency)
should_receive = [node for node in test_nodes if node != some_node]
should_receive = sorted([node for node in test_nodes if node != some_node], key=lambda n: n.name)

message = create_some_message()
some_node.network.send(message)
Expand Down