Skip to content

Consolidate common IPC / named pipe code #11546

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Build.UnitTests/BackEnd/NodeEndpointInProc_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void UnregisterPacketHandler(NodePacketType packetType)
throw new NotImplementedException();
}

public void DeserializeAndRoutePacket(int nodeId, NodePacketType packetType, ITranslator translator)
public INodePacket DeserializePacket(NodePacketType packetType, ITranslator translator)
{
throw new NotImplementedException();
}
Expand Down
60 changes: 6 additions & 54 deletions src/Build/BackEnd/Client/MSBuildClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.IO.Pipes;
using System.Threading;
using Microsoft.Build.BackEnd;
using Microsoft.Build.BackEnd.Client;
Expand Down Expand Up @@ -75,19 +74,9 @@ public sealed class MSBuildClient
private readonly string _pipeName;

/// <summary>
/// The named pipe stream for client-server communication.
/// The named pipe client for client-server communication.
/// </summary>
private NamedPipeClientStream _nodeStream = null!;

/// <summary>
/// A way to cache a byte array when writing out packets
/// </summary>
private readonly MemoryStream _packetMemoryStream;

/// <summary>
/// A binary writer to help write into <see cref="_packetMemoryStream"/>
/// </summary>
private readonly BinaryWriter _binaryWriter;
private NodePipeClient _pipeClient = null!;

/// <summary>
/// Used to estimate the size of the build with an ETW trace.
Expand Down Expand Up @@ -130,26 +119,14 @@ public MSBuildClient(
// Client <-> Server communication stream
_handshake = GetHandshake();
_pipeName = OutOfProcServerNode.GetPipeName(_handshake);
_packetMemoryStream = new MemoryStream();
_binaryWriter = new BinaryWriter(_packetMemoryStream);

CreateNodePipeStream();
}

private void CreateNodePipeStream()
{
#pragma warning disable SA1111, SA1009 // Closing parenthesis should be on line of last parameter
_nodeStream = new NamedPipeClientStream(
serverName: ".",
_pipeName,
PipeDirection.InOut,
PipeOptions.Asynchronous
#if FEATURE_PIPEOPTIONS_CURRENTUSERONLY
| PipeOptions.CurrentUserOnly
#endif
);
#pragma warning restore SA1111, SA1009 // Closing parenthesis should be on line of last parameter
_packetPump = new MSBuildClientPacketPump(_nodeStream);
_pipeClient = new NodePipeClient(_pipeName, _handshake);
_packetPump = new MSBuildClientPacketPump(_pipeClient);
}

/// <summary>
Expand Down Expand Up @@ -423,7 +400,7 @@ private bool TrySendPacket(Func<INodePacket> packetResolver)
try
{
packet = packetResolver();
WritePacket(_nodeStream, packet);
_pipeClient.WritePacket(packet);
CommunicationsUtilities.Trace("Command packet of type '{0}' sent...", packet.Type);
}
catch (Exception ex)
Expand Down Expand Up @@ -621,7 +598,7 @@ private bool TryConnectToServer(int timeoutMilliseconds)
tryAgain = false;
try
{
NodeProviderOutOfProcBase.ConnectToPipeStream(_nodeStream, _pipeName, _handshake, Math.Max(1, timeoutMilliseconds - (int)sw.ElapsedMilliseconds));
_pipeClient.ConnectToServer(Math.Max(1, timeoutMilliseconds - (int)sw.ElapsedMilliseconds));
}
catch (Exception ex)
{
Expand All @@ -644,30 +621,5 @@ private bool TryConnectToServer(int timeoutMilliseconds)

return true;
}

private void WritePacket(Stream nodeStream, INodePacket packet)
{
MemoryStream memoryStream = _packetMemoryStream;
memoryStream.SetLength(0);

ITranslator writeTranslator = BinaryTranslator.GetWriteTranslator(memoryStream);

// Write header
memoryStream.WriteByte((byte)packet.Type);

// Pad for packet length
_binaryWriter.Write(0);

// Reset the position in the write buffer.
packet.Translate(writeTranslator);

int packetStreamLength = (int)memoryStream.Position;

// Now write in the actual packet length
memoryStream.Position = 1;
_binaryWriter.Write(packetStreamLength - 5);

nodeStream.Write(memoryStream.GetBuffer(), 0, packetStreamLength);
}
}
}
131 changes: 26 additions & 105 deletions src/Build/BackEnd/Client/MSBuildClientPacketPump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,11 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.Buffers.Binary;
using System.Collections.Concurrent;
using System.IO;
using System.Threading;
using Microsoft.Build.Internal;
using Microsoft.Build.Shared;
#if !FEATURE_APM
using System.Threading.Tasks;
#endif

