Skip to content

Commit bcd24d8

Browse files
committed
trying out channels and setting up a slightly different drain method.
1 parent 56bee6b commit bcd24d8

File tree

3 files changed

+59
-89
lines changed

3 files changed

+59
-89
lines changed

src/Build/BackEnd/Components/Communications/NodeProviderOutOfProc.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ void NodeContextCreated(NodeContext context)
118118

119119
// Start the asynchronous read.
120120
context.BeginAsyncPacketRead();
121+
context.StartDrainingQueue();
121122

122123
// Configure the node.
123124
context.SendData(configurationFactory(nodeInfo));

src/Build/BackEnd/Components/Communications/NodeProviderOutOfProcBase.cs

Lines changed: 57 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
using System.Buffers.Binary;
66
using System.Collections.Generic;
77
using System.Collections.Concurrent;
8+
using System.Threading.Channels;
9+
810
using System.Globalization;
911
using System.IO;
1012
using System.IO.Pipes;
@@ -573,16 +575,14 @@ private enum ExitPacketState
573575
private MemoryStream _writeBufferMemoryStream;
574576

575577
/// <summary>
576-
/// A queue used for enqueuing packets to write to the stream asynchronously.
577-
/// </summary>
578-
private ConcurrentQueue<INodePacket> _packetWriteQueue = new ConcurrentQueue<INodePacket>();
579-
580-
/// <summary>
581-
/// A task representing the last packet write, so we can chain packet writes one after another.
582-
/// We want to queue up writing packets on a separate thread asynchronously, but serially.
583-
/// Each task drains the <see cref="_packetWriteQueue"/>
578+
/// A channel used for enqueuing packets to write to the stream asynchronously.
584579
/// </summary>
585-
private Task _packetWriteDrainTask = Task.CompletedTask;
580+
private Channel<INodePacket> _packetChannel = Channel.CreateUnbounded<INodePacket>(new UnboundedChannelOptions()
581+
{
582+
SingleWriter = false,
583+
SingleReader = true,
584+
AllowSynchronousContinuations = false
585+
});
586586

