Skip to content

Commit 59d013b

Browse files
Consolidate common IPC / named pipe code (#11546)
* Encapsulate common IPC/pipe logic * Simplify handshake timeout ifdefs * Update node impls to use common i'PC/pipe code * Delete BufferedReadStream * Split DeserializeAndRoutePacket (plumbing) * proj file changes * Port ValueTask and NET flag changes from merge
1 parent 88496b9 commit 59d013b

24 files changed

+775
-1217
lines changed

src/Build.UnitTests/BackEnd/NodeEndpointInProc_Tests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public void UnregisterPacketHandler(NodePacketType packetType)
101101
throw new NotImplementedException();
102102
}
103103

104-
public void DeserializeAndRoutePacket(int nodeId, NodePacketType packetType, ITranslator translator)
104+
public INodePacket DeserializePacket(NodePacketType packetType, ITranslator translator)
105105
{
106106
throw new NotImplementedException();
107107
}

src/Build/BackEnd/Client/MSBuildClient.cs

Lines changed: 6 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
using System.Diagnostics;
88
using System.Globalization;
99
using System.IO;
10-
using System.IO.Pipes;
1110
using System.Threading;
1211
using Microsoft.Build.BackEnd;
1312
using Microsoft.Build.BackEnd.Client;
@@ -75,19 +74,9 @@ public sealed class MSBuildClient
7574
private readonly string _pipeName;
7675

7776
/// <summary>
78-
/// The named pipe stream for client-server communication.
77+
/// The named pipe client for client-server communication.
7978
/// </summary>
80-
private NamedPipeClientStream _nodeStream = null!;
81-
82-
/// <summary>
83-
/// A way to cache a byte array when writing out packets
84-
/// </summary>
85-
private readonly MemoryStream _packetMemoryStream;
86-
87-
/// <summary>
88-
/// A binary writer to help write into <see cref="_packetMemoryStream"/>
89-
/// </summary>
90-
private readonly BinaryWriter _binaryWriter;
79+
private NodePipeClient _pipeClient = null!;
9180

9281
/// <summary>
9382
/// Used to estimate the size of the build with an ETW trace.
@@ -130,26 +119,14 @@ public MSBuildClient(
130119
// Client <-> Server communication stream
131120
_handshake = GetHandshake();
132121
_pipeName = OutOfProcServerNode.GetPipeName(_handshake);
133-
_packetMemoryStream = new MemoryStream();
134-
_binaryWriter = new BinaryWriter(_packetMemoryStream);
135122

136123
CreateNodePipeStream();
137124
}
138125

139126
private void CreateNodePipeStream()
140127
{
141-
#pragma warning disable SA1111, SA1009 // Closing parenthesis should be on line of last parameter
142-
_nodeStream = new NamedPipeClientStream(
143-
serverName: ".",
144-
_pipeName,
145-
PipeDirection.InOut,
146-
PipeOptions.Asynchronous
147-
#if FEATURE_PIPEOPTIONS_CURRENTUSERONLY
148-
| PipeOptions.CurrentUserOnly
149-
#endif
150-
);
151-
#pragma warning restore SA1111, SA1009 // Closing parenthesis should be on line of last parameter
152-
_packetPump = new MSBuildClientPacketPump(_nodeStream);
128+
_pipeClient = new NodePipeClient(_pipeName, _handshake);
129+
_packetPump = new MSBuildClientPacketPump(_pipeClient);
153130
}
154131

