Skip to content

refactor(event-queue): improve event ordering #738

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion hathor/consensus/consensus.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from hathor.profiler import get_cpu_profiler
from hathor.pubsub import HathorEvents, PubSubManager
from hathor.transaction import BaseTransaction
from hathor.util import not_none

logger = get_logger()
settings = HathorSettings()
Expand Down Expand Up @@ -135,7 +136,8 @@ def _unsafe_update(self, base: BaseTransaction) -> None:
reorg_size=reorg_size)

# finally signal an index update for all affected transactions
for tx_affected in context.txs_affected:
sorted_txs_affected = sorted(context.txs_affected, key=lambda tx: not_none(tx.hash))
for tx_affected in sorted_txs_affected:
assert tx_affected.storage is not None
assert tx_affected.storage.indexes is not None
tx_affected.storage.indexes.update(tx_affected)
Expand Down
107 changes: 45 additions & 62 deletions tests/event/test_event_reorg.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from typing import Any

from hathor.conf import HathorSettings
from hathor.event.model.event_type import EventType
from hathor.event.storage import EventMemoryStorage
from tests import unittest
from tests.utils import add_new_blocks, get_genesis_key, zip_chunkify
from tests.utils import add_new_blocks, get_genesis_key

settings = HathorSettings()

Expand Down Expand Up @@ -47,76 +48,58 @@ def test_reorg_events(self):
# check events
actual_events = list(self.event_storage.iter_from_event(0))