587587
/// <summary>
588588
/// Delegate called when the context terminates.
@@ -623,6 +623,52 @@ public NodeContext(int nodeId, Process process,
623623
/// </summary>
624624
public int NodeId => _nodeId;
625625

626+
public async void StartDrainingQueue()
627+
{
628+
while (await _packetChannel.Reader.WaitToReadAsync())
629+
{
630+
while (_packetChannel.Reader.TryRead(out var packet))
631+
{
632+
// NodeContext context = (NodeContext)this;
633+
MemoryStream writeStream = this._writeBufferMemoryStream;
634+
writeStream.SetLength(0);
635+
ITranslator writeTranslator = BinaryTranslator.GetWriteTranslator(writeStream);
636+
try
637+
{
638+
writeStream.WriteByte((byte)packet.Type);
639+
// Pad for the packet length
640+
WriteInt32(writeStream, 0);
641+
packet.Translate(writeTranslator);
642+
int writeStreamLength = (int)writeStream.Position;
643+
// Now plug in the real packet length
644+
writeStream.Position = 1;
645+
WriteInt32(writeStream, writeStreamLength - 5);
646+
byte[] writeStreamBuffer = writeStream.GetBuffer();
647+
for (int i = 0; i < writeStreamLength; i += MaxPacketWriteSize)
648+
{
649+
int lengthToWrite = Math.Min(writeStreamLength - i, MaxPacketWriteSize);
650+
#pragma warning disable CA1835 // Prefer the 'Memory'-based overloads for 'ReadAsync' and 'WriteAsync'
651+
await _serverToClientStream.WriteAsync(writeStreamBuffer, i, lengthToWrite, CancellationToken.None);
652+
#pragma warning restore CA1835 // Prefer the 'Memory'-based overloads for 'ReadAsync' and 'WriteAsync'
653+
}
654+
if (IsExitPacket(packet))
655+
{
656+
_exitPacketState = ExitPacketState.ExitPacketSent;
657+
}
658+
}
659+
catch (IOException e)
660+
{
661+
// Do nothing here because any exception will be caught by the async read handler
662+
CommunicationsUtilities.Trace(_nodeId, "EXCEPTION in SendData: {0}", e);
663+
}
664+
catch (ObjectDisposedException) // This happens if a child dies unexpectedly
665+
{
666+
// Do nothing here because any exception will be caught by the async read handler
667+
}
668+
}
669+
}
670+
}
671+
626672
/// <summary>
627673
/// Starts a new asynchronous read operation for this node.
628674
/// </summary>
@@ -710,85 +756,7 @@ public void SendData(INodePacket packet)
710756
{
711757
_exitPacketState = ExitPacketState.ExitPacketQueued;
712758
}
713-
_packetWriteQueue.Enqueue(packet);
714-
DrainPacketQueue();
715-
}
716-
717-
/// <summary>
718-
/// Schedule a task to drain the packet write queue. We could have had a
719-
/// dedicated thread that would pump the queue constantly, but
720-
/// we don't want to allocate a dedicated thread per node (1MB stack)
721-
/// </summary>
722-
/// <remarks>Usually there'll be a single packet in the queue, but sometimes
723-
/// a burst of SendData comes in, with 10-20 packets scheduled. In this case
724-
/// the first scheduled task will drain all of them, and subsequent tasks
725-
/// will run on an empty queue. I tried to write logic that avoids queueing
726-
/// a new task if the queue is already being drained, but it didn't show any
727-
/// improvement and made things more complicated.</remarks>
728-
private void DrainPacketQueue()
729-
{
730-
// this lock is only necessary to protect a write to _packetWriteDrainTask field
731-
lock (_packetWriteQueue)
732-
{
733-
// average latency between the moment this runs and when the delegate starts
734-
// running is about 100-200 microseconds (unless there's thread pool saturation)
735-
_packetWriteDrainTask = _packetWriteDrainTask.ContinueWith(
736-
SendDataCoreAsync,
737-
this,
738-
TaskScheduler.Default).Unwrap();
739-
740-
static async Task SendDataCoreAsync(Task _, object state)
741-
{
742-
NodeContext context = (NodeContext)state;
743-
while (context._packetWriteQueue.TryDequeue(out var packet))
744-
{
745-
MemoryStream writeStream = context._writeBufferMemoryStream;
746-
747-
// clear the buffer but keep the underlying capacity to avoid reallocations
748-
writeStream.SetLength(0);
749-
750-
ITranslator writeTranslator = BinaryTranslator.GetWriteTranslator(writeStream);
751-
try
752-
{
753-
writeStream.WriteByte((byte)packet.Type);
754-
755-
// Pad for the packet length
756-
WriteInt32(writeStream, 0);
757-
packet.Translate(writeTranslator);
758-
759-
int writeStreamLength = (int)writeStream.Position;
760-
761-
// Now plug in the real packet length
762-
writeStream.Position = 1;
763-
WriteInt32(writeStream, writeStreamLength - 5);
764-
765-
byte[] writeStreamBuffer = writeStream.GetBuffer();
766-
767-
for (int i = 0; i < writeStreamLength; i += MaxPacketWriteSize)
768-
{
769-
int lengthToWrite = Math.Min(writeStreamLength - i, MaxPacketWriteSize);
770-
#pragma warning disable CA1835 // Prefer the 'Memory'-based overloads for 'ReadAsync' and 'WriteAsync'
771-
await context._serverToClientStream.WriteAsync(writeStreamBuffer, i, lengthToWrite, CancellationToken.None);
772-
#pragma warning restore CA1835 // Prefer the 'Memory'-based overloads for 'ReadAsync' and 'WriteAsync'
773-
}
774-
775-
if (IsExitPacket(packet))
776-
{
777-
context._exitPacketState = ExitPacketState.ExitPacketSent;
778-
}
779-
}
780-
catch (IOException e)
781-
{
782-
// Do nothing here because any exception will be caught by the async read handler
783-
CommunicationsUtilities.Trace(context._nodeId, "EXCEPTION in SendData: {0}", e);
784-
}
785-
catch (ObjectDisposedException) // This happens if a child dies unexpectedly
786-
{
787-
// Do nothing here because any exception will be caught by the async read handler
788-
}
789-
}
790-
}
791-
}
759+
_packetChannel.Writer.TryWrite(packet);
792760
}
793761

794762
private static bool IsExitPacket(INodePacket packet)
@@ -829,7 +797,7 @@ public async Task WaitForExitAsync(ILoggingService loggingService)
829797
{
830798
// Wait up to 100ms until all remaining packets are sent.
831799
// We don't need to wait long, just long enough for the Task to start running on the ThreadPool.
832-
await Task.WhenAny(_packetWriteDrainTask, Task.Delay(100));
800+
await Task.WhenAny(Task.Delay(100));
833801
}
834802
if (_exitPacketState == ExitPacketState.ExitPacketSent)
835803
{

src/Build/BackEnd/Components/Communications/NodeProviderOutOfProcTaskHost.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,7 @@ private void NodeContextCreated(NodeContext context)
574574

575575
// Start the asynchronous read.
576576
context.BeginAsyncPacketRead();
577+
context.StartDrainingQueue();
577578

578579
lock (_activeNodes)
579580
{

0 commit comments

Comments
 (0)