namespace Microsoft.Build.BackEnd.Client
{
Expand Down Expand Up @@ -46,25 +42,15 @@ internal sealed class MSBuildClientPacketPump : INodePacketHandler, INodePacketF
/// </summary>
private readonly NodePacketFactory _packetFactory;

/// <summary>
/// The memory stream for a read buffer.
/// </summary>
private readonly MemoryStream _readBufferMemoryStream;

/// <summary>
/// The thread which runs the asynchronous packet pump
/// </summary>
private Thread? _packetPumpThread;

/// <summary>
/// The stream from where to read packets.
/// </summary>
private readonly Stream _stream;

/// <summary>
/// The binary translator for reading packets.
/// The pipe client from where to read packets.
/// </summary>
private readonly ITranslator _binaryReadTranslator;
private readonly NodePipeClient _pipeClient;

/// <summary>
/// True if this side is gracefully disconnecting.
Expand All @@ -73,21 +59,19 @@ internal sealed class MSBuildClientPacketPump : INodePacketHandler, INodePacketF
/// </summary>
private bool _isServerDisconnecting;

public MSBuildClientPacketPump(Stream stream)
public MSBuildClientPacketPump(NodePipeClient pipeClient)
{
ErrorUtilities.VerifyThrowArgumentNull(stream);
ErrorUtilities.VerifyThrowArgumentNull(pipeClient);

_stream = stream;
_pipeClient = pipeClient;
_pipeClient.RegisterPacketFactory(this);
_isServerDisconnecting = false;
_packetFactory = new NodePacketFactory();

ReceivedPacketsQueue = new ConcurrentQueue<INodePacket>();
PacketReceivedEvent = new AutoResetEvent(false);
PacketPumpCompleted = new ManualResetEvent(false);
_packetPumpShutdownEvent = new ManualResetEvent(false);

_readBufferMemoryStream = new MemoryStream();
_binaryReadTranslator = BinaryTranslator.GetReadTranslator(_readBufferMemoryStream, InterningBinaryReader.CreateSharedBuffer());
}

#region INodePacketFactory Members
Expand All @@ -113,14 +97,13 @@ public void UnregisterPacketHandler(NodePacketType packetType)
}

/// <summary>
/// Deserializes and routes a packer to the appropriate handler.
/// Deserializes a packet.
/// </summary>
/// <param name="nodeId">The node from which the packet was received.</param>
/// <param name="packetType">The packet type.</param>
/// <param name="translator">The translator to use as a source for packet data.</param>
public void DeserializeAndRoutePacket(int nodeId, NodePacketType packetType, ITranslator translator)
public INodePacket DeserializePacket(NodePacketType packetType, ITranslator translator)
{
_packetFactory.DeserializeAndRoutePacket(nodeId, packetType, translator);
return _packetFactory.DeserializePacket(packetType, translator);
}

/// <summary>
Expand Down Expand Up @@ -182,21 +165,16 @@ public void Stop()
/// </remarks>
private void PacketPumpProc()
{
RunReadLoop(_stream, _packetPumpShutdownEvent);
RunReadLoop(_pipeClient, _packetPumpShutdownEvent);
}