155132
/// <summary>
@@ -423,7 +400,7 @@ private bool TrySendPacket(Func<INodePacket> packetResolver)
423400
try
424401
{
425402
packet = packetResolver();
426-
WritePacket(_nodeStream, packet);
403+
_pipeClient.WritePacket(packet);
427404
CommunicationsUtilities.Trace("Command packet of type '{0}' sent...", packet.Type);
428405
}
429406
catch (Exception ex)
@@ -621,7 +598,7 @@ private bool TryConnectToServer(int timeoutMilliseconds)
621598
tryAgain = false;
622599
try
623600
{
624-
NodeProviderOutOfProcBase.ConnectToPipeStream(_nodeStream, _pipeName, _handshake, Math.Max(1, timeoutMilliseconds - (int)sw.ElapsedMilliseconds));
601+
_pipeClient.ConnectToServer(Math.Max(1, timeoutMilliseconds - (int)sw.ElapsedMilliseconds));
625602
}
626603
catch (Exception ex)
627604
{
@@ -644,30 +621,5 @@ private bool TryConnectToServer(int timeoutMilliseconds)
644621

645622
return true;
646623
}
647-
648-
private void WritePacket(Stream nodeStream, INodePacket packet)
649-
{
650-
MemoryStream memoryStream = _packetMemoryStream;
651-
memoryStream.SetLength(0);
652-
653-
ITranslator writeTranslator = BinaryTranslator.GetWriteTranslator(memoryStream);
654-
655-
// Write header
656-
memoryStream.WriteByte((byte)packet.Type);
657-
658-
// Pad for packet length
659-
_binaryWriter.Write(0);
660-
661-
// Reset the position in the write buffer.
662-
packet.Translate(writeTranslator);
663-
664-
int packetStreamLength = (int)memoryStream.Position;
665-
666-
// Now write in the actual packet length
667-
memoryStream.Position = 1;
668-
_binaryWriter.Write(packetStreamLength - 5);
669-
670-
nodeStream.Write(memoryStream.GetBuffer(), 0, packetStreamLength);
671-
}
672624
}
673625
}

src/Build/BackEnd/Client/MSBuildClientPacketPump.cs

Lines changed: 26 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,11 @@
22
// The .NET Foundation licenses this file to you under the MIT license.
33

44
using System;
5-
using System.Buffers.Binary;
65
using System.Collections.Concurrent;
7-
using System.IO;
86
using System.Threading;
97
using Microsoft.Build.Internal;
108
using Microsoft.Build.Shared;
11-
#if !FEATURE_APM
129
using System.Threading.Tasks;
13-
#endif
1410

1511
namespace Microsoft.Build.BackEnd.Client
1612
{
@@ -46,25 +42,15 @@ internal sealed class MSBuildClientPacketPump : INodePacketHandler, INodePacketF
4642
/// </summary>
4743
private readonly NodePacketFactory _packetFactory;
4844

49-
/// <summary>
50-
/// The memory stream for a read buffer.
51-
/// </summary>
52-
private readonly MemoryStream _readBufferMemoryStream;
53-
5445
/// <summary>
5546
/// The thread which runs the asynchronous packet pump
5647
/// </summary>
5748
private Thread? _packetPumpThread;
5849

5950
/// <summary>
60-
/// The stream from where to read packets.
61-
/// </summary>
62-
private readonly Stream _stream;
63-
64-
/// <summary>
65-
/// The binary translator for reading packets.
51+
/// The pipe client from where to read packets.
6652
/// </summary>
67-
private readonly ITranslator _binaryReadTranslator;
53+
private readonly NodePipeClient _pipeClient;
6854

6955
/// <summary>
7056
/// True if this side is gracefully disconnecting.
@@ -73,21 +59,19 @@ internal sealed class MSBuildClientPacketPump : INodePacketHandler, INodePacketF
7359
/// </summary>
7460
private bool _isServerDisconnecting;
7561

76-
public MSBuildClientPacketPump(Stream stream)
62+
public MSBuildClientPacketPump(NodePipeClient pipeClient)
7763
{
78-
ErrorUtilities.VerifyThrowArgumentNull(stream);
64+
ErrorUtilities.VerifyThrowArgumentNull(pipeClient);
7965

80-
_stream = stream;
66+
_pipeClient = pipeClient;
67+
_pipeClient.RegisterPacketFactory(this);
8168
_isServerDisconnecting = false;
8269
_packetFactory = new NodePacketFactory();
8370

8471
ReceivedPacketsQueue = new ConcurrentQueue<INodePacket>();
8572
PacketReceivedEvent = new AutoResetEvent(false);
8673
PacketPumpCompleted = new ManualResetEvent(false);
8774
_packetPumpShutdownEvent = new ManualResetEvent(false);
88-
89-
_readBufferMemoryStream = new MemoryStream();
90-
_binaryReadTranslator = BinaryTranslator.GetReadTranslator(_readBufferMemoryStream, InterningBinaryReader.CreateSharedBuffer());
9175
}
9276

9377
#region INodePacketFactory Members
@@ -113,14 +97,13 @@ public void UnregisterPacketHandler(NodePacketType packetType)
11397
}
11498

