Skip to content

refactor: Switch to let-chains where possible #435

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ARG RUST_VERSION=1.85.1
ARG RUST_VERSION=1.88.0
FROM rust:${RUST_VERSION}-bullseye AS builder
RUN apt update && apt dist-upgrade -y && apt install -y cmake libclang-dev
COPY . anchor
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.devnet
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ARG RUST_VERSION=1.85.1
ARG RUST_VERSION=1.88.0
FROM rust:${RUST_VERSION}-bullseye AS builder
RUN apt update && apt dist-upgrade -y && apt install -y cmake libclang-dev
COPY . anchor
Expand Down
2 changes: 1 addition & 1 deletion anchor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "anchor"
version = "0.2.0"
edition = { workspace = true }
authors = ["Sigma Prime <[email protected]>"]
rust-version = "1.85.0"
rust-version = "1.88.0"
default-run = "anchor"

[features]
Expand Down
31 changes: 15 additions & 16 deletions anchor/common/qbft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -874,23 +874,22 @@ where
vec![]
};

if matches!(msg_type, QbftMessageType::RoundChange) {
if let (Some(last_prepared_value), Some(last_prepared_round)) =
if matches!(msg_type, QbftMessageType::RoundChange)
&& let (Some(last_prepared_value), Some(last_prepared_round)) =
(self.last_prepared_value, self.last_prepared_round)
{
return MessageData::new(
last_prepared_round.get() as u64,
self.current_round.get() as u64,
last_prepared_value,
self.data
.get(&last_prepared_value)
.map(|d| d.as_ssz_bytes())
.unwrap_or_else(|| {
warn!("Data misisng for last prepared value");
vec![]
}),
);
}
{
return MessageData::new(
last_prepared_round.get() as u64,
self.current_round.get() as u64,
last_prepared_value,
self.data
.get(&last_prepared_value)
.map(|d| d.as_ssz_bytes())
.unwrap_or_else(|| {
warn!("Data misisng for last prepared value");
vec![]
}),
);
}

// Standard message data for Proposal, Prepare, and Commit
Expand Down
14 changes: 7 additions & 7 deletions anchor/common/qbft/src/msg_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,13 @@ impl MessageContainer {
/// If we have a quorum for the round, get all of the messages that correspond to that quorum
pub fn get_quorum_of_messages(&self, round: Round) -> Vec<WrappedQbftMessage> {
let mut msgs = vec![];
if let Some(hash) = self.has_quorum(round) {
// collect all of the messages where root = quorum hash
if let Some(round_messages) = self.messages.get(&round) {
for msg in round_messages.values() {
if msg.qbft_message.root == hash {
msgs.push(msg.clone());
}
// collect all of the messages where root = quorum hash
if let Some(hash) = self.has_quorum(round)
&& let Some(round_messages) = self.messages.get(&round)
{
for msg in round_messages.values() {
if msg.qbft_message.root == hash {
msgs.push(msg.clone());
}
}
}
Expand Down
24 changes: 12 additions & 12 deletions anchor/database/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,18 @@ impl NetworkState {
);

// Process this validators shares
if let Some(share_map) = &share_map {
if let Some(shares) = share_map.get(cluster_id) {
for share in shares {
if share.validator_pubkey == validator.public_key {
shares_multi.insert(
&validator.public_key,
cluster_id,
&cluster.owner,
&cluster.committee_id(),
share.clone(),
);
}
if let Some(share_map) = &share_map
&& let Some(shares) = share_map.get(cluster_id)
{
for share in shares {
if share.validator_pubkey == validator.public_key {
shares_multi.insert(
&validator.public_key,
cluster_id,
&cluster.owner,
&cluster.committee_id(),
share.clone(),
);
}
}
}
Expand Down
70 changes: 35 additions & 35 deletions anchor/http_metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,46 +84,46 @@ async fn metrics_handler<E: EthSpec>(

{
let shared = state.read();
if let Some(genesis_time) = shared.genesis_time {
if let Ok(now) = SystemTime::now().duration_since(UNIX_EPOCH) {
let distance = now.as_secs() as i64 - genesis_time as i64;
set_gauge(&GENESIS_DISTANCE, distance);
}
if let Some(genesis_time) = shared.genesis_time
&& let Ok(now) = SystemTime::now().duration_since(UNIX_EPOCH)
{
let distance = now.as_secs() as i64 - genesis_time as i64;
set_gauge(&GENESIS_DISTANCE, distance);
}

// Duties services
if let Some(duties_service) = &shared.duties_service {
if let Some(slot) = duties_service.slot_clock.now() {
let current_epoch = slot.epoch(E::slots_per_epoch());
let next_epoch = current_epoch + 1;

set_int_gauge(
&PROPOSER_COUNT,
&[CURRENT_EPOCH],
duties_service.proposer_count(current_epoch) as i64,
);
set_int_gauge(
&ATTESTER_COUNT,
&[CURRENT_EPOCH],
duties_service.attester_count(current_epoch) as i64,
);
set_int_gauge(
&ATTESTER_COUNT,
&[NEXT_EPOCH],
duties_service.attester_count(next_epoch) as i64,
);
}
if let Some(duties_service) = &shared.duties_service
&& let Some(slot) = duties_service.slot_clock.now()
{
let current_epoch = slot.epoch(E::slots_per_epoch());
let next_epoch = current_epoch + 1;

set_int_gauge(
&PROPOSER_COUNT,
&[CURRENT_EPOCH],
duties_service.proposer_count(current_epoch) as i64,
);
set_int_gauge(
&ATTESTER_COUNT,
&[CURRENT_EPOCH],
duties_service.attester_count(current_epoch) as i64,
);
set_int_gauge(
&ATTESTER_COUNT,
&[NEXT_EPOCH],
duties_service.attester_count(next_epoch) as i64,
);
}

if let Some(network_metrics) = &shared.network_registry {
// Network metrics
if let Err(e) = encode(&mut buffer, network_metrics) {
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to encode promethus data: {e}"),
)
.into_response();
}
// Network metrics
if let Some(network_metrics) = &shared.network_registry
&& let Err(e) = encode(&mut buffer, network_metrics)
{
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to encode promethus data: {e}"),
)
.into_response();
}
}

Expand Down
10 changes: 5 additions & 5 deletions anchor/logging/src/tracing_libp2p_discv5_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ pub fn create_libp2p_discv5_tracing_layer(
// Ensure that `tracing_log_path` only contains directories.
for p in tracing_log_path.clone().iter() {
tracing_log_path = tracing_log_path.join(p);
if let Ok(metadata) = tracing_log_path.metadata() {
if !metadata.is_dir() {
tracing_log_path.pop();
break;
}
if let Ok(metadata) = tracing_log_path.metadata()
&& !metadata.is_dir()
{
tracing_log_path.pop();
break;
}
}

Expand Down
24 changes: 12 additions & 12 deletions anchor/message_sender/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,19 +129,19 @@ impl<S: SlotClock + 'static, D: DutiesProvider> NetworkMessageSender<S, D> {
fn do_send(&self, message: SignedSSVMessage, committee_id: CommitteeId) {
let message_bytes = message.as_ssz_bytes();

if let Some(validator) = self.validator.as_ref() {
if let Err(err) = validator.validate(&message_bytes).as_result() {
// `Reject` is more severe and can be punished by other peers. We should not have
// created this message ever, while `Ignore` can be triggered simply because the
// message is irrelevant by now.
if let MessageAcceptance::Reject = MessageAcceptance::from(err) {
warn!(?err, "Validation of outgoing message failed (Reject)");
debug!(msg = %message, "Failing message");
} else {
debug!(?err, "Validation of outgoing message failed (Ignore)");
}
return;
if let Some(validator) = self.validator.as_ref()
&& let Err(err) = validator.validate(&message_bytes).as_result()
{
// `Reject` is more severe and can be punished by other peers. We should not have
// created this message ever, while `Ignore` can be triggered simply because the message
// is irrelevant by now.
if let MessageAcceptance::Reject = MessageAcceptance::from(err) {
warn!(?err, "Validation of outgoing message failed (Reject)");
debug!(msg = %message, "Failing message");
} else {
debug!(?err, "Validation of outgoing message failed (Ignore)");
}
return;
}

let subnet = SubnetId::from_committee(committee_id, self.subnet_count);
Expand Down
16 changes: 8 additions & 8 deletions anchor/network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,10 @@ impl<R: MessageReceiver> Network<R> {
event = self.message_rx.recv() => {
match event {
Some((subnet_id, message)) => {
if let Err(err) = self.gossipsub().publish(subnet_to_topic(subnet_id), message) {
if !matches!(err, PublishError::Duplicate) {
error!(?err, "Failed to publish message");
}
if let Err(err) = self.gossipsub().publish(subnet_to_topic(subnet_id), message)
&& let PublishError::Duplicate = err
{
error!(?err, "Failed to publish message");
}
}
None => {
Expand Down Expand Up @@ -368,10 +368,10 @@ impl<R: MessageReceiver> Network<R> {

// update enr and metadata to new state
self.discovery().set_subscribed(subnet, subscribed);
if let Some(metadata) = &mut self.node_info.metadata {
if let Err(err) = metadata.set_subscribed(subnet, subscribed) {
error!(?err, "unable to update node info");
}
if let Some(metadata) = &mut self.node_info.metadata
&& let Err(err) = metadata.set_subscribed(subnet, subscribed)
{
error!(?err, "unable to update node info");
}
}

Expand Down
8 changes: 4 additions & 4 deletions anchor/network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,10 @@ impl NetworkBehaviour for PeerManager {
}

// Check heartbeat timer
if self.heartbeat_manager.poll_tick(cx).is_ready() {
if let Some(actions) = self.heartbeat() {
return Poll::Ready(ToSwarm::GenerateEvent(Event::ConnectActions(actions)));
}
if self.heartbeat_manager.poll_tick(cx).is_ready()
&& let Some(actions) = self.heartbeat()
{
return Poll::Ready(ToSwarm::GenerateEvent(Event::ConnectActions(actions)));
}

Poll::Pending
Expand Down
18 changes: 9 additions & 9 deletions anchor/processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,15 @@ async fn processor(config: Config, mut receivers: Receivers, executor: TaskExecu
&[name, received.queue.name()],
);

if let Some(expiry) = received.work_item.expiry() {
if expiry < &Instant::now() {
warn!(task = name, "Processor skipped expired work");
metrics::inc_counter_vec(
&metrics::ANCHOR_PROCESSOR_WORK_EVENTS_EXPIRED_COUNT,
&[name],
);
continue;
}
if let Some(expiry) = received.work_item.expiry()
&& expiry < &Instant::now()
{
warn!(task = name, "Processor skipped expired work");
metrics::inc_counter_vec(
&metrics::ANCHOR_PROCESSOR_WORK_EVENTS_EXPIRED_COUNT,
&[name],
);
continue;
}

// update metrics
Expand Down
50 changes: 25 additions & 25 deletions anchor/signature_collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,16 +481,16 @@ async fn signature_collector(mut rx: mpsc::UnboundedReceiver<CollectorMessage>)
} else {
// Register the notifier and threshold.
notifiers.push(notify);
if let Some(old_threshold) = threshold {
if new_threshold != old_threshold {
// Different tasks expect different thresholds. We can not know which
// is correct, so we exit this instance.
error!(
new_threshold,
old_threshold, "Conflicting thresholds passed!"
);
return;
}
if let Some(old_threshold) = threshold
&& new_threshold != old_threshold
{
// Different tasks expect different thresholds. We can not know which is
// correct, so we exit this instance.
error!(
new_threshold,
old_threshold, "Conflicting thresholds passed!"
);
return;
}
threshold = Some(new_threshold);
}
Expand Down Expand Up @@ -523,25 +523,25 @@ async fn signature_collector(mut rx: mpsc::UnboundedReceiver<CollectorMessage>)
}
}

if let Some(threshold) = threshold {
if signature_share.len() as u64 >= threshold {
let signature = match combine_signatures(mem::take(&mut signature_share)) {
Ok(signature) => Arc::new(signature),
Err(err) => {
error!(?err, "Failed to recover signature");
return;
}
};
if let Some(threshold) = threshold
&& signature_share.len() as u64 >= threshold
{
let signature = match combine_signatures(mem::take(&mut signature_share)) {
Ok(signature) => Arc::new(signature),
Err(err) => {
error!(?err, "Failed to recover signature");
return;
}
};

debug!(?signature, "Successfully recovered signature");
debug!(?signature, "Successfully recovered signature");

for notifier in mem::take(&mut notifiers) {
if notifier.send(Arc::clone(&signature)).is_err() {
warn!("Callback dropped - signature is no longer relevant");
}
for notifier in mem::take(&mut notifiers) {
if notifier.send(Arc::clone(&signature)).is_err() {
warn!("Callback dropped - signature is no longer relevant");
}
full_signature = Some(signature);
}
full_signature = Some(signature);
}
}
}
Expand Down
Loading
Loading