Skip to content

Commit

Permalink
Merged PR 697871: Use ClientConnectionManager for streaming calls
Browse files Browse the repository at this point in the history
Use ClientConnectionManager for streaming calls, so that we can reuse existing exception handling.
  • Loading branch information
semihokur committed Jan 22, 2023
1 parent fbfbb40 commit 6fd62a3
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 125 deletions.
8 changes: 0 additions & 8 deletions Public/Src/Engine/Dll/Distribution/DistributionCounters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,6 @@ public enum DistributionCounter : ushort
[CounterType(CounterType.Stopwatch)]
ReportPipResultsDuration,

/// <nodoc/>
[CounterType(CounterType.Stopwatch)]
FinalReportExecutionLogDuration,

/// <nodoc/>
[CounterType(CounterType.Stopwatch)]
FinalReportPipResultsDuration,

/// <nodoc/>
ForAllPipsGrpcDurationMs,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ public void OnAttachmentCompleted()
}

public async Task<RpcCallResult<Unit>> CallAsync(
Func<CallOptions, AsyncUnaryCall<RpcResponse>> func,
Func<CallOptions, Task> func,
string operation,
CancellationToken cancellationToken = default(CancellationToken),
bool waitForConnection = false)
Expand Down
72 changes: 52 additions & 20 deletions Public/Src/Engine/Dll/Distribution/Grpc/GrpcOrchestratorClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Threading;
using System.Threading.Tasks;
using BuildXL.Distribution.Grpc;
using BuildXL.Utilities.Configuration;
using BuildXL.Utilities.Instrumentation.Common;
using BuildXL.Utilities.Tasks;
using Grpc.Core;
Expand All @@ -23,6 +24,8 @@ internal sealed class GrpcOrchestratorClient : IOrchestratorClient
private ClientConnectionManager m_connectionManager;
private readonly LoggingContext m_loggingContext;
private volatile bool m_initialized;
private AsyncClientStreamingCall<ExecutionLogInfo, RpcResponse> m_executionLogStream;
private AsyncClientStreamingCall<PipResultsInfo, RpcResponse> m_pipResultsStream;

