diff --git a/Cargo.lock b/Cargo.lock index 7526a609..d125a866 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -805,6 +805,7 @@ dependencies = [ "swss-serde", "tokio", "tracing", + "uuid", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index ce456428..96d62d40 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,6 +56,7 @@ serde = { version = "1", features = ["derive", "rc"] } serde_json = "1" serde_yaml = "0.9" serde_with = "3.12" + # Command line utils clap = { version = "4", features = ["derive", "cargo", "wrap_help", "unicode", "string", "unstable-styles"] } color-eyre = "0.6" diff --git a/crates/hamgrd/Cargo.toml b/crates/hamgrd/Cargo.toml index f29e48f5..71fe852f 100644 --- a/crates/hamgrd/Cargo.toml +++ b/crates/hamgrd/Cargo.toml @@ -29,6 +29,7 @@ serde_with.workspace = true clap.workspace = true tracing.workspace = true chrono.workspace = true +uuid.workspace = true [dev-dependencies] serde_json.workspace = true diff --git a/crates/hamgrd/src/actors.rs b/crates/hamgrd/src/actors.rs index a691ff65..9c03766a 100644 --- a/crates/hamgrd/src/actors.rs +++ b/crates/hamgrd/src/actors.rs @@ -1,9 +1,6 @@ //! Actors //! //! -// temporarily disable unused warning until vdpu/ha-set actors are implemented -#![allow(unused)] - pub mod dpu; pub mod ha_scope; pub mod ha_set; @@ -12,7 +9,6 @@ pub mod vdpu; #[cfg(test)] pub mod test; use anyhow::Result as AnyhowResult; -use std::any; use std::sync::Arc; use swbus_actor::{spawn, Actor, ActorMessage}; use swbus_edge::swbus_proto::message_id_generator::MessageIdGenerator; @@ -23,10 +19,10 @@ use swss_common::{ KeyOpFieldValues, KeyOperation, ProducerStateTable, SonicDbTable, SubscriberStateTable, ZmqClient, ZmqProducerStateTable, }; -use swss_common_bridge::{consumer::spawn_consumer_bridge, consumer::ConsumerBridge, producer::spawn_producer_bridge}; +use swss_common_bridge::{consumer::ConsumerBridge, producer::spawn_producer_bridge}; use tokio::sync::mpsc::{channel, Receiver}; use tokio::task::JoinHandle; -use tracing::error; +use tracing::{error, info}; pub trait DbBasedActor: Actor { fn name() -> &'static str; @@ -35,7 +31,7 @@ pub trait DbBasedActor: Actor { where Self: Sized; - async fn start_actor_creator(edge_runtime: Arc) -> AnyhowResult<()> + async fn start_actor_creator(edge_runtime: Arc) -> AnyhowResult> where Self: Sized, T: SonicDbTable + 'static, @@ -53,7 +49,7 @@ pub trait DbBasedActor: Actor { let sst = SubscriberStateTable::new_async(config_db, T::table_name(), None, None).await?; let addr = crate::common_bridge_sp::(&edge_runtime); let base_addr = edge_runtime.get_base_sp(); - spawn_consumer_bridge( + Ok(vec![ConsumerBridge::spawn( edge_runtime.clone(), addr, sst, @@ -64,8 +60,7 @@ pub trait DbBasedActor: Actor { (addr, T::table_name().to_owned()) }, |_| true, - ); - Ok(()) + )]) } } @@ -274,6 +269,11 @@ where let zpst = ZmqProducerStateTable::new(dpu_appl_db, T::table_name(), zmqc, false).unwrap(); let sp = crate::common_bridge_sp::(&edge_runtime); + info!( + "spawned ZMQ producer bridge for {} at {}", + T::table_name(), + sp.to_longest_path() + ); Ok(spawn_producer_bridge(edge_runtime.clone(), sp, zpst)) } else { if std::env::var("SIM").is_err() { @@ -283,6 +283,11 @@ where let pst = ProducerStateTable::new(dpu_appl_db, T::table_name()).unwrap(); let sp = crate::common_bridge_sp::(&edge_runtime); + info!( + "spawned producer bridge for {} at {}", + T::table_name(), + sp.to_longest_path() + ); Ok(spawn_producer_bridge(edge_runtime.clone(), sp, pst)) } } diff --git a/crates/hamgrd/src/actors/dpu.rs b/crates/hamgrd/src/actors/dpu.rs index ff31d7de..f1d42701 100644 --- a/crates/hamgrd/src/actors/dpu.rs +++ b/crates/hamgrd/src/actors/dpu.rs @@ -1,19 +1,17 @@ -use crate::actors::{spawn_consumer_bridge_for_actor, spawn_zmq_producer_bridge, ActorCreator}; +use crate::actors::{spawn_consumer_bridge_for_actor, ActorCreator}; use crate::db_structs::{ BfdSessionTable, DashBfdProbeState, DashHaGlobalConfig, Dpu, DpuPmonStateType, DpuState, RemoteDpu, }; use crate::ha_actor_messages::{ActorRegistration, DpuActorState, RegistrationType}; use crate::ServicePath; use anyhow::{anyhow, Result}; -use serde::{Deserialize, Serialize}; -use serde_with::{formats::CommaSeparator, serde_as, StringWithSeparator}; use std::collections::HashSet; use std::sync::Arc; use swbus_actor::{state::incoming::Incoming, state::outgoing::Outgoing, Actor, ActorMessage, Context, State}; use swbus_edge::SwbusEdgeRuntime; use swss_common::{KeyOpFieldValues, KeyOperation, SonicDbTable, SubscriberStateTable}; -use swss_common_bridge::consumer::{spawn_consumer_bridge, ConsumerBridge}; -use tracing::{debug, error, info}; +use swss_common_bridge::consumer::ConsumerBridge; +use tracing::{debug, error, info, instrument}; use super::spawn_consumer_bridge_for_actor_with_selector; @@ -78,7 +76,8 @@ impl DpuActor { } // DpuActor is spawned in response to swss-common-bridge message for DPU and REMOTE_DPU table - pub async fn start_actor_creator(edge_runtime: Arc) -> Result<()> { + pub async fn start_actor_creator(edge_runtime: Arc) -> Result> { + let mut bridges = Vec::new(); let dpu_ac = ActorCreator::new( edge_runtime.new_sp(Self::name(), ""), edge_runtime.clone(), @@ -93,7 +92,7 @@ impl DpuActor { let sst = SubscriberStateTable::new_async(config_db, Self::dpu_table_name(), None, None).await?; let addr = crate::common_bridge_sp::(&edge_runtime); let base_addr = edge_runtime.get_base_sp(); - spawn_consumer_bridge( + bridges.push(ConsumerBridge::spawn( edge_runtime.clone(), addr, sst, @@ -104,13 +103,13 @@ impl DpuActor { (addr, Self::dpu_table_name().to_owned()) }, |_| true, - ); + )); let config_db = crate::db_named(RemoteDpu::db_name()).await?; let sst = SubscriberStateTable::new_async(config_db, Self::remote_dpu_table_name(), None, None).await?; let addr = crate::common_bridge_sp::(&edge_runtime); let base_addr = edge_runtime.get_base_sp(); - spawn_consumer_bridge( + bridges.push(ConsumerBridge::spawn( edge_runtime.clone(), addr, sst, @@ -121,8 +120,8 @@ impl DpuActor { (addr, Self::remote_dpu_table_name().to_owned()) }, |_| true, - ); - Ok(()) + )); + Ok(bridges) } async fn handle_dpu_message(&mut self, state: &mut State, key: &str, context: &mut Context) -> Result<()> { @@ -140,7 +139,6 @@ impl DpuActor { let npu_ipv6: Option = crate::get_npu_ipv6(context.get_edge_runtime()).map(|ip| ip.to_string()); let dpu_id = dpu.dpu_id; let is_managed = dpu.dpu_id == crate::get_slot_id(context.get_edge_runtime()); - let zmq_endpoint = format!("{}:{}", dpu.midplane_ipv4, dpu.orchagent_zmq_port); let first_time = self.dpu.is_none(); self.dpu = Some(DpuData::LocalDpu { @@ -263,7 +261,6 @@ impl DpuActor { outgoing: &mut Outgoing, target_actor: Option, ) -> Result<()> { - let mut up = false; let Some(ref dpu_data) = self.dpu else { return Ok(()); }; @@ -421,8 +418,7 @@ impl DpuActor { } } - fn handle_dash_ha_global_config(&mut self, state: &mut State, key: &str) -> Result<()> { - let incoming = state.incoming(); + fn handle_dash_ha_global_config(&mut self, state: &mut State) -> Result<()> { self.update_bfd_sessions(state)?; Ok(()) } @@ -436,6 +432,7 @@ impl DpuActor { } impl Actor for DpuActor { + #[instrument(name="handle_message", level="info", skip_all, fields(actor=format!("dpu/{}", self.id), key=key))] async fn handle_message(&mut self, state: &mut State, key: &str, context: &mut Context) -> Result<()> { let (_internal, incoming, outgoing) = state.get_all(); if key == Self::dpu_table_name() { @@ -456,7 +453,7 @@ impl Actor for DpuActor { if !self.is_local_managed() { return Ok(()); } else if key == DashHaGlobalConfig::table_name() { - return self.handle_dash_ha_global_config(state, key); + return self.handle_dash_ha_global_config(state); } else if key == DpuState::table_name() || key == DashBfdProbeState::table_name() { return self.update_dpu_state(incoming, outgoing, None); } else { diff --git a/crates/hamgrd/src/actors/ha_scope.rs b/crates/hamgrd/src/actors/ha_scope.rs index b93f7d54..267ae212 100644 --- a/crates/hamgrd/src/actors/ha_scope.rs +++ b/crates/hamgrd/src/actors/ha_scope.rs @@ -1 +1,797 @@ -// struct HaScopeActor {} +use crate::actors::{spawn_consumer_bridge_for_actor, DbBasedActor}; +use crate::db_structs::*; +use crate::ha_actor_messages::{ActorRegistration, HaSetActorState, RegistrationType, VDpuActorState}; +use crate::{HaSetActor, VDpuActor}; +use anyhow::Result; +use std::collections::HashMap; +use swbus_actor::{ + state::{incoming::Incoming, internal::Internal, outgoing::Outgoing}, + Actor, ActorMessage, Context, State, +}; +use swss_common::Table; +use swss_common::{KeyOpFieldValues, KeyOperation, SonicDbTable}; +use swss_common_bridge::consumer::ConsumerBridge; +use tracing::{debug, error, info, instrument}; +use uuid::Uuid; + +pub struct HaScopeActor { + id: String, + ha_scope_id: String, + vdpu_id: String, + dash_ha_scope_config: Option, + bridges: Vec, + // we need to keep track the previous dpu_ha_scope_state to detect state change + dpu_ha_scope_state: Option, +} + +impl DbBasedActor for HaScopeActor { + fn new(key: String) -> Result { + if let Some((vdpu_id, ha_scope_id)) = key.split_once(DashHaScopeConfigTable::key_separator()) { + Ok(HaScopeActor { + id: key.to_string(), + vdpu_id: vdpu_id.to_string(), + ha_scope_id: ha_scope_id.to_string(), + dash_ha_scope_config: None, + bridges: Vec::new(), + dpu_ha_scope_state: None, + }) + } else { + Err(anyhow::anyhow!("Invalid key format for HA scope actor: {}", key)) + } + } + + fn table_name() -> &'static str { + DashHaScopeConfigTable::table_name() + } + + fn name() -> &'static str { + "ha-scope" + } +} + +// Implements getter helper functions for HaScopeActor +impl HaScopeActor { + // get vdpu data received via vdpu udpate + fn get_vdpu(&self, incoming: &Incoming) -> Option { + let key = VDpuActorState::msg_key(&self.vdpu_id); + let Ok(msg) = incoming.get(&key) else { + return None; + }; + msg.deserialize_data().ok() + } + + fn get_haset(&self, incoming: &Incoming) -> Option { + let key = HaSetActorState::msg_key(&self.ha_scope_id); + let Ok(msg) = incoming.get(&key) else { + return None; + }; + msg.deserialize_data().ok() + } + + fn get_dpu_ha_scope_state(&self, incoming: &Incoming) -> Option { + let Ok(msg) = incoming.get(DpuDashHaScopeState::table_name()) else { + return None; + }; + let kfv = match msg.deserialize_data::() { + Ok(data) => data, + Err(e) => { + error!("Failed to deserialize DASH_HA_SCOPE_STATE KeyOpFieldValues: {}", e); + return None; + } + }; + + match swss_serde::from_field_values(&kfv.field_values) { + Ok(state) => Some(state), + Err(e) => { + error!("Failed to deserialize DASH_HA_SCOPE_STATE from field values: {}", e); + None + } + } + } + + fn get_npu_ha_scope_state(&self, internal: &Internal) -> Option { + // check dash_ha_scope_config is not none to make sure DASH_HA_SCOPE_CONFIG_TABLE has received + self.dash_ha_scope_config.as_ref()?; + + let fvs = internal.get(NpuDashHaScopeState::table_name()); + + if fvs.is_empty() { + // not created yet + return None; + } + + match swss_serde::from_field_values(fvs) { + Ok(state) => Some(state), + Err(e) => { + error!("Failed to deserialize DASH_HA_SCOPE_STATE from field values: {}", e); + None + } + } + } + + fn get_pending_operations( + &self, + internal: &Internal, + npu_ha_scope_state: Option<&NpuDashHaScopeState>, + ) -> Result> { + let npu_ha_scope_state = match npu_ha_scope_state { + Some(state) => state, + None => { + let fvs = internal.get(NpuDashHaScopeState::table_name()); + if fvs.is_empty() { + // not created yet + return Ok(HashMap::new()); + } + &swss_serde::from_field_values(fvs)? + } + }; + + let pending_operation_ids = &npu_ha_scope_state.pending_operation_ids; + let pending_operation_types = &npu_ha_scope_state.pending_operation_types; + if let (Some(pending_operation_ids), Some(pending_operation_types)) = + (pending_operation_ids, pending_operation_types) + { + if pending_operation_ids.len() != pending_operation_types.len() { + return Err(anyhow::anyhow!( + "pending_operation_ids and pending_operation_types have different lengths" + )); + } + let operations = pending_operation_ids + .iter() + .zip(pending_operation_types.iter()) + .map(|(id, op)| (id.clone(), op.clone())) + .collect(); + Ok(operations) + } else { + Ok(HashMap::new()) + } + } + + fn vdpu_is_managed(&self, incoming: &Incoming) -> bool { + let Some(vdpu) = self.get_vdpu(incoming) else { + return false; + }; + vdpu.dpu.is_managed + } +} + +// Implements internal action functions for HaScopeActor +impl HaScopeActor { + fn register_to_vdpu_actor(&self, outgoing: &mut Outgoing, active: bool) -> Result<()> { + if self.dash_ha_scope_config.is_none() { + return Ok(()); + }; + + let msg = ActorRegistration::new_actor_msg(active, RegistrationType::VDPUState, &self.id)?; + outgoing.send(outgoing.from_my_sp(VDpuActor::name(), &self.vdpu_id), msg); + Ok(()) + } + + fn register_to_haset_actor(&self, outgoing: &mut Outgoing, active: bool) -> Result<()> { + if self.dash_ha_scope_config.is_none() { + return Ok(()); + }; + + let msg = ActorRegistration::new_actor_msg(active, RegistrationType::HaSetState, &self.id)?; + outgoing.send(outgoing.from_my_sp(HaSetActor::name(), &self.ha_scope_id), msg); + Ok(()) + } + + fn update_dpu_ha_scope_table(&self, state: &mut State) -> Result<()> { + let Some(dash_ha_scope_config) = self.dash_ha_scope_config.as_ref() else { + return Ok(()); + }; + let (internal, _incoming, outgoing) = state.get_all(); + + let mut activate_role_requested = false; + let mut flow_reconcile_requested = false; + if let Some(approved_ops) = dash_ha_scope_config.approved_pending_operation_ids.as_ref() { + if !approved_ops.is_empty() { + let pending_operations = self.get_pending_operations(internal, None)?; + for op_id in approved_ops { + let Some(op) = pending_operations.get(op_id) else { + // has been removed from pending list + continue; + }; + match op.as_str() { + "switchover" => { + // todo: this is for switch driven ha + } + "activate_role" => { + activate_role_requested = true; + } + "flow_reconcile" => { + flow_reconcile_requested = true; + } + "brainsplit_recover" => { + // todo: what's the action here? + } + _ => { + error!("Unknown operation type {}", op); + } + } + } + } + } + + let dash_ha_scope = DashHaScopeTable { + version: dash_ha_scope_config.version, + disable: dash_ha_scope_config.disable, + ha_role: dash_ha_scope_config.desired_ha_state.clone(), /*todo, how switching_to_active is derived. Is it relevant to dpu driven mode */ + flow_reconcile_requested, + activate_role_requested, + }; + + let fv = swss_serde::to_field_values(&dash_ha_scope)?; + let kfv = KeyOpFieldValues { + key: self.ha_scope_id.clone(), + operation: KeyOperation::Set, + field_values: fv, + }; + + let msg = ActorMessage::new(self.ha_scope_id.clone(), &kfv)?; + outgoing.send(outgoing.common_bridge_sp::(), msg); + + Ok(()) + } + + fn update_npu_ha_scope_state_base(&self, state: &mut State) -> Result<()> { + if self.dash_ha_scope_config.is_none() { + return Ok(()); + }; + + let (internal, incoming, _outgoing) = state.get_all(); + + let Some(vdpu) = self.get_vdpu(incoming) else { + debug!( + "vDPU {} has not been received. Skip DASH_HA_SCOPE_STATE update", + &self.vdpu_id + ); + return Ok(()); + }; + + if !vdpu.dpu.is_managed { + debug!("vDPU {} is unmanaged. Skip DASH_HA_SCOPE_STATE update", &self.vdpu_id); + return Ok(()); + } + + let pmon_state = vdpu.dpu.dpu_pmon_state.unwrap_or_default(); + let bfd_state = vdpu.dpu.dpu_bfd_state.unwrap_or_default(); + let Some(haset) = self.get_haset(incoming) else { + debug!( + "HA-SET {} has not been received. Skip DASH_HA_SCOPE_STATE update", + &self.ha_scope_id + ); + return Ok(()); + }; + + let mut npu_ha_scope_state = self.get_npu_ha_scope_state(internal).unwrap_or_default(); + + npu_ha_scope_state.creation_time_in_ms = 0; /*todo */ + npu_ha_scope_state.last_heartbeat_time_in_ms = 0; /* todo: wait until heartbeat is implemented */ + npu_ha_scope_state.vip_v4 = haset.ha_set.vip_v4.clone(); + npu_ha_scope_state.vip_v6 = haset.ha_set.vip_v6.clone(); + npu_ha_scope_state.local_ip = haset.ha_set.local_ip.clone(); + npu_ha_scope_state.peer_ip = haset.ha_set.peer_ip.clone(); + + // The state of local vDPU midplane. The value can be "unknown", "up", "down". + npu_ha_scope_state.local_vdpu_midplane_state = pmon_state.dpu_midplane_link_state; + + // Local vDPU midplane state last updated time in milliseconds. + npu_ha_scope_state.local_vdpu_midplane_state_last_updated_time_in_ms = pmon_state.dpu_midplane_link_time; + // The state of local vDPU control plane, which includes DPU OS and certain required firmware. The value can be "unknown", "up", "down". + npu_ha_scope_state.local_vdpu_control_plane_state = pmon_state.dpu_control_plane_state; + // Local vDPU control plane state last updated time in milliseconds. + npu_ha_scope_state.local_vdpu_control_plane_state_last_updated_time_in_ms = pmon_state.dpu_control_plane_time; + // The state of local vDPU data plane, which includes DPU hardware / ASIC and certain required firmware. The value can be "unknown", "up", "down". + npu_ha_scope_state.local_vdpu_data_plane_state = pmon_state.dpu_data_plane_state; + // Local vDPU data plane state last updated time in milliseconds. + npu_ha_scope_state.local_vdpu_data_plane_state_last_updated_time_in_ms = pmon_state.dpu_data_plane_time; + // The list of IPv4 peer IPs (NPU IP) of the BFD sessions in up state. + npu_ha_scope_state.local_vdpu_up_bfd_sessions_v4 = bfd_state.v4_bfd_up_sessions.clone(); + // Local vDPU BFD sessions v4 last updated time in milliseconds. + npu_ha_scope_state.local_vdpu_up_bfd_sessions_v4_update_time_in_ms = bfd_state.v4_bfd_up_sessions_timestamp; + // The list of IPv6 peer IPs (NPU IP) of the BFD sessions in up state. + npu_ha_scope_state.local_vdpu_up_bfd_sessions_v6 = bfd_state.v6_bfd_up_sessions.clone(); + // Local vDPU BFD sessions v6 last updated time in milliseconds. + npu_ha_scope_state.local_vdpu_up_bfd_sessions_v6_update_time_in_ms = bfd_state.v6_bfd_up_sessions_timestamp; + + let fvs = swss_serde::to_field_values(&npu_ha_scope_state)?; + internal.get_mut(NpuDashHaScopeState::table_name()).clone_from(&fvs); + + Ok(()) + } + + fn update_npu_ha_scope_state_pending_operations( + &self, + state: &mut State, + new_operations: Vec<(String, String)>, + approved_operations: Vec, + ) -> Result<()> { + info!( + "Update pending operation list from DPU. New operations: {:?}, Approved operations: {:?}", + new_operations, approved_operations + ); + let internal = state.internal(); + + let Some(mut npu_ha_scope_state) = self.get_npu_ha_scope_state(internal) else { + error!("Cannot update STATE_DB/DASH_HA_SCOPE_STATE until it is populated with basic information",); + return Ok(()); + }; + let mut pending_operations = self.get_pending_operations(internal, Some(&npu_ha_scope_state))?; + let old_pending_operations = pending_operations.clone(); + + for op_id in approved_operations { + pending_operations.remove(&op_id); + } + for (op_id, op_type) in new_operations { + pending_operations.insert(op_id, op_type); + } + + if old_pending_operations == pending_operations { + // no change + return Ok(()); + } + let mut pending_operation_ids = Vec::new(); + let mut pending_operation_types = Vec::new(); + + for (op_id, op_type) in pending_operations { + pending_operation_ids.push(op_id); + pending_operation_types.push(op_type); + } + npu_ha_scope_state.pending_operation_ids = Some(pending_operation_ids); + npu_ha_scope_state.pending_operation_types = Some(pending_operation_types); + npu_ha_scope_state.pending_operation_list_last_updated_time_in_ms = Some(now_in_millis()); + + let fvs = swss_serde::to_field_values(&npu_ha_scope_state)?; + internal.get_mut(NpuDashHaScopeState::table_name()).clone_from(&fvs); + + Ok(()) + } + + fn update_npu_ha_scope_state_ha_state(&self, state: &mut State) -> Result<()> { + let Some(ref dash_ha_scope_config) = self.dash_ha_scope_config else { + return Ok(()); + }; + let (internal, incoming, _outgoing) = state.get_all(); + + let Some(mut npu_ha_scope_state) = self.get_npu_ha_scope_state(internal) else { + info!("Cannot update STATE_DB/DASH_HA_SCOPE_STATE until it is populated with basic information",); + return Ok(()); + }; + + let Some(dpu_ha_scope_state) = self.get_dpu_ha_scope_state(incoming) else { + debug!( + "DPU HA-SCOPE STATE {} is corrupted or has not been received. Skip DASH_HA_SCOPE_STATE update", + &self.id + ); + return Ok(()); + }; + + // in dpu driven mode, local_ha_state is same as dpu acked ha state + npu_ha_scope_state.local_ha_state = Some(dpu_ha_scope_state.ha_role.clone()); + npu_ha_scope_state.local_ha_state_last_updated_time_in_ms = Some(dpu_ha_scope_state.ha_role_start_time); + // The reason of the last HA state change. + npu_ha_scope_state.local_ha_state_last_updated_reason = Some("dpu initiated".to_string()); + + // The target HA state in ASIC. This is the state that hamgrd generates and asking DPU to move to. + npu_ha_scope_state.local_target_asic_ha_state = Some(dash_ha_scope_config.desired_ha_state.clone()); + // The HA state that ASIC acked. + npu_ha_scope_state.local_acked_asic_ha_state = Some(dpu_ha_scope_state.ha_role.clone()); + + // The current target term of the HA state machine. in dpu-driven mode, use the term acked by asic + npu_ha_scope_state.local_target_term = Some(dpu_ha_scope_state.ha_term.clone()); + npu_ha_scope_state.local_acked_term = Some(dpu_ha_scope_state.ha_term); + + let fvs = swss_serde::to_field_values(&npu_ha_scope_state)?; + internal.get_mut(NpuDashHaScopeState::table_name()).clone_from(&fvs); + + Ok(()) + } +} + +// Implements messages handlers for HaScopeActor +impl HaScopeActor { + /// Handles updates to the DASH_HA_SCOPE_CONFIG_TABLE. + /// Updates the actor's internal config and performs any necessary initialization or subscriptions. + /// Update DPU DASH_HA_SCOPE_TABLE + /// Update NPU DASH_HA_SCOPE_STATE if approved_pending_operation_ids is not empty + fn handle_dash_ha_scope_config_table_message( + &mut self, + state: &mut State, + key: &str, + context: &mut Context, + ) -> Result<()> { + let (_internal, incoming, outgoing) = state.get_all(); + + // Retrieve the config update from the incoming message + let kfv: KeyOpFieldValues = incoming.get(key)?.deserialize_data()?; + + if kfv.operation == KeyOperation::Del { + // unregister from the vDPU Actor and ha-set actor + self.register_to_vdpu_actor(outgoing, false)?; + self.register_to_haset_actor(outgoing, false)?; + context.stop(); + return Ok(()); + } + let first_time = self.dash_ha_scope_config.is_none(); + let dash_ha_scope_config: DashHaScopeConfigTable = swss_serde::from_field_values(&kfv.field_values)?; + + // Update internal config + self.dash_ha_scope_config = Some(dash_ha_scope_config); + + if first_time { + // Subscribe to the vDPU Actor for state updates. + self.register_to_vdpu_actor(outgoing, true)?; + // Subscribe to the ha-set Actor for state updates. + self.register_to_haset_actor(outgoing, true)?; + } + + // this is not a ha_scope for the target vDPU. Skip + if !self.vdpu_is_managed(incoming) { + return Ok(()); + } + + // update the DASH_HA_SCOPE_TABLE in DPU + self.update_dpu_ha_scope_table(state)?; + + // update the NPU DASH_HA_SCOPE_STATE because some fields are derived from dash_ha_scope_config + self.update_npu_ha_scope_state_ha_state(state)?; + + // need to update operation list if approved_pending_operation_ids is not empty + let approved_pending_operation_ids = match self + .dash_ha_scope_config + .as_ref() + .unwrap() + .approved_pending_operation_ids + { + Some(ref ids) => ids.clone(), + None => Vec::new(), + }; + if !approved_pending_operation_ids.is_empty() { + self.update_npu_ha_scope_state_pending_operations(state, Vec::new(), approved_pending_operation_ids)?; + } + + Ok(()) + } + + /// Handles VDPU state update messages for this HA scope. + /// If the vdpu is unmanaged, the actor is put in dormant state. Otherwise, the actor subscribes to the + /// DASH_HA_SCOPE_STATE table and updates the NPU HA scope state. + async fn handle_vdpu_state_update(&mut self, state: &mut State, context: &mut Context) -> Result<()> { + let (internal, incoming, _outgoing) = state.get_all(); + let Some(vdpu) = self.get_vdpu(incoming) else { + error!("Failed to retrieve vDPU {} from incoming state", &self.vdpu_id); + return Ok(()); + }; + + if !vdpu.dpu.is_managed { + debug!("vDPU {} is unmanaged. Put actor in dormant state", &self.vdpu_id); + return Ok(()); + } + + // create an internal entry for npu STATE_DB/DASH_HA_SCOPE_STATE, which will be the + // notification channel to SDN controller + let swss_key = format!( + "{}{}{}", + self.vdpu_id, + NpuDashHaScopeState::key_separator(), + self.ha_scope_id + ); + if !internal.has_entry(NpuDashHaScopeState::table_name(), &swss_key) { + let db = crate::db_named(NpuDashHaScopeState::db_name()).await?; + let table = Table::new_async(db, NpuDashHaScopeState::table_name()).await?; + internal.add(NpuDashHaScopeState::table_name(), table, swss_key).await; + } + + if self.bridges.is_empty() { + // subscribe to dpu DASH_HA_SCOPE_STATE + self.bridges.push( + spawn_consumer_bridge_for_actor::( + context.get_edge_runtime().clone(), + Self::name(), + Some(&self.id), + true, + ) + .await?, + ); + } + // ha_scope_table in dpu has no info derived from vDPU but it won't be programed until we receive vDPU which confirms the vDPU is managed + self.update_dpu_ha_scope_table(state)?; + self.update_npu_ha_scope_state_base(state)?; + Ok(()) + } + + /// Handles HaSet state update messages for this HA scope. + /// Update NPU DASH_HA_SCOPE_STATE + fn handle_haset_state_update(&mut self, state: &mut State) -> Result<()> { + self.update_npu_ha_scope_state_base(state)?; + Ok(()) + } + + /// Handles DPU DASH_HA_SCOPE_STATE update messages for this HA scope. + /// Update NPU DASH_HA_SCOPE_STATE ha_state related fields + /// Update NPU DASH_HA_SCOPE_STATE pending operation list if there are new operations requested by DPU + fn handle_dpu_ha_scope_state_update(&mut self, state: &mut State) -> Result<()> { + let (_internal, incoming, _) = state.get_all(); + // calculate operation requested by dpu + let Some(new_dpu_ha_scope_state) = self.get_dpu_ha_scope_state(incoming) else { + // no valid state received from dpu, skip + return Ok(()); + }; + let mut operations: Vec<(String, String)> = Vec::new(); + + // if hamgrd is restarted, we will lose the cached old state. In this case, we will treat + // all pending operations as new and request the sdn controller via npu dash_ha_scope_state + // to take action. If these have been notified to sdn controller prior to hamgrd restart, + // they will be no change to dash_ha_scope_state and no action will be taken by sdn controller. + let old_dpu_ha_scope_state = self.dpu_ha_scope_state.as_ref().cloned().unwrap_or_default(); + if new_dpu_ha_scope_state.activate_role_pending && !old_dpu_ha_scope_state.activate_role_pending { + operations.push((Uuid::new_v4().to_string(), "activate_role".to_string())); + } + + if new_dpu_ha_scope_state.brainsplit_recover_pending && !old_dpu_ha_scope_state.brainsplit_recover_pending { + operations.push((Uuid::new_v4().to_string(), "brainsplit_recover".to_string())); + } + + if new_dpu_ha_scope_state.flow_reconcile_pending && !old_dpu_ha_scope_state.flow_reconcile_pending { + operations.push((Uuid::new_v4().to_string(), "flow_reconcile".to_string())); + } + + self.dpu_ha_scope_state = Some(new_dpu_ha_scope_state); + + self.update_npu_ha_scope_state_ha_state(state)?; + + if !operations.is_empty() { + self.update_npu_ha_scope_state_pending_operations(state, operations, Vec::new())?; + } + + Ok(()) + } +} + +impl Actor for HaScopeActor { + #[instrument(name="handle_message", level="info", skip_all, fields(actor=format!("ha-scope/{}", self.id), key=key))] + async fn handle_message(&mut self, state: &mut State, key: &str, context: &mut Context) -> Result<()> { + if key == Self::table_name() { + if let Err(e) = self.handle_dash_ha_scope_config_table_message(state, key, context) { + error!("handle_dash_ha_scope_config_table_message failed: {e}"); + } + return Ok(()); + } + + if self.dash_ha_scope_config.is_none() { + return Ok(()); + } + + if VDpuActorState::is_my_msg(key) { + return self.handle_vdpu_state_update(state, context).await; + } + if HaSetActorState::is_my_msg(key) { + return self.handle_haset_state_update(state); + } + if key.starts_with(DpuDashHaScopeState::table_name()) { + // dpu ha scope state update + return self.handle_dpu_ha_scope_state_update(state); + } + + Ok(()) + } +} + +#[cfg(test)] +mod test { + use crate::{ + actors::{ + ha_scope::HaScopeActor, + ha_set::HaSetActor, + test::{self, *}, + vdpu::VDpuActor, + DbBasedActor, + }, + db_structs::{ + now_in_millis, DashHaScopeConfigTable, DashHaScopeTable, DpuDashHaScopeState, NpuDashHaScopeState, + }, + ha_actor_messages::*, + }; + use std::time::Duration; + use swss_common::{SonicDbTable, Table}; + use swss_common_testing::*; + use swss_serde::to_field_values; + + #[tokio::test] + async fn ha_scope_planned_up_then_down() { + // To enable trace, set ENABLE_TRACE=1 to run test + sonic_common::log::init_logger_for_test(); + let _redis = Redis::start_config_db(); + let runtime = test::create_actor_runtime(0, "10.0.0.0", "10::").await; + + // prepare test data + let (ha_set_id, ha_set_obj) = make_dpu_scope_ha_set_obj(0, 0); + let dpu_mon = make_dpu_pmon_state(true); + let bfd_state = make_dpu_bfd_state(Vec::new(), Vec::new()); + let dpu0 = make_local_dpu_actor_state(0, 0, true, Some(dpu_mon.clone()), Some(bfd_state)); + let (vdpu0_id, vdpu0_state_obj) = make_vdpu_actor_state(true, &dpu0); + + // Initial state of NPU DASH_HA_SCOPE_STATE + let npu_ha_scope_state1 = make_npu_ha_scope_state(&vdpu0_state_obj, &ha_set_obj); + let npu_ha_scope_state_fvs1 = to_field_values(&npu_ha_scope_state1).unwrap(); + + // NPU DASH_HA_SCOPE_STATE after DPU DASH_HA_SCOPE_STATE update + let dpu_ha_state_state2 = make_dpu_ha_scope_state("dead"); + let mut npu_ha_scope_state2 = npu_ha_scope_state1.clone(); + update_npu_ha_scope_state_by_dpu_scope_state(&mut npu_ha_scope_state2, &dpu_ha_state_state2, "active"); + let npu_ha_scope_state_fvs2 = to_field_values(&npu_ha_scope_state2).unwrap(); + + // NPU DASH_HA_SCOPE_STATE after DPU DASH_HA_SCOPE_STATE role activation requestion + let mut dpu_ha_state_state3 = dpu_ha_state_state2.clone(); + dpu_ha_state_state3.activate_role_pending = true; + dpu_ha_state_state3.last_updated_time = now_in_millis(); + let mut npu_ha_scope_state3 = npu_ha_scope_state2.clone(); + update_npu_ha_scope_state_by_dpu_scope_state(&mut npu_ha_scope_state3, &dpu_ha_state_state3, "active"); + update_npu_ha_scope_state_pending_ops( + &mut npu_ha_scope_state3, + vec![("1".to_string(), "activate_role".to_string())], + ); + let npu_ha_scope_state_fvs3 = to_field_values(&npu_ha_scope_state3).unwrap(); + + let scope_id = format!("{vdpu0_id}:{ha_set_id}"); + let scope_id_in_state = format!("{vdpu0_id}|{ha_set_id}"); + let ha_scope_actor = HaScopeActor::new(scope_id.clone()).unwrap(); + + let handle = runtime.spawn(ha_scope_actor, HaScopeActor::name(), &scope_id); + + #[rustfmt::skip] + let commands = [ + // Send DASH_HA_SCOPE_CONFIG_TABLE to actor with admin state disabled + send! { key: DashHaScopeConfigTable::table_name(), data: { "key": &scope_id, "operation": "Set", + "field_values": {"version": "1", "disable": "true", "desired_ha_state": "active", "approved_pending_operation_ids": "" }, + }, + addr: crate::common_bridge_sp::(&runtime.get_swbus_edge()) }, + + // Recv registration to vDPU and ha-set + recv! { key: ActorRegistration::msg_key(RegistrationType::VDPUState, &scope_id), data: { "active": true }, addr: runtime.sp(VDpuActor::name(), &vdpu0_id) }, + recv! { key: ActorRegistration::msg_key(RegistrationType::HaSetState, &scope_id), data: { "active": true }, addr: runtime.sp(HaSetActor::name(), &ha_set_id) }, + + // Send ha-set state to actor + send! { key: HaSetActorState::msg_key(&ha_set_id), data: { "up": true, "ha_set": &ha_set_obj }, addr: runtime.sp(HaSetActor::name(), &ha_set_id) }, + + // Send vDPU state to actor + send! { key: VDpuActorState::msg_key(&vdpu0_id), data: vdpu0_state_obj, addr: runtime.sp("vdpu", &vdpu0_id) }, + + // Recv update to DPU DASH_HA_SCOPE_TABLE with ha_role = active + recv! { key: &ha_set_id, data: { "key": &ha_set_id, "operation": "Set", + "field_values": {"version": "1", "ha_role": "active", "disable": "true", "activate_role_requested": "false", "flow_reconcile_requested": "false" }, + }, + addr: crate::common_bridge_sp::(&runtime.get_swbus_edge()) }, + + // Write to NPU DASH_HA_SCOPE_STATE through internal state + chkdb! { db: NpuDashHaScopeState::db_name(), table: NpuDashHaScopeState::table_name(), key: &scope_id_in_state, data: npu_ha_scope_state_fvs1 }, + + // Send DPU DASH_HA_SCOPE_STATE to actor to simulate response from DPU + send! { key: DpuDashHaScopeState::table_name(), data: {"key": DpuDashHaScopeState::table_name(), "operation": "Set", + "field_values": serde_json::to_value(to_field_values(&dpu_ha_state_state2).unwrap()).unwrap() + }}, + + // Write to NPU DASH_HA_SCOPE_STATE through internal state + chkdb! { db: NpuDashHaScopeState::db_name(), table: NpuDashHaScopeState::table_name(), key: &scope_id_in_state, data: npu_ha_scope_state_fvs2 }, + + // Send DASH_HA_SCOPE_CONFIG_TABLE to actor with admin state enabled + send! { key: DashHaScopeConfigTable::table_name(), data: { "key": &scope_id, "operation": "Set", + "field_values": {"version": "2", "disable": "false", "desired_ha_state": "active", "approved_pending_operation_ids": "" }, + }, + addr: crate::common_bridge_sp::(&runtime.get_swbus_edge()) }, + + // Recv update to DPU DASH_HA_SCOPE_TABLE with disabled = false + recv! { key: &ha_set_id, data: { "key": &ha_set_id, "operation": "Set", + "field_values": {"version": "2", "ha_role": "active", "disable": "false", "activate_role_requested": "false", "flow_reconcile_requested": "false" }, + }, + addr: crate::common_bridge_sp::(&runtime.get_swbus_edge()) }, + + // Write to NPU DASH_HA_SCOPE_STATE through internal state + chkdb! { db: NpuDashHaScopeState::db_name(), table: NpuDashHaScopeState::table_name(), key: &scope_id_in_state, data: npu_ha_scope_state_fvs2 }, + + // Send DPU DASH_HA_SCOPE_STATE with role activation request to the actor + send! { key: DpuDashHaScopeState::table_name(), data: {"key": DpuDashHaScopeState::table_name(), "operation": "Set", + "field_values": serde_json::to_value(to_field_values(&dpu_ha_state_state3).unwrap()).unwrap() + }}, + + // Write to NPU DASH_HA_SCOPE_STATE through internal state with pending activation + chkdb! { db: NpuDashHaScopeState::db_name(), table: NpuDashHaScopeState::table_name(), key: &scope_id_in_state, data: npu_ha_scope_state_fvs3, + exclude: "pending_operation_ids,pending_operation_list_last_updated_time_in_ms" }, + ]; + + test::run_commands(&runtime, runtime.sp(HaScopeActor::name(), &scope_id), &commands).await; + + // get GUID from DASH_HA_SCOPE_STATE pending_operation_ids + let db = crate::db_named(NpuDashHaScopeState::db_name()).await.unwrap(); + let table = Table::new(db, NpuDashHaScopeState::table_name()).unwrap(); + let npu_ha_scope_state: NpuDashHaScopeState = swss_serde::from_table(&table, &scope_id_in_state).unwrap(); + let op_id = npu_ha_scope_state.pending_operation_ids.unwrap().pop().unwrap(); + + // continue the test to activate the role + let mut npu_ha_scope_state4 = npu_ha_scope_state3.clone(); + update_npu_ha_scope_state_pending_ops(&mut npu_ha_scope_state4, vec![]); + let npu_ha_scope_state_fvs4 = to_field_values(&npu_ha_scope_state4).unwrap(); + + let mut dpu_ha_state_state5 = make_dpu_ha_scope_state("active"); + dpu_ha_state_state5.ha_term = "2".to_string(); + let mut npu_ha_scope_state5: NpuDashHaScopeState = npu_ha_scope_state4.clone(); + update_npu_ha_scope_state_by_dpu_scope_state(&mut npu_ha_scope_state5, &dpu_ha_state_state5, "active"); + let npu_ha_scope_state_fvs5 = to_field_values(&npu_ha_scope_state5).unwrap(); + + let bfd_state = make_dpu_bfd_state(vec!["10.0.0.0", "10.0.1.0"], Vec::new()); + let dpu0 = make_local_dpu_actor_state(0, 0, true, Some(dpu_mon), Some(bfd_state)); + let (vdpu0_id, vdpu0_state_obj) = make_vdpu_actor_state(true, &dpu0); + let mut npu_ha_scope_state6 = npu_ha_scope_state5.clone(); + update_npu_ha_scope_state_by_vdpu(&mut npu_ha_scope_state6, &vdpu0_state_obj); + let npu_ha_scope_state_fvs6 = to_field_values(&npu_ha_scope_state6).unwrap(); + + #[rustfmt::skip] + let commands = [ + // Send DASH_HA_SCOPE_CONFIG_TABLE with activation approved + send! { key: HaScopeActor::table_name(), data: { "key": &scope_id, "operation": "Set", + "field_values": {"version": "3", "disable": "false", "desired_ha_state": "active", "approved_pending_operation_ids": &op_id }, + }, + addr: crate::common_bridge_sp::(&runtime.get_swbus_edge()) }, + + // Recv update to DPU DASH_HA_SCOPE_TABLE with activate_role_requested=true + recv! { key: &ha_set_id, data: { "key": &ha_set_id, "operation": "Set", + "field_values": {"version": "3", "ha_role": "active", "disable": "false", "activate_role_requested": "true", "flow_reconcile_requested": "false" }, + }, + addr: crate::common_bridge_sp::(&runtime.get_swbus_edge()) }, + + // Write to NPU DASH_HA_SCOPE_STATE through internal state with no pending activation + chkdb! { db: NpuDashHaScopeState::db_name(), table: NpuDashHaScopeState::table_name(), key: &scope_id_in_state, data: npu_ha_scope_state_fvs4, + exclude: "pending_operation_list_last_updated_time_in_ms" }, + + // Send DPU DASH_HA_SCOPE_STATE with ha_role = active and activate_role_requested = false + send! { key: DpuDashHaScopeState::table_name(), data: {"key": DpuDashHaScopeState::table_name(), "operation": "Set", + "field_values": serde_json::to_value(to_field_values(&dpu_ha_state_state5).unwrap()).unwrap() + }}, + + // Write to NPU DASH_HA_SCOPE_STATE through internal state with ha_role = active + chkdb! { db: NpuDashHaScopeState::db_name(), table: NpuDashHaScopeState::table_name(), key: &scope_id_in_state, data: npu_ha_scope_state_fvs5, + exclude: "pending_operation_list_last_updated_time_in_ms" }, + + // Send vdpu state update after bfd session up + send! { key: VDpuActorState::msg_key(&vdpu0_id), data: vdpu0_state_obj, addr: runtime.sp("vdpu", &vdpu0_id) }, + + // Write to NPU DASH_HA_SCOPE_STATE through internal state with bfd session up + chkdb! { db: NpuDashHaScopeState::db_name(), table: NpuDashHaScopeState::table_name(), key: &scope_id_in_state, data: npu_ha_scope_state_fvs6, + exclude: "pending_operation_list_last_updated_time_in_ms" }, + ]; + + test::run_commands(&runtime, runtime.sp(HaScopeActor::name(), &scope_id), &commands).await; + + // execute planned shutdown + let mut npu_ha_scope_state7 = npu_ha_scope_state6.clone(); + npu_ha_scope_state7.local_target_asic_ha_state = Some("dead".to_string()); + let npu_ha_scope_state_fvs7 = to_field_values(&npu_ha_scope_state7).unwrap(); + #[rustfmt::skip] + let commands = [ + // Send DASH_HA_SCOPE_CONFIG_TABLE with desired_ha_state = dead + send! { key: DashHaScopeConfigTable::table_name(), data: { "key": &scope_id, "operation": "Set", + "field_values": {"version": "2", "disable": "false", "desired_ha_state": "dead", "approved_pending_operation_ids": "" }, + }, + addr: crate::common_bridge_sp::(&runtime.get_swbus_edge()) }, + + // Check NPU DASH_HA_SCOPE_STATE is updated with desired_ha_state = dead + chkdb! { db: NpuDashHaScopeState::db_name(), table: NpuDashHaScopeState::table_name(), key: &scope_id_in_state, data: npu_ha_scope_state_fvs7, + exclude: "pending_operation_list_last_updated_time_in_ms" }, + + // simulate delete of ha-scope entry + send! { key: DashHaScopeConfigTable::table_name(), data: { "key": &scope_id, "operation": "Del", + "field_values": {"version": "2", "disable": "false", "desired_ha_state": "dead", "approved_pending_operation_ids": "" }}, + addr: crate::common_bridge_sp::(&runtime.get_swbus_edge()) }, + ]; + + test::run_commands(&runtime, runtime.sp(HaScopeActor::name(), &scope_id), &commands).await; + if tokio::time::timeout(Duration::from_secs(3), handle).await.is_err() { + panic!("timeout waiting for actor to terminate"); + } + } + + #[tokio::test] + async fn ha_scope_planned_down() {} +} diff --git a/crates/hamgrd/src/actors/ha_set.rs b/crates/hamgrd/src/actors/ha_set.rs index 6fa943de..6cf2830f 100644 --- a/crates/hamgrd/src/actors/ha_set.rs +++ b/crates/hamgrd/src/actors/ha_set.rs @@ -1,22 +1,16 @@ use crate::actors::vdpu::VDpuActor; -use crate::actors::{ha_set, spawn_consumer_bridge_for_actor, ActorCreator, DbBasedActor}; +use crate::actors::{spawn_consumer_bridge_for_actor, DbBasedActor}; use crate::db_structs::*; use crate::ha_actor_messages::{ActorRegistration, HaSetActorState, RegistrationType, VDpuActorState}; use anyhow::Result; -use serde::{Deserialize, Serialize}; -use std::sync::Arc; -use std::time::SystemTime; -use swbus_actor::state::incoming; -use swbus_actor::state::internal::{self, Internal}; -use swbus_actor::{state::incoming::Incoming, state::outgoing::Outgoing, Actor, ActorMessage, Context, State}; -use swbus_edge::SwbusEdgeRuntime; -use swss_common::Table; -use swss_common::{ - KeyOpFieldValues, KeyOperation, SonicDbTable, SubscriberStateTable, ZmqClient, ZmqProducerStateTable, +use swbus_actor::{ + state::{incoming::Incoming, internal::Internal, outgoing::Outgoing}, + Actor, ActorMessage, Context, State, }; -use swss_common_bridge::{consumer::spawn_consumer_bridge, consumer::ConsumerBridge, producer::spawn_producer_bridge}; -use tokio::time::error::Elapsed; -use tracing::{debug, error, info}; +use swss_common::Table; +use swss_common::{KeyOpFieldValues, KeyOperation, SonicDbTable}; +use swss_common_bridge::consumer::ConsumerBridge; +use tracing::{debug, error, info, instrument}; pub struct HaSetActor { id: String, @@ -198,7 +192,7 @@ impl HaSetActor { /// Get vdpu data received via vdpu update and return them in a list with primary DPUs first. /// All preferred_vdpu_ids are considered primary, followed by backups. - fn get_vdpus(&self, incoming: &Incoming) -> Vec<(Option)> { + fn get_vdpus(&self, incoming: &Incoming) -> Vec> { let Some(ref ha_set_cfg) = self.dash_ha_set_config else { return Vec::new(); }; @@ -265,7 +259,7 @@ impl HaSetActor { self.dash_ha_set_config = Some(swss_serde::from_field_values(&dpu_kfv.field_values)?); let swss_key = format!("default:{}", self.dash_ha_set_config.as_ref().unwrap().vip_v4); - if !internal.has_entry(key, &swss_key) { + if !internal.has_entry(VnetRouteTunnelTable::table_name(), &swss_key) { let db = crate::db_named(VnetRouteTunnelTable::db_name()).await?; let table = Table::new_async(db, VnetRouteTunnelTable::table_name()).await?; internal.add(VnetRouteTunnelTable::table_name(), table, swss_key).await; @@ -294,7 +288,7 @@ impl HaSetActor { Ok(()) } - fn handle_dash_ha_global_config(&mut self, state: &mut State, key: &str, context: &mut Context) -> Result<()> { + fn handle_dash_ha_global_config(&mut self, state: &mut State) -> Result<()> { let (internal, incoming, outgoing) = state.get_all(); let Some(vdpus) = self.get_vdpus_if_ready(incoming) else { return Ok(()); @@ -305,7 +299,7 @@ impl HaSetActor { Ok(()) } - fn handle_vdpu_state_update(&mut self, state: &mut State, key: &str, context: &mut Context) -> Result<()> { + fn handle_vdpu_state_update(&mut self, state: &mut State) -> Result<()> { let (internal, incoming, outgoing) = state.get_all(); // vdpu update affects dash-ha-set in DPU and vxlan tunnel let Some(vdpus) = self.get_vdpus_if_ready(incoming) else { @@ -330,7 +324,6 @@ impl HaSetActor { }; let msg = HaSetActorState::new_actor_msg(true, &self.id, dash_ha_set).unwrap(); - let peer_actors = ActorRegistration::get_registered_actors(incoming, RegistrationType::HaSetState); outgoing.send(entry.source.clone(), msg); } @@ -339,6 +332,7 @@ impl HaSetActor { } impl Actor for HaSetActor { + #[instrument(name="handle_message", level="info", skip_all, fields(actor=format!("ha-set/{}", self.id), key=key))] async fn handle_message(&mut self, state: &mut State, key: &str, context: &mut Context) -> Result<()> { if key == Self::table_name() { if let Err(e) = self.handle_dash_ha_set_config_table_message(state, key, context).await { @@ -352,9 +346,9 @@ impl Actor for HaSetActor { } if VDpuActorState::is_my_msg(key) { - return self.handle_vdpu_state_update(state, key, context); + return self.handle_vdpu_state_update(state); } else if key == DashHaGlobalConfig::table_name() { - return self.handle_dash_ha_global_config(state, key, context); + return self.handle_dash_ha_global_config(state); } else if ActorRegistration::is_my_msg(key, RegistrationType::HaSetState) { return self.handle_haset_state_registration(state, key).await; } @@ -366,16 +360,16 @@ impl Actor for HaSetActor { mod test { use crate::{ actors::{ - ha_set::{self, HaSetActor}, + ha_set::HaSetActor, test::{self, *}, - vdpu::{self, VDpuActor}, + vdpu::VDpuActor, DbBasedActor, }, db_structs::{DashHaGlobalConfig, DashHaSetConfigTable, DashHaSetTable, VnetRouteTunnelTable}, ha_actor_messages::*, }; use std::time::Duration; - use swss_common::{DbConnector, SonicDbTable, Table}; + use swss_common::SonicDbTable; use swss_common_testing::*; #[tokio::test] @@ -383,7 +377,7 @@ mod test { // To enable trace, set ENABLE_TRACE=1 to run test sonic_common::log::init_logger_for_test(); - let redis = Redis::start_config_db(); + let _redis = Redis::start_config_db(); let runtime = test::create_actor_runtime(0, "10.0.0.0", "10::").await; //prepare test data @@ -468,7 +462,7 @@ mod test { // To enable trace, set ENABLE_TRACE=1 to run test sonic_common::log::init_logger_for_test(); - let redis = Redis::start_config_db(); + let _redis = Redis::start_config_db(); let runtime = test::create_actor_runtime(0, "10.0.0.0", "10::").await; //prepare test data diff --git a/crates/hamgrd/src/actors/test.rs b/crates/hamgrd/src/actors/test.rs index 1b9d7f36..84ec6140 100644 --- a/crates/hamgrd/src/actors/test.rs +++ b/crates/hamgrd/src/actors/test.rs @@ -1,9 +1,8 @@ -use crate::actors::dpu; use crate::db_structs::*; use crate::ha_actor_messages::*; use crate::RuntimeData; use anyhow::Result; -use serde_json::{json, Value}; +use serde_json::Value; use std::{collections::HashMap, future::Future, time::Duration}; use std::{net::Ipv4Addr, net::Ipv6Addr, sync::Arc}; use swbus_actor::{ActorMessage, ActorRuntime}; @@ -13,7 +12,6 @@ use swbus_edge::{ SwbusEdgeRuntime, }; use swss_common::{FieldValues, Table}; -use tracing::field::Field; async fn timeout>(fut: Fut) -> Result { const TIMEOUT: Duration = Duration::from_millis(5000); @@ -204,15 +202,19 @@ pub async fn run_commands(runtime: &ActorRuntime, aut: ServicePath, commands: &[ } ChkDb { - db, - table, + db: db_name, + table: table_name, key, data, exclude, } => { - let db = crate::db_named(db).await.unwrap(); - let mut table = Table::new(db, table).unwrap(); - let mut actual_data = table.get_async(key).await.unwrap().unwrap(); + let db = crate::db_named(db_name).await.unwrap(); + let mut table = Table::new(db, table_name).unwrap(); + + let mut actual_data = table.get_async(key).await.unwrap(); + let Some(actual_data) = actual_data.as_mut() else { + panic!("Key {key} not found in {table_name}/{db_name}"); + }; let mut fvs: FieldValues = serde_json::from_value(data.clone()).unwrap(); exclude @@ -223,7 +225,7 @@ pub async fn run_commands(runtime: &ActorRuntime, aut: ServicePath, commands: &[ fvs.remove(id); actual_data.remove(id); }); - assert_eq!(actual_data, fvs); + assert_eq!(actual_data, &fvs); } } } @@ -364,20 +366,6 @@ pub fn to_local_dpu(dpu_actor_state: &DpuActorState) -> Dpu { } } -pub fn to_remote_dpu(dpu_actor_state: &DpuActorState) -> RemoteDpu { - if dpu_actor_state.remote_dpu { - panic!("Cannot convert remote DPU to local DPU"); - } - RemoteDpu { - pa_ipv4: dpu_actor_state.pa_ipv4.clone(), - pa_ipv6: dpu_actor_state.pa_ipv6.clone(), - dpu_id: dpu_actor_state.dpu_id, - swbus_port: dpu_actor_state.swbus_port, - npu_ipv4: dpu_actor_state.npu_ipv4.clone(), - npu_ipv6: dpu_actor_state.npu_ipv6.clone(), - } -} - pub fn make_vdpu_actor_state(up: bool, dpu_state: &DpuActorState) -> (String, VDpuActorState) { let parts: Vec<&str> = dpu_state .dpu_name @@ -444,3 +432,102 @@ pub fn make_dpu_scope_ha_set_obj(switch: u16, dpu: u16) -> (String, DashHaSetTab }; (format!("haset{switch_pair_id}-{dpu}"), ha_set) } + +pub fn make_npu_ha_scope_state(vdpu_state_obj: &VDpuActorState, ha_set_obj: &DashHaSetTable) -> NpuDashHaScopeState { + let mut scope_state = NpuDashHaScopeState::default(); + + let pmon_state = match vdpu_state_obj.dpu.dpu_pmon_state { + Some(ref state) => state.clone(), + None => DpuState::default(), + }; + let bfd_state = match vdpu_state_obj.dpu.dpu_bfd_state { + Some(ref state) => state.clone(), + None => DashBfdProbeState::default(), + }; + + scope_state.vip_v4 = ha_set_obj.vip_v4.clone(); + scope_state.vip_v6 = ha_set_obj.vip_v6.clone(); + scope_state.local_ip = ha_set_obj.local_ip.clone(); + scope_state.peer_ip = ha_set_obj.peer_ip.clone(); + scope_state.local_vdpu_midplane_state = pmon_state.dpu_midplane_link_state.clone(); + scope_state.local_vdpu_midplane_state_last_updated_time_in_ms = pmon_state.dpu_midplane_link_time; + scope_state.local_vdpu_control_plane_state = pmon_state.dpu_control_plane_state.clone(); + scope_state.local_vdpu_control_plane_state_last_updated_time_in_ms = pmon_state.dpu_control_plane_time; + scope_state.local_vdpu_data_plane_state = pmon_state.dpu_data_plane_state.clone(); + scope_state.local_vdpu_data_plane_state_last_updated_time_in_ms = pmon_state.dpu_data_plane_time; + scope_state.local_vdpu_up_bfd_sessions_v4 = bfd_state.v4_bfd_up_sessions.clone(); + scope_state.local_vdpu_up_bfd_sessions_v4_update_time_in_ms = bfd_state.v4_bfd_up_sessions_timestamp; + scope_state.local_vdpu_up_bfd_sessions_v6 = bfd_state.v6_bfd_up_sessions.clone(); + scope_state.local_vdpu_up_bfd_sessions_v6_update_time_in_ms = bfd_state.v6_bfd_up_sessions_timestamp; + + scope_state +} + +pub fn update_npu_ha_scope_state_by_vdpu( + npu_ha_scope_state: &mut NpuDashHaScopeState, + vdpu_state_obj: &VDpuActorState, +) { + let pmon_state = match vdpu_state_obj.dpu.dpu_pmon_state { + Some(ref state) => state.clone(), + None => DpuState::default(), + }; + let bfd_state = match vdpu_state_obj.dpu.dpu_bfd_state { + Some(ref state) => state.clone(), + None => DashBfdProbeState::default(), + }; + + npu_ha_scope_state.local_vdpu_midplane_state = pmon_state.dpu_midplane_link_state.clone(); + npu_ha_scope_state.local_vdpu_midplane_state_last_updated_time_in_ms = pmon_state.dpu_midplane_link_time; + npu_ha_scope_state.local_vdpu_control_plane_state = pmon_state.dpu_control_plane_state.clone(); + npu_ha_scope_state.local_vdpu_control_plane_state_last_updated_time_in_ms = pmon_state.dpu_control_plane_time; + npu_ha_scope_state.local_vdpu_data_plane_state = pmon_state.dpu_data_plane_state.clone(); + npu_ha_scope_state.local_vdpu_data_plane_state_last_updated_time_in_ms = pmon_state.dpu_data_plane_time; + npu_ha_scope_state.local_vdpu_up_bfd_sessions_v4 = bfd_state.v4_bfd_up_sessions.clone(); + npu_ha_scope_state.local_vdpu_up_bfd_sessions_v4_update_time_in_ms = bfd_state.v4_bfd_up_sessions_timestamp; + npu_ha_scope_state.local_vdpu_up_bfd_sessions_v6 = bfd_state.v6_bfd_up_sessions.clone(); + npu_ha_scope_state.local_vdpu_up_bfd_sessions_v6_update_time_in_ms = bfd_state.v6_bfd_up_sessions_timestamp; +} + +pub fn update_npu_ha_scope_state_by_dpu_scope_state( + npu_ha_scope_state: &mut NpuDashHaScopeState, + dpu_ha_scope_state: &DpuDashHaScopeState, + target_ha_state: &str, +) { + npu_ha_scope_state.local_ha_state = Some(dpu_ha_scope_state.ha_role.clone()); + npu_ha_scope_state.local_ha_state_last_updated_time_in_ms = Some(dpu_ha_scope_state.ha_role_start_time); + npu_ha_scope_state.local_ha_state_last_updated_reason = Some("dpu initiated".to_string()); + npu_ha_scope_state.local_target_asic_ha_state = Some(target_ha_state.to_string()); + npu_ha_scope_state.local_acked_asic_ha_state = Some(dpu_ha_scope_state.ha_role.clone()); + npu_ha_scope_state.local_target_term = Some(dpu_ha_scope_state.ha_term.clone()); + npu_ha_scope_state.local_acked_term = Some(dpu_ha_scope_state.ha_term.clone()); +} + +pub fn update_npu_ha_scope_state_pending_ops( + npu_ha_scope_state: &mut NpuDashHaScopeState, + pending_ops: Vec<(String, String)>, +) { + let mut pending_operation_ids = vec![]; + let mut pending_operation_types = vec![]; + for (op_id, op_type) in pending_ops { + pending_operation_ids.push(op_id); + pending_operation_types.push(op_type); + } + npu_ha_scope_state.pending_operation_ids = Some(pending_operation_ids); + npu_ha_scope_state.pending_operation_types = Some(pending_operation_types); + npu_ha_scope_state.pending_operation_list_last_updated_time_in_ms = Some(now_in_millis()); +} + +pub fn make_dpu_ha_scope_state(role: &str) -> DpuDashHaScopeState { + DpuDashHaScopeState { + last_updated_time: now_in_millis(), + // The current HA role confirmed by ASIC. Please refer to the HA states defined in HA HLD. + ha_role: role.to_string(), + // The time when HA role is moved into current one in milliseconds. + ha_role_start_time: now_in_millis(), + // The current term confirmed by ASIC. + ha_term: "1".to_string(), + activate_role_pending: false, + flow_reconcile_pending: false, + brainsplit_recover_pending: false, + } +} diff --git a/crates/hamgrd/src/actors/vdpu.rs b/crates/hamgrd/src/actors/vdpu.rs index 34282ff8..6db44102 100644 --- a/crates/hamgrd/src/actors/vdpu.rs +++ b/crates/hamgrd/src/actors/vdpu.rs @@ -1,17 +1,12 @@ use crate::actors::dpu::DpuActor; -use crate::actors::{ActorCreator, DbBasedActor}; +use crate::actors::DbBasedActor; use crate::db_structs::VDpu; use crate::ha_actor_messages::{ActorRegistration, DpuActorState, RegistrationType, VDpuActorState}; use anyhow::Result; -use serde::Deserialize; -use std::collections::HashMap; -use std::sync::Arc; use swbus_actor::Context; use swbus_actor::{state::incoming::Incoming, state::outgoing::Outgoing, Actor, State}; -use swbus_edge::SwbusEdgeRuntime; -use swss_common::{KeyOpFieldValues, KeyOperation, SonicDbTable, SubscriberStateTable}; -use swss_common_bridge::consumer::spawn_consumer_bridge; -use tracing::error; +use swss_common::{KeyOpFieldValues, KeyOperation, SonicDbTable}; +use tracing::{error, instrument}; pub struct VDpuActor { /// The id of this vdpu @@ -91,13 +86,11 @@ impl VDpuActor { if let Ok(dpu) = msg.deserialize_data::() { let vdpu = VDpuActorState { up: dpu.up, dpu }; - return Some(vdpu); + Some(vdpu) } else { error!("Failed to deserialize DpuActorState from the message"); - return None; + None } - - None } async fn handle_vdpu_state_registration( @@ -121,6 +114,7 @@ impl VDpuActor { } impl Actor for VDpuActor { + #[instrument(name="handle_message", level="info", skip_all, fields(actor=format!("vdpu/{}", self.id), key=key))] async fn handle_message(&mut self, state: &mut State, key: &str, context: &mut Context) -> Result<()> { if key == Self::table_name() { return self.handle_vdpu_message(state, key, context).await; diff --git a/crates/hamgrd/src/db_structs.rs b/crates/hamgrd/src/db_structs.rs index 525e4e1e..eff1fb46 100644 --- a/crates/hamgrd/src/db_structs.rs +++ b/crates/hamgrd/src/db_structs.rs @@ -1,11 +1,8 @@ -// temporarily disable unused warning until vdpu/ha-set actors are implemented -#![allow(unused)] use anyhow::{Context, Result}; use chrono::DateTime; use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; use serde_with::{formats::CommaSeparator, serde_as, skip_serializing_none, StringWithSeparator}; use sonicdb_derive::SonicDb; -use std::fmt; use swss_common::{DbConnector, Table}; use swss_serde::from_table; @@ -278,6 +275,143 @@ pub struct VnetRouteTunnelTable { pub check_directly_connected: Option, } +/// +#[skip_serializing_none] +#[serde_as] +#[derive(Debug, Deserialize, Serialize, PartialEq, SonicDb)] +#[sonicdb(table_name = "DASH_HA_SCOPE_CONFIG_TABLE", key_separator = ":", db_name = "APPL_DB")] +pub struct DashHaScopeConfigTable { + pub version: u32, + pub disable: bool, + pub desired_ha_state: String, + #[serde_as(as = "Option>")] + pub approved_pending_operation_ids: Option>, +} + +/// +#[skip_serializing_none] +#[serde_as] +#[derive(Debug, Deserialize, Serialize, PartialEq, SonicDb)] +#[sonicdb(table_name = "DASH_HA_SCOPE_TABLE", key_separator = ":", db_name = "DPU_APPL_DB")] +pub struct DashHaScopeTable { + pub version: u32, + pub disable: bool, + pub ha_role: String, + pub flow_reconcile_requested: bool, + pub activate_role_requested: bool, +} + +/// +#[derive(Debug, Deserialize, Serialize, PartialEq, Default, Clone, SonicDb)] +#[sonicdb(table_name = "DASH_HA_SCOPE_STATE", key_separator = "|", db_name = "DPU_STATE_DB")] +pub struct DpuDashHaScopeState { + // The last update time of this state in milliseconds. + pub last_updated_time: i64, + // The current HA role confirmed by ASIC. Please refer to the HA states defined in HA HLD. + pub ha_role: String, + // The time when HA role is moved into current one in milliseconds. + pub ha_role_start_time: i64, + // The current term confirmed by ASIC. + pub ha_term: String, + // DPU is pending on role activation. + pub activate_role_pending: bool, + // Flow reconcile is requested and pending approval. + pub flow_reconcile_pending: bool, + // Brainsplit is detected, and DPU is pending on recovery. + pub brainsplit_recover_pending: bool, +} + +/// +#[skip_serializing_none] +#[serde_as] +#[derive(Debug, Deserialize, Serialize, PartialEq, Default, Clone, SonicDb)] +#[sonicdb(table_name = "DASH_HA_SCOPE_STATE", key_separator = "|", db_name = "STATE_DB")] +pub struct NpuDashHaScopeState { + // HA scope creation time in milliseconds. + pub creation_time_in_ms: i64, /*todo: where is this from */ + // Last heartbeat time in milliseconds. This is used for leak detection. + // Heartbeat time happens once per minute and will not change the last state updated time. + pub last_heartbeat_time_in_ms: i64, /*todo: what is heartbeat */ + // Data path VIP of the DPU or ENI + pub vip_v4: String, + // Data path VIP of the DPU or ENI + pub vip_v6: Option, + // The IP address of the DPU. + pub local_ip: String, + // The IP address of the peer DPU. + pub peer_ip: String, + + // The state of the HA state machine. This is the state in NPU hamgrd. + // The state of the HA state machine. This is the state in NPU hamgrd. + pub local_ha_state: Option, + // The time when local target HA state is set. + pub local_ha_state_last_updated_time_in_ms: Option, + // The reason of the last HA state change. + pub local_ha_state_last_updated_reason: Option, + // The target HA state in ASIC. This is the state that hamgrd generates and asking DPU to move to. + pub local_target_asic_ha_state: Option, + // The HA state that ASIC acked. + pub local_acked_asic_ha_state: Option, + // The current target term of the HA state machine. + pub local_target_term: Option, + // The current term that acked by ASIC. + pub local_acked_term: Option, + // The state of the HA state machine in peer DPU. + pub peer_ha_state: Option, /*todo: we don't know peer dpu state */ + // The current term in peer DPU. + pub peer_term: Option, + + // The state of local vDPU midplane. The value can be "unknown", "up", "down". + pub local_vdpu_midplane_state: DpuPmonStateType, + // Local vDPU midplane state last updated time in milliseconds. + pub local_vdpu_midplane_state_last_updated_time_in_ms: i64, + // The state of local vDPU control plane, which includes DPU OS and certain required firmware. The value can be "unknown", "up", "down". + pub local_vdpu_control_plane_state: DpuPmonStateType, + // Local vDPU control plane state last updated time in milliseconds. + pub local_vdpu_control_plane_state_last_updated_time_in_ms: i64, + // The state of local vDPU data plane, which includes DPU hardware / ASIC and certain required firmware. The value can be "unknown", "up", "down". + pub local_vdpu_data_plane_state: DpuPmonStateType, + // Local vDPU data plane state last updated time in milliseconds. + pub local_vdpu_data_plane_state_last_updated_time_in_ms: i64, + // The list of IPv4 peer IPs (NPU IP) of the BFD sessions in up state. + #[serde_as(as = "StringWithSeparator::")] + pub local_vdpu_up_bfd_sessions_v4: Vec, + // Local vDPU BFD sessions v4 last updated time in milliseconds. + pub local_vdpu_up_bfd_sessions_v4_update_time_in_ms: i64, + // The list of IPv6 peer IPs (NPU IP) of the BFD sessions in up state. + #[serde_as(as = "StringWithSeparator::")] + pub local_vdpu_up_bfd_sessions_v6: Vec, + // Local vDPU BFD sessions v6 last updated time in milliseconds. + pub local_vdpu_up_bfd_sessions_v6_update_time_in_ms: i64, + + // GUIDs of pending operation IDs, connected by "," + #[serde_as(as = "Option>")] + pub pending_operation_ids: Option>, + // Type of pending operations, e.g. "switchover", "activate_role", "flow_reconcile", "brainsplit_recover". Connected by "," + #[serde_as(as = "Option>")] + pub pending_operation_types: Option>, + // Last updated time of the pending operation list. + pub pending_operation_list_last_updated_time_in_ms: Option, + // Switchover ID (GUID). + pub switchover_id: Option, + // Switchover state. It can be "pending_approval", "approved", "in_progress", "completed", "failed" + pub switchover_state: Option, + // The time when operation is created. + pub switchover_start_time_in_ms: Option, + // The time when operation is ended. + pub switchover_end_time_in_ms: Option, + // The time when operation is approved. + pub switchover_approved_time_in_ms: Option, + // Flow sync session ID. + pub flow_sync_session_id: Option, + // Flow sync session state. It can be "in_progress", "completed", "failed" + pub flow_sync_session_state: Option, + // Flow sync start time in milliseconds. + pub flow_sync_session_start_time_in_ms: Option, + // The IP endpoint of the server that flow records are sent to. + pub flow_sync_session_target_server: Option, +} + pub fn get_dpu_config_from_db(dpu_id: u32) -> Result { let db = DbConnector::new_named("CONFIG_DB", false, 0).context("connecting config_db")?; let table = Table::new(db, "DPU").context("opening DPU table")?; @@ -336,12 +470,12 @@ mod test { }"#; let kfv: KeyOpFieldValues = serde_json::from_str(json).unwrap(); let vnet: VnetRouteTunnelTable = swss_serde::from_field_values(&kfv.field_values).unwrap(); - println!("{:?}", vnet); + println!("{vnet:?}"); assert!(vnet.endpoint == vec!["1.2.3.4", "2.2.3.4"]); assert!(vnet.endpoint_monitor == Some(vec!["1.2.3.5".into(), "2.2.3.5".into()])); assert!(vnet.monitoring.is_none()); let fvs = swss_serde::to_field_values(&vnet).unwrap(); - println!("{:?}", fvs); + println!("{fvs:?}"); assert!(fvs["endpoint"] == "1.2.3.4,2.2.3.4"); assert!(fvs["endpoint_monitor"] == "1.2.3.5,2.2.3.5"); assert!(!fvs.contains_key("monitoring")); @@ -385,7 +519,7 @@ mod test { ("orchagent_zmq_port".to_string(), "8100".to_string()), ("swbus_port".to_string(), (23606 + d as u16).to_string()), ("midplane_ipv4".to_string(), Ipv4Addr::new(169, 254, 1, d).to_string()), - ("vdpu_id".to_string(), format!("vpdu{}", d)), + ("vdpu_id".to_string(), format!("vpdu{d}")), ]; table.set(&d.to_string(), dpu_fvs).unwrap(); } diff --git a/crates/hamgrd/src/ha_actor_messages.rs b/crates/hamgrd/src/ha_actor_messages.rs index 7c2ce37e..934da164 100644 --- a/crates/hamgrd/src/ha_actor_messages.rs +++ b/crates/hamgrd/src/ha_actor_messages.rs @@ -34,26 +34,6 @@ pub struct DpuActorState { pub dpu_bfd_state: Option, } -#[derive(Serialize, Deserialize, Clone, PartialEq, Eq)] -pub struct DpuBfdState { - pub v4_bfd_up_sessions: Vec, - pub v4_bfd_up_sessions_time_in_ms: i64, - pub v6_bfd_up_sessions: Vec, - pub v6_bfd_up_sessions_time_in_ms: i64, -} - -impl Default for DpuBfdState { - fn default() -> Self { - let now = Utc::now().timestamp_millis(); - Self { - v4_bfd_up_sessions: Vec::new(), - v4_bfd_up_sessions_time_in_ms: now, - v6_bfd_up_sessions: Vec::new(), - v6_bfd_up_sessions_time_in_ms: now, - } - } -} - impl DpuActorState { pub fn from_dpu( dpu_name: &str, diff --git a/crates/hamgrd/src/main.rs b/crates/hamgrd/src/main.rs index e63b7712..57f00879 100644 --- a/crates/hamgrd/src/main.rs +++ b/crates/hamgrd/src/main.rs @@ -7,19 +7,20 @@ use swbus_actor::{set_global_runtime, ActorRuntime}; use swbus_config::swbus_config_from_db; use swbus_edge::{simple_client::SimpleSwbusEdgeClient, swbus_proto::swbus::ServicePath, RuntimeEnv, SwbusEdgeRuntime}; use swss_common::DbConnector; +use swss_common_bridge::consumer::ConsumerBridge; use tokio::{signal, task::JoinHandle, time::timeout}; use tracing::error; mod actors; mod db_structs; mod ha_actor_messages; use actors::spawn_zmq_producer_bridge; -use actors::{dpu::DpuActor, ha_set::HaSetActor, vdpu::VDpuActor, DbBasedActor}; +use actors::{dpu::DpuActor, ha_scope::HaScopeActor, ha_set::HaSetActor, vdpu::VDpuActor, DbBasedActor}; use anyhow::Result; -use db_structs::{DashHaSetConfigTable, Dpu, VDpu}; +use db_structs::{ + BfdSessionTable, DashHaScopeConfigTable, DashHaScopeTable, DashHaSetConfigTable, DashHaSetTable, Dpu, VDpu, +}; use std::any::Any; -use crate::db_structs::BfdSessionTable; - #[derive(Parser, Debug)] #[command(name = "hamgrd")] struct Args { @@ -71,7 +72,7 @@ async fn main() { } }); - start_actor_creators(&swbus_edge).await.unwrap(); + let _bridges = start_actor_creators(&swbus_edge).await.unwrap(); // Wait for Ctrl+C to exit signal::ctrl_c().await.expect("Failed to install Ctrl+C handler"); @@ -97,16 +98,28 @@ async fn spawn_producer_bridges(edge_runtime: Arc, dpu: &Dpu) let handle = spawn_zmq_producer_bridge::(edge_runtime.clone(), &zmq_endpoint).await?; handles.push(handle); + // Spawn DASH_HA_SET_TABLE zmq producer bridge for ha-set actor + // Has service path swss-common-bridge/DASH_HA_SET_TABLE. + let handle = spawn_zmq_producer_bridge::(edge_runtime.clone(), &zmq_endpoint).await?; + handles.push(handle); + + // Spawn DASH_HA_SCOPE_TABLE zmq producer bridge for ha-set actor + // Has service path swss-common-bridge/DASH_HA_SCOPE_TABLE. + let handle = spawn_zmq_producer_bridge::(edge_runtime.clone(), &zmq_endpoint).await?; + handles.push(handle); + Ok(handles) } // actor-creator creates are private swbus message handler to handle messages to actor but actor do not exist. // The creator will create the actor when it receives the first message to the actor. -async fn start_actor_creators(edge_runtime: &Arc) -> Result<()> { - DpuActor::start_actor_creator(edge_runtime.clone()).await?; - VDpuActor::start_actor_creator::(edge_runtime.clone()).await?; - HaSetActor::start_actor_creator::(edge_runtime.clone()).await?; - Ok(()) +async fn start_actor_creators(edge_runtime: &Arc) -> Result> { + let mut bridges: Vec = Vec::new(); + bridges.append(&mut DpuActor::start_actor_creator(edge_runtime.clone()).await?); + bridges.append(&mut VDpuActor::start_actor_creator::(edge_runtime.clone()).await?); + bridges.append(&mut HaSetActor::start_actor_creator::(edge_runtime.clone()).await?); + bridges.append(&mut HaScopeActor::start_actor_creator::(edge_runtime.clone()).await?); + Ok(bridges) } pub fn get_slot_id(swbus_edge: &Arc) -> u32 { @@ -176,6 +189,6 @@ where { let mut new_sp = runtime.get_base_sp(); new_sp.resource_type = "swss-common-bridge".into(); - new_sp.resource_id = format!("{}/{}", T::db_name(), T::table_name()); + new_sp.resource_id = format!("{}|{}", T::db_name(), T::table_name()); new_sp } diff --git a/crates/swbus-actor/src/state/outgoing.rs b/crates/swbus-actor/src/state/outgoing.rs index 118aaa09..3a3d87dd 100644 --- a/crates/swbus-actor/src/state/outgoing.rs +++ b/crates/swbus-actor/src/state/outgoing.rs @@ -146,7 +146,7 @@ impl Outgoing { where T: swss_common::SonicDbTable + 'static, { - let resource_id = format!("{}/{}", T::db_name(), T::table_name()); + let resource_id = format!("{}|{}", T::db_name(), T::table_name()); self.from_my_sp("swss-common-bridge", &resource_id) } diff --git a/crates/swss-common-testing/src/lib.rs b/crates/swss-common-testing/src/lib.rs index c112fb2b..703abaef 100644 --- a/crates/swss-common-testing/src/lib.rs +++ b/crates/swss-common-testing/src/lib.rs @@ -131,6 +131,16 @@ const CONFIG_DB_REDIS_CONFIG_JSON: &str = r#" "id": 1, "separator": "|", "instance": "redis" + }, + "STATE_DB": { + "id": 2, + "separator": "|", + "instance": "redis" + }, + "DPU_STATE_DB": { + "id": 3, + "separator": "|", + "instance": "redis" } } } diff --git a/test_utils/hamgrd/database_config.json b/test_utils/hamgrd/database_config.json index 84b4519d..3c776701 100644 --- a/test_utils/hamgrd/database_config.json +++ b/test_utils/hamgrd/database_config.json @@ -34,7 +34,7 @@ "instance": "redis" }, "DPU_STATE_DB": { - "id": 17, + "id": 14, "separator": "|", "instance": "redis" } diff --git a/test_utils/hamgrd/redis_data_planned_up_then_down.cmd b/test_utils/hamgrd/redis_data_planned_up_then_down.cmd index 8faa20ad..c5ccf569 100644 --- a/test_utils/hamgrd/redis_data_planned_up_then_down.cmd +++ b/test_utils/hamgrd/redis_data_planned_up_then_down.cmd @@ -1,21 +1,21 @@ # execute below commands in redis-cli to simulate the flow in planned ha up # approved_pending_operation_ids needs to be updated with the actual value from STATE_DB/DASH_HA_SCOPE_STATE -select 17 +select 14 HSET DASH_HA_SCOPE_STATE|haset0_0 last_updated_time "1718053542000" ha_role "dead" ha_role_start_time "1718053542000" ha_term "1" activate_role_pending "false" flow_reconcile_pending "false" brainsplit_recover_pending "false" select 0 HSET DASH_HA_SCOPE_CONFIG_TABLE:vdpu0:haset0_0 disable "false" -select 17 +select 14 HSET DASH_HA_SCOPE_STATE|haset0_0 activate_role_pending "true" select 0 HSET DASH_HA_SCOPE_CONFIG_TABLE:vdpu0:haset0_0 version "3" approved_pending_operation_ids "8dda84ea-fdb4-4edd-b88c-55fc96d02a6f" -select 17 +select 14 HSET DASH_HA_SCOPE_STATE|haset0_0 activate_role_pending "false" ha_role "active" ha_role_start_time "1718053552000" ha_term "2" -select 17 +select 14 HSET DASH_BFD_PROBE_STATE|dpu0 v4_bfd_up_sessions "10.1.0.3" diff --git a/test_utils/hamgrd/redis_data_set_full.cmd b/test_utils/hamgrd/redis_data_set_full.cmd index c0d5ae10..5f98abf7 100644 --- a/test_utils/hamgrd/redis_data_set_full.cmd +++ b/test_utils/hamgrd/redis_data_set_full.cmd @@ -132,6 +132,3 @@ HSET DPU_STATE|dpu6 dpu_data_plane_state up HSET DPU_STATE|dpu7 dpu_midplane_link_state up HSET DPU_STATE|dpu7 dpu_control_plane_state up HSET DPU_STATE|dpu7 dpu_data_plane_state up - -select 17 -HSET DASH_BFD_PROBE_STATE|dpu0 v4_bfd_up_sessions "10.1.0.32" \ No newline at end of file