# events are separated into portions that are sorted (indicated by using lists) and portions that are unsorted
# (indicated by using a custom class), the unsorted parts mean that the given events must be present, but not
# necessarily in the given order, to check that we sort both the expected and actual events by tx hash to be
# able to match them, but only for the "unsorted" portions will, for the "sorted" portions the order is
# expected to be the given one
class unsorted(list):
pass
expected_events_grouped = [
[
(EventType.LOAD_STARTED, {}),
(EventType.NEW_VERTEX_ACCEPTED, {'hash': settings.GENESIS_BLOCK_HASH.hex()}),
(EventType.NEW_VERTEX_ACCEPTED, {'hash': settings.GENESIS_TX1_HASH.hex()}),
(EventType.NEW_VERTEX_ACCEPTED, {'hash': settings.GENESIS_TX2_HASH.hex()}),
(EventType.LOAD_FINISHED, {})
],
# XXX: the order of the following events can vary depending on which genesis is spent/confirmed first
unsorted([
expected_events = [
(EventType.LOAD_STARTED, {}),
(EventType.NEW_VERTEX_ACCEPTED, {'hash': settings.GENESIS_BLOCK_HASH.hex()}),
(EventType.NEW_VERTEX_ACCEPTED, {'hash': settings.GENESIS_TX1_HASH.hex()}),
(EventType.NEW_VERTEX_ACCEPTED, {'hash': settings.GENESIS_TX2_HASH.hex()}),
(EventType.LOAD_FINISHED, {}),
*sorted_by_hash(
(EventType.VERTEX_METADATA_CHANGED, {'hash': blocks[0].hash_hex}),
(EventType.VERTEX_METADATA_CHANGED, {'hash': settings.GENESIS_TX1_HASH.hex()}),
(EventType.VERTEX_METADATA_CHANGED, {'hash': settings.GENESIS_TX2_HASH.hex()}),
(EventType.VERTEX_METADATA_CHANGED, {'hash': blocks[0].hash_hex}),
]),
# XXX: these events must always have this order
[
(EventType.NEW_VERTEX_ACCEPTED, {'hash': blocks[0].hash_hex}),
(EventType.VERTEX_METADATA_CHANGED, {'hash': blocks[1].hash_hex}),
(EventType.NEW_VERTEX_ACCEPTED, {'hash': blocks[1].hash_hex}),
(EventType.VERTEX_METADATA_CHANGED, {'hash': blocks[2].hash_hex}),
(EventType.NEW_VERTEX_ACCEPTED, {'hash': blocks[2].hash_hex}),
(EventType.VERTEX_METADATA_CHANGED, {'hash': blocks[3].hash_hex}),
(EventType.NEW_VERTEX_ACCEPTED, {'hash': blocks[3].hash_hex}),
(EventType.VERTEX_METADATA_CHANGED, {'hash': blocks[4].hash_hex}),
(EventType.NEW_VERTEX_ACCEPTED, {'hash': blocks[4].hash_hex}),
(EventType.VERTEX_METADATA_CHANGED, {'hash': blocks[5].hash_hex}),
(EventType.NEW_VERTEX_ACCEPTED, {'hash': blocks[5].hash_hex}),
(EventType.VERTEX_METADATA_CHANGED, {'hash': blocks[6].hash_hex}),
(EventType.NEW_VERTEX_ACCEPTED, {'hash': blocks[6].hash_hex}),
(EventType.VERTEX_METADATA_CHANGED, {'hash': blocks[7].hash_hex}),
(EventType.NEW_VERTEX_ACCEPTED, {'hash': blocks[7].hash_hex}),
(EventType.VERTEX_METADATA_CHANGED, {'hash': blocks[8].hash_hex}),
(EventType.NEW_VERTEX_ACCEPTED, {'hash': blocks[8].hash_hex}),
),
(EventType.NEW_VERTEX_ACCEPTED, {'hash': blocks[0].hash_hex}),
(EventType.VERTEX_METADATA_CHANGED, {'hash': blocks[1].hash_hex}),
(EventType.NEW_VERTEX_ACCEPTED, {'hash': blocks[1].hash_hex}),
(EventType.VERTEX_METADATA_CHANGED, {'hash': blocks[2].hash_hex}),
(EventType.NEW_VERTEX_ACCEPTED, {'hash': blocks[2].hash_hex}),
(EventType.VERTEX_METADATA_CHANGED, {'hash': blocks[3].hash_hex}),
(EventType.NEW_VERTEX_ACCEPTED, {'hash': blocks[3].hash_hex}),
(EventType.VERTEX_METADATA_CHANGED, {'hash': blocks[4].hash_hex}),
(EventType.NEW_VERTEX_ACCEPTED, {'hash': blocks[4].hash_hex}),
(EventType.VERTEX_METADATA_CHANGED, {'hash': blocks[5].hash_hex}),
(EventType.NEW_VERTEX_ACCEPTED, {'hash': blocks[5].hash_hex}),
(EventType.VERTEX_METADATA_CHANGED, {'hash': blocks[6].hash_hex}),
(EventType.NEW_VERTEX_ACCEPTED, {'hash': blocks[6].hash_hex}),
(EventType.VERTEX_METADATA_CHANGED, {'hash': blocks[7].hash_hex}),
(EventType.NEW_VERTEX_ACCEPTED, {'hash': blocks[7].hash_hex}),
(EventType.VERTEX_METADATA_CHANGED, {'hash': blocks[8].hash_hex}),
(EventType.NEW_VERTEX_ACCEPTED, {'hash': blocks[8].hash_hex}),
(EventType.VERTEX_METADATA_CHANGED, {'hash': blocks[9].hash_hex}),
(EventType.NEW_VERTEX_ACCEPTED, {'hash': blocks[9].hash_hex}),
(EventType.REORG_STARTED, {'reorg_size': 2, 'previous_best_block': blocks[9].hash_hex,
'new_best_block': b0.hash_hex}),
*sorted_by_hash(
(EventType.VERTEX_METADATA_CHANGED, {'hash': b0.hash_hex}),
(EventType.VERTEX_METADATA_CHANGED, {'hash': blocks[9].hash_hex}),
(EventType.NEW_VERTEX_ACCEPTED, {'hash': blocks[9].hash_hex}),
(EventType.REORG_STARTED, {'reorg_size': 2, 'previous_best_block': blocks[9].hash_hex,
'new_best_block': b0.hash_hex}),
],
# XXX: for some reason the metadata update order of these events isn't always the same
unsorted([
(EventType.VERTEX_METADATA_CHANGED, {'hash': blocks[8].hash_hex}),
(EventType.VERTEX_METADATA_CHANGED, {'hash': blocks[9].hash_hex}),
(EventType.VERTEX_METADATA_CHANGED, {'hash': b0.hash_hex}),
]),
# XXX: these events must always have this order
[
(EventType.REORG_FINISHED, {}),
(EventType.NEW_VERTEX_ACCEPTED, {'hash': b0.hash_hex}),
],
),
(EventType.REORG_FINISHED, {}),
(EventType.NEW_VERTEX_ACCEPTED, {'hash': b0.hash_hex}),
]

for actual_events, expected_events in zip_chunkify(actual_events, expected_events_grouped):
if isinstance(expected_events, unsorted):
actual_events.sort(key=lambda i: i.data.hash)
expected_events.sort(key=lambda i: i[1].get('hash', ''))
for actual_event, expected_event in zip(actual_events, expected_events):
expected_event_type, expected_partial_data = expected_event

self.assertEqual(EventType(actual_event.type), expected_event_type)

for actual_event, expected_event in zip(actual_events, expected_events):
expected_event_type, expected_partial_data = expected_event
for expected_data_key, expected_data_value in expected_partial_data.items():
self.assertEqual(actual_event.data.dict()[expected_data_key], expected_data_value)

self.assertEqual(EventType(actual_event.type), expected_event_type)

for expected_data_key, expected_data_value in expected_partial_data.items():
self.assertEqual(actual_event.data.dict()[expected_data_key], expected_data_value)
def sorted_by_hash(*events: tuple[EventType, dict[str, Any]]) -> list[tuple[EventType, dict[str, Any]]]:
return sorted(events, key=lambda event: event[1]['hash'])


class SyncV1EventReorgTest(unittest.SyncV1Params, BaseEventReorgTest):
Expand Down
Loading