public GrpcOrchestratorClient(LoggingContext loggingContext, DistributedInvocationId invocationId)
{
Expand All @@ -33,7 +36,7 @@ public GrpcOrchestratorClient(LoggingContext loggingContext, DistributedInvocati
public Task<RpcCallResult<Unit>> SayHelloAsync(ServiceLocation myLocation, CancellationToken cancellationToken = default)
{
return m_connectionManager.CallAsync(
(callOptions) => m_client.HelloAsync(myLocation, options: callOptions),
async (callOptions) => await m_client.HelloAsync(myLocation, options: callOptions),
"Hello",
cancellationToken: cancellationToken,
waitForConnection: true);
Expand Down Expand Up @@ -64,7 +67,7 @@ public async Task<RpcCallResult<Unit>> AttachCompletedAsync(AttachCompletionInfo
Contract.Assert(m_initialized);

var attachmentCompletion = await m_connectionManager.CallAsync(
(callOptions) => m_client.AttachCompletedAsync(message, options: callOptions),
async (callOptions) => await m_client.AttachCompletedAsync(message, options: callOptions),
"AttachCompleted",
waitForConnection: true);

Expand All @@ -80,36 +83,65 @@ public Task<RpcCallResult<Unit>> ReportPipResultsAsync(PipResultsInfo message, s
{
Contract.Assert(m_initialized);

return m_connectionManager.CallAsync(
(callOptions) => m_client.ReportPipResultsAsync(message, options: callOptions),
description,
cancellationToken: cancellationToken);
Func<CallOptions, Task> func;

if (EngineEnvironmentSettings.GrpcStreamingEnabled)
{
if (m_pipResultsStream == null)
{
var headerResult = GrpcUtils.InitializeHeaders(m_invocationId);
m_pipResultsStream = m_client.StreamPipResults(headers: headerResult.headers, cancellationToken: cancellationToken);
}

func = async (callOptions) => await m_pipResultsStream.RequestStream.WriteAsync(message);
}
else
{
func = async (callOptions) => await m_client.ReportPipResultsAsync(message, options: callOptions);
}

return m_connectionManager.CallAsync(func, description, cancellationToken: cancellationToken);
}

public Task<RpcCallResult<Unit>> ReportExecutionLogAsync(ExecutionLogInfo message, CancellationToken cancellationToken = default)
{
Contract.Assert(m_initialized);

return m_connectionManager.CallAsync(
(callOptions) => m_client.ReportExecutionLogAsync(message, options: callOptions),
$" ReportExecutionLog: Size={message.Events.DataBlob.Count()}, SequenceNumber={message.Events.SequenceNumber}",
cancellationToken: cancellationToken);
}
Func<CallOptions, Task> func;

public AsyncClientStreamingCall<ExecutionLogInfo, RpcResponse> StreamExecutionLog(CancellationToken cancellationToken = default)
{
Contract.Assert(m_initialized);
if (EngineEnvironmentSettings.GrpcStreamingEnabled)
{
if (m_executionLogStream == null)
{
var headerResult = GrpcUtils.InitializeHeaders(m_invocationId);
m_executionLogStream = m_client.StreamExecutionLog(headers: headerResult.headers, cancellationToken: cancellationToken);
}

func = async (callOptions) => await m_executionLogStream.RequestStream.WriteAsync(message);
}
else
{
func = async (callOptions) => await m_client.ReportExecutionLogAsync(message, options: callOptions);
}

var headerResult = GrpcUtils.InitializeHeaders(m_invocationId);
return m_client.StreamExecutionLog(headers: headerResult.headers, cancellationToken: cancellationToken);
return m_connectionManager.CallAsync(func, $" ReportExecutionLog: Size={message.Events.DataBlob.Count()}, SequenceNumber={message.Events.SequenceNumber}", cancellationToken: cancellationToken);
}

public AsyncClientStreamingCall<PipResultsInfo, RpcResponse> StreamPipResults(CancellationToken cancellationToken = default)
public void FinalizeStreaming()
{
Contract.Assert(m_initialized);
if (m_executionLogStream != null)
{
m_executionLogStream.RequestStream.CompleteAsync().GetAwaiter().GetResult();
m_executionLogStream.GetAwaiter().GetResult();
m_executionLogStream.Dispose();
}

var headerResult = GrpcUtils.InitializeHeaders(m_invocationId);
return m_client.StreamPipResults(headers: headerResult.headers, cancellationToken: cancellationToken);
if (m_pipResultsStream != null)
{
m_pipResultsStream.RequestStream.CompleteAsync().GetAwaiter().GetResult();
m_pipResultsStream.GetAwaiter().GetResult();
m_pipResultsStream.Dispose();
}
}
}
}
45 changes: 31 additions & 14 deletions Public/Src/Engine/Dll/Distribution/Grpc/GrpcWorkerClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Diagnostics.ContractsLight;
using System.Threading;
using System.Threading.Tasks;
using BuildXL.Distribution.Grpc;
using BuildXL.Utilities.Configuration;
using BuildXL.Utilities.Instrumentation.Common;
using BuildXL.Utilities.Tasks;
using Grpc.Core;
using static BuildXL.Engine.Distribution.DistributionHelpers;
using static BuildXL.Engine.Distribution.Grpc.ClientConnectionManager;

namespace BuildXL.Engine.Distribution.Grpc
Expand All @@ -23,6 +22,7 @@ internal sealed class GrpcWorkerClient : IWorkerClient
private readonly DistributedInvocationId m_invocationId;
private ClientConnectionManager m_connectionManager;
private Worker.WorkerClient m_client;
private AsyncClientStreamingCall<PipBuildRequest, RpcResponse> m_pipBuildRequestStream;

public GrpcWorkerClient(LoggingContext loggingContext, DistributedInvocationId invocationId, EventHandler<ConnectionFailureEventArgs> onConnectionFailureAsync)
{
Expand Down Expand Up @@ -57,7 +57,7 @@ public async Task<RpcCallResult<Unit>> AttachAsync(BuildStartData message, Cance
Contract.Assert(m_connectionManager != null, "The worker location should be known before attaching");

var attachment = await m_connectionManager.CallAsync(
(callOptions) => m_client.AttachAsync(message, options: callOptions),
async (callOptions) => await m_client.AttachAsync(message, options: callOptions),
"Attach",
cancellationToken,
waitForConnection: true);
Expand All @@ -70,21 +70,28 @@ public async Task<RpcCallResult<Unit>> AttachAsync(BuildStartData message, Cance
return attachment;
}

public Task<RpcCallResult<Unit>> ExecutePipsAsync(PipBuildRequest message, string description)
public Task<RpcCallResult<Unit>> ExecutePipsAsync(PipBuildRequest message, string description, CancellationToken cancellationToken = default)
{
Contract.Assert(m_connectionManager != null, "The worker location should be known if calling ExecutePips");

return m_connectionManager.CallAsync(
(callOptions) => m_client.ExecutePipsAsync(message, options: callOptions),
description);
}
Func<CallOptions, Task> func;

public AsyncClientStreamingCall<PipBuildRequest, RpcResponse> StreamExecutePips(CancellationToken cancellationToken = default)
{
Contract.Assert(m_connectionManager != null, "The worker location should be known if calling ExecutePips");
if (EngineEnvironmentSettings.GrpcStreamingEnabled)
{
if (m_pipBuildRequestStream == null)
{
var headerResult = GrpcUtils.InitializeHeaders(m_invocationId);
m_pipBuildRequestStream = m_client.StreamExecutePips(headers: headerResult.headers, cancellationToken: cancellationToken);
}

func = async (callOptions) => await m_pipBuildRequestStream.RequestStream.WriteAsync(message);
}
else
{
func = async (callOptions) => await m_client.ExecutePipsAsync(message, options: callOptions);
}

var headerResult = GrpcUtils.InitializeHeaders(m_invocationId);
return m_client.StreamExecutePips(headers: headerResult.headers, cancellationToken: cancellationToken);
return m_connectionManager.CallAsync(func, description);
}

public Task<RpcCallResult<Unit>> ExitAsync(BuildEndData message, CancellationToken cancellationToken)
Expand All @@ -94,9 +101,19 @@ public Task<RpcCallResult<Unit>> ExitAsync(BuildEndData message, CancellationTok
m_connectionManager.ReadyForExit();

return m_connectionManager.CallAsync(
(callOptions) => m_client.ExitAsync(message, options: callOptions),
async (callOptions) => await m_client.ExitAsync(message, options: callOptions),
"Exit",
cancellationToken);
}

public void FinalizeStreaming()
{
if (m_pipBuildRequestStream != null)
{
m_pipBuildRequestStream.RequestStream.CompleteAsync().GetAwaiter().GetResult();
m_pipBuildRequestStream.GetAwaiter().GetResult();
m_pipBuildRequestStream.Dispose();
}
}
}
}
1 change: 1 addition & 0 deletions Public/Src/Engine/Dll/Distribution/IOrchestratorClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ internal interface IOrchestratorClient
Task<RpcCallResult<Unit>> ReportPipResultsAsync(PipResultsInfo message, string description, CancellationToken cancellationToken = default);
Task<RpcCallResult<Unit>> ReportExecutionLogAsync(ExecutionLogInfo message, CancellationToken cancellationToken = default);
Task CloseAsync();
void FinalizeStreaming();
}
}
4 changes: 3 additions & 1 deletion Public/Src/Engine/Dll/Distribution/IWorkerClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ internal interface IWorkerClient : IDisposable
{
Task<RpcCallResult<Unit>> AttachAsync(BuildStartData startData, CancellationToken cancellationToken);

Task<RpcCallResult<Unit>> ExecutePipsAsync(PipBuildRequest input, string description);
Task<RpcCallResult<Unit>> ExecutePipsAsync(PipBuildRequest input, string description, CancellationToken cancellationToken = default);

Task<RpcCallResult<Unit>> ExitAsync(BuildEndData buildEndData, CancellationToken cancellationToken);

void SetWorkerLocation(ServiceLocation serviceLocation);

void FinalizeStreaming();

Task CloseAsync();
}
}
26 changes: 2 additions & 24 deletions Public/Src/Engine/Dll/Distribution/RemoteWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ internal sealed class RemoteWorker : RemoteWorkerBase
private PipGraph m_pipGraph;
private CancellationTokenRegistration m_cancellationTokenRegistration;