private void RunReadLoop(Stream localStream, ManualResetEvent localPacketPumpShutdownEvent)
private void RunReadLoop(NodePipeClient pipeClient, ManualResetEvent localPacketPumpShutdownEvent)
{
CommunicationsUtilities.Trace("Entering read loop.");

try
{
byte[] headerByte = new byte[5];
#if FEATURE_APM
IAsyncResult result = localStream.BeginRead(headerByte, 0, headerByte.Length, null, null);
#else
Task<int> readTask = CommunicationsUtilities.ReadAsync(localStream, headerByte, headerByte.Length).AsTask();
#endif
Task<INodePacket> readTask = pipeClient.ReadPacketAsync();

bool continueReading = true;
do
Expand All @@ -208,11 +186,7 @@ private void RunReadLoop(Stream localStream, ManualResetEvent localPacketPumpShu
WaitHandle[] handles =
[
localPacketPumpShutdownEvent,
#if FEATURE_APM
result.AsyncWaitHandle
#else
((IAsyncResult)readTask).AsyncWaitHandle
#endif
];
int waitId = WaitHandle.WaitAny(handles);
switch (waitId)
Expand All @@ -224,80 +198,27 @@ private void RunReadLoop(Stream localStream, ManualResetEvent localPacketPumpShu
break;

case 1:
INodePacket packet = readTask.GetAwaiter().GetResult();

if (packet.Type == NodePacketType.NodeShutdown)
{
// Client recieved a packet header. Read the rest of it.
int headerBytesRead = 0;
#if FEATURE_APM
headerBytesRead = localStream.EndRead(result);
#else
headerBytesRead = readTask.Result;
#endif

if ((headerBytesRead != headerByte.Length) && !localPacketPumpShutdownEvent.WaitOne(0))
if (!_isServerDisconnecting)
{
// Incomplete read. Abort.
if (headerBytesRead == 0)
{
if (_isServerDisconnecting)
{
continueReading = false;
break;
}

ErrorUtilities.ThrowInternalError("Server disconnected abruptly");
}
else
{
ErrorUtilities.ThrowInternalError("Incomplete header read. {0} of {1} bytes read", headerBytesRead, headerByte.Length);
}
ErrorUtilities.ThrowInternalError("Server disconnected abruptly.");
}

NodePacketType packetType = (NodePacketType)Enum.ToObject(typeof(NodePacketType), headerByte[0]);

int packetLength = BinaryPrimitives.ReadInt32LittleEndian(new Span<byte>(headerByte, 1, 4));
int packetBytesRead = 0;

_readBufferMemoryStream.Position = 0;
_readBufferMemoryStream.SetLength(packetLength);
byte[] packetData = _readBufferMemoryStream.GetBuffer();

while (packetBytesRead < packetLength)
{
int bytesRead = localStream.Read(packetData, packetBytesRead, packetLength - packetBytesRead);
if (bytesRead == 0)
{
// Incomplete read. Abort.
ErrorUtilities.ThrowInternalError("Incomplete packet read. {0} of {1} bytes read", packetBytesRead, packetLength);
}

packetBytesRead += bytesRead;
}
continueReading = false;
break;
}

try
{
_packetFactory.DeserializeAndRoutePacket(0, packetType, _binaryReadTranslator);
}
catch
{
// Error while deserializing or handling packet. Logging additional info.
CommunicationsUtilities.Trace("Packet factory failed to receive package. Exception while deserializing packet {0}.", packetType);
throw;
}
_packetFactory.RoutePacket(0, packet);

if (packetType == NodePacketType.ServerNodeBuildResult)
{
continueReading = false;
}
else
{
// Start reading the next package header.
#if FEATURE_APM
result = localStream.BeginRead(headerByte, 0, headerByte.Length, null, null);
#else
readTask = CommunicationsUtilities.ReadAsync(localStream, headerByte, headerByte.Length).AsTask();
#endif
}
continueReading = packet.Type != NodePacketType.ServerNodeBuildResult;
if (continueReading)
{
readTask = pipeClient.ReadPacketAsync();
}

break;

default:
Expand Down
12 changes: 3 additions & 9 deletions src/Build/BackEnd/Components/Communications/NodeManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -240,19 +240,13 @@ public void UnregisterPacketHandler(NodePacketType packetType)
}

/// <summary>
/// Takes a serializer, deserializes the packet and routes it to the appropriate handler.
/// Takes a serializer and deserializes the packet.
/// </summary>
/// <param name="nodeId">The node from which the packet was received.</param>
/// <param name="packetType">The packet type.</param>
/// <param name="translator">The translator containing the data from which the packet should be reconstructed.</param>
public void DeserializeAndRoutePacket(int nodeId, NodePacketType packetType, ITranslator translator)
public INodePacket DeserializePacket(NodePacketType packetType, ITranslator translator)
{
if (packetType == NodePacketType.NodeShutdown)
{
RemoveNodeFromMapping(nodeId);
}

_packetFactory.DeserializeAndRoutePacket(nodeId, packetType, translator);
return _packetFactory.DeserializePacket(packetType, translator);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,10 +285,11 @@ public void UnregisterPacketHandler(NodePacketType packetType)
/// <summary>
/// Deserializes and routes a packet. Not used in the in-proc node.
/// </summary>
public void DeserializeAndRoutePacket(int nodeId, NodePacketType packetType, ITranslator translator)
public INodePacket DeserializePacket(NodePacketType packetType, ITranslator translator)
{
// Not used
ErrorUtilities.ThrowInternalErrorUnreachable();
return null;
}

/// <summary>
Expand Down
Loading