11599
/// <summary>
116-
/// Deserializes and routes a packer to the appropriate handler.
100+
/// Deserializes a packet.
117101
/// </summary>
118-
/// <param name="nodeId">The node from which the packet was received.</param>
119102
/// <param name="packetType">The packet type.</param>
120103
/// <param name="translator">The translator to use as a source for packet data.</param>
121-
public void DeserializeAndRoutePacket(int nodeId, NodePacketType packetType, ITranslator translator)
104+
public INodePacket DeserializePacket(NodePacketType packetType, ITranslator translator)
122105
{
123-
_packetFactory.DeserializeAndRoutePacket(nodeId, packetType, translator);
106+
return _packetFactory.DeserializePacket(packetType, translator);
124107
}
125108

126109
/// <summary>
@@ -182,21 +165,16 @@ public void Stop()
182165
/// </remarks>
183166
private void PacketPumpProc()
184167
{
185-
RunReadLoop(_stream, _packetPumpShutdownEvent);
168+
RunReadLoop(_pipeClient, _packetPumpShutdownEvent);
186169
}
187170

188-
private void RunReadLoop(Stream localStream, ManualResetEvent localPacketPumpShutdownEvent)
171+
private void RunReadLoop(NodePipeClient pipeClient, ManualResetEvent localPacketPumpShutdownEvent)
189172
{
190173
CommunicationsUtilities.Trace("Entering read loop.");
191174

192175
try
193176
{
194-
byte[] headerByte = new byte[5];
195-
#if FEATURE_APM
196-
IAsyncResult result = localStream.BeginRead(headerByte, 0, headerByte.Length, null, null);
197-
#else
198-
Task<int> readTask = CommunicationsUtilities.ReadAsync(localStream, headerByte, headerByte.Length).AsTask();
199-
#endif
177+
Task<INodePacket> readTask = pipeClient.ReadPacketAsync();
200178

201179
bool continueReading = true;
202180
do
@@ -208,11 +186,7 @@ private void RunReadLoop(Stream localStream, ManualResetEvent localPacketPumpShu
208186
WaitHandle[] handles =
209187
[
210188
localPacketPumpShutdownEvent,
211-
#if FEATURE_APM
212-
result.AsyncWaitHandle
213-
#else
214189
((IAsyncResult)readTask).AsyncWaitHandle
215-
#endif
216190
];
217191
int waitId = WaitHandle.WaitAny(handles);
218192
switch (waitId)
@@ -224,80 +198,27 @@ private void RunReadLoop(Stream localStream, ManualResetEvent localPacketPumpShu
224198
break;
225199

226200
case 1:
201+
INodePacket packet = readTask.GetAwaiter().GetResult();
202+
203+
if (packet.Type == NodePacketType.NodeShutdown)
227204
{
228-
// Client recieved a packet header. Read the rest of it.
229-
int headerBytesRead = 0;
230-
#if FEATURE_APM
231-
headerBytesRead = localStream.EndRead(result);
232-
#else
233-
headerBytesRead = readTask.Result;
234-
#endif
235-
236-
if ((headerBytesRead != headerByte.Length) && !localPacketPumpShutdownEvent.WaitOne(0))
205+
if (!_isServerDisconnecting)
237206
{
238-
// Incomplete read. Abort.
239-
if (headerBytesRead == 0)
240-
{
241-
if (_isServerDisconnecting)
242-
{
243-
continueReading = false;
244-
break;
245-
}
246-
247-
ErrorUtilities.ThrowInternalError("Server disconnected abruptly");
248-
}
249-
else
250-
{
251-
ErrorUtilities.ThrowInternalError("Incomplete header read. {0} of {1} bytes read", headerBytesRead, headerByte.Length);
252-
}
207+
ErrorUtilities.ThrowInternalError("Server disconnected abruptly.");
253208
}
254209

255-
NodePacketType packetType = (NodePacketType)Enum.ToObject(typeof(NodePacketType), headerByte[0]);
256-
257-
int packetLength = BinaryPrimitives.ReadInt32LittleEndian(new Span<byte>(headerByte, 1, 4));
258-
int packetBytesRead = 0;
259-
260-
_readBufferMemoryStream.Position = 0;
261-
_readBufferMemoryStream.SetLength(packetLength);
262-
byte[] packetData = _readBufferMemoryStream.GetBuffer();
263-
264-
while (packetBytesRead < packetLength)
265-
{
266-
int bytesRead = localStream.Read(packetData, packetBytesRead, packetLength - packetBytesRead);
267-
if (bytesRead == 0)
268-
{
269-
// Incomplete read. Abort.
270-
ErrorUtilities.ThrowInternalError("Incomplete packet read. {0} of {1} bytes read", packetBytesRead, packetLength);
271-
}
272-
273-
packetBytesRead += bytesRead;
274-
}
210+
continueReading = false;
211+
break;
212+
}
275213

276-
try
277-
{
278-
_packetFactory.DeserializeAndRoutePacket(0, packetType, _binaryReadTranslator);
279-
}
280-
catch
281-
{
282-
// Error while deserializing or handling packet. Logging additional info.
283-
CommunicationsUtilities.Trace("Packet factory failed to receive package. Exception while deserializing packet {0}.", packetType);
284-
throw;
285-
}
214+
_packetFactory.RoutePacket(0, packet);
286215

287-
if (packetType == NodePacketType.ServerNodeBuildResult)
288-
{
289-
continueReading = false;
290-
}
291-
else
292-
{
293-
// Start reading the next package header.
294-
#if FEATURE_APM
295-
result = localStream.BeginRead(headerByte, 0, headerByte.Length, null, null);
296-
#else
297-
readTask = CommunicationsUtilities.ReadAsync(localStream, headerByte, headerByte.Length).AsTask();
298-
#endif
299-
}
216+
continueReading = packet.Type != NodePacketType.ServerNodeBuildResult;
217+
if (continueReading)
218+
{
219+
readTask = pipeClient.ReadPacketAsync();
300220
}
221+
301222
break;
302223

303224
default:

src/Build/BackEnd/Components/Communications/NodeManager.cs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -240,19 +240,13 @@ public void UnregisterPacketHandler(NodePacketType packetType)
240240
}
241241

242242
/// <summary>
243-
/// Takes a serializer, deserializes the packet and routes it to the appropriate handler.
243+
/// Takes a serializer and deserializes the packet.
244244
/// </summary>
245-
/// <param name="nodeId">The node from which the packet was received.</param>
246245
/// <param name="packetType">The packet type.</param>
247246
/// <param name="translator">The translator containing the data from which the packet should be reconstructed.</param>
248-
public void DeserializeAndRoutePacket(int nodeId, NodePacketType packetType, ITranslator translator)
247+
public INodePacket DeserializePacket(NodePacketType packetType, ITranslator translator)
249248
{
250-
if (packetType == NodePacketType.NodeShutdown)
251-
{
252-
RemoveNodeFromMapping(nodeId);
253-
}
254-
255-
_packetFactory.DeserializeAndRoutePacket(nodeId, packetType, translator);
249+
return _packetFactory.DeserializePacket(packetType, translator);
256250
}
257251

258252
/// <summary>

src/Build/BackEnd/Components/Communications/NodeProviderInProc.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,10 +285,11 @@ public void UnregisterPacketHandler(NodePacketType packetType)
285285
/// <summary>
286286
/// Deserializes and routes a packet. Not used in the in-proc node.
287287
/// </summary>
288-
public void DeserializeAndRoutePacket(int nodeId, NodePacketType packetType, ITranslator translator)
288+
public INodePacket DeserializePacket(NodePacketType packetType, ITranslator translator)
289289
{
290290
// Not used
291291
ErrorUtilities.ThrowInternalErrorUnreachable();
292+
return null;
292293
}
293294

294295
/// <summary>

0 commit comments

Comments
 (0)