Skip to content

Commit

Permalink
agent: add timeout() routine and use it for data-plane operations
Browse files Browse the repository at this point in the history
Ensure we apply top-level timeouts to actions undertaken within a data-plane:
- Staring a connector proxy environment.
- Awaiting a connector unary response.
- Activating built catalog specifications into a data-plane
- Deleting built specifications from a data-plane.

We want to preserve a firm boundary that data-planes are falliable and
control-plane operations must protect themselves from an unavailability
or undesired behavior of a data-plane.
  • Loading branch information
jgraettinger committed Oct 20, 2024
1 parent 75a26e1 commit d8e1394
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 32 deletions.
17 changes: 10 additions & 7 deletions crates/agent/src/controllers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,16 +322,19 @@ impl Status {
// resources, and then notify dependent controllers, to make
// sure that they can respond. The controller job row will be
// deleted automatically after we return.
control_plane
.data_plane_delete(
crate::timeout(
std::time::Duration::from_secs(60),
control_plane.data_plane_delete(
state.catalog_name.clone(),
catalog_type,
state.data_plane_id,
)
.await
.context("deleting from data-plane")
.with_retry(backoff_data_plane_activate(state.failures))?;
tracing::info!("deleted from data-plane");
),
|| "Timeout while deleting from data-plane",
)
.await
.context("failed to delete from data-plane")
.with_retry(backoff_data_plane_activate(state.failures))?;

control_plane
.notify_dependents(state.catalog_name.clone())
.await
Expand Down
15 changes: 10 additions & 5 deletions crates/agent/src/controllers/publication_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,16 @@ impl ActivationStatus {
if state.last_build_id > self.last_activated {
let name = state.catalog_name.clone();
let built_spec = state.built_spec.as_ref().expect("built_spec must be Some");
control_plane
.data_plane_activate(name, built_spec, state.data_plane_id)
.await
.with_retry(backoff_data_plane_activate(state.failures))
.context("failed to activate")?;

crate::timeout(
std::time::Duration::from_secs(60),
control_plane.data_plane_activate(name, built_spec, state.data_plane_id),
|| "Timeout while activating into data-plane",
)
.await
.with_retry(backoff_data_plane_activate(state.failures))
.context("failed to activate into data-plane")?;

tracing::debug!(last_activated = %state.last_build_id, "activated");
self.last_activated = state.last_build_id;
}
Expand Down
20 changes: 20 additions & 0 deletions crates/agent/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,23 @@ pub fn next_name(current_name: &str) -> String {
// complexity.
format!("{current_name}_v2")
}

// timeout is a convienence for tokio::time::timeout which merges
// its error with the Future's nested anyhow::Result Output.
async fn timeout<Ok, Fut, C, WC>(
dur: std::time::Duration,
fut: Fut,
with_context: WC,
) -> anyhow::Result<Ok>
where
C: std::fmt::Display + Send + Sync + 'static,
Fut: std::future::Future<Output = anyhow::Result<Ok>>,
WC: FnOnce() -> C,
{
use anyhow::Context;

match tokio::time::timeout(dur, fut).await {
Ok(result) => result,
Err(err) => Err(anyhow::anyhow!(err)).with_context(with_context),
}
}
73 changes: 53 additions & 20 deletions crates/agent/src/proxy_connectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,25 @@ impl<L: runtime::LogHandler> ProxyConnectors<L> {
task: ops::ShardRef,
request: capture::Request,
) -> anyhow::Result<capture::Response> {
let (channel, metadata, logs) = self.dial_proxy(data_plane, task).await?;
let (channel, metadata, logs) = crate::timeout(
DIAL_PROXY_TIMEOUT,
self.dial_proxy(data_plane, task),
|| dial_proxy_timeout_msg(data_plane),
)
.await?;

let mut client = proto_grpc::capture::connector_client::ConnectorClient::with_interceptor(
channel, metadata,
)
.max_decoding_message_size(runtime::MAX_MESSAGE_SIZE);

Self::drive_unary_response(
client.capture(futures::stream::once(async move { request })),
logs,
crate::timeout(
CONNECTOR_TIMEOUT,
Self::drive_unary_response(
client.capture(futures::stream::once(async move { request })),
logs,
),
|| CONNECTOR_TIMEOUT_MSG,
)
.await
}
Expand All @@ -96,15 +106,25 @@ impl<L: runtime::LogHandler> ProxyConnectors<L> {
task: ops::ShardRef,
request: derive::Request,
) -> anyhow::Result<derive::Response> {
let (channel, metadata, logs) = self.dial_proxy(data_plane, task).await?;
let (channel, metadata, logs) = crate::timeout(
DIAL_PROXY_TIMEOUT,
self.dial_proxy(data_plane, task),
|| dial_proxy_timeout_msg(data_plane),
)
.await?;

let mut client = proto_grpc::derive::connector_client::ConnectorClient::with_interceptor(
channel, metadata,
)
.max_decoding_message_size(runtime::MAX_MESSAGE_SIZE);

Self::drive_unary_response(
client.derive(futures::stream::once(async move { request })),
logs,
crate::timeout(
CONNECTOR_TIMEOUT,
Self::drive_unary_response(
client.derive(futures::stream::once(async move { request })),
logs,
),
|| CONNECTOR_TIMEOUT_MSG,
)
.await
}
Expand All @@ -119,16 +139,26 @@ impl<L: runtime::LogHandler> ProxyConnectors<L> {
task: ops::ShardRef,
request: materialize::Request,
) -> anyhow::Result<materialize::Response> {
let (channel, metadata, logs) = self.dial_proxy(data_plane, task).await?;
let (channel, metadata, logs) = crate::timeout(
DIAL_PROXY_TIMEOUT,
self.dial_proxy(data_plane, task),
|| dial_proxy_timeout_msg(data_plane),
)
.await?;

let mut client =
proto_grpc::materialize::connector_client::ConnectorClient::with_interceptor(
channel, metadata,
)
.max_decoding_message_size(runtime::MAX_MESSAGE_SIZE);

Self::drive_unary_response(
client.materialize(futures::stream::once(async move { request })),
logs,
crate::timeout(
CONNECTOR_TIMEOUT,
Self::drive_unary_response(
client.materialize(futures::stream::once(async move { request })),
logs,
),
|| CONNECTOR_TIMEOUT_MSG,
)
.await
}
Expand Down Expand Up @@ -236,18 +266,21 @@ impl<L: runtime::LogHandler> ProxyConnectors<L> {
}
.map_err(runtime::status_to_anyhow);

let response = tokio::time::timeout(CONNECTOR_TIMEOUT, response);

match futures::join!(response, log_loop) {
(Err(_timeout), _) => Err(anyhow::anyhow!("Timeout while waiting for the connector's response. Please verify any network configuration and retry.")),
(Ok(Err(response_err)), _) => Err(response_err),
(Ok(Ok(_)), Err(log_err)) => Err(log_err),
(Ok(Ok(response)), Ok(())) => Ok(response),
}
futures::try_join!(response, log_loop).map(|(response, ())| response)
}
}

const DIAL_PROXY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60);

fn dial_proxy_timeout_msg(data_plane: &tables::DataPlane) -> String {
format!(
"Timeout starting remote proxy for connector in data-plane {}",
data_plane.data_plane_name
)
}

const CONNECTOR_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(300); // Five minutes.
const CONNECTOR_TIMEOUT_MSG: &'static str = "Timeout while waiting for the connector's response. Please verify any network configuration and retry.";

/*
Expand Down

0 comments on commit d8e1394

Please sign in to comment.