Skip to content

reduce the number of IMessageExtractor invocations #5217

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
30 changes: 19 additions & 11 deletions src/contrib/cluster/Akka.Cluster.Sharding/DDataShard.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,16 @@ public void EntityTerminated(IActorRef tref)
{
//Note; because we're not persisting the EntityStopped, we don't need
// to persist the EntityStarted either.
Log.Debug("Starting entity [{0}] again, there are buffered messages for it", id);
if(Log.IsDebugEnabled)
Log.Debug("Starting entity [{0}] again, there are buffered messages for it", id);
this.SendMessageBuffer(new Shard.EntityStarted(id));
}
else
{
if (!Passivating.Contains(tref))
{
Log.Debug("Entity [{0}] stopped without passivating, will restart after backoff", id);
if(Log.IsDebugEnabled)
Log.Debug("Entity [{0}] stopped without passivating, will restart after backoff", id);
Context.System.Scheduler.ScheduleTellOnce(Settings.TuningParameters.EntityRestartBackoff, Self, new Shard.RestartEntity(id), ActorRefs.NoSender);
}
else
Expand All @@ -159,7 +161,7 @@ public void EntityTerminated(IActorRef tref)
Passivating = Passivating.Remove(tref);
}

public void DeliverTo(string id, object message, object payload, IActorRef sender)
public void DeliverTo(string id, object message, IActorRef sender)
{
var name = Uri.EscapeDataString(id);
var child = Context.Child(name);
Expand All @@ -171,7 +173,7 @@ public void DeliverTo(string id, object message, object payload, IActorRef sende
{
throw new InvalidOperationException($"Message buffers contains id [{id}].");
}
this.GetOrCreateEntity(id).Tell(payload, sender);
this.GetOrCreateEntity(id).Tell(message, sender);
}
else
{
Expand All @@ -183,7 +185,7 @@ public void DeliverTo(string id, object message, object payload, IActorRef sende
else
{
this.TouchLastMessageTimestamp(id);
child.Tell(payload, sender);
child.Tell(message, sender);
}
}

Expand Down Expand Up @@ -324,13 +326,19 @@ private Receive WaitingForUpdate<TEvent>(TEvent e, Action<TEvent> afterUpdateCal
case Shard.IShardQuery sq:
this.HandleShardRegionQuery(sq);
break;
case var _ when ExtractEntityId(message).HasValue:
this.DeliverMessage(message, Context.Sender);
break;
default:
Log.Debug("Stashing unexpected message [{0}] while waiting for DDataShard update of {0}",
message.GetType(), e);
Stash.Stash();
var extracted = ExtractEntityId(message);
if (extracted.HasValue)
{
DeliverTo(extracted.Value.Item1, extracted.Value.Item2, Sender);
}
else
{
Log.Debug("Stashing unexpected message [{0}] while waiting for DDataShard update of {0}",
message.GetType(), e);
Stash.Stash();
}

break;
}
return true;
Expand Down
6 changes: 3 additions & 3 deletions src/contrib/cluster/Akka.Cluster.Sharding/PersistentShard.cs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public void EntityTerminated(IActorRef tref)
Passivating = Passivating.Remove(tref);
}

public void DeliverTo(string id, object message, object payload, IActorRef sender)
public void DeliverTo(string id, object message, IActorRef sender)
{
var name = Uri.EscapeDataString(id);
var child = Context.Child(name);
Expand All @@ -233,7 +233,7 @@ public void DeliverTo(string id, object message, object payload, IActorRef sende
{
throw new InvalidOperationException($"Message buffers contains id [{id}].");
}
this.GetOrCreateEntity(id).Tell(payload, sender);
this.GetOrCreateEntity(id).Tell(message, sender);
}
else
{
Expand All @@ -245,7 +245,7 @@ public void DeliverTo(string id, object message, object payload, IActorRef sende
else
{
this.TouchLastMessageTimestamp(id);
child.Tell(payload, sender);
child.Tell(message, sender);
}
}
}
Expand Down
49 changes: 25 additions & 24 deletions src/contrib/cluster/Akka.Cluster.Sharding/Shard.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ internal interface IShard
void Unhandled(object message);
void ProcessChange<T>(T evt, Action<T> handler) where T : Shard.StateChange;
void EntityTerminated(IActorRef tref);
void DeliverTo(string id, object message, object payload, IActorRef sender);
void DeliverTo(string id, object message, IActorRef sender);
ICancelable PassivateIdleTask { get; }

ITimerScheduler Timers { get; }
Expand Down Expand Up @@ -491,7 +491,7 @@ public void OnLeaseAcquired()
protected override bool Receive(object message) => this.HandleCommand(message);
public void ProcessChange<T>(T evt, Action<T> handler) where T : StateChange => this.BaseProcessChange(evt, handler);
public void EntityTerminated(IActorRef tref) => this.BaseEntityTerminated(tref);
public void DeliverTo(string id, object message, object payload, IActorRef sender) => this.BaseDeliverTo(id, message, payload, sender);
public void DeliverTo(string id, object message, IActorRef sender) => this.BaseDeliverTo(id, message, sender);
}

internal static class Shards
Expand Down Expand Up @@ -654,14 +654,17 @@ public static bool HandleCommand<TShard>(this TShard shard, object message) wher
case Shard.PassivateIdleTick _:
shard.PassivateIdleEntities();
return true;

case Shard.LeaseLost ll:
shard.HandleLeaseLost(ll);
return true;

case var _ when shard.ExtractEntityId(message).HasValue:
shard.DeliverMessage(message, shard.Context.Sender);
return true;
default:
var extracted = shard.ExtractEntityId(message);
if (extracted.HasValue)
{
shard.DeliverMessage(extracted.Value.Item1, extracted.Value.Item2, shard.Context.Sender);
return true;
}
break;
}
return false;
}
Expand Down Expand Up @@ -865,50 +868,48 @@ public static void SendMessageBuffer<TShard>(this TShard shard, Shard.EntityStar

if (buffer.Count != 0)
{
shard.Log.Debug("Sending message buffer for entity [{0}] ([{1}] messages)", id, buffer.Count);
if(shard.Log.IsDebugEnabled)
shard.Log.Debug("Sending message buffer for entity [{0}] ([{1}] messages)", id, buffer.Count);

shard.GetOrCreateEntity(id);

// Now there is no deliveryBuffer we can try to redeliver
// and as the child exists, the message will be directly forwarded
foreach (var pair in buffer)
shard.DeliverMessage(pair.Item1, pair.Item2);
foreach (var (msg, actorRef) in buffer)
shard.DeliverMessage(id, msg, actorRef);
}
}
}

