Skip to content

feat: Gossipsub interop testing #648

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Jul 10, 2025
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
e4e5b1c
Squashed 'gossipsub-interop/' content from commit fa401a80
MarcoPolo May 14, 2025
93a51a7
Merge commit 'e4e5b1c500f21561a8810466cf5b20368e62ab74' as 'gossipsub…
MarcoPolo May 14, 2025
eec0fa7
Update test specs
MarcoPolo May 15, 2025
c44ceea
Remove "experiment" option. Make it a part of script actions
MarcoPolo May 15, 2025
9a0c644
Rename go-libp2p impl folder
MarcoPolo May 15, 2025
ace4432
Tidy up go code by removing go.mod replaced directives
MarcoPolo May 15, 2025
8e55af4
nit
MarcoPolo May 15, 2025
2c83915
Add rust test for peer id generation
MarcoPolo May 16, 2025
6f1a5c3
Add go-libp2p test for peer ID generation
MarcoPolo May 16, 2025
baa7fbf
Fix message ID function in Go
MarcoPolo May 16, 2025
ab8d056
Update gossipsub-interop/pyproject.toml
MarcoPolo May 16, 2025
62e6da5
Update gossipsub-interop/README.md
MarcoPolo May 16, 2025
a267aec
review rust impl (#649)
jxs May 16, 2025
4f4ebde
Add seed to output folder name
MarcoPolo May 16, 2025
5bdcf39
also plot average message count per message id (#651)
jxs May 29, 2025
8ec7340
Add timestamp and git hash to default output directory
MarcoPolo May 29, 2025
c434d77
Rename action to instruction
MarcoPolo Jun 4, 2025
f4e444d
Enable syscall latency modeling in shadow config
MarcoPolo Jun 4, 2025
6860e16
Allow disabling gossip emission. Improvements on analysis tools
MarcoPolo Jul 2, 2025
c591f41
introduce sleep to simulate blob verification time (#660)
jxs Jul 2, 2025
d409aaa
Clean up git describe meta
MarcoPolo Jul 10, 2025
cb61125
Go: panic if there's unexpected error
MarcoPolo Jul 10, 2025
ec51610
Add simple-fanout scenario
MarcoPolo Jul 10, 2025
13af098
PR nits
MarcoPolo Jul 10, 2025
c953269
nit
MarcoPolo Jul 10, 2025
324086a
Fix Gossipsub param parsing in Rust
MarcoPolo Jul 10, 2025
6e6548d
Fix comments and type in script_instruction.py
MarcoPolo Jul 10, 2025
94e74d5
Add warnings when using unsupported parameters in rust
MarcoPolo Jul 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions gossipsub-interop/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
*.swp
/shadow.data
synctest*.data
/shadow.yaml
/graph.gml
=======
/*.data
/**/*/*.tar.gz
/**.tar.gz
/pubsub-shadow
**/gossipsub-bin
params.json
/venv
**/plots
.DS_Store
Dockerfile
docker-compose.yml
backup
timelines
experiment-results/**
__pycache__
1 change: 1 addition & 0 deletions gossipsub-interop/.python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.12
13 changes: 13 additions & 0 deletions gossipsub-interop/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Default target
all: binaries

binaries:
cd go-libp2p && go build -linkshared -o gossipsub-bin
cd rust-libp2p && cargo build

# Clean all generated shadow simulation files
clean:
rm -rf *.data || true
rm plots/* || true

.PHONY: binaries all clean
85 changes: 85 additions & 0 deletions gossipsub-interop/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# GossipSub Interop testing framework

## Overview

This framework is designed to reproducibly test interoperability between
different GossipSub implementations. It can also be used to benchmark the effect
of different implementations and protocol designs in a controlled simulation.

This framework leverages [Shadow](https://shadow.github.io/) as its simulator.

There are two components to our interoperability test:

1. The _scenario_ we are running. This defines the specific actions each node in
the network takes at a specific point in time. Actions such as publishing a
message, connecting to other nodes, or subcribing. See `script_action.py` for a
list of actions.
2. The _composition_ of the network. This defines what percent of the network is
running what implementation. For example you can have a network composed of 50%
go-libp2p nodes and 50% rust-libp2p nodes.

A key aspect of this framework is that scenarios, compositions, and GossipSub
parameters can be modified without modifying implementations. See `experiment.py`, where this can be configured

After running a test, there are three key results we can extract from the simulation:

1. The _reliability_ of the message dissemenation. This is the percentage of
nodes that a message was delivered to.
2. The dissementation _latency_ to disseminate the message.
3. The _bandwidth efficiency_ in terms of the number of _duplicate messages_ received.

Implementations are deemed interoperable if variations in composition do not
result in any significant differences in observed behavior or outputs. For
example, a network of all go-libp2p nodes should behave the same as a network
with an even mix of go-libp2p and rust-libp2p nodes.

## Requirements

- [Shadow](https://shadow.github.io/) for shadow experiments.
- [uv](https://docs.astral.sh/uv/) for python dependencies.
- Implementation specific requirements for building the implementations (Go, Rust, etc...)

## Running a simulation

```bash
uv run run.py --help
```

For example, to run a simulation with an even mix of go-libp2p and rust-libp2p
nodes with default GossipSub parameters and sending large messages to a network
of 700 nodes:

```bash
uv run run.py --node_count 700 --composition "rust-and-go" --scenario "subnet-blob-msg"
```

The definitions of the experiment, composition, and scenarios are defined in `experiment.py`.

After running an experiment all the results and configuration needed to
reproduce the test are saved in an output folder which, by default, is named by
the specific scenario, node count, and composition. For the above
example, the output folder is
`subnet-blob-msg-700-rust-and-go.data`. This output folder contains the following files:

- shadow.yaml: The Shadow config defining the binaries and network.
- graph.gml: The graph of the network links for Shadow.
- params.json: The parameters passed to each binary with GossipSub parameters and the actions to run.
- plots/
- analysis_*.txt: A text file containing a high level analysis of the 3 key results
- Charts visualizing the results.

## Adding an implementation

To build the implementation reference `./test-specs/implementation.md`.

After implementing it, make sure to add build commands in the Makefile's `binaries` recipe.

Finally, add it to the `composition` function in `experiment.py`.

## Future work (contributions welcome)

- Add more scenarios.
- Add other implementations.
- Add more plots and visualizations.
- Add a helper to make it easier to rerun an experiment given an output folder.
- Add to CI
128 changes: 128 additions & 0 deletions gossipsub-interop/analyze_message_deliveries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
from collections import defaultdict
import json
import os
import sys
from datetime import datetime
import matplotlib.pyplot as plt

messages = defaultdict(list)
duplicate_count = defaultdict(lambda: 0)
duplicate_count_by_message_and_node = defaultdict(lambda: defaultdict(int))
peer_id_to_node_id = dict()
node_id_to_peer_id = dict()


def nodeIDFromFilename(filename):
return filename.split(".")[0]


def logfile_iterator(folder):
"""
Returns a list of all the log files in the folder.

Special case for shadow data folders by identifying the "hosts" subfolder.

Otherwise, returns a list of all the files in the folder.
"""
files = os.listdir(folder)
if "hosts" in files:
for host in os.listdir(os.path.join(folder, "hosts")):
for file in os.listdir(os.path.join(folder, "hosts", host)):
if file.endswith(".stdout"):
yield os.path.join(folder, "hosts", host, file)
else:
for file in files:
yield os.path.join(folder, file)


def analyse_message_deliveries(folder):
analysis_txt = []

for file in logfile_iterator(folder):
with open(file, "r") as f:
node_id = ""
seen_message_ids = set()
for line in f:
try:
parsed = json.loads(line)
except json.JSONDecodeError:
continue

if parsed["msg"] == "PeerID":
node_id = parsed["node_id"]
peer_id_to_node_id[parsed["id"]] = node_id
node_id_to_peer_id[node_id] = parsed["id"]
continue

if "msg" not in parsed or "time" not in parsed:
continue

# Parse timestamp RFC3339
ts = datetime.fromisoformat(parsed["time"])

match parsed["msg"]:
case "Received Message":
msgID = parsed["id"]
if msgID not in seen_message_ids:
seen_message_ids.add(msgID)
messages[msgID].append((ts, node_id))
else:
duplicate_count[msgID] += 1
duplicate_count_by_message_and_node[msgID][node_id] += 1

# Prepare data for plotting
msg_ids = []
time_diffs = []

total_nodes = len(node_id_to_peer_id)
messagesIDs = list(messages.keys())
messagesIDs.sort(key=lambda x: int(x))

for msgID in messagesIDs:
deliveries = messages[msgID]
deliveries.sort(key=lambda x: x[0])
time_diff = (deliveries[-1][0] - deliveries[0][0]).total_seconds()
msg_ids.append(msgID)
time_diffs.append(time_diff)
avg_duplicate_count = duplicate_count[msgID] / total_nodes
reached = len(deliveries) / (total_nodes - 1) # Minus 1 for the original sender
if reached > 1.0:
if len(deliveries) > total_nodes:
raise ValueError(
f"Message {msgID} was delivered to more nodes than exist"
)
# We overshot because the original publisher received a duplicate message
reached = 1.0
analysis_txt.append(f"{msgID}, {time_diff}s, {avg_duplicate_count}, {reached}")

# Create the plot
plt.figure(figsize=(12, 6))
plt.bar(range(len(msg_ids)), time_diffs)
plt.xlabel("Message Index")
plt.ylabel("Delivery Time Difference (seconds)")
plt.title("Message Delivery Time Differences")
plt.xticks(range(len(msg_ids)), msg_ids, rotation=45, ha="right")
plt.tight_layout()

if not os.path.exists("plots"):
os.makedirs("plots")
plt.savefig(f"plots/message_delivery_times_{folder}.png")
plt.close()

# Print the analysis and save it to a file
with open(f"plots/analysis_{folder}.txt", "w") as f:
f.write(
"Message ID, Time to Disseminate, Avg Duplicate Count, Reached percent\n"
)
for line in analysis_txt:
f.write(line + "\n")


def main():
# Read folder from input
folder = sys.argv[1]
analyse_message_deliveries(folder)


if __name__ == "__main__":
main()
124 changes: 124 additions & 0 deletions gossipsub-interop/experiment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
from collections import defaultdict
from dataclasses import dataclass, field
import random
from typing import List, Dict, Set

from script_action import GossipSubParams, ScriptAction, NodeID
import script_action


@dataclass
class Binary:
path: str
percent_of_nodes: int


@dataclass
class ExperimentParams:
script: List[ScriptAction] = field(default_factory=list)


def scenario(scenario_name: str, node_count: int) -> ExperimentParams:
actions: List[ScriptAction] = []
match scenario_name:
case "subnet-blob-msg":
actions.extend(init_gossipsub())
number_of_conns_per_node = 10
if number_of_conns_per_node >= node_count:
number_of_conns_per_node = node_count - 1
actions.extend(random_network_mesh(node_count, number_of_conns_per_node))
message_size = 2 * 1024 * 48
num_messages = 32
actions.extend(
random_publish_every_12s(node_count, num_messages, message_size)
)
case _:
raise ValueError(f"Unknown scenario name: {scenario_name}")

return ExperimentParams(script=actions)


def composition(preset_name: str) -> List[Binary]:
match preset_name:
case "all-go":
return [Binary("go-libp2p/gossipsub-bin", percent_of_nodes=100)]
case "all-rust":
# Always use debug. We don't measure compute performance here.
return [
Binary(
"rust-libp2p/target/debug/rust-libp2p-gossip", percent_of_nodes=100
)
]
case "rust-and-go":
return [
Binary(
"rust-libp2p/target/debug/rust-libp2p-gossip", percent_of_nodes=50
),
Binary("go-libp2p/gossipsub-bin", percent_of_nodes=50),
]
raise ValueError(f"Unknown preset name: {preset_name}")


def init_gossipsub() -> List[ScriptAction]:
# Default gossipsub parameters
return [script_action.InitGossipSub(gossipSubParams=GossipSubParams())]


def random_network_mesh(
node_count: int, number_of_connections: int
) -> List[ScriptAction]:
connections: Dict[NodeID, Set[NodeID]] = defaultdict(set)
connect_to: Dict[NodeID, List[NodeID]] = defaultdict(list)
for node_id in range(node_count):
while len(connections[node_id]) < number_of_connections:
target = random.randint(0, node_count - 1)
if target == node_id:
continue
connections[node_id].add(target)
connections[target].add(node_id)

connect_to[node_id].append(target)

actions = []
for node_id, node_connections in connect_to.items():
actions.append(
script_action.IfNodeIDEquals(
nodeID=node_id,
action=script_action.Connect(
connectTo=list(node_connections),
),
)
)
return actions


def random_publish_every_12s(
node_count: int, numMessages: int, messageSize: int
) -> List[ScriptAction]:
topicStr = "foobar"
actions = []
actions.append(script_action.SubscribeToTopic(topicID=topicStr))

# Start at 120 seconds (2 minutes) to allow for setup time
elapsed_seconds = 120
actions.append(script_action.WaitUntil(elapsedSeconds=elapsed_seconds))

for i in range(numMessages):
random_node = random.randint(0, node_count - 1)
actions.append(
script_action.IfNodeIDEquals(
nodeID=random_node,
action=script_action.Publish(
messageID=i,
topicID=topicStr,
messageSizeBytes=messageSize,
),
)
)
elapsed_seconds += 12 # Add 12 seconds for each subsequent message
actions.append(script_action.WaitUntil(elapsedSeconds=elapsed_seconds))

elapsed_seconds += 30 # wait a bit more to allow all messages to flush
actions.append(script_action.WaitUntil(elapsedSeconds=elapsed_seconds))

return actions
Loading