private AsyncClientStreamingCall<PipBuildRequest, RpcResponse> m_pipBuildRequestStream;

/// <summary>
/// Indicates failure which should cause the worker build to fail. NOTE: This may not correspond to the
/// entire distributed build failing. Namely, connection failures for operations materialize outputs
Expand Down Expand Up @@ -271,22 +269,7 @@ private void SendBuildRequests()

using (var watch = m_orchestratorService.Environment.Counters.StartStopwatch(PipExecutorCounter.RemoteWorker_BuildRequestSendDuration))
{
if (EngineEnvironmentSettings.GrpcStreamingEnabled)
{
if (m_pipBuildRequestStream == null)
{
m_pipBuildRequestStream = ((GrpcWorkerClient)m_workerClient).StreamExecutePips();
}

m_pipBuildRequestStream.RequestStream.WriteAsync(pipRequest).GetAwaiter().GetResult();
Tracing.Logger.Log.GrpcTrace(m_appLoggingContext, $"{WorkerIpAddress}", description);
callResult = new RpcCallResult<Unit>();
}
else
{
callResult = m_workerClient.ExecutePipsAsync(pipRequest, description).GetAwaiter().GetResult();
}

callResult = m_workerClient.ExecutePipsAsync(pipRequest, description).GetAwaiter().GetResult();
sendDuration = watch.Elapsed;
}

Expand Down Expand Up @@ -638,12 +621,7 @@ private async Task DisconnectAsync(string buildFailure = null, [CallerMemberName
m_sendThread.Join();
}

if (m_pipBuildRequestStream != null)
{
m_pipBuildRequestStream.RequestStream.CompleteAsync().GetAwaiter().GetResult();
m_pipBuildRequestStream.GetAwaiter().GetResult();
m_pipBuildRequestStream.Dispose();
}
m_workerClient.FinalizeStreaming();

// If we still have a connection with the worker, we should send a message to worker to make it exit.
// We might be releasing a worker that didn't say Hello and so m_serviceLocation can be null, don't try to call exit
Expand Down
Loading

0 comments on commit 6fd62a3

Please sign in to comment.