internal static void DeliverMessage<TShard>(this TShard shard, object message, IActorRef sender) where TShard : IShard
internal static void DeliverMessage<TShard>(this TShard shard, string entityId, object message, IActorRef sender) where TShard : IShard
{
var t = shard.ExtractEntityId(message);
var id = t.Value.Item1;
var payload = t.Value.Item2;

if (string.IsNullOrEmpty(id))
if (string.IsNullOrEmpty(entityId))
{
shard.Log.Warning("Id must not be empty, dropping message [{0}]", message.GetType());
shard.Context.System.DeadLetters.Tell(message);
}
else
{
if (payload is ShardRegion.StartEntity start)
if (message is ShardRegion.StartEntity start) // technically, this message should never make it here
shard.HandleStartEntity(start);
else
{
if (shard.MessageBuffers.TryGetValue(id, out var buffer))
if (shard.MessageBuffers.TryGetValue(entityId, out var buffer))
{
if (shard.TotalBufferSize() >= shard.Settings.TuningParameters.BufferSize)
{
shard.Log.Warning("Buffer is full, dropping message for entity [{0}]", id);
shard.Log.Warning("Buffer is full, dropping message for entity [{0}]", entityId);
shard.Context.System.DeadLetters.Tell(message);
}
else
{
shard.Log.Debug("Message for entity [{0}] buffered", id);
shard.MessageBuffers = shard.MessageBuffers.SetItem(id, buffer.Add((message, sender)));
if(shard.Log.IsDebugEnabled)
shard.Log.Debug("Message for entity [{0}] buffered", entityId);
shard.MessageBuffers = shard.MessageBuffers.SetItem(entityId, buffer.Add((message, sender)));
}
}
else
shard.DeliverTo(id, message, payload, sender);
shard.DeliverTo(entityId, message, sender);
}
}
}
Expand Down Expand Up @@ -937,10 +938,10 @@ public static void BaseEntityTerminated<TShard>(this TShard shard, IActorRef tre
shard.Passivating = shard.Passivating.Remove(tref);
}

internal static void BaseDeliverTo<TShard>(this TShard shard, string id, object message, object payload, IActorRef sender) where TShard : IShard
internal static void BaseDeliverTo<TShard>(this TShard shard, string id, object message, IActorRef sender) where TShard : IShard
{
shard.TouchLastMessageTimestamp(id);
shard.GetOrCreateEntity(id).Tell(payload, sender);
shard.GetOrCreateEntity(id).Tell(message, sender);
}

internal static IActorRef GetOrCreateEntity<TShard>(this TShard shard, string id, Action<IActorRef> onCreate = null) where TShard : IShard
Expand Down