|
| 1 | +--- |
| 2 | +uid: cluster-sharding-delivery |
| 3 | +title: Reliable Delivery over Akka.Cluster.Sharding |
| 4 | +--- |
| 5 | + |
| 6 | +# Reliable Delivery over Akka.Cluster.Sharding |
| 7 | + |
| 8 | +> [!TIP] |
| 9 | +> Please see "[Reliable Message Delivery with Akka.Delivery](xref:reliable-delivery)" before reading this documentation. Akka.Cluster.Sharding.Delivery builds upon all of the concepts and tools implemented in the base Akka.Delivery APIs. |
| 10 | +
|
| 11 | +If you're using [Akka.Cluster.Sharding](xref:cluster-sharding) to distribute state via one or more `ShardRegion`s across your Akka.Cluster, Akka.Cluster.Sharding.Delivery can help you guarantee delivery of messages from the rest of your `ActorSystem`s to each of your entity actors. |
| 12 | + |
| 13 | +## Point to Point Delivery |
| 14 | + |
| 15 | +Akka.Cluster.Sharding.Delivery only uses [point-to-point delivery mode from Akka.Delivery](xref:reliable-delivery) and **message chunking is not supported** in this mode. |
| 16 | + |
| 17 | +### Typed Messaging Protocol |
| 18 | + |
| 19 | +Akka.Cluster.ShardingDelivery uses a .NET generic-typed protocol and the `ShardingProducerController` and `ShardingConsumerController` are also both strongly typed. This means that end-users need to organize their messages into "protocol groups" in order to be effective, like so: |
| 20 | + |
| 21 | +[!code-csharp[Message Protocol](../../../src/core/Akka.Docs.Tests/Delivery/DeliveryDocSpecs.cs?name=MessageProtocol)] |
| 22 | + |
| 23 | +The common interface that all of the messages in this protocol implement is typically the type you'll want to use for your generic argument `T` in the Akka.Delivery or [Akka.Cluster.Sharding.Delivery](xref:cluster-sharding-delivery) method calls and types, as shown below: |
| 24 | + |
| 25 | +[!code-csharp[Starting Typed Actors](../../../src/core/Akka.Docs.Tests/Delivery/DeliveryDocSpecs.cs?name=ProducerRegistration)] |
| 26 | + |
| 27 | +### Built-in Actors and Messages |
| 28 | + |
| 29 | + |
| 30 | + |
| 31 | +The Akka.Cluster.Sharding.Delivery relationship consists of the following actors: |
| 32 | + |
| 33 | +* **`Producer`** - this is a user-defined actor that is responsible for the production of messages. It receives [`ShardingProducerController.RequestNext<T>`](xref:Akka.Cluster.Sharding.Delivery.ShardingProducerController.RequestNext`1) messages from the `ShardingProducerController` when capacity is available to deliver additional messages. |
| 34 | +* **`ShardingProducerController`** - this actor is built into Akka.Cluster.ShardingDelivery and does most of the work. **You typically only need a single `ShardingProducerController` per-`ActorSystem` / per-`ShardRegion`** (or you can use [Sharded Daemon Processes](xref:sharded-daemon-process) to host a fixed number of producers per-cluster.) The `ShardingProducerController` is responsible for spawning a `ProducerController` per-entity and delivering those messages to the `ShardRegion` `IActorRef`. |
| 35 | +* **`ShardingConsumerController`** - this actor is built into Akka.Cluster.Sharding.Delivery and typically resides on the opposite site of the network from the `ShardingProducerController`. This actor wraps around your normal Akka.Cluster.Sharding entity actors and is created directly by the `ShardRegion` each time an entity is messaged. The `ShardingConsumerController` will spawn your entity actor directly and will additionally spawn a `ConsumerController` for each unique `ProducerId` detected in the message stream. Each of the `ConsumerController`s spawned by the `ShardingConsumerController` will deliver messages via their usual [`ConsumerController.Delivery<T>`](xref:Akka.Delivery.ConsumerController.Delivery`1) to the `Consumer`. |
| 36 | +* **`Consumer`** - this is your entity actor hosted via Akka.Cluster.Sharding. The `Consumer` processes messages of type `T` and must send `ConsumerController.Confirmation` messages back to the `ConsumerController` once it has successfully processed each `ConsumerController.Delivery<T>`. The `Consumer` actor is spawned by the `ShardingConsumerController`. |
| 37 | + |
| 38 | +#### Integration with ShardRegions |
| 39 | + |
| 40 | +In order to make use of Akka.Cluster.Sharding.Delivery, we have to change the way we spawn our `ShardRegion`'s entity actors: |
| 41 | + |
| 42 | +[!code-csharp[Launching ShardRegion with ShardingConsumerController configured](../../../src/examples/Cluster/ClusterSharding/ShoppingCart/Program.cs?name=LaunchShardRegion)] |
| 43 | + |
| 44 | +1. The `ShardingConsumerController` needs to be the actor initially created by the `ShardRegion` each time an entity is spawned; |
| 45 | +2. The `ShardingConsumerController.Create` method takes an argument of type `Func<IActorRef, Props>` - this allows you to pass in the `IActorRef` of the `ShardingConsumerController` itself down to your entity actor, the `Props` of which should be returned by this function. |
| 46 | +3. The `Consumer` actor must send a `ConsumerController.Start<T>` message, typically during `PreStart`, to the `ShardingConsumerController` in order to trigger message delivery. |
| 47 | + |
| 48 | +[!code-csharp[Consumer signalling to ShardingConsumerController that it's ready to receive messages](../../../src/examples/Cluster/ClusterSharding/ShoppingCart/Customers.cs?name=ShardingConsumerRegistration)] |
| 49 | + |
| 50 | +Do this for each entity type / `ShardRegion` you wish to guarantee delivery for via the `ShardingProducerController`. |
| 51 | + |
| 52 | +Next, we have to create our `Producer` and `ShardingProducerController` instances: |
| 53 | + |
| 54 | +[!code-csharp[Launching ShardRegion with ShardingConsumerController configured](../../../src/examples/Cluster/ClusterSharding/ShoppingCart/Program.cs?name=StartSendingMessage)] |
| 55 | + |
| 56 | +1. We have to launch our `ShardingProducerController` and our `Producer` actors - each `ShardingProducerController` must have a *globally unique* `ProducerId` value (similar to a `PersistentId`). |
| 57 | +2. `ShardingProducerController`s can be optionally made persistent via the same [`EventSourcedProducerQueue`](xref:Akka.Persistence.Delivery.EventSourcedProducerQueue) that can be used by an individual `ProducerController`. |
| 58 | +3. The `ShardingProducerController` must have a reference to the `IActorRef` of the `ShardRegion` to which it will be delivering messages. |
| 59 | +4. The `ShardingProducerController` must receive a [`ShardingProducerController.Start<T>`](xref:Akka.Cluster.Sharding.Delivery.ShardingProducerController.Start`1) message that contains the `Producer`'s `IActorRef` in order to begin message production. |
| 60 | + |
| 61 | +> [!TIP] |
| 62 | +> Unlike Akka.Delivery, there is no need to have the `ProducerController` and `ConsumerController` explicitly register with the other - this will be handled automatically by the Akka.Cluster.Sharding messaging system. |
| 63 | +
|
| 64 | +### Message Production |
| 65 | + |
| 66 | +Once the `Producer` has been successfully registered with its `ProducerController`, it will begin to receive `ShardingProducerController.RequestNext<T>` messages - each time it receives one of these messages the `Producer` can send a burst of messages to the `ShardingProducerController`. |
| 67 | + |
| 68 | +[!code-csharp[Launching ShardRegion with ShardingConsumerController configured](../../../src/examples/Cluster/ClusterSharding/ShoppingCart/Producer.cs?name=MessageProduction)] |
| 69 | + |
| 70 | +> [!IMPORTANT] |
| 71 | +> It is crucial that the `Prodcuer` send its messages of type `T` wrapped inside a [`ShardingEnvelope`](xref:Akka.Cluster.Sharding.ShardingEnvelope) - otherwise the `ShardingProducerController` won't know which messages should be routed to which unique `entityId`. Additionally - your `HashCodeMessageExtractor` that you use with your `ShardRegion` must also be able to handle the `ShardingEnvelope` to ensure that this message is processed correctly on the receiving side. There is a proposal in-place to automate some of this work in a future release of Akka.NET: [#6717](https://github.com/akkadotnet/akka.net/issues/6717). |
| 72 | +
|
| 73 | +One important distinction between [`ShardingProducerController.RequestNext<T>`](xref:Akka.Cluster.Sharding.Delivery.ShardingProducerController.RequestNext`1) and [`ProducerController.RequestNext<T>`](xref:Akka.Delivery.ProducerController.RequestNext`1) - because the `ShardingProducerController` has to deliver to multiple `Consumer`s, it retains a much larger outbound delivery buffer. You can check the status of which entities are currently buffered or which ones have active demand by inspecting the `ShardingProducerController.RequestNext<T>.BufferedForEntitiesWithoutDemand` or `ShardingProducerController.RequestNext<T>.EntitiesWithDemand` properties respectively. |
| 74 | + |
| 75 | + |
| 76 | + |
| 77 | +Once the `ShardingProducerController` begins receiving messages of type `T` (wrapped in a `ShardingEnvelope`) from the `Producer`, it will spawn `ProducerController`s for each unique entity and begin routing those messages to the `ShardRegion`. |
| 78 | + |
| 79 | +### Message Consumption |
| 80 | + |
| 81 | +The `ProducerController`s will all send `ConsumerController.SequencedMessage<T>` over the wire, wrapped inside `ShardingEnvelope`s - the `ShardRegion` must be programmed to handle these types: |
| 82 | + |
| 83 | + |
| 84 | + |
| 85 | +[!code-csharp[ShardRegion message extractor](../../../src/examples/Cluster/ClusterSharding/ShoppingCart/MessageExtractor.cs?name=ExtractorClass)] |
| 86 | + |
| 87 | +As the `ConsumerController.SequencedMessage<T>`s are delivered, the `ShardRegion` will spawn the `ShardingConsumerController` for each entity, which will in turn spawn the entity actor itself (the `Consumer`) as well as one `ConsumerController` per unique `ProducerId`. These actors are all cheap and maintain a finite amount of buffer space. |
| 88 | + |
| 89 | +1. The `Consumer` receives the `ConsumerController.Delivery<T>` message from the `ShardingConsumerController`; |
| 90 | +2. The `Consumer` replies to the `IActorRef` stored inside `ConsumerController.Delivery<T>.DeliverTo` with a `ConsumerController.Confirmed` message - this marks the message as "processed;" |
| 91 | +3. The `ConsumerController` marks the message as delivered, removes it from the buffer, and requests additional messages from the `ProducerController`; and |
| 92 | +4. This in turn causes the `ShardingProducerController` to update its aggregate state and send additional `ShardingProducerController.RequestNext<T>` demand to the `Producer`. |
| 93 | + |
| 94 | +### Guarantees, Constraints, and Caveats |
| 95 | + |
| 96 | +See ["Reliable Message Delivery with Akka.Delivery - Guarantees, Constraints, and Caveats"](xref:reliable-delivery#guarantees-constraints-and-caveats) - these are the same in Akka.Cluster.Sharding.Delivery. |
| 97 | + |
| 98 | +With one notable exception: **Akka.Cluster.Sharding.Delivery does not support message chunking** and there are no plans to add it at this time. |
| 99 | + |
| 100 | +## Durable Reliable Delivery over Akka.Cluster.Sharding |
| 101 | + |
| 102 | +By default the `ShardingProducerController` will run using without any persistent storage - however, if you reference the [Akka.Persistence library](xref:persistence-architecture) in your Akka.NET application then you can make use of the [`EventSourcedProducerQueue`](xref:Akka.Persistence.Delivery.EventSourcedProducerQueue) to ensure that your `ShardingProducerController` saves and un-acknowledged messages to your Akka.Persistence Journal and SnapshotStore. |
| 103 | + |
| 104 | +[!code-csharp[Durable ShardingProducerController configuration](../../../src/contrib/cluster/Akka.Cluster.Sharding.Tests/Delivery/DurableShardingSpec.cs?name=SpawnDurableProducer)] |
| 105 | + |
| 106 | +> [!TIP] |
| 107 | +> The `EventSourcedProducerQueue` can be customized via the [`EventSourcedProducerQueue.Settings` class](xref:Akka.Persistence.Delivery.EventSourcedProducerQueue.Settings) - for instance, you can customize it to use a separate Akka.Persistence Journal and SnapshotStore. |
| 108 | +
|
| 109 | +Each time a message is sent to the `ShardingProducerController` it will persist a copy of the message to the Akka.Persistence journal. |
| 110 | + |
| 111 | + |
| 112 | + |
| 113 | +> [!TIP] |
| 114 | +> All messages for all entities are stored in the same `EventSourcedProducerQueue` instance. |
| 115 | +
|
| 116 | +### Confirmation of Outbound Messages Persisted |
| 117 | + |
| 118 | +If the `Producer` needs to confirm that all of its outbound messages have been successfully persisted, this can be accomplished via the `ShardingProducerController.RequestNext<T>.AskNextTo` method: |
| 119 | + |
| 120 | +[!code-csharp[Starting ProducerController with EventSourcedProducerQueue enabled](../../../src/core/Akka.Docs.Tests/Delivery/DeliveryDocSpecs.cs?name=ConfirmableMessages)] |
| 121 | + |
| 122 | +The `AskNextTo` method will return a `Task<long>` that will be completed once the message has been confirmed as stored inside the `EventSourcedProducerQueue` - the `long` in this case is the sequence number that has been assigned to this message via the `ShardingProducerController`'s outbound queue. |
| 123 | + |
| 124 | +In addition to outbound deliveries, confirmation messages from the `ShardingConsumerController` will also be persisted - and these will cause the `EventSourcedProducerQueue` to gradually compress its footprint in the Akka.Persistence journal by taking snapshots. |
| 125 | + |
| 126 | +> [!TIP] |
| 127 | +> By default the `EventSourcedProducerQueue` will take a new snapshot every 1000 events but this can be configured via the [`EventSourcedProducerQueue.Settings` class](xref:Akka.Persistence.Delivery.EventSourcedProducerQueue.Settings). |
0 commit comments