diff --git a/example/full/instance.properties b/example/full/instance.properties
index a043f924c3..ab8d22e9ea 100644
--- a/example/full/instance.properties
+++ b/example/full/instance.properties
@@ -129,6 +129,9 @@ sleeper.table.batching.lambdas.memory=1024
# create compaction jobs, run garbage collection, perform partition splitting.
sleeper.table.batching.lambdas.timeout.seconds=60
+# This specifies whether OpenTelemetry tracing is enabled.
+sleeper.opentelemetry.tracing.enabled=true
+
## The following properties relate to standard ingest.
diff --git a/java/athena/pom.xml b/java/athena/pom.xml
index dba947c591..9fdf3aabb2 100644
--- a/java/athena/pom.xml
+++ b/java/athena/pom.xml
@@ -27,6 +27,27 @@
athena
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ ${jackson.version}
+
+
+
+ com.amazonaws
+ aws-athena-federation-sdk
+ ${athena.version}
+
+
+
+ org.apache.arrow
+ arrow-vector
+
+
+ org.apache.arrow
+ arrow-memory-unsafe
+
+
sleeper
configuration
@@ -47,25 +68,6 @@
statestore
${project.parent.version}
-
- com.fasterxml.jackson.core
- jackson-annotations
- ${jackson.version}
-
-
- com.amazonaws
- aws-athena-federation-sdk
- ${athena.version}
-
-
-
- org.apache.arrow
- arrow-vector
-
-
- org.apache.arrow
- arrow-memory-unsafe
-
org.testcontainers
diff --git a/java/bulk-import/bulk-import-runner/docker/eks-native/Dockerfile b/java/bulk-import/bulk-import-runner/docker/eks-native/Dockerfile
index 48c43705e5..bb99d8578f 100644
--- a/java/bulk-import/bulk-import-runner/docker/eks-native/Dockerfile
+++ b/java/bulk-import/bulk-import-runner/docker/eks-native/Dockerfile
@@ -17,6 +17,8 @@ ARG BUILDER_IMAGE_TAG=3.8-openjdk-8-slim
ARG BASE_IMAGE_NAME=amazoncorretto
ARG BASE_IMAGE_TAG=11
+ARG MODE=no_tracing
+
ARG SPARK_VERSION=3.4.1
ARG HADOOP_VERSION=3.3.3
ARG SPARK_DOWNLOAD_FILENAME=spark-${SPARK_VERSION}-bin-hadoop3
@@ -57,7 +59,14 @@ RUN echo "Before slimming: $(du -sh /opt/${SPARK_DIRNAME})" && \
# Add workdir
RUN mkdir /opt/${SPARK_DIRNAME}/workdir
-FROM ${BASE_IMAGE_NAME}:${BASE_IMAGE_TAG}
+FROM ${BASE_IMAGE_NAME}:${BASE_IMAGE_TAG} as base_no_tracing
+
+FROM base_no_tracing as base_tracing
+
+ONBUILD ADD https://github.com/aws-observability/aws-otel-java-instrumentation/releases/latest/download/aws-opentelemetry-agent.jar /opt/aws-opentelemetry-agent.jar
+ONBUILD ENV JAVA_TOOL_OPTIONS=-javaagent:/opt/aws-opentelemetry-agent.jar
+
+FROM base_${MODE}
ARG SPARK_VERSION
ARG HADOOP_VERSION
diff --git a/java/bulk-import/bulk-import-runner/docker/eks/Dockerfile b/java/bulk-import/bulk-import-runner/docker/eks/Dockerfile
index 7d5602d11e..995ee00e57 100644
--- a/java/bulk-import/bulk-import-runner/docker/eks/Dockerfile
+++ b/java/bulk-import/bulk-import-runner/docker/eks/Dockerfile
@@ -11,8 +11,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+ARG MODE=no_tracing
-FROM apache/spark:3.4.1-scala2.12-java11-ubuntu
+FROM apache/spark:3.4.1-scala2.12-java11-ubuntu as build_no_tracing
+
+FROM build_no_tracing as build_tracing
+
+ONBUILD ADD https://github.com/aws-observability/aws-otel-java-instrumentation/releases/latest/download/aws-opentelemetry-agent.jar /opt/aws-opentelemetry-agent.jar
+ONBUILD ENV JAVA_TOOL_OPTIONS=-javaagent:/opt/aws-opentelemetry-agent.jar
+
+FROM build_${MODE}
ENV PATH="$PATH:/opt/spark/bin"
USER root
diff --git a/java/bulk-import/bulk-import-runner/docker/emr-serverless/Dockerfile b/java/bulk-import/bulk-import-runner/docker/emr-serverless/Dockerfile
index da6daf77b5..01e09297ab 100644
--- a/java/bulk-import/bulk-import-runner/docker/emr-serverless/Dockerfile
+++ b/java/bulk-import/bulk-import-runner/docker/emr-serverless/Dockerfile
@@ -11,8 +11,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+ARG MODE=no_tracing
-FROM public.ecr.aws/emr-serverless/spark/emr-6.13.0:latest
+FROM public.ecr.aws/emr-serverless/spark/emr-6.13.0:latest as build_no_tracing
+
+FROM build_no_tracing as build_tracing
+
+ONBUILD ADD https://github.com/aws-observability/aws-otel-java-instrumentation/releases/latest/download/aws-opentelemetry-agent.jar /opt/aws-opentelemetry-agent.jar
+ONBUILD ENV JAVA_TOOL_OPTIONS=-javaagent:/opt/aws-opentelemetry-agent.jar
+
+FROM build_${MODE}
USER root
@@ -24,4 +32,4 @@ RUN yum update -y && \
COPY ./bulk-import-runner.jar /workdir
# EMR Severless will run the image as hadoop
-USER hadoop:hadoop
\ No newline at end of file
+USER hadoop:hadoop
diff --git a/java/cdk/src/main/java/sleeper/cdk/SleeperCdkApp.java b/java/cdk/src/main/java/sleeper/cdk/SleeperCdkApp.java
index 6d3fc83cfc..151e7adb2b 100644
--- a/java/cdk/src/main/java/sleeper/cdk/SleeperCdkApp.java
+++ b/java/cdk/src/main/java/sleeper/cdk/SleeperCdkApp.java
@@ -64,7 +64,6 @@
import static sleeper.configuration.properties.instance.CommonProperty.ACCOUNT;
import static sleeper.configuration.properties.instance.CommonProperty.ID;
-import static sleeper.configuration.properties.instance.CommonProperty.JARS_BUCKET;
import static sleeper.configuration.properties.instance.CommonProperty.OPTIONAL_STACKS;
import static sleeper.configuration.properties.instance.CommonProperty.REGION;
@@ -322,7 +321,7 @@ public static void main(String[] args) {
.account(instanceProperties.get(ACCOUNT))
.region(instanceProperties.get(REGION))
.build();
- BuiltJars jars = new BuiltJars(AmazonS3ClientBuilder.defaultClient(), instanceProperties.get(JARS_BUCKET));
+ BuiltJars jars = new BuiltJars(AmazonS3ClientBuilder.defaultClient(), instanceProperties);
new SleeperCdkApp(app, id, StackProps.builder()
.stackName(id)
diff --git a/java/cdk/src/main/java/sleeper/cdk/TracingUtils.java b/java/cdk/src/main/java/sleeper/cdk/TracingUtils.java
new file mode 100644
index 0000000000..8d4cfb09d4
--- /dev/null
+++ b/java/cdk/src/main/java/sleeper/cdk/TracingUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2022-2024 Crown Copyright
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package sleeper.cdk;
+
+import software.amazon.awscdk.services.lambda.Tracing;
+
+import sleeper.configuration.properties.instance.InstanceProperties;
+
+import static sleeper.configuration.properties.instance.CommonProperty.TRACING_ENABLED;
+
+public class TracingUtils {
+
+ private TracingUtils() {
+ }
+
+ public static Tracing active(InstanceProperties properties) {
+ if (properties.getBoolean(TRACING_ENABLED)) {
+ return Tracing.ACTIVE;
+ } else {
+ return Tracing.DISABLED;
+ }
+ }
+
+ public static Tracing passThrough(InstanceProperties properties) {
+ if (properties.getBoolean(TRACING_ENABLED)) {
+ return Tracing.PASS_THROUGH;
+ } else {
+ return Tracing.DISABLED;
+ }
+ }
+}
diff --git a/java/cdk/src/main/java/sleeper/cdk/Utils.java b/java/cdk/src/main/java/sleeper/cdk/Utils.java
index 2da7c02bb0..ef10536a18 100644
--- a/java/cdk/src/main/java/sleeper/cdk/Utils.java
+++ b/java/cdk/src/main/java/sleeper/cdk/Utils.java
@@ -76,8 +76,10 @@ private Utils() {
public static Map createDefaultEnvironment(InstanceProperties instanceProperties) {
Map environmentVariables = new HashMap<>();
- environmentVariables.put(CONFIG_BUCKET.toEnvironmentVariable(),
- instanceProperties.get(CONFIG_BUCKET));
+ if (instanceProperties.isSet(CONFIG_BUCKET)) {
+ environmentVariables.put(CONFIG_BUCKET.toEnvironmentVariable(),
+ instanceProperties.get(CONFIG_BUCKET));
+ }
environmentVariables.put("JAVA_TOOL_OPTIONS", createToolOptions(instanceProperties,
LOGGING_LEVEL,
diff --git a/java/cdk/src/main/java/sleeper/cdk/jars/BuiltJars.java b/java/cdk/src/main/java/sleeper/cdk/jars/BuiltJars.java
index 69442b036f..dbe87d4b14 100644
--- a/java/cdk/src/main/java/sleeper/cdk/jars/BuiltJars.java
+++ b/java/cdk/src/main/java/sleeper/cdk/jars/BuiltJars.java
@@ -18,14 +18,32 @@
import com.amazonaws.services.s3.AmazonS3;
import software.amazon.awscdk.services.s3.IBucket;
+import sleeper.configuration.properties.instance.InstanceProperties;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static sleeper.configuration.properties.instance.CommonProperty.JARS_BUCKET;
+
public class BuiltJars {
private final AmazonS3 s3;
private final String bucketName;
+ private final LambdaBuilder.Configuration globalConfig;
+ private final Map jarFilenameToVersionId = new HashMap<>();
+
+ public BuiltJars(AmazonS3 s3, InstanceProperties instanceProperties) {
+ this(s3, instanceProperties.get(JARS_BUCKET), new GlobalLambdaConfiguration(instanceProperties));
+ }
public BuiltJars(AmazonS3 s3, String bucketName) {
+ this(s3, bucketName, LambdaBuilder.Configuration.none());
+ }
+
+ private BuiltJars(AmazonS3 s3, String bucketName, LambdaBuilder.Configuration globalConfig) {
this.s3 = s3;
this.bucketName = bucketName;
+ this.globalConfig = globalConfig;
}
public String bucketName() {
@@ -33,10 +51,11 @@ public String bucketName() {
}
public LambdaCode lambdaCode(BuiltJar jar, IBucket bucketConstruct) {
- return new LambdaCode(bucketConstruct, jar.getFileName(), getLatestVersionId(jar));
+ return new LambdaCode(bucketConstruct, jar.getFileName(), getLatestVersionId(jar), globalConfig);
}
public String getLatestVersionId(BuiltJar jar) {
- return s3.getObjectMetadata(bucketName, jar.getFileName()).getVersionId();
+ return jarFilenameToVersionId.computeIfAbsent(jar.getFileName(),
+ filename -> s3.getObjectMetadata(bucketName, filename).getVersionId());
}
}
diff --git a/java/cdk/src/main/java/sleeper/cdk/jars/GlobalLambdaConfiguration.java b/java/cdk/src/main/java/sleeper/cdk/jars/GlobalLambdaConfiguration.java
new file mode 100644
index 0000000000..6409f49abe
--- /dev/null
+++ b/java/cdk/src/main/java/sleeper/cdk/jars/GlobalLambdaConfiguration.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2022-2024 Crown Copyright
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package sleeper.cdk.jars;
+
+import software.amazon.awscdk.services.lambda.LayerVersion;
+import software.constructs.Construct;
+
+import sleeper.cdk.Utils;
+import sleeper.configuration.properties.instance.InstanceProperties;
+
+import static sleeper.configuration.properties.instance.CommonProperty.REGION;
+import static sleeper.configuration.properties.instance.CommonProperty.TRACING_ENABLED;
+
+public class GlobalLambdaConfiguration implements LambdaBuilder.Configuration {
+
+ private final InstanceProperties instanceProperties;
+
+ public GlobalLambdaConfiguration(InstanceProperties instanceProperties) {
+ this.instanceProperties = instanceProperties;
+ }
+
+ @Override
+ public void apply(Construct scope, String functionId, LambdaBuilder builder) {
+ builder.environmentVariables(Utils.createDefaultEnvironment(instanceProperties));
+ if (instanceProperties.getBoolean(TRACING_ENABLED)) {
+ String region = instanceProperties.get(REGION);
+ String arn = "arn:aws:lambda:" + region + ":901920570463:layer:aws-otel-java-agent-amd64-ver-1-32-0:1";
+ builder.layer(LayerVersion.fromLayerVersionArn(scope, functionId + "Tracing", arn));
+ builder.environmentVariable("AWS_LAMBDA_EXEC_WRAPPER", "/opt/otel-handler");
+ }
+ }
+
+}
diff --git a/java/cdk/src/main/java/sleeper/cdk/jars/LambdaBuilder.java b/java/cdk/src/main/java/sleeper/cdk/jars/LambdaBuilder.java
new file mode 100644
index 0000000000..ae07e7e7e8
--- /dev/null
+++ b/java/cdk/src/main/java/sleeper/cdk/jars/LambdaBuilder.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2022-2024 Crown Copyright
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package sleeper.cdk.jars;
+
+import software.amazon.awscdk.services.lambda.Function;
+import software.amazon.awscdk.services.lambda.ILayerVersion;
+import software.amazon.awscdk.services.lambda.IVersion;
+import software.constructs.Construct;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+
+public class LambdaBuilder {
+ private final Function.Builder builder;
+ private final Map environment = new HashMap<>();
+ private final List layers = new ArrayList<>();
+
+ public LambdaBuilder(Function.Builder builder) {
+ this.builder = builder;
+ }
+
+ public LambdaBuilder config(Consumer config) {
+ config.accept(builder);
+ return this;
+ }
+
+ public LambdaBuilder environmentVariable(String key, String value) {
+ this.environment.put(key, value);
+ return this;
+ }
+
+ public LambdaBuilder environmentVariables(Map environment) {
+ this.environment.putAll(environment);
+ return this;
+ }
+
+ public LambdaBuilder layer(ILayerVersion layer) {
+ layers.add(layer);
+ return this;
+ }
+
+ public IVersion build() {
+ Function function = builder.environment(environment).layers(layers).build();
+
+ // This is needed to tell the CDK to update the functions with new code when it changes in the jars bucket.
+ // See the following:
+ // https://www.define.run/posts/cdk-not-updating-lambda/
+ // https://awsteele.com/blog/2020/12/24/aws-lambda-latest-is-dangerous.html
+ // https://docs.aws.amazon.com/cdk/api/v1/java/software/amazon/awscdk/services/lambda/Version.html
+ return function.getCurrentVersion();
+ }
+
+ public interface Configuration {
+ void apply(Construct scope, String functionId, LambdaBuilder builder);
+
+ static Configuration none() {
+ return (scope, functionId, builder) -> {
+ };
+ }
+ }
+}
diff --git a/java/cdk/src/main/java/sleeper/cdk/jars/LambdaCode.java b/java/cdk/src/main/java/sleeper/cdk/jars/LambdaCode.java
index 672800f481..2494dbd7de 100644
--- a/java/cdk/src/main/java/sleeper/cdk/jars/LambdaCode.java
+++ b/java/cdk/src/main/java/sleeper/cdk/jars/LambdaCode.java
@@ -28,25 +28,23 @@ public class LambdaCode {
private final IBucket bucket;
private final String filename;
private final String versionId;
+ private final LambdaBuilder.Configuration globalConfig;
- public LambdaCode(IBucket bucket, String filename, String versionId) {
+ LambdaCode(IBucket bucket, String filename, String versionId, LambdaBuilder.Configuration globalConfig) {
this.bucket = bucket;
this.filename = filename;
this.versionId = versionId;
+ this.globalConfig = globalConfig;
}
public IVersion buildFunction(Construct scope, String id, Consumer config) {
+ return createFunction(scope, id).config(config).build();
+ }
- Function.Builder builder = Function.Builder.create(scope, id)
- .code(Code.fromBucket(bucket, filename, versionId));
- config.accept(builder);
- Function function = builder.build();
-
- // This is needed to tell the CDK to update the functions with new code when it changes in the jars bucket.
- // See the following:
- // https://www.define.run/posts/cdk-not-updating-lambda/
- // https://awsteele.com/blog/2020/12/24/aws-lambda-latest-is-dangerous.html
- // https://docs.aws.amazon.com/cdk/api/v1/java/software/amazon/awscdk/services/lambda/Version.html
- return function.getCurrentVersion();
+ public LambdaBuilder createFunction(Construct scope, String id) {
+ LambdaBuilder builder = new LambdaBuilder(Function.Builder.create(scope, id)
+ .code(Code.fromBucket(bucket, filename, versionId)));
+ globalConfig.apply(scope, id, builder);
+ return builder;
}
}
diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/AthenaStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/AthenaStack.java
index 16b7ef2548..e441ef2991 100644
--- a/java/cdk/src/main/java/sleeper/cdk/stack/AthenaStack.java
+++ b/java/cdk/src/main/java/sleeper/cdk/stack/AthenaStack.java
@@ -34,6 +34,7 @@
import software.amazon.awscdk.services.s3.LifecycleRule;
import software.constructs.Construct;
+import sleeper.cdk.TracingUtils;
import sleeper.cdk.Utils;
import sleeper.cdk.jars.BuiltJar;
import sleeper.cdk.jars.BuiltJars;
@@ -56,8 +57,9 @@
@SuppressFBWarnings("NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE")
public class AthenaStack extends NestedStack {
- public AthenaStack(Construct scope, String id, InstanceProperties instanceProperties, BuiltJars jars,
- CoreStacks coreStacks) {
+ public AthenaStack(
+ Construct scope, String id, InstanceProperties instanceProperties,
+ BuiltJars jars, CoreStacks coreStacks) {
super(scope, id);
IBucket jarsBucket = Bucket.fromBucketName(this, "JarsBucket", jars.bucketName());
@@ -85,10 +87,6 @@ public AthenaStack(Construct scope, String id, InstanceProperties instanceProper
.pendingWindow(Duration.days(7))
.build();
- Map env = Utils.createDefaultEnvironment(instanceProperties);
- env.put("spill_bucket", spillBucket.getBucketName());
- env.put("kms_key_id", spillMasterKey.getKeyId());
-
Integer memory = instanceProperties.getInt(ATHENA_COMPOSITE_HANDLER_MEMORY);
Integer timeout = instanceProperties.getInt(ATHENA_COMPOSITE_HANDLER_TIMEOUT_IN_SECONDS);
List handlerClasses = instanceProperties.getList(ATHENA_COMPOSITE_HANDLER_CLASSES);
@@ -116,7 +114,8 @@ public AthenaStack(Construct scope, String id, InstanceProperties instanceProper
.build();
for (String className : handlerClasses) {
- IFunction handler = createConnector(className, instanceProperties, jarCode, env, memory, timeout);
+ IFunction handler = createConnector(
+ className, instanceProperties, jarCode, spillBucket, spillMasterKey, memory, timeout);
jarsBucket.grantRead(handler);
@@ -140,21 +139,28 @@ public AthenaStack(Construct scope, String id, InstanceProperties instanceProper
Utils.addStackTagIfSet(this, instanceProperties);
}
- private IFunction createConnector(String className, InstanceProperties instanceProperties, LambdaCode jar, Map env, Integer memory, Integer timeout) {
+ private IFunction createConnector(
+ String className, InstanceProperties instanceProperties, LambdaCode jar,
+ Bucket spillBucket, Key spillMasterKey, Integer memory, Integer timeout) {
String instanceId = instanceProperties.get(ID);
String simpleClassName = getSimpleClassName(className);
String functionName = Utils.truncateTo64Characters(String.join("-", "sleeper",
instanceId.toLowerCase(Locale.ROOT), simpleClassName, "athena-composite-handler"));
- IFunction athenaCompositeHandler = jar.buildFunction(this, simpleClassName + "AthenaCompositeHandler", builder -> builder
- .functionName(functionName)
- .memorySize(memory)
- .timeout(Duration.seconds(timeout))
- .runtime(Runtime.JAVA_11)
- .logGroup(createLambdaLogGroup(this, simpleClassName + "AthenaCompositeHandlerLogGroup", functionName, instanceProperties))
- .handler(className)
- .environment(env));
+ IFunction athenaCompositeHandler = jar.createFunction(this, simpleClassName + "AthenaCompositeHandler")
+ .environmentVariables(Map.of(
+ "spill_bucket", spillBucket.getBucketName(),
+ "kms_key_id", spillMasterKey.getKeyId()))
+ .config(builder -> builder
+ .functionName(functionName)
+ .memorySize(memory)
+ .timeout(Duration.seconds(timeout))
+ .runtime(Runtime.JAVA_11)
+ .logGroup(createLambdaLogGroup(this, simpleClassName + "AthenaCompositeHandlerLogGroup", functionName, instanceProperties))
+ .handler(className)
+ .tracing(TracingUtils.active(instanceProperties)))
+ .build();
CfnDataCatalog.Builder.create(this, simpleClassName + "AthenaDataCatalog")
.name(instanceId + simpleClassName + "SleeperConnector")
diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/CompactionStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/CompactionStack.java
index 2655f8978a..61972f6c2d 100644
--- a/java/cdk/src/main/java/sleeper/cdk/stack/CompactionStack.java
+++ b/java/cdk/src/main/java/sleeper/cdk/stack/CompactionStack.java
@@ -52,11 +52,11 @@
import software.amazon.awscdk.services.ecs.EcsOptimizedImage;
import software.amazon.awscdk.services.ecs.EcsOptimizedImageOptions;
import software.amazon.awscdk.services.ecs.FargateTaskDefinition;
-import software.amazon.awscdk.services.ecs.ITaskDefinition;
import software.amazon.awscdk.services.ecs.MachineImageType;
import software.amazon.awscdk.services.ecs.NetworkMode;
import software.amazon.awscdk.services.ecs.OperatingSystemFamily;
import software.amazon.awscdk.services.ecs.RuntimePlatform;
+import software.amazon.awscdk.services.ecs.TaskDefinition;
import software.amazon.awscdk.services.events.Rule;
import software.amazon.awscdk.services.events.Schedule;
import software.amazon.awscdk.services.events.targets.LambdaFunction;
@@ -76,6 +76,7 @@
import software.amazon.awscdk.services.sqs.Queue;
import software.constructs.Construct;
+import sleeper.cdk.TracingUtils;
import sleeper.cdk.Utils;
import sleeper.cdk.jars.BuiltJar;
import sleeper.cdk.jars.BuiltJars;
@@ -91,7 +92,6 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
-import java.util.function.Consumer;
import static sleeper.cdk.Utils.createLambdaLogGroup;
import static sleeper.cdk.Utils.shouldDeployPaused;
@@ -258,8 +258,6 @@ private void lambdaToCreateCompactionJobsBatchedViaSQS(
CoreStacks coreStacks, IBucket jarsBucket, LambdaCode jobCreatorJar, Queue compactionJobsQueue) {
// Function to create compaction jobs
- Map environmentVariables = Utils.createDefaultEnvironment(instanceProperties);
-
String triggerFunctionName = Utils.truncateTo64Characters(String.join("-", "sleeper",
instanceProperties.get(ID).toLowerCase(Locale.ROOT), "compaction-job-creation-trigger"));
String functionName = Utils.truncateTo64Characters(String.join("-", "sleeper",
@@ -272,9 +270,9 @@ private void lambdaToCreateCompactionJobsBatchedViaSQS(
.memorySize(instanceProperties.getInt(TABLE_BATCHING_LAMBDAS_MEMORY_IN_MB))
.timeout(Duration.seconds(instanceProperties.getInt(TABLE_BATCHING_LAMBDAS_TIMEOUT_IN_SECONDS)))
.handler("sleeper.compaction.job.creation.lambda.CreateCompactionJobsTriggerLambda::handleRequest")
- .environment(environmentVariables)
.reservedConcurrentExecutions(1)
- .logGroup(createLambdaLogGroup(this, "CompactionJobsCreationTriggerLogGroup", triggerFunctionName, instanceProperties)));
+ .logGroup(createLambdaLogGroup(this, "CompactionJobsCreationTriggerLogGroup", triggerFunctionName, instanceProperties))
+ .tracing(TracingUtils.active(instanceProperties)));
IFunction handlerFunction = jobCreatorJar.buildFunction(this, "CompactionJobsCreationHandler", builder -> builder
.functionName(functionName)
@@ -283,8 +281,8 @@ private void lambdaToCreateCompactionJobsBatchedViaSQS(
.memorySize(instanceProperties.getInt(COMPACTION_JOB_CREATION_LAMBDA_MEMORY_IN_MB))
.timeout(Duration.seconds(instanceProperties.getInt(COMPACTION_JOB_CREATION_LAMBDA_TIMEOUT_IN_SECONDS)))
.handler("sleeper.compaction.job.creation.lambda.CreateCompactionJobsLambda::handleRequest")
- .environment(environmentVariables)
- .logGroup(createLambdaLogGroup(this, "CompactionJobsCreationHandlerLogGroup", functionName, instanceProperties)));
+ .logGroup(createLambdaLogGroup(this, "CompactionJobsCreationHandlerLogGroup", functionName, instanceProperties))
+ .tracing(TracingUtils.passThrough(instanceProperties)));
// Send messages from the trigger function to the handler function
Queue jobCreationQueue = sqsQueueForCompactionJobCreation();
@@ -361,42 +359,32 @@ private void ecsClusterForCompactionTasks(
Map environmentVariables = Utils.createDefaultEnvironment(instanceProperties);
environmentVariables.put(Utils.AWS_REGION, instanceProperties.get(REGION));
- Consumer grantPermissions = taskDef -> {
- coreStacks.grantRunCompactionJobs(taskDef.getTaskRole());
- jarsBucket.grantRead(taskDef.getTaskRole());
- statusStore.grantWriteJobEvent(taskDef.getTaskRole());
- statusStore.grantWriteTaskEvent(taskDef.getTaskRole());
-
- taskDef.getTaskRole().addToPrincipalPolicy(PolicyStatement.Builder
- .create()
- .resources(Collections.singletonList("*"))
- .actions(List.of("ecs:DescribeContainerInstances"))
- .build());
-
- compactionJobsQueue.grantConsumeMessages(taskDef.getTaskRole());
- };
-
String launchType = instanceProperties.get(COMPACTION_ECS_LAUNCHTYPE);
+ TaskDefinition taskDefinition;
if (launchType.equalsIgnoreCase("FARGATE")) {
- FargateTaskDefinition fargateTaskDefinition = compactionFargateTaskDefinition();
- String fargateTaskDefinitionFamily = fargateTaskDefinition.getFamily();
- instanceProperties.set(COMPACTION_TASK_FARGATE_DEFINITION_FAMILY, fargateTaskDefinitionFamily);
- ContainerDefinitionOptions fargateContainerDefinitionOptions = createFargateContainerDefinition(containerImage,
- environmentVariables, instanceProperties);
- fargateTaskDefinition.addContainer(ContainerConstants.COMPACTION_CONTAINER_NAME,
- fargateContainerDefinitionOptions);
- grantPermissions.accept(fargateTaskDefinition);
+ taskDefinition = compactionFargateTaskDefinition();
+ instanceProperties.set(COMPACTION_TASK_FARGATE_DEFINITION_FAMILY, taskDefinition.getFamily());
+ taskDefinition.addContainer(ContainerConstants.COMPACTION_CONTAINER_NAME,
+ createFargateContainerDefinition(containerImage, environmentVariables, instanceProperties));
} else {
- Ec2TaskDefinition ec2TaskDefinition = compactionEC2TaskDefinition();
- String ec2TaskDefinitionFamily = ec2TaskDefinition.getFamily();
- instanceProperties.set(COMPACTION_TASK_EC2_DEFINITION_FAMILY, ec2TaskDefinitionFamily);
- ContainerDefinitionOptions ec2ContainerDefinitionOptions = createEC2ContainerDefinition(containerImage,
- environmentVariables, instanceProperties);
- ec2TaskDefinition.addContainer(ContainerConstants.COMPACTION_CONTAINER_NAME, ec2ContainerDefinitionOptions);
- grantPermissions.accept(ec2TaskDefinition);
+ taskDefinition = compactionEC2TaskDefinition();
+ instanceProperties.set(COMPACTION_TASK_EC2_DEFINITION_FAMILY, taskDefinition.getFamily());
+ taskDefinition.addContainer(ContainerConstants.COMPACTION_CONTAINER_NAME,
+ createEC2ContainerDefinition(containerImage, environmentVariables, instanceProperties));
addEC2CapacityProvider(cluster, vpc, coreStacks, taskCreatorJar);
}
+ coreStacks.grantRunCompactionJobs(taskDefinition.getTaskRole());
+ jarsBucket.grantRead(taskDefinition.getTaskRole());
+ statusStore.grantWriteJobEvent(taskDefinition.getTaskRole());
+ statusStore.grantWriteTaskEvent(taskDefinition.getTaskRole());
+ taskDefinition.getTaskRole().addToPrincipalPolicy(PolicyStatement.Builder
+ .create()
+ .resources(Collections.singletonList("*"))
+ .actions(List.of("ecs:DescribeContainerInstances"))
+ .build());
+ compactionJobsQueue.grantConsumeMessages(taskDefinition.getTaskRole());
+
CfnOutputProps compactionClusterProps = new CfnOutputProps.Builder()
.value(cluster.getClusterName())
.build();
@@ -516,15 +504,9 @@ private Ec2TaskDefinition compactionEC2TaskDefinition() {
private ContainerDefinitionOptions createFargateContainerDefinition(
ContainerImage image, Map environment, InstanceProperties instanceProperties) {
- String architecture = instanceProperties.get(COMPACTION_TASK_CPU_ARCHITECTURE).toUpperCase(Locale.ROOT);
- String launchType = instanceProperties.get(COMPACTION_ECS_LAUNCHTYPE);
- Pair requirements = Requirements.getArchRequirements(architecture, launchType,
- instanceProperties);
return ContainerDefinitionOptions.builder()
.image(image)
.environment(environment)
- .cpu(requirements.getLeft())
- .memoryLimitMiB(requirements.getRight())
.logging(Utils.createECSContainerLogDriver(this, instanceProperties, "FargateCompactionTasks"))
.build();
}
@@ -551,20 +533,18 @@ private ContainerDefinitionOptions createEC2ContainerDefinition(
private IFunction lambdaForCustomTerminationPolicy(CoreStacks coreStacks, LambdaCode taskCreatorJar) {
// Run tasks function
- Map environmentVariables = Utils.createDefaultEnvironment(instanceProperties);
-
String functionName = Utils.truncateTo64Characters(String.join("-", "sleeper",
instanceProperties.get(ID).toLowerCase(Locale.ROOT), "compaction-custom-termination"));
IFunction handler = taskCreatorJar.buildFunction(this, "CompactionTerminator", builder -> builder
.functionName(functionName)
.description("Custom termination policy for ECS auto scaling group. Only terminate empty instances.")
- .environment(environmentVariables)
.handler("sleeper.compaction.task.creation.SafeTerminationLambda::handleRequest")
.logGroup(createLambdaLogGroup(this, "CompactionTerminatorLogGroup", functionName, instanceProperties))
.memorySize(512)
.runtime(software.amazon.awscdk.services.lambda.Runtime.JAVA_11)
- .timeout(Duration.seconds(10)));
+ .timeout(Duration.seconds(10))
+ .tracing(TracingUtils.active(instanceProperties)));
coreStacks.grantReadInstanceConfig(handler);
// Grant this function permission to query ECS for the number of tasks.
@@ -592,7 +572,6 @@ private void lambdaToCreateCompactionTasks(
.memorySize(instanceProperties.getInt(TASK_RUNNER_LAMBDA_MEMORY_IN_MB))
.timeout(Duration.seconds(instanceProperties.getInt(TASK_RUNNER_LAMBDA_TIMEOUT_IN_SECONDS)))
.handler("sleeper.compaction.task.creation.RunTasksLambda::eventHandler")
- .environment(Utils.createDefaultEnvironment(instanceProperties))
.reservedConcurrentExecutions(1)
.logGroup(createLambdaLogGroup(this, "CompactionTasksCreatorLogGroup", functionName, instanceProperties)));
diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/GarbageCollectorStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/GarbageCollectorStack.java
index d3956b294f..24d6bd0cb1 100644
--- a/java/cdk/src/main/java/sleeper/cdk/stack/GarbageCollectorStack.java
+++ b/java/cdk/src/main/java/sleeper/cdk/stack/GarbageCollectorStack.java
@@ -30,6 +30,7 @@
import software.amazon.awscdk.services.sqs.Queue;
import software.constructs.Construct;
+import sleeper.cdk.TracingUtils;
import sleeper.cdk.Utils;
import sleeper.cdk.jars.BuiltJar;
import sleeper.cdk.jars.BuiltJars;
@@ -90,10 +91,10 @@ public GarbageCollectorStack(
.description("Creates batches of Sleeper tables to perform garbage collection for and puts them on a queue to be processed")
.runtime(Runtime.JAVA_11)
.handler("sleeper.garbagecollector.GarbageCollectorTriggerLambda::handleRequest")
- .environment(Utils.createDefaultEnvironment(instanceProperties))
.memorySize(instanceProperties.getInt(TABLE_BATCHING_LAMBDAS_MEMORY_IN_MB))
.timeout(Duration.seconds(instanceProperties.getInt(TABLE_BATCHING_LAMBDAS_TIMEOUT_IN_SECONDS)))
- .logGroup(createLambdaLogGroup(this, "GarbageCollectorTriggerLogGroup", triggerFunctionName, instanceProperties)));
+ .logGroup(createLambdaLogGroup(this, "GarbageCollectorTriggerLogGroup", triggerFunctionName, instanceProperties))
+ .tracing(TracingUtils.active(instanceProperties)));
IFunction handlerFunction = gcJar.buildFunction(this, "GarbageCollectorLambda", builder -> builder
.functionName(functionName)
.description("Scan the state store looking for files that need deleting and delete them")
@@ -101,8 +102,8 @@ public GarbageCollectorStack(
.memorySize(instanceProperties.getInt(GARBAGE_COLLECTOR_LAMBDA_MEMORY_IN_MB))
.timeout(handlerTimeout)
.handler("sleeper.garbagecollector.GarbageCollectorLambda::handleRequest")
- .environment(Utils.createDefaultEnvironment(instanceProperties))
- .logGroup(createLambdaLogGroup(this, "GarbageCollectorLambdaLogGroup", functionName, instanceProperties)));
+ .logGroup(createLambdaLogGroup(this, "GarbageCollectorLambdaLogGroup", functionName, instanceProperties))
+ .tracing(TracingUtils.passThrough(instanceProperties)));
instanceProperties.set(GARBAGE_COLLECTOR_LAMBDA_FUNCTION, triggerFunction.getFunctionName());
// Grant this function permission delete files from the data bucket and
diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/IngestBatcherStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/IngestBatcherStack.java
index 9cc59ca5e7..4ce0a64aec 100644
--- a/java/cdk/src/main/java/sleeper/cdk/stack/IngestBatcherStack.java
+++ b/java/cdk/src/main/java/sleeper/cdk/stack/IngestBatcherStack.java
@@ -34,6 +34,7 @@
import software.amazon.awscdk.services.sqs.Queue;
import software.constructs.Construct;
+import sleeper.cdk.TracingUtils;
import sleeper.cdk.Utils;
import sleeper.cdk.jars.BuiltJar;
import sleeper.cdk.jars.BuiltJars;
@@ -46,7 +47,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Locale;
-import java.util.Map;
import static sleeper.cdk.Utils.createLambdaLogGroup;
import static sleeper.cdk.Utils.removalPolicy;
@@ -128,8 +128,6 @@ public IngestBatcherStack(
String jobCreatorName = Utils.truncateTo64Characters(String.join("-", "sleeper",
instanceProperties.get(ID).toLowerCase(Locale.ROOT), "batch-ingest-jobs"));
- Map environmentVariables = Utils.createDefaultEnvironment(instanceProperties);
-
IFunction submitterLambda = submitterJar.buildFunction(this, "SubmitToIngestBatcherLambda", builder -> builder
.functionName(submitterName)
.description("Triggered by an SQS event that contains a request to ingest a file")
@@ -137,9 +135,9 @@ public IngestBatcherStack(
.memorySize(instanceProperties.getInt(INGEST_BATCHER_SUBMITTER_MEMORY_IN_MB))
.timeout(Duration.seconds(instanceProperties.getInt(INGEST_BATCHER_SUBMITTER_TIMEOUT_IN_SECONDS)))
.handler("sleeper.ingest.batcher.submitter.IngestBatcherSubmitterLambda::handleRequest")
- .environment(environmentVariables)
.logGroup(createLambdaLogGroup(this, "SubmitToIngestBatcherLogGroup", submitterName, instanceProperties))
- .events(List.of(new SqsEventSource(submitQueue))));
+ .events(List.of(new SqsEventSource(submitQueue)))
+ .tracing(TracingUtils.passThrough(instanceProperties)));
instanceProperties.set(INGEST_BATCHER_SUBMIT_REQUEST_FUNCTION, submitterLambda.getFunctionName());
ingestRequestsTable.grantReadWriteData(submitterLambda);
@@ -154,9 +152,9 @@ public IngestBatcherStack(
.memorySize(instanceProperties.getInt(INGEST_BATCHER_JOB_CREATION_MEMORY_IN_MB))
.timeout(Duration.seconds(instanceProperties.getInt(INGEST_BATCHER_JOB_CREATION_TIMEOUT_IN_SECONDS)))
.handler("sleeper.ingest.batcher.job.creator.IngestBatcherJobCreatorLambda::eventHandler")
- .environment(environmentVariables)
.reservedConcurrentExecutions(1)
- .logGroup(createLambdaLogGroup(this, "IngestBatcherJobCreationLogGroup", jobCreatorName, instanceProperties)));
+ .logGroup(createLambdaLogGroup(this, "IngestBatcherJobCreationLogGroup", jobCreatorName, instanceProperties))
+ .tracing(TracingUtils.active(instanceProperties)));
instanceProperties.set(INGEST_BATCHER_JOB_CREATION_FUNCTION, jobCreatorLambda.getFunctionName());
ingestRequestsTable.grantReadWriteData(jobCreatorLambda);
diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/IngestStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/IngestStack.java
index 238acc341c..27a813668b 100644
--- a/java/cdk/src/main/java/sleeper/cdk/stack/IngestStack.java
+++ b/java/cdk/src/main/java/sleeper/cdk/stack/IngestStack.java
@@ -49,6 +49,7 @@
import software.amazon.awscdk.services.sqs.Queue;
import software.constructs.Construct;
+import sleeper.cdk.TracingUtils;
import sleeper.cdk.Utils;
import sleeper.cdk.jars.BuiltJar;
import sleeper.cdk.jars.BuiltJars;
@@ -162,8 +163,7 @@ private Queue sqsQueueForIngestJobs(CoreStacks coreStacks, Topic topic) {
.create(this, "IngestAlarm")
.alarmDescription("Alarms if there are any messages on the dead letter queue for the ingest queue")
.metric(ingestDLQ.metricApproximateNumberOfMessagesVisible()
- .with(MetricOptions.builder().statistic("Sum").period(Duration.seconds(60)).build())
- )
+ .with(MetricOptions.builder().statistic("Sum").period(Duration.seconds(60)).build()))
.comparisonOperator(ComparisonOperator.GREATER_THAN_THRESHOLD)
.threshold(0)
.evaluationPeriods(1)
@@ -269,9 +269,9 @@ private void lambdaToCreateIngestTasks(CoreStacks coreStacks, Queue ingestJobQue
.memorySize(instanceProperties.getInt(TASK_RUNNER_LAMBDA_MEMORY_IN_MB))
.timeout(Duration.seconds(instanceProperties.getInt(TASK_RUNNER_LAMBDA_TIMEOUT_IN_SECONDS)))
.handler("sleeper.ingest.starter.RunTasksLambda::eventHandler")
- .environment(Utils.createDefaultEnvironment(instanceProperties))
.reservedConcurrentExecutions(1)
- .logGroup(createLambdaLogGroup(this, "IngestTasksCreatorLogGroup", functionName, instanceProperties)));
+ .logGroup(createLambdaLogGroup(this, "IngestTasksCreatorLogGroup", functionName, instanceProperties))
+ .tracing(TracingUtils.active(instanceProperties)));
// Grant this function permission to read from the S3 bucket
coreStacks.grantReadInstanceConfig(handler);
diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/PartitionSplittingStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/PartitionSplittingStack.java
index 6b44e1b2aa..6c7f43de1d 100644
--- a/java/cdk/src/main/java/sleeper/cdk/stack/PartitionSplittingStack.java
+++ b/java/cdk/src/main/java/sleeper/cdk/stack/PartitionSplittingStack.java
@@ -37,6 +37,7 @@
import software.amazon.awscdk.services.sqs.Queue;
import software.constructs.Construct;
+import sleeper.cdk.TracingUtils;
import sleeper.cdk.Utils;
import sleeper.cdk.jars.BuiltJar;
import sleeper.cdk.jars.BuiltJars;
@@ -46,7 +47,6 @@
import java.util.List;
import java.util.Locale;
-import java.util.Map;
import static sleeper.cdk.Utils.createLambdaLogGroup;
import static sleeper.cdk.Utils.shouldDeployPaused;
@@ -100,18 +100,17 @@ public PartitionSplittingStack(Construct scope,
// Partition splitting code
LambdaCode splitterJar = jars.lambdaCode(BuiltJar.PARTITION_SPLITTER, jarsBucket);
- Map environmentVariables = Utils.createDefaultEnvironment(instanceProperties);
// Lambda to batch tables and put requests on the batch SQS queue, to be consumed by FindPartitionsToSplit
- createTriggerFunction(instanceProperties, splitterJar, coreStacks, environmentVariables);
+ createTriggerFunction(instanceProperties, splitterJar, coreStacks);
// Lambda to look for partitions that need splitting (for each partition that
// needs splitting it puts a definition of the splitting job onto a queue)
- createFindPartitionsToSplitFunction(instanceProperties, splitterJar, coreStacks, environmentVariables);
+ createFindPartitionsToSplitFunction(instanceProperties, splitterJar, coreStacks);
// Lambda to split partitions (triggered by partition splitting job
// arriving on partitionSplittingQueue)
- createSplitPartitionFunction(instanceProperties, splitterJar, coreStacks, environmentVariables);
+ createSplitPartitionFunction(instanceProperties, splitterJar, coreStacks);
Utils.addStackTagIfSet(this, instanceProperties);
}
@@ -185,7 +184,7 @@ private QueueAndDlq createJobQueues(InstanceProperties instanceProperties, Topic
return new QueueAndDlq(partitionSplittingJobQueue, partitionSplittingJobDlq);
}
- private void createTriggerFunction(InstanceProperties instanceProperties, LambdaCode splitterJar, CoreStacks coreStacks, Map environmentVariables) {
+ private void createTriggerFunction(InstanceProperties instanceProperties, LambdaCode splitterJar, CoreStacks coreStacks) {
String triggerFunctionName = Utils.truncateTo64Characters(String.join("-", "sleeper",
instanceProperties.get(ID).toLowerCase(Locale.ROOT), "split-partition-trigger"));
IFunction triggerFunction = splitterJar.buildFunction(this, "FindPartitionsToSplitTriggerLambda", builder -> builder
@@ -195,9 +194,9 @@ private void createTriggerFunction(InstanceProperties instanceProperties, Lambda
.memorySize(instanceProperties.getInt(TABLE_BATCHING_LAMBDAS_MEMORY_IN_MB))
.timeout(Duration.seconds(instanceProperties.getInt(TABLE_BATCHING_LAMBDAS_TIMEOUT_IN_SECONDS)))
.handler("sleeper.splitter.lambda.FindPartitionsToSplitTriggerLambda::handleRequest")
- .environment(environmentVariables)
.reservedConcurrentExecutions(1)
- .logGroup(createLambdaLogGroup(this, "FindPartitionsToSplitTriggerLogGroup", triggerFunctionName, instanceProperties)));
+ .logGroup(createLambdaLogGroup(this, "FindPartitionsToSplitTriggerLogGroup", triggerFunctionName, instanceProperties))
+ .tracing(TracingUtils.active(instanceProperties)));
// Cloudwatch rule to trigger this lambda
Rule rule = Rule.Builder
.create(this, "FindPartitionsToSplitPeriodicTrigger")
@@ -214,7 +213,7 @@ private void createTriggerFunction(InstanceProperties instanceProperties, Lambda
partitionSplittingBatchQueue.grantSendMessages(triggerFunction);
}
- private void createFindPartitionsToSplitFunction(InstanceProperties instanceProperties, LambdaCode splitterJar, CoreStacks coreStacks, Map environmentVariables) {
+ private void createFindPartitionsToSplitFunction(InstanceProperties instanceProperties, LambdaCode splitterJar, CoreStacks coreStacks) {
String functionName = Utils.truncateTo64Characters(String.join("-", "sleeper",
instanceProperties.get(ID).toLowerCase(Locale.ROOT), "find-partitions-to-split"));
IFunction findPartitionsToSplitLambda = splitterJar.buildFunction(this, "FindPartitionsToSplitLambda", builder -> builder
@@ -224,8 +223,8 @@ private void createFindPartitionsToSplitFunction(InstanceProperties instanceProp
.memorySize(instanceProperties.getInt(FIND_PARTITIONS_TO_SPLIT_LAMBDA_MEMORY_IN_MB))
.timeout(Duration.seconds(instanceProperties.getInt(FIND_PARTITIONS_TO_SPLIT_TIMEOUT_IN_SECONDS)))
.handler("sleeper.splitter.lambda.FindPartitionsToSplitLambda::handleRequest")
- .environment(environmentVariables)
- .logGroup(createLambdaLogGroup(this, "FindPartitionsToSplitLogGroup", functionName, instanceProperties)));
+ .logGroup(createLambdaLogGroup(this, "FindPartitionsToSplitLogGroup", functionName, instanceProperties))
+ .tracing(TracingUtils.passThrough(instanceProperties)));
coreStacks.grantReadTablesMetadata(findPartitionsToSplitLambda);
partitionSplittingJobQueue.grantSendMessages(findPartitionsToSplitLambda);
@@ -233,7 +232,7 @@ private void createFindPartitionsToSplitFunction(InstanceProperties instanceProp
SqsEventSourceProps.builder().batchSize(1).build()));
}
- private void createSplitPartitionFunction(InstanceProperties instanceProperties, LambdaCode splitterJar, CoreStacks coreStacks, Map environmentVariables) {
+ private void createSplitPartitionFunction(InstanceProperties instanceProperties, LambdaCode splitterJar, CoreStacks coreStacks) {
String splitFunctionName = Utils.truncateTo64Characters(String.join("-", "sleeper",
instanceProperties.get(ID).toLowerCase(Locale.ROOT), "split-partition"));
@@ -246,8 +245,8 @@ private void createSplitPartitionFunction(InstanceProperties instanceProperties,
.memorySize(instanceProperties.getInt(SPLIT_PARTITIONS_LAMBDA_MEMORY_IN_MB))
.timeout(Duration.seconds(instanceProperties.getInt(SPLIT_PARTITIONS_TIMEOUT_IN_SECONDS)))
.handler("sleeper.splitter.lambda.SplitPartitionLambda::handleRequest")
- .environment(environmentVariables)
- .logGroup(createLambdaLogGroup(this, "SplitPartitionLogGroup", splitFunctionName, instanceProperties)));
+ .logGroup(createLambdaLogGroup(this, "SplitPartitionLogGroup", splitFunctionName, instanceProperties))
+ .tracing(TracingUtils.active(instanceProperties)));
coreStacks.grantSplitPartitions(splitPartitionLambda);
splitPartitionLambda.addEventSource(new SqsEventSource(partitionSplittingJobQueue,
diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/PropertiesStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/PropertiesStack.java
index 14c9c16501..c2289b44ea 100644
--- a/java/cdk/src/main/java/sleeper/cdk/stack/PropertiesStack.java
+++ b/java/cdk/src/main/java/sleeper/cdk/stack/PropertiesStack.java
@@ -44,10 +44,10 @@
public class PropertiesStack extends NestedStack {
public PropertiesStack(Construct scope,
- String id,
- InstanceProperties instanceProperties,
- BuiltJars jars,
- CoreStacks coreStacks) {
+ String id,
+ InstanceProperties instanceProperties,
+ BuiltJars jars,
+ CoreStacks coreStacks) {
super(scope, id);
// Jars bucket
@@ -64,7 +64,6 @@ public PropertiesStack(Construct scope,
.functionName(functionName)
.handler("sleeper.cdk.custom.PropertiesWriterLambda::handleEvent")
.memorySize(2048)
- .environment(Utils.createDefaultEnvironment(instanceProperties))
.description("Lambda for writing instance properties to S3 upon initialisation and teardown")
.logGroup(createLambdaLogGroup(this, "PropertiesWriterLambdaLogGroup", functionName, instanceProperties))
.runtime(Runtime.JAVA_11));
diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/QueryStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/QueryStack.java
index 3e4e00f17e..44841f6ea8 100644
--- a/java/cdk/src/main/java/sleeper/cdk/stack/QueryStack.java
+++ b/java/cdk/src/main/java/sleeper/cdk/stack/QueryStack.java
@@ -47,6 +47,7 @@
import software.amazon.awscdk.services.sqs.Queue;
import software.constructs.Construct;
+import sleeper.cdk.TracingUtils;
import sleeper.cdk.Utils;
import sleeper.cdk.jars.BuiltJar;
import sleeper.cdk.jars.BuiltJars;
@@ -84,12 +85,9 @@ public class QueryStack extends NestedStack {
private IFunction queryExecutorLambda;
private IFunction leafPartitionQueryLambda;
- public QueryStack(Construct scope,
- String id,
- InstanceProperties instanceProperties,
- BuiltJars jars,
- CoreStacks coreStacks,
- QueryQueueStack queryQueueStack) {
+ public QueryStack(
+ Construct scope, String id, InstanceProperties instanceProperties,
+ BuiltJars jars, CoreStacks coreStacks, QueryQueueStack queryQueueStack) {
super(scope, id);
IBucket jarsBucket = Bucket.fromBucketName(this, "JarsBucket", jars.bucketName());
@@ -131,7 +129,8 @@ public QueryStack(Construct scope,
* @param description a description for the function
* @return an IFunction
*/
- private IFunction createFunction(String id, LambdaCode queryJar, InstanceProperties instanceProperties,
+ private IFunction createFunction(
+ String id, LambdaCode queryJar, InstanceProperties instanceProperties,
String functionName, String handler, String description) {
return queryJar.buildFunction(this, id, builder -> builder
.functionName(functionName)
@@ -140,8 +139,8 @@ private IFunction createFunction(String id, LambdaCode queryJar, InstancePropert
.memorySize(instanceProperties.getInt(QUERY_PROCESSOR_LAMBDA_MEMORY_IN_MB))
.timeout(Duration.seconds(instanceProperties.getInt(QUERY_PROCESSOR_LAMBDA_TIMEOUT_IN_SECONDS)))
.handler(handler)
- .environment(Utils.createDefaultEnvironment(instanceProperties))
- .logGroup(createLambdaLogGroup(this, id + "LogGroup", functionName, instanceProperties)));
+ .logGroup(createLambdaLogGroup(this, id + "LogGroup", functionName, instanceProperties))
+ .tracing(TracingUtils.passThrough(instanceProperties)));
}
/***
@@ -154,8 +153,9 @@ private IFunction createFunction(String id, LambdaCode queryJar, InstancePropert
* @param queryTrackingTable used to track a query
* @return the lambda created
*/
- private IFunction setupQueriesQueryLambda(CoreStacks coreStacks, QueryQueueStack queryQueueStack, InstanceProperties instanceProperties, LambdaCode queryJar,
- IBucket jarsBucket, ITable queryTrackingTable) {
+ private IFunction setupQueriesQueryLambda(
+ CoreStacks coreStacks, QueryQueueStack queryQueueStack, InstanceProperties instanceProperties,
+ LambdaCode queryJar, IBucket jarsBucket, ITable queryTrackingTable) {
String functionName = Utils.truncateTo64Characters(String.join("-", "sleeper",
instanceProperties.get(ID).toLowerCase(Locale.ROOT), "query-executor"));
IFunction lambda = createFunction("QueryExecutorLambda", queryJar, instanceProperties, functionName,
diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/TableMetricsStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/TableMetricsStack.java
index c4afcd1720..e48a34f7da 100644
--- a/java/cdk/src/main/java/sleeper/cdk/stack/TableMetricsStack.java
+++ b/java/cdk/src/main/java/sleeper/cdk/stack/TableMetricsStack.java
@@ -31,6 +31,7 @@
import software.amazon.awscdk.services.sqs.Queue;
import software.constructs.Construct;
+import sleeper.cdk.TracingUtils;
import sleeper.cdk.Utils;
import sleeper.cdk.jars.BuiltJar;
import sleeper.cdk.jars.BuiltJars;
@@ -71,19 +72,19 @@ public TableMetricsStack(
.description("Creates batches of Sleeper tables to calculate metrics for and puts them on a queue to be published")
.runtime(Runtime.JAVA_11)
.handler("sleeper.metrics.TableMetricsTriggerLambda::handleRequest")
- .environment(Utils.createDefaultEnvironment(instanceProperties))
.memorySize(instanceProperties.getInt(TABLE_BATCHING_LAMBDAS_MEMORY_IN_MB))
.timeout(Duration.seconds(instanceProperties.getInt(TABLE_BATCHING_LAMBDAS_TIMEOUT_IN_SECONDS)))
- .logGroup(createLambdaLogGroup(this, "MetricsTriggerLogGroup", triggerFunctionName, instanceProperties)));
+ .logGroup(createLambdaLogGroup(this, "MetricsTriggerLogGroup", triggerFunctionName, instanceProperties))
+ .tracing(TracingUtils.active(instanceProperties)));
IFunction tableMetricsPublisher = metricsJar.buildFunction(this, "MetricsPublisher", builder -> builder
.functionName(publishFunctionName)
.description("Generates metrics for a Sleeper table based on info in its state store, and publishes them to CloudWatch")
.runtime(Runtime.JAVA_11)
.handler("sleeper.metrics.TableMetricsLambda::handleRequest")
- .environment(Utils.createDefaultEnvironment(instanceProperties))
.memorySize(1024)
.timeout(Duration.minutes(1))
- .logGroup(createLambdaLogGroup(this, "MetricsPublisherLogGroup", publishFunctionName, instanceProperties)));
+ .logGroup(createLambdaLogGroup(this, "MetricsPublisherLogGroup", publishFunctionName, instanceProperties))
+ .tracing(TracingUtils.passThrough(instanceProperties)));
coreStacks.grantReadTablesStatus(tableMetricsTrigger);
coreStacks.grantReadTablesMetadata(tableMetricsPublisher);
diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/bulkimport/CommonEmrBulkImportHelper.java b/java/cdk/src/main/java/sleeper/cdk/stack/bulkimport/CommonEmrBulkImportHelper.java
index ab093f81e5..3c735b7863 100644
--- a/java/cdk/src/main/java/sleeper/cdk/stack/bulkimport/CommonEmrBulkImportHelper.java
+++ b/java/cdk/src/main/java/sleeper/cdk/stack/bulkimport/CommonEmrBulkImportHelper.java
@@ -34,6 +34,7 @@
import software.amazon.awscdk.services.sqs.Queue;
import software.constructs.Construct;
+import sleeper.cdk.TracingUtils;
import sleeper.cdk.Utils;
import sleeper.cdk.jars.BuiltJar;
import sleeper.cdk.jars.BuiltJars;
@@ -45,7 +46,6 @@
import java.util.List;
import java.util.Locale;
-import java.util.Map;
import java.util.stream.Collectors;
import static sleeper.cdk.Utils.createLambdaLogGroup;
@@ -60,10 +60,9 @@ public class CommonEmrBulkImportHelper {
private final IngestStatusStoreResources statusStoreResources;
private final CoreStacks coreStacks;
- public CommonEmrBulkImportHelper(Construct scope, String shortId,
- InstanceProperties instanceProperties,
- CoreStacks coreStacks,
- IngestStatusStoreResources ingestStatusStoreResources) {
+ public CommonEmrBulkImportHelper(
+ Construct scope, String shortId, InstanceProperties instanceProperties,
+ CoreStacks coreStacks, IngestStatusStoreResources ingestStatusStoreResources) {
this.scope = scope;
this.shortId = shortId;
this.instanceProperties = instanceProperties;
@@ -86,7 +85,8 @@ public Queue createJobQueue(CdkDefinedInstanceProperty jobQueueUrl, CdkDefinedIn
.queue(queueForDLs)
.build();
- queueForDLs.metricApproximateNumberOfMessagesVisible().with(MetricOptions.builder()
+ queueForDLs.metricApproximateNumberOfMessagesVisible()
+ .with(MetricOptions.builder()
.period(Duration.seconds(60))
.statistic("Sum")
.build())
@@ -114,33 +114,36 @@ public Queue createJobQueue(CdkDefinedInstanceProperty jobQueueUrl, CdkDefinedIn
return emrBulkImportJobQueue;
}
- public IFunction createJobStarterFunction(String bulkImportPlatform, Queue jobQueue, BuiltJars jars,
- IBucket importBucket, CommonEmrBulkImportStack commonEmrStack) {
+ public IFunction createJobStarterFunction(
+ String bulkImportPlatform, Queue jobQueue, BuiltJars jars,
+ IBucket importBucket, CommonEmrBulkImportStack commonEmrStack) {
return createJobStarterFunction(bulkImportPlatform, jobQueue, jars, importBucket,
List.of(commonEmrStack.getEmrRole(), commonEmrStack.getEc2Role()));
}
- public IFunction createJobStarterFunction(String bulkImportPlatform, Queue jobQueue, BuiltJars jars,
- IBucket importBucket, List passRoles) {
+ public IFunction createJobStarterFunction(
+ String bulkImportPlatform, Queue jobQueue, BuiltJars jars,
+ IBucket importBucket, List passRoles) {
String instanceId = instanceProperties.get(ID);
- Map env = Utils.createDefaultEnvironment(instanceProperties);
- env.put("BULK_IMPORT_PLATFORM", bulkImportPlatform);
IBucket jarsBucket = Bucket.fromBucketName(scope, "CodeBucketEMR", instanceProperties.get(JARS_BUCKET));
LambdaCode bulkImportStarterJar = jars.lambdaCode(BuiltJar.BULK_IMPORT_STARTER, jarsBucket);
String functionName = Utils.truncateTo64Characters(String.join("-", "sleeper",
instanceId.toLowerCase(Locale.ROOT), shortId, "bulk-import-job-starter"));
- IFunction function = bulkImportStarterJar.buildFunction(scope, "BulkImport" + shortId + "JobStarter", builder -> builder
- .functionName(functionName)
- .description("Function to start " + shortId + " bulk import jobs")
- .memorySize(1024)
- .timeout(Duration.minutes(2))
- .environment(env)
- .runtime(software.amazon.awscdk.services.lambda.Runtime.JAVA_11)
- .handler("sleeper.bulkimport.starter.BulkImportStarterLambda")
- .logGroup(createLambdaLogGroup(scope, "BulkImport" + shortId + "JobStarterLogGroup", functionName, instanceProperties))
- .events(Lists.newArrayList(SqsEventSource.Builder.create(jobQueue).batchSize(1).build())));
+ IFunction function = bulkImportStarterJar.createFunction(scope, "BulkImport" + shortId + "JobStarter")
+ .environmentVariable("BULK_IMPORT_PLATFORM", bulkImportPlatform)
+ .config(builder -> builder
+ .functionName(functionName)
+ .description("Function to start " + shortId + " bulk import jobs")
+ .memorySize(1024)
+ .timeout(Duration.minutes(2))
+ .runtime(software.amazon.awscdk.services.lambda.Runtime.JAVA_11)
+ .handler("sleeper.bulkimport.starter.BulkImportStarterLambda")
+ .logGroup(createLambdaLogGroup(scope, "BulkImport" + shortId + "JobStarterLogGroup", functionName, instanceProperties))
+ .events(Lists.newArrayList(SqsEventSource.Builder.create(jobQueue).batchSize(1).build()))
+ .tracing(TracingUtils.passThrough(instanceProperties)))
+ .build();
coreStacks.grantReadConfigAndPartitions(function);
importBucket.grantReadWrite(function);
diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/bulkimport/EksBulkImportStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/bulkimport/EksBulkImportStack.java
index 2f3a9d431c..116c6b3242 100644
--- a/java/cdk/src/main/java/sleeper/cdk/stack/bulkimport/EksBulkImportStack.java
+++ b/java/cdk/src/main/java/sleeper/cdk/stack/bulkimport/EksBulkImportStack.java
@@ -65,6 +65,7 @@
import software.amazon.awscdk.services.stepfunctions.tasks.SnsPublish;
import software.constructs.Construct;
+import sleeper.cdk.TracingUtils;
import sleeper.cdk.Utils;
import sleeper.cdk.jars.BuiltJar;
import sleeper.cdk.jars.BuiltJars;
@@ -127,7 +128,8 @@ public EksBulkImportStack(
.queue(queueForDLs)
.build();
- queueForDLs.metricApproximateNumberOfMessagesVisible().with(MetricOptions.builder()
+ queueForDLs.metricApproximateNumberOfMessagesVisible()
+ .with(MetricOptions.builder()
.period(Duration.seconds(60))
.statistic("Sum")
.build())
@@ -152,24 +154,25 @@ public EksBulkImportStack(
instanceProperties.set(BULK_IMPORT_EKS_JOB_QUEUE_ARN, bulkImportJobQueue.getQueueArn());
bulkImportJobQueue.grantSendMessages(coreStacks.getIngestPolicy());
- Map env = Utils.createDefaultEnvironment(instanceProperties);
- env.put("BULK_IMPORT_PLATFORM", "EKS");
IBucket jarsBucket = Bucket.fromBucketName(this, "CodeBucketEKS", instanceProperties.get(JARS_BUCKET));
LambdaCode bulkImportStarterJar = jars.lambdaCode(BuiltJar.BULK_IMPORT_STARTER, jarsBucket);
String functionName = Utils.truncateTo64Characters(String.join("-", "sleeper",
instanceId.toLowerCase(Locale.ROOT), "eks-bulk-import-job-starter"));
- IFunction bulkImportJobStarter = bulkImportStarterJar.buildFunction(this, "BulkImportEKSJobStarter", builder -> builder
- .functionName(functionName)
- .description("Function to start EKS bulk import jobs")
- .memorySize(1024)
- .timeout(Duration.minutes(2))
- .environment(env)
- .runtime(software.amazon.awscdk.services.lambda.Runtime.JAVA_11)
- .handler("sleeper.bulkimport.starter.BulkImportStarterLambda")
- .logGroup(createLambdaLogGroup(this, "BulkImportEKSJobStarterLogGroup", functionName, instanceProperties))
- .events(Lists.newArrayList(SqsEventSource.Builder.create(bulkImportJobQueue).batchSize(1).build())));
+ IFunction bulkImportJobStarter = bulkImportStarterJar.createFunction(this, "BulkImportEKSJobStarter")
+ .environmentVariable("BULK_IMPORT_PLATFORM", "EKS")
+ .config(builder -> builder
+ .functionName(functionName)
+ .description("Function to start EKS bulk import jobs")
+ .memorySize(1024)
+ .timeout(Duration.minutes(2))
+ .runtime(software.amazon.awscdk.services.lambda.Runtime.JAVA_11)
+ .handler("sleeper.bulkimport.starter.BulkImportStarterLambda")
+ .logGroup(createLambdaLogGroup(this, "BulkImportEKSJobStarterLogGroup", functionName, instanceProperties))
+ .events(Lists.newArrayList(SqsEventSource.Builder.create(bulkImportJobQueue).batchSize(1).build()))
+ .tracing(TracingUtils.passThrough(instanceProperties)))
+ .build();
configureJobStarterFunction(bulkImportJobStarter);
importBucketStack.getImportBucket().grantReadWrite(bulkImportJobStarter);
@@ -251,7 +254,7 @@ private static void configureJobStarterFunction(IFunction bulkImportJobStarter)
}
private StateMachine createStateMachine(Cluster cluster, InstanceProperties instanceProperties,
- ITopic errorsTopic) {
+ ITopic errorsTopic) {
String imageName = instanceProperties.get(ACCOUNT) +
".dkr.ecr." +
instanceProperties.get(REGION) +
@@ -295,8 +298,8 @@ private StateMachine createStateMachine(Cluster cluster, InstanceProperties inst
.next(CustomState.Builder.create(this, "DeleteJob")
.stateJson(deleteJobState).build()))
.otherwise(createErrorMessage.next(publishError).next(Fail.Builder
- .create(this, "FailedJobState").cause("Spark job failed").build()))))
- ).build();
+ .create(this, "FailedJobState").cause("Spark job failed").build())))))
+ .build();
}
private KubernetesManifest createNamespace(Cluster bulkImportCluster, String bulkImportNamespace) {
@@ -314,18 +317,18 @@ private void addClusterAdminRoles(Cluster cluster, InstanceProperties properties
}
}
- private void createManifests(Cluster cluster, KubernetesManifest namespace, String namespaceName,
- IRole stateMachineRole) {
+ private void createManifests(
+ Cluster cluster, KubernetesManifest namespace, String namespaceName, IRole stateMachineRole) {
Lists.newArrayList(
- createManifestFromResource(cluster, "SparkSubmitRole", namespaceName, "/k8s/spark-submit-role.json"),
- createManifestFromResource(cluster, "SparkSubmitRoleBinding", namespaceName,
- "/k8s/spark-submit-role-binding.json"),
- createManifestFromResource(cluster, "SparkRole", namespaceName, "/k8s/spark-role.json"),
- createManifestFromResource(cluster, "SparkRoleBinding", namespaceName, "/k8s/spark-role-binding.json"),
- createManifestFromResource(cluster, "StepFunctionRole", namespaceName, "/k8s/step-function-role.json"),
- createManifestFromResource(cluster, "StepFunctionRoleBinding", namespaceName,
- "/k8s/step-function-role-binding.json",
- replacements(Map.of("user-placeholder", stateMachineRole.getRoleArn()))))
+ createManifestFromResource(cluster, "SparkSubmitRole", namespaceName, "/k8s/spark-submit-role.json"),
+ createManifestFromResource(cluster, "SparkSubmitRoleBinding", namespaceName,
+ "/k8s/spark-submit-role-binding.json"),
+ createManifestFromResource(cluster, "SparkRole", namespaceName, "/k8s/spark-role.json"),
+ createManifestFromResource(cluster, "SparkRoleBinding", namespaceName, "/k8s/spark-role-binding.json"),
+ createManifestFromResource(cluster, "StepFunctionRole", namespaceName, "/k8s/step-function-role.json"),
+ createManifestFromResource(cluster, "StepFunctionRoleBinding", namespaceName,
+ "/k8s/step-function-role-binding.json",
+ replacements(Map.of("user-placeholder", stateMachineRole.getRoleArn()))))
.forEach(manifest -> manifest.getNode().addDependency(namespace));
}
@@ -333,8 +336,8 @@ private static KubernetesManifest createManifestFromResource(Cluster cluster, St
return createManifestFromResource(cluster, id, namespace, resource, json -> json);
}
- private static KubernetesManifest createManifestFromResource(Cluster cluster, String id, String namespace, String resource,
- Function replacements) {
+ private static KubernetesManifest createManifestFromResource(
+ Cluster cluster, String id, String namespace, String resource, Function replacements) {
return cluster.addManifest(id, parseJsonWithNamespace(resource, namespace, replacements));
}
diff --git a/java/clients/src/main/java/sleeper/clients/deploy/StacksForDockerUpload.java b/java/clients/src/main/java/sleeper/clients/deploy/StacksForDockerUpload.java
index 5a968b76eb..31b7c0013d 100644
--- a/java/clients/src/main/java/sleeper/clients/deploy/StacksForDockerUpload.java
+++ b/java/clients/src/main/java/sleeper/clients/deploy/StacksForDockerUpload.java
@@ -29,12 +29,14 @@
import static sleeper.configuration.properties.instance.CommonProperty.ID;
import static sleeper.configuration.properties.instance.CommonProperty.OPTIONAL_STACKS;
import static sleeper.configuration.properties.instance.CommonProperty.REGION;
+import static sleeper.configuration.properties.instance.CommonProperty.TRACING_ENABLED;
public class StacksForDockerUpload {
private final String ecrPrefix;
private final String account;
private final String region;
private final String version;
+ private final boolean tracingEnabled;
private final List stacks;
private StacksForDockerUpload(Builder builder) {
@@ -42,6 +44,7 @@ private StacksForDockerUpload(Builder builder) {
account = requireNonNull(builder.account, "account must not be null");
region = requireNonNull(builder.region, "region must not be null");
version = requireNonNull(builder.version, "version must not be null");
+ tracingEnabled = builder.tracingEnabled;
stacks = requireNonNull(builder.stacks, "stacks must not be null");
}
@@ -60,6 +63,7 @@ public static StacksForDockerUpload from(InstanceProperties properties, String v
.account(properties.get(ACCOUNT))
.region(properties.get(REGION))
.version(version)
+ .tracingEnabled(properties.getBoolean(TRACING_ENABLED))
.stacks(properties.getList(OPTIONAL_STACKS)).build();
}
@@ -79,6 +83,10 @@ public String getVersion() {
return version;
}
+ public String getMode() {
+ return tracingEnabled ? "tracing" : "no_tracing";
+ }
+
public List getStacks() {
return stacks;
}
@@ -120,6 +128,7 @@ public static final class Builder {
private String account;
private String region;
private String version;
+ private boolean tracingEnabled;
private List stacks;
private Builder() {
@@ -145,6 +154,11 @@ public Builder version(String version) {
return this;
}
+ public Builder tracingEnabled(boolean tracingEnabled) {
+ this.tracingEnabled = tracingEnabled;
+ return this;
+ }
+
public Builder stacks(List stacks) {
this.stacks = stacks;
return this;
diff --git a/java/clients/src/main/java/sleeper/clients/deploy/UploadDockerImages.java b/java/clients/src/main/java/sleeper/clients/deploy/UploadDockerImages.java
index c23233b0b7..e747ff3f82 100644
--- a/java/clients/src/main/java/sleeper/clients/deploy/UploadDockerImages.java
+++ b/java/clients/src/main/java/sleeper/clients/deploy/UploadDockerImages.java
@@ -44,7 +44,6 @@ private UploadDockerImages(Builder builder) {
dockerImageConfig = requireNonNull(builder.dockerImageConfig, "dockerImageConfig must not be null");
}
-
public static Builder builder() {
return new Builder();
}
@@ -57,8 +56,7 @@ public void upload(CommandPipelineRunner runCommand, StacksForDockerUpload data)
upload(runCommand, data, List.of());
}
- public void upload(CommandPipelineRunner runCommand, StacksForDockerUpload data, List extraDockerImages)
- throws IOException, InterruptedException {
+ public void upload(CommandPipelineRunner runCommand, StacksForDockerUpload data, List extraDockerImages) throws IOException, InterruptedException {
String repositoryHost = String.format("%s.dkr.ecr.%s.amazonaws.com", data.getAccount(), data.getRegion());
List stacksToUpload = dockerImageConfig.getStacksToDeploy(data.getStacks(), extraDockerImages);
List stacksToBuild = stacksToUpload.stream()
@@ -94,9 +92,11 @@ public void upload(CommandPipelineRunner runCommand, StacksForDockerUpload data,
if (stackImage.isBuildx()) {
runCommand.runOrThrow("docker", "buildx", "build",
"--platform", "linux/amd64,linux/arm64",
- "-t", tag, "--push", directory);
+ "-t", tag, "--push", directory,
+ "--build-arg", "MODE=" + data.getMode());
} else {
- runCommand.runOrThrow("docker", "build", "-t", tag, directory);
+ runCommand.runOrThrow("docker", "build",
+ "-t", tag, "--build-arg", "MODE=" + data.getMode(), directory);
runCommand.runOrThrow("docker", "push", tag);
}
} catch (Exception e) {
diff --git a/java/clients/src/test/java/sleeper/clients/deploy/UploadDockerImagesTest.java b/java/clients/src/test/java/sleeper/clients/deploy/UploadDockerImagesTest.java
index e09bef294d..cb7265c32c 100644
--- a/java/clients/src/test/java/sleeper/clients/deploy/UploadDockerImagesTest.java
+++ b/java/clients/src/test/java/sleeper/clients/deploy/UploadDockerImagesTest.java
@@ -56,8 +56,7 @@ public class UploadDockerImagesTest {
"IngestStack", dockerBuildImage("ingest"),
"EksBulkImportStack", dockerBuildImage("bulk-import-runner"),
"BuildxStack", dockerBuildxImage("buildx"),
- "EmrServerlessBulkImportStack", emrServerlessImage("bulk-import-runner-emr-serverless")
- );
+ "EmrServerlessBulkImportStack", emrServerlessImage("bulk-import-runner-emr-serverless"));
private final InMemoryEcrRepositories ecrClient = new InMemoryEcrRepositories();
private final InstanceProperties properties = createTestInstanceProperties();
private final DockerImageConfiguration dockerImageConfiguration = new DockerImageConfiguration(STACK_DOCKER_IMAGES);
@@ -141,8 +140,7 @@ void shouldCreateRepositoryAndPushImageWhenEcrRepositoryPrefixIsSet() throws Exc
assertThat(commandsThatRan).containsExactly(
loginDockerCommand(),
buildImageCommand(expectedTag, "./docker/ingest"),
- pushImageCommand(expectedTag)
- );
+ pushImageCommand(expectedTag));
assertThat(ecrClient.getRepositories())
.containsExactlyInAnyOrder("custom-ecr-prefix/ingest");
}
@@ -222,8 +220,7 @@ void shouldCreateRepositoryAndPushImageWhenOnlyOneImageNeedsToBeBuiltByBuildx()
createNewBuildxBuilderInstanceCommand(),
buildImageCommand(expectedTag1, "./docker/ingest"),
pushImageCommand(expectedTag1),
- buildAndPushImageWithBuildxCommand(expectedTag2, "./docker/buildx")
- );
+ buildAndPushImageWithBuildxCommand(expectedTag2, "./docker/buildx"));
assertThat(ecrClient.getRepositories())
.containsExactlyInAnyOrder("test-instance/buildx", "test-instance/ingest");
@@ -269,9 +266,7 @@ void shouldFailWhenDockerLoginFails() {
properties.set(OPTIONAL_STACKS, "IngestStack");
// When / Then
- assertThatThrownBy(() ->
- uploader().upload(returningExitCode(123), StacksForDockerUpload.from(properties))
- ).isInstanceOfSatisfying(CommandFailedException.class, e -> {
+ assertThatThrownBy(() -> uploader().upload(returningExitCode(123), StacksForDockerUpload.from(properties))).isInstanceOfSatisfying(CommandFailedException.class, e -> {
assertThat(e.getCommand()).isEqualTo(loginDockerCommand());
assertThat(e.getExitCode()).isEqualTo(123);
});
@@ -305,13 +300,11 @@ void shouldFailWhenCreateBuildxBuilderFails() {
properties.set(OPTIONAL_STACKS, "BuildxStack");
// When / Then
- assertThatThrownBy(() ->
- uploader().upload(returningExitCodeForCommand(123, createNewBuildxBuilderInstanceCommand()),
- StacksForDockerUpload.from(properties))
- ).isInstanceOfSatisfying(CommandFailedException.class, e -> {
- assertThat(e.getCommand()).isEqualTo(createNewBuildxBuilderInstanceCommand());
- assertThat(e.getExitCode()).isEqualTo(123);
- });
+ assertThatThrownBy(() -> uploader().upload(returningExitCodeForCommand(123, createNewBuildxBuilderInstanceCommand()),
+ StacksForDockerUpload.from(properties))).isInstanceOfSatisfying(CommandFailedException.class, e -> {
+ assertThat(e.getCommand()).isEqualTo(createNewBuildxBuilderInstanceCommand());
+ assertThat(e.getExitCode()).isEqualTo(123);
+ });
assertThat(ecrClient.getRepositories()).isEmpty();
}
@@ -324,13 +317,11 @@ void shouldDeleteRepositoryWhenDockerBuildFails() {
CommandPipeline buildImageCommand = buildImageCommand(
"123.dkr.ecr.test-region.amazonaws.com/test-instance/ingest:1.0.0",
"./docker/ingest");
- assertThatThrownBy(() ->
- uploader().upload(returningExitCodeForCommand(42, buildImageCommand),
- StacksForDockerUpload.from(properties))
- ).isInstanceOfSatisfying(CommandFailedException.class, e -> {
- assertThat(e.getCommand()).isEqualTo(buildImageCommand);
- assertThat(e.getExitCode()).isEqualTo(42);
- });
+ assertThatThrownBy(() -> uploader().upload(returningExitCodeForCommand(42, buildImageCommand),
+ StacksForDockerUpload.from(properties))).isInstanceOfSatisfying(CommandFailedException.class, e -> {
+ assertThat(e.getCommand()).isEqualTo(buildImageCommand);
+ assertThat(e.getExitCode()).isEqualTo(42);
+ });
assertThat(ecrClient.getRepositories()).isEmpty();
}
}
@@ -383,7 +374,7 @@ private CommandPipeline loginDockerCommand() {
}
private CommandPipeline buildImageCommand(String tag, String dockerDirectory) {
- return pipeline(command("docker", "build", "-t", tag, dockerDirectory));
+ return pipeline(command("docker", "build", "-t", tag, "--build-arg", "MODE=tracing", dockerDirectory));
}
private CommandPipeline pushImageCommand(String tag) {
@@ -400,7 +391,7 @@ private CommandPipeline createNewBuildxBuilderInstanceCommand() {
private CommandPipeline buildAndPushImageWithBuildxCommand(String tag, String dockerDirectory) {
return pipeline(command("docker", "buildx", "build", "--platform", "linux/amd64,linux/arm64",
- "-t", tag, "--push", dockerDirectory));
+ "-t", tag, "--push", dockerDirectory, "--build-arg", "MODE=tracing"));
}
private List commandsToLoginDockerAndPushImages(String... images) {
diff --git a/java/compaction/compaction-job-execution/docker/Dockerfile b/java/compaction/compaction-job-execution/docker/Dockerfile
index 34a40e8443..0fd2e531fb 100644
--- a/java/compaction/compaction-job-execution/docker/Dockerfile
+++ b/java/compaction/compaction-job-execution/docker/Dockerfile
@@ -11,7 +11,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-FROM amazoncorretto:11
+ARG MODE=no_tracing
+
+FROM amazoncorretto:11 as build_no_tracing
+
+FROM build_no_tracing as build_tracing
+
+ONBUILD ADD https://github.com/aws-observability/aws-otel-java-instrumentation/releases/latest/download/aws-opentelemetry-agent.jar /opt/aws-opentelemetry-agent.jar
+ONBUILD ENV JAVA_TOOL_OPTIONS=-javaagent:/opt/aws-opentelemetry-agent.jar
+
+FROM build_${MODE}
COPY compaction-job-execution.jar /compaction-job-execution.jar
COPY run.sh /run.sh
diff --git a/java/configuration/src/main/java/sleeper/configuration/properties/instance/CommonProperty.java b/java/configuration/src/main/java/sleeper/configuration/properties/instance/CommonProperty.java
index 48694dc303..e1964fcb7c 100644
--- a/java/configuration/src/main/java/sleeper/configuration/properties/instance/CommonProperty.java
+++ b/java/configuration/src/main/java/sleeper/configuration/properties/instance/CommonProperty.java
@@ -230,6 +230,12 @@ public interface CommonProperty {
.validationPredicate(Utils::isPositiveInteger)
.propertyGroup(InstancePropertyGroup.COMMON)
.build();
+ UserDefinedInstanceProperty TRACING_ENABLED = Index.propertyBuilder("sleeper.opentelemetry.tracing.enabled")
+ .description("This specifies whether OpenTelemetry tracing is enabled.")
+ .defaultValue("true")
+ .validationPredicate(Utils::isTrueOrFalse)
+ .propertyGroup(InstancePropertyGroup.COMMON)
+ .runCdkDeployWhenChanged(true).build();
static List getAll() {
return Index.INSTANCE.getAll();
diff --git a/java/ingest/ingest-runner/docker/Dockerfile b/java/ingest/ingest-runner/docker/Dockerfile
index b6b2841107..5fd7fdb5ef 100644
--- a/java/ingest/ingest-runner/docker/Dockerfile
+++ b/java/ingest/ingest-runner/docker/Dockerfile
@@ -11,7 +11,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-FROM amazoncorretto:11
+ARG MODE=no_tracing
+
+FROM amazoncorretto:11 as build_no_tracing
+
+FROM build_no_tracing as build_tracing
+
+ONBUILD ADD https://github.com/aws-observability/aws-otel-java-instrumentation/releases/latest/download/aws-opentelemetry-agent.jar /opt/aws-opentelemetry-agent.jar
+ONBUILD ENV JAVA_TOOL_OPTIONS=-javaagent:/opt/aws-opentelemetry-agent.jar
+
+FROM build_${MODE}
COPY ingest.jar /ingest.jar
COPY run.sh /run.sh
diff --git a/java/system-test/system-test-cdk/src/main/java/sleeper/systemtest/cdk/SystemTestApp.java b/java/system-test/system-test-cdk/src/main/java/sleeper/systemtest/cdk/SystemTestApp.java
index 0956ee2b87..c74dba9973 100644
--- a/java/system-test/system-test-cdk/src/main/java/sleeper/systemtest/cdk/SystemTestApp.java
+++ b/java/system-test/system-test-cdk/src/main/java/sleeper/systemtest/cdk/SystemTestApp.java
@@ -29,7 +29,6 @@
import static sleeper.configuration.properties.instance.CommonProperty.ACCOUNT;
import static sleeper.configuration.properties.instance.CommonProperty.ID;
-import static sleeper.configuration.properties.instance.CommonProperty.JARS_BUCKET;
import static sleeper.configuration.properties.instance.CommonProperty.REGION;
import static sleeper.systemtest.configuration.SystemTestProperty.SYSTEM_TEST_CLUSTER_ENABLED;
@@ -87,7 +86,7 @@ public static void main(String[] args) {
.account(systemTestProperties.get(ACCOUNT))
.region(systemTestProperties.get(REGION))
.build();
- BuiltJars jars = new BuiltJars(AmazonS3ClientBuilder.defaultClient(), systemTestProperties.get(JARS_BUCKET));
+ BuiltJars jars = new BuiltJars(AmazonS3ClientBuilder.defaultClient(), systemTestProperties);
new SystemTestApp(app, id, StackProps.builder()
.stackName(id)
diff --git a/java/system-test/system-test-cdk/src/main/java/sleeper/systemtest/cdk/SystemTestPropertiesStack.java b/java/system-test/system-test-cdk/src/main/java/sleeper/systemtest/cdk/SystemTestPropertiesStack.java
index e9df499cbd..a209f2f23d 100644
--- a/java/system-test/system-test-cdk/src/main/java/sleeper/systemtest/cdk/SystemTestPropertiesStack.java
+++ b/java/system-test/system-test-cdk/src/main/java/sleeper/systemtest/cdk/SystemTestPropertiesStack.java
@@ -34,7 +34,6 @@
import java.util.HashMap;
import java.util.Locale;
-import java.util.Map;
import static sleeper.cdk.Utils.createLogGroupWithRetentionDays;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET;
@@ -43,11 +42,9 @@
public class SystemTestPropertiesStack extends NestedStack {
- public SystemTestPropertiesStack(Construct scope,
- String id,
- SystemTestStandaloneProperties systemTestProperties,
- SystemTestBucketStack bucketStack,
- BuiltJars jars) {
+ public SystemTestPropertiesStack(
+ Construct scope, String id, SystemTestStandaloneProperties systemTestProperties,
+ SystemTestBucketStack bucketStack, BuiltJars jars) {
super(scope, id);
String jarsBucketName = systemTestProperties.get(SYSTEM_TEST_JARS_BUCKET);
@@ -60,14 +57,16 @@ public SystemTestPropertiesStack(Construct scope,
String functionName = Utils.truncateTo64Characters(String.join("-", "sleeper",
systemTestProperties.get(SYSTEM_TEST_ID).toLowerCase(Locale.ROOT), "properties-writer"));
- IFunction propertiesWriterLambda = jar.buildFunction(this, "PropertiesWriterLambda", builder -> builder
- .functionName(functionName)
- .handler("sleeper.cdk.custom.PropertiesWriterLambda::handleEvent")
- .memorySize(2048)
- .environment(Map.of(CONFIG_BUCKET.toEnvironmentVariable(), bucketStack.getBucket().getBucketName()))
- .description("Lambda for writing system test properties to S3 upon initialisation and teardown")
- .logGroup(createLogGroupWithRetentionDays(this, "PropertiesWriterLambdaLogGroup", 30))
- .runtime(Runtime.JAVA_11));
+ IFunction propertiesWriterLambda = jar.createFunction(this, "PropertiesWriterLambda")
+ .environmentVariable(CONFIG_BUCKET.toEnvironmentVariable(), bucketStack.getBucket().getBucketName())
+ .config(builder -> builder
+ .functionName(functionName)
+ .handler("sleeper.cdk.custom.PropertiesWriterLambda::handleEvent")
+ .memorySize(2048)
+ .description("Lambda for writing system test properties to S3 upon initialisation and teardown")
+ .logGroup(createLogGroupWithRetentionDays(this, "PropertiesWriterLambdaLogGroup", 30))
+ .runtime(Runtime.JAVA_11))
+ .build();
bucketStack.getBucket().grantWrite(propertiesWriterLambda);
diff --git a/java/system-test/system-test-data-generation/docker/Dockerfile b/java/system-test/system-test-data-generation/docker/Dockerfile
index d3902c192a..bdad39d5be 100644
--- a/java/system-test/system-test-data-generation/docker/Dockerfile
+++ b/java/system-test/system-test-data-generation/docker/Dockerfile
@@ -11,7 +11,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-FROM amazoncorretto:11
+ARG MODE=no_tracing
+
+FROM amazoncorretto:11 as build_no_tracing
+
+FROM build_no_tracing as build_tracing
+
+ONBUILD ADD https://github.com/aws-observability/aws-otel-java-instrumentation/releases/latest/download/aws-opentelemetry-agent.jar /opt/aws-opentelemetry-agent.jar
+ONBUILD ENV JAVA_TOOL_OPTIONS=-javaagent:/opt/aws-opentelemetry-agent.jar
+
+FROM build_${MODE}
COPY system-test.jar /system-test.jar
COPY run.sh /run.sh
diff --git a/scripts/templates/instanceproperties.template b/scripts/templates/instanceproperties.template
index 28028b6a65..2611463e89 100644
--- a/scripts/templates/instanceproperties.template
+++ b/scripts/templates/instanceproperties.template
@@ -170,6 +170,9 @@ sleeper.table.batching.lambdas.memory=1024
# create compaction jobs, run garbage collection, perform partition splitting.
sleeper.table.batching.lambdas.timeout.seconds=60
+# This specifies whether OpenTelemetry tracing is enabled.
+sleeper.opentelemetry.tracing.enabled=true
+
## The following properties relate to standard ingest.