diff --git a/Public/Src/Engine/Dll/Distribution/DistributionCounters.cs b/Public/Src/Engine/Dll/Distribution/DistributionCounters.cs
index ab0c7a4b45..d4e700425f 100644
--- a/Public/Src/Engine/Dll/Distribution/DistributionCounters.cs
+++ b/Public/Src/Engine/Dll/Distribution/DistributionCounters.cs
@@ -114,14 +114,6 @@ public enum DistributionCounter : ushort
[CounterType(CounterType.Stopwatch)]
ReportPipResultsDuration,
- ///
- [CounterType(CounterType.Stopwatch)]
- FinalReportExecutionLogDuration,
-
- ///
- [CounterType(CounterType.Stopwatch)]
- FinalReportPipResultsDuration,
-
///
ForAllPipsGrpcDurationMs,
diff --git a/Public/Src/Engine/Dll/Distribution/Grpc/ClientConnectionManager.cs b/Public/Src/Engine/Dll/Distribution/Grpc/ClientConnectionManager.cs
index dffde693b6..392dcd867c 100644
--- a/Public/Src/Engine/Dll/Distribution/Grpc/ClientConnectionManager.cs
+++ b/Public/Src/Engine/Dll/Distribution/Grpc/ClientConnectionManager.cs
@@ -540,7 +540,7 @@ public void OnAttachmentCompleted()
}
public async Task> CallAsync(
- Func> func,
+ Func func,
string operation,
CancellationToken cancellationToken = default(CancellationToken),
bool waitForConnection = false)
diff --git a/Public/Src/Engine/Dll/Distribution/Grpc/GrpcOrchestratorClient.cs b/Public/Src/Engine/Dll/Distribution/Grpc/GrpcOrchestratorClient.cs
index 0dde6aa9b8..145fe15500 100644
--- a/Public/Src/Engine/Dll/Distribution/Grpc/GrpcOrchestratorClient.cs
+++ b/Public/Src/Engine/Dll/Distribution/Grpc/GrpcOrchestratorClient.cs
@@ -8,6 +8,7 @@
using System.Threading;
using System.Threading.Tasks;
using BuildXL.Distribution.Grpc;
+using BuildXL.Utilities.Configuration;
using BuildXL.Utilities.Instrumentation.Common;
using BuildXL.Utilities.Tasks;
using Grpc.Core;
@@ -23,6 +24,8 @@ internal sealed class GrpcOrchestratorClient : IOrchestratorClient
private ClientConnectionManager m_connectionManager;
private readonly LoggingContext m_loggingContext;
private volatile bool m_initialized;
+ private AsyncClientStreamingCall m_executionLogStream;
+ private AsyncClientStreamingCall m_pipResultsStream;
public GrpcOrchestratorClient(LoggingContext loggingContext, DistributedInvocationId invocationId)
{
@@ -33,7 +36,7 @@ public GrpcOrchestratorClient(LoggingContext loggingContext, DistributedInvocati
public Task> SayHelloAsync(ServiceLocation myLocation, CancellationToken cancellationToken = default)
{
return m_connectionManager.CallAsync(
- (callOptions) => m_client.HelloAsync(myLocation, options: callOptions),
+ async (callOptions) => await m_client.HelloAsync(myLocation, options: callOptions),
"Hello",
cancellationToken: cancellationToken,
waitForConnection: true);
@@ -64,7 +67,7 @@ public async Task> AttachCompletedAsync(AttachCompletionInfo
Contract.Assert(m_initialized);
var attachmentCompletion = await m_connectionManager.CallAsync(
- (callOptions) => m_client.AttachCompletedAsync(message, options: callOptions),
+ async (callOptions) => await m_client.AttachCompletedAsync(message, options: callOptions),
"AttachCompleted",
waitForConnection: true);
@@ -80,36 +83,65 @@ public Task> ReportPipResultsAsync(PipResultsInfo message, s
{
Contract.Assert(m_initialized);
- return m_connectionManager.CallAsync(
- (callOptions) => m_client.ReportPipResultsAsync(message, options: callOptions),
- description,
- cancellationToken: cancellationToken);
+ Func func;
+
+ if (EngineEnvironmentSettings.GrpcStreamingEnabled)
+ {
+ if (m_pipResultsStream == null)
+ {
+ var headerResult = GrpcUtils.InitializeHeaders(m_invocationId);
+ m_pipResultsStream = m_client.StreamPipResults(headers: headerResult.headers, cancellationToken: cancellationToken);
+ }
+
+ func = async (callOptions) => await m_pipResultsStream.RequestStream.WriteAsync(message);
+ }
+ else
+ {
+ func = async (callOptions) => await m_client.ReportPipResultsAsync(message, options: callOptions);
+ }
+
+ return m_connectionManager.CallAsync(func, description, cancellationToken: cancellationToken);
}
public Task> ReportExecutionLogAsync(ExecutionLogInfo message, CancellationToken cancellationToken = default)
{
Contract.Assert(m_initialized);
- return m_connectionManager.CallAsync(
- (callOptions) => m_client.ReportExecutionLogAsync(message, options: callOptions),
- $" ReportExecutionLog: Size={message.Events.DataBlob.Count()}, SequenceNumber={message.Events.SequenceNumber}",
- cancellationToken: cancellationToken);
- }
+ Func func;
- public AsyncClientStreamingCall StreamExecutionLog(CancellationToken cancellationToken = default)
- {
- Contract.Assert(m_initialized);
+ if (EngineEnvironmentSettings.GrpcStreamingEnabled)
+ {
+ if (m_executionLogStream == null)
+ {
+ var headerResult = GrpcUtils.InitializeHeaders(m_invocationId);
+ m_executionLogStream = m_client.StreamExecutionLog(headers: headerResult.headers, cancellationToken: cancellationToken);
+ }
+
+ func = async (callOptions) => await m_executionLogStream.RequestStream.WriteAsync(message);
+ }
+ else
+ {
+ func = async (callOptions) => await m_client.ReportExecutionLogAsync(message, options: callOptions);
+ }
- var headerResult = GrpcUtils.InitializeHeaders(m_invocationId);
- return m_client.StreamExecutionLog(headers: headerResult.headers, cancellationToken: cancellationToken);
+ return m_connectionManager.CallAsync(func, $" ReportExecutionLog: Size={message.Events.DataBlob.Count()}, SequenceNumber={message.Events.SequenceNumber}", cancellationToken: cancellationToken);
}
- public AsyncClientStreamingCall StreamPipResults(CancellationToken cancellationToken = default)
+ public void FinalizeStreaming()
{
- Contract.Assert(m_initialized);
+ if (m_executionLogStream != null)
+ {
+ m_executionLogStream.RequestStream.CompleteAsync().GetAwaiter().GetResult();
+ m_executionLogStream.GetAwaiter().GetResult();
+ m_executionLogStream.Dispose();
+ }
- var headerResult = GrpcUtils.InitializeHeaders(m_invocationId);
- return m_client.StreamPipResults(headers: headerResult.headers, cancellationToken: cancellationToken);
+ if (m_pipResultsStream != null)
+ {
+ m_pipResultsStream.RequestStream.CompleteAsync().GetAwaiter().GetResult();
+ m_pipResultsStream.GetAwaiter().GetResult();
+ m_pipResultsStream.Dispose();
+ }
}
}
}
\ No newline at end of file
diff --git a/Public/Src/Engine/Dll/Distribution/Grpc/GrpcWorkerClient.cs b/Public/Src/Engine/Dll/Distribution/Grpc/GrpcWorkerClient.cs
index b41aee29a2..02f3de3535 100644
--- a/Public/Src/Engine/Dll/Distribution/Grpc/GrpcWorkerClient.cs
+++ b/Public/Src/Engine/Dll/Distribution/Grpc/GrpcWorkerClient.cs
@@ -2,15 +2,14 @@
// Licensed under the MIT License.
using System;
-using System.Collections.Generic;
using System.Diagnostics.ContractsLight;
using System.Threading;
using System.Threading.Tasks;
using BuildXL.Distribution.Grpc;
+using BuildXL.Utilities.Configuration;
using BuildXL.Utilities.Instrumentation.Common;
using BuildXL.Utilities.Tasks;
using Grpc.Core;
-using static BuildXL.Engine.Distribution.DistributionHelpers;
using static BuildXL.Engine.Distribution.Grpc.ClientConnectionManager;
namespace BuildXL.Engine.Distribution.Grpc
@@ -23,6 +22,7 @@ internal sealed class GrpcWorkerClient : IWorkerClient
private readonly DistributedInvocationId m_invocationId;
private ClientConnectionManager m_connectionManager;
private Worker.WorkerClient m_client;
+ private AsyncClientStreamingCall m_pipBuildRequestStream;
public GrpcWorkerClient(LoggingContext loggingContext, DistributedInvocationId invocationId, EventHandler onConnectionFailureAsync)
{
@@ -57,7 +57,7 @@ public async Task> AttachAsync(BuildStartData message, Cance
Contract.Assert(m_connectionManager != null, "The worker location should be known before attaching");
var attachment = await m_connectionManager.CallAsync(
- (callOptions) => m_client.AttachAsync(message, options: callOptions),
+ async (callOptions) => await m_client.AttachAsync(message, options: callOptions),
"Attach",
cancellationToken,
waitForConnection: true);
@@ -70,21 +70,28 @@ public async Task> AttachAsync(BuildStartData message, Cance
return attachment;
}
- public Task> ExecutePipsAsync(PipBuildRequest message, string description)
+ public Task> ExecutePipsAsync(PipBuildRequest message, string description, CancellationToken cancellationToken = default)
{
Contract.Assert(m_connectionManager != null, "The worker location should be known if calling ExecutePips");
- return m_connectionManager.CallAsync(
- (callOptions) => m_client.ExecutePipsAsync(message, options: callOptions),
- description);
- }
+ Func func;
- public AsyncClientStreamingCall StreamExecutePips(CancellationToken cancellationToken = default)
- {
- Contract.Assert(m_connectionManager != null, "The worker location should be known if calling ExecutePips");
+ if (EngineEnvironmentSettings.GrpcStreamingEnabled)
+ {
+ if (m_pipBuildRequestStream == null)
+ {
+ var headerResult = GrpcUtils.InitializeHeaders(m_invocationId);
+ m_pipBuildRequestStream = m_client.StreamExecutePips(headers: headerResult.headers, cancellationToken: cancellationToken);
+ }
+
+ func = async (callOptions) => await m_pipBuildRequestStream.RequestStream.WriteAsync(message);
+ }
+ else
+ {
+ func = async (callOptions) => await m_client.ExecutePipsAsync(message, options: callOptions);
+ }
- var headerResult = GrpcUtils.InitializeHeaders(m_invocationId);
- return m_client.StreamExecutePips(headers: headerResult.headers, cancellationToken: cancellationToken);
+ return m_connectionManager.CallAsync(func, description);
}
public Task> ExitAsync(BuildEndData message, CancellationToken cancellationToken)
@@ -94,9 +101,19 @@ public Task> ExitAsync(BuildEndData message, CancellationTok
m_connectionManager.ReadyForExit();
return m_connectionManager.CallAsync(
- (callOptions) => m_client.ExitAsync(message, options: callOptions),
+ async (callOptions) => await m_client.ExitAsync(message, options: callOptions),
"Exit",
cancellationToken);
}
+
+ public void FinalizeStreaming()
+ {
+ if (m_pipBuildRequestStream != null)
+ {
+ m_pipBuildRequestStream.RequestStream.CompleteAsync().GetAwaiter().GetResult();
+ m_pipBuildRequestStream.GetAwaiter().GetResult();
+ m_pipBuildRequestStream.Dispose();
+ }
+ }
}
}
\ No newline at end of file
diff --git a/Public/Src/Engine/Dll/Distribution/IOrchestratorClient.cs b/Public/Src/Engine/Dll/Distribution/IOrchestratorClient.cs
index 3bb9510577..e9bc72ad10 100644
--- a/Public/Src/Engine/Dll/Distribution/IOrchestratorClient.cs
+++ b/Public/Src/Engine/Dll/Distribution/IOrchestratorClient.cs
@@ -19,5 +19,6 @@ internal interface IOrchestratorClient
Task> ReportPipResultsAsync(PipResultsInfo message, string description, CancellationToken cancellationToken = default);
Task> ReportExecutionLogAsync(ExecutionLogInfo message, CancellationToken cancellationToken = default);
Task CloseAsync();
+ void FinalizeStreaming();
}
}
\ No newline at end of file
diff --git a/Public/Src/Engine/Dll/Distribution/IWorkerClient.cs b/Public/Src/Engine/Dll/Distribution/IWorkerClient.cs
index d89cf2f517..fc54fd3249 100644
--- a/Public/Src/Engine/Dll/Distribution/IWorkerClient.cs
+++ b/Public/Src/Engine/Dll/Distribution/IWorkerClient.cs
@@ -14,12 +14,14 @@ internal interface IWorkerClient : IDisposable
{
Task> AttachAsync(BuildStartData startData, CancellationToken cancellationToken);
- Task> ExecutePipsAsync(PipBuildRequest input, string description);
+ Task> ExecutePipsAsync(PipBuildRequest input, string description, CancellationToken cancellationToken = default);
Task> ExitAsync(BuildEndData buildEndData, CancellationToken cancellationToken);
void SetWorkerLocation(ServiceLocation serviceLocation);
+ void FinalizeStreaming();
+
Task CloseAsync();
}
}
\ No newline at end of file
diff --git a/Public/Src/Engine/Dll/Distribution/RemoteWorker.cs b/Public/Src/Engine/Dll/Distribution/RemoteWorker.cs
index 13beea2046..b9afa5bd61 100644
--- a/Public/Src/Engine/Dll/Distribution/RemoteWorker.cs
+++ b/Public/Src/Engine/Dll/Distribution/RemoteWorker.cs
@@ -65,8 +65,6 @@ internal sealed class RemoteWorker : RemoteWorkerBase
private PipGraph m_pipGraph;
private CancellationTokenRegistration m_cancellationTokenRegistration;
- private AsyncClientStreamingCall m_pipBuildRequestStream;
-
///
/// Indicates failure which should cause the worker build to fail. NOTE: This may not correspond to the
/// entire distributed build failing. Namely, connection failures for operations materialize outputs
@@ -271,22 +269,7 @@ private void SendBuildRequests()
using (var watch = m_orchestratorService.Environment.Counters.StartStopwatch(PipExecutorCounter.RemoteWorker_BuildRequestSendDuration))
{
- if (EngineEnvironmentSettings.GrpcStreamingEnabled)
- {
- if (m_pipBuildRequestStream == null)
- {
- m_pipBuildRequestStream = ((GrpcWorkerClient)m_workerClient).StreamExecutePips();
- }
-
- m_pipBuildRequestStream.RequestStream.WriteAsync(pipRequest).GetAwaiter().GetResult();
- Tracing.Logger.Log.GrpcTrace(m_appLoggingContext, $"{WorkerIpAddress}", description);
- callResult = new RpcCallResult();
- }
- else
- {
- callResult = m_workerClient.ExecutePipsAsync(pipRequest, description).GetAwaiter().GetResult();
- }
-
+ callResult = m_workerClient.ExecutePipsAsync(pipRequest, description).GetAwaiter().GetResult();
sendDuration = watch.Elapsed;
}
@@ -638,12 +621,7 @@ private async Task DisconnectAsync(string buildFailure = null, [CallerMemberName
m_sendThread.Join();
}
- if (m_pipBuildRequestStream != null)
- {
- m_pipBuildRequestStream.RequestStream.CompleteAsync().GetAwaiter().GetResult();
- m_pipBuildRequestStream.GetAwaiter().GetResult();
- m_pipBuildRequestStream.Dispose();
- }
+ m_workerClient.FinalizeStreaming();
// If we still have a connection with the worker, we should send a message to worker to make it exit.
// We might be releasing a worker that didn't say Hello and so m_serviceLocation can be null, don't try to call exit
diff --git a/Public/Src/Engine/Dll/Distribution/WorkerNotificationManager.cs b/Public/Src/Engine/Dll/Distribution/WorkerNotificationManager.cs
index 53f775c685..5eaca23eca 100644
--- a/Public/Src/Engine/Dll/Distribution/WorkerNotificationManager.cs
+++ b/Public/Src/Engine/Dll/Distribution/WorkerNotificationManager.cs
@@ -94,9 +94,6 @@ internal partial class WorkerNotificationManager : IWorkerNotificationManager
private readonly List m_executionResults = new List();
private readonly List m_eventList = new List();
- private AsyncClientStreamingCall m_executionLogStream;
- private AsyncClientStreamingCall m_pipResultsStream;
-
///
public WorkerNotificationManager(DistributionService distributionService, IWorkerPipExecutionService executionService, LoggingContext loggingContext)
{
@@ -168,25 +165,7 @@ public void Exit(bool isClean)
m_forwardingEventListener?.Dispose();
m_sendCancellationSource.Cancel();
- if (m_executionLogStream != null)
- {
- using (DistributionService.Counters.StartStopwatch(DistributionCounter.FinalReportExecutionLogDuration))
- {
- m_executionLogStream.RequestStream.CompleteAsync().GetAwaiter().GetResult();
- m_executionLogStream.GetAwaiter().GetResult();
- m_executionLogStream.Dispose();
- }
- }
-
- if (m_pipResultsStream != null)
- {
- using (DistributionService.Counters.StartStopwatch(DistributionCounter.FinalReportPipResultsDuration))
- {
- m_pipResultsStream.RequestStream.CompleteAsync().GetAwaiter().GetResult();
- m_pipResultsStream.GetAwaiter().GetResult();
- m_pipResultsStream.Dispose();
- }
- }
+ m_orchestratorClient.FinalizeStreaming();
DistributionService.Counters.AddToCounter(DistributionCounter.ExecutionLogSentSize, m_executionLogTarget?.TotalSize ?? 0);
}
@@ -450,23 +429,9 @@ private void SendNotifications(CancellationToken cancellationToken)
using (DistributionService.Counters.StartStopwatch(DistributionCounter.ReportPipResultsDuration))
{
- if (EngineEnvironmentSettings.GrpcStreamingEnabled)
- {
- if (m_pipResultsStream == null)
- {
- m_pipResultsStream = ((GrpcOrchestratorClient)m_orchestratorClient).StreamPipResults();
- }
-
- m_pipResultsStream.RequestStream.WriteAsync(notification).GetAwaiter().GetResult();
- Tracing.Logger.Log.GrpcTrace(m_loggingContext, "Orchestrator", description);
- callResult = new RpcCallResult();
- }
- else
- {
- callResult = m_orchestratorClient.ReportPipResultsAsync(notification,
- description,
- cancellationToken).GetAwaiter().GetResult();
- }
+ callResult = m_orchestratorClient.ReportPipResultsAsync(notification,
+ description,
+ cancellationToken).GetAwaiter().GetResult();
}
if (callResult.Succeeded)
@@ -542,27 +507,11 @@ private bool ReportExecutionLog(MemoryStream memoryStream)
}
};
- bool callSuccess = true;
-
using (DistributionService.Counters.StartStopwatch(DistributionCounter.ReportExecutionLogDuration))
{
- if (EngineEnvironmentSettings.GrpcStreamingEnabled)
- {
- if (m_executionLogStream == null)
- {
- m_executionLogStream = ((GrpcOrchestratorClient)m_orchestratorClient).StreamExecutionLog();
- }
-
- m_executionLogStream.RequestStream.WriteAsync(message).GetAwaiter().GetResult();
- }
- else
- {
- // Send event data to orchestrator synchronously. This will only block the dedicated thread used by the binary logger.
- callSuccess = m_orchestratorClient.ReportExecutionLogAsync(message).GetAwaiter().GetResult().Succeeded;
- }
+ // Send event data to orchestrator synchronously. This will only block the dedicated thread used by the binary logger.
+ return m_orchestratorClient.ReportExecutionLogAsync(message).GetAwaiter().GetResult().Succeeded;
}
-
- return callSuccess;
}
public void MarkPipProcessingStarted(long semistableHash)
diff --git a/Public/Src/Engine/UnitTests/Distribution/Mocks/GrpcMocks.cs b/Public/Src/Engine/UnitTests/Distribution/Mocks/GrpcMocks.cs
index b87e4f3f09..6dfc1d3426 100644
--- a/Public/Src/Engine/UnitTests/Distribution/Mocks/GrpcMocks.cs
+++ b/Public/Src/Engine/UnitTests/Distribution/Mocks/GrpcMocks.cs
@@ -190,5 +190,10 @@ public Task> SayHelloAsync(ServiceLocation serviceLocation,
{
return Task.FromResult(SuccessResult);
}
+
+ public void FinalizeStreaming()
+ {
+
+ }
}
}
\ No newline at end of file