diff --git a/java/bulk-import/bulk-import-common/src/main/java/sleeper/bulkimport/configuration/ConfigurationUtils.java b/java/bulk-import/bulk-import-common/src/main/java/sleeper/bulkimport/configuration/ConfigurationUtils.java
index fa3b0d5324..60de12096f 100644
--- a/java/bulk-import/bulk-import-common/src/main/java/sleeper/bulkimport/configuration/ConfigurationUtils.java
+++ b/java/bulk-import/bulk-import-common/src/main/java/sleeper/bulkimport/configuration/ConfigurationUtils.java
@@ -74,7 +74,7 @@
*/
public class ConfigurationUtils {
- private static final String JAVA_HOME = "/usr/lib/jvm/java-11-amazon-corretto.%s";
+ private static final String JAVA_HOME = "/usr/lib/jvm/java-17-amazon-corretto.%s";
private ConfigurationUtils() {
}
diff --git a/java/bulk-import/bulk-import-runner/docker/eks-native/Dockerfile b/java/bulk-import/bulk-import-eks/docker/eks-native/Dockerfile
similarity index 97%
rename from java/bulk-import/bulk-import-runner/docker/eks-native/Dockerfile
rename to java/bulk-import/bulk-import-eks/docker/eks-native/Dockerfile
index dc0d36543e..dbea669327 100644
--- a/java/bulk-import/bulk-import-runner/docker/eks-native/Dockerfile
+++ b/java/bulk-import/bulk-import-eks/docker/eks-native/Dockerfile
@@ -12,10 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
ARG BUILDER_IMAGE_NAME=maven
-ARG BUILDER_IMAGE_TAG=3.8-openjdk-8-slim
+ARG BUILDER_IMAGE_TAG=3.9-amazoncorretto-17-al2023
ARG BASE_IMAGE_NAME=amazoncorretto
-ARG BASE_IMAGE_TAG=11-al2023-headless
+ARG BASE_IMAGE_TAG=17-al2023-headless
ARG SPARK_VERSION=3.5.1
ARG HADOOP_VERSION=3.3.6
diff --git a/java/bulk-import/bulk-import-runner/docker/eks-native/build/build-hadoop.sh b/java/bulk-import/bulk-import-eks/docker/eks-native/build/build-hadoop.sh
similarity index 100%
rename from java/bulk-import/bulk-import-runner/docker/eks-native/build/build-hadoop.sh
rename to java/bulk-import/bulk-import-eks/docker/eks-native/build/build-hadoop.sh
diff --git a/java/bulk-import/bulk-import-runner/docker/eks-native/build/extract-native-libs.sh b/java/bulk-import/bulk-import-eks/docker/eks-native/build/extract-native-libs.sh
similarity index 100%
rename from java/bulk-import/bulk-import-runner/docker/eks-native/build/extract-native-libs.sh
rename to java/bulk-import/bulk-import-eks/docker/eks-native/build/extract-native-libs.sh
diff --git a/java/bulk-import/bulk-import-runner/docker/eks/Dockerfile b/java/bulk-import/bulk-import-eks/docker/eks/Dockerfile
similarity index 78%
rename from java/bulk-import/bulk-import-runner/docker/eks/Dockerfile
rename to java/bulk-import/bulk-import-eks/docker/eks/Dockerfile
index 2e1bbdfdb5..e53340069e 100644
--- a/java/bulk-import/bulk-import-runner/docker/eks/Dockerfile
+++ b/java/bulk-import/bulk-import-eks/docker/eks/Dockerfile
@@ -12,10 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-FROM apache/spark:3.5.1-scala2.12-java11-ubuntu
+FROM apache/spark:3.5.1-scala2.12-java17-ubuntu
ENV PATH="$PATH:/opt/spark/bin"
USER root
+RUN rm /opt/spark/jars/*
RUN mkdir /opt/spark/workdir
USER spark
+
+# Replace Spark jars with versions managed by Sleeper (the version of Spark must match)
+COPY ./spark/* /opt/spark/jars
COPY ./bulk-import-runner.jar /opt/spark/workdir
diff --git a/java/bulk-import/bulk-import-eks/pom.xml b/java/bulk-import/bulk-import-eks/pom.xml
new file mode 100644
index 0000000000..5b9a526625
--- /dev/null
+++ b/java/bulk-import/bulk-import-eks/pom.xml
@@ -0,0 +1,66 @@
+
+
+
+
+ sleeper
+ bulk-import
+ 0.26.0-SNAPSHOT
+
+ 4.0.0
+
+ bulk-import-eks
+
+
+
+ org.apache.spark
+ spark-sql_${scala.version}
+ ${spark.version}
+ runtime
+
+
+ org.apache.spark
+ spark-kubernetes_${scala.version}
+ ${spark.version}
+ runtime
+
+
+
+
+
+
+ maven-dependency-plugin
+
+
+
+
+
+
+
+
+ copy-dependencies
+
+
+ ${project.build.directory}/spark
+ runtime
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/java/bulk-import/bulk-import-runner/docker/emr-serverless/Dockerfile b/java/bulk-import/bulk-import-runner/docker/emr-serverless/Dockerfile
index 4429c98734..c535b34930 100644
--- a/java/bulk-import/bulk-import-runner/docker/emr-serverless/Dockerfile
+++ b/java/bulk-import/bulk-import-runner/docker/emr-serverless/Dockerfile
@@ -15,12 +15,7 @@
FROM public.ecr.aws/emr-serverless/spark/emr-7.2.0:latest
USER root
-
-# Install JDK 11
-RUN yum update -y && \
- yum install java-11-amazon-corretto -y && \
- mkdir /workdir
-
+RUN mkdir /workdir
COPY ./bulk-import-runner.jar /workdir
# EMR Severless will run the image as hadoop
diff --git a/java/bulk-import/bulk-import-starter/src/test/java/sleeper/bulkimport/starter/executor/EmrServerlessPlatformExecutorIT.java b/java/bulk-import/bulk-import-starter/src/test/java/sleeper/bulkimport/starter/executor/EmrServerlessPlatformExecutorIT.java
index 05ca6e1e5c..44b6f41e75 100644
--- a/java/bulk-import/bulk-import-starter/src/test/java/sleeper/bulkimport/starter/executor/EmrServerlessPlatformExecutorIT.java
+++ b/java/bulk-import/bulk-import-starter/src/test/java/sleeper/bulkimport/starter/executor/EmrServerlessPlatformExecutorIT.java
@@ -101,7 +101,7 @@ void shouldRunAServerlessJob(WireMockRuntimeInfo runtimeInfo) {
assertThatJson(body)
.inPath("$.jobDriver.sparkSubmit.sparkSubmitParameters").asString()
.startsWith("--class BulkImportClass ")
- .contains(" --conf spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-11-amazon-corretto.x86_64 ");
+ .contains(" --conf spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64 ");
});
}
diff --git a/java/bulk-import/pom.xml b/java/bulk-import/pom.xml
index 09fd53e873..ccc175c005 100644
--- a/java/bulk-import/pom.xml
+++ b/java/bulk-import/pom.xml
@@ -30,5 +30,6 @@
bulk-import-common
bulk-import-runner
bulk-import-starter
+ bulk-import-eks
diff --git a/java/cdk-environment/src/main/java/sleeper/environment/cdk/builduptime/BuildUptimeDeployment.java b/java/cdk-environment/src/main/java/sleeper/environment/cdk/builduptime/BuildUptimeDeployment.java
index 9130d8b62a..ff1324e813 100644
--- a/java/cdk-environment/src/main/java/sleeper/environment/cdk/builduptime/BuildUptimeDeployment.java
+++ b/java/cdk-environment/src/main/java/sleeper/environment/cdk/builduptime/BuildUptimeDeployment.java
@@ -21,6 +21,7 @@
import software.amazon.awscdk.services.lambda.Code;
import software.amazon.awscdk.services.lambda.Function;
import software.amazon.awscdk.services.lambda.IFunction;
+import software.amazon.awscdk.services.lambda.Runtime;
import software.constructs.Construct;
import sleeper.environment.cdk.config.AppContext;
@@ -31,7 +32,6 @@
import java.util.Map;
import static sleeper.environment.cdk.config.AppParameters.INSTANCE_ID;
-import static software.amazon.awscdk.services.lambda.Runtime.JAVA_11;
public class BuildUptimeDeployment {
public static final OptionalStringParameter LAMBDA_JAR = AppParameters.BUILD_UPTIME_LAMBDA_JAR;
@@ -48,7 +48,7 @@ public BuildUptimeDeployment(Construct scope) {
.code(Code.fromAsset(lambdaJarPath))
.functionName("sleeper-" + context.get(INSTANCE_ID) + "-build-uptime")
.description("Start and stop EC2 instances and schedule rules")
- .runtime(JAVA_11)
+ .runtime(Runtime.JAVA_17)
.memorySize(1024)
.timeout(Duration.minutes(10))
.handler("sleeper.build.uptime.lambda.BuildUptimeLambda::handleRequest")
diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/AthenaStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/AthenaStack.java
index 451b363b83..c81fbf619f 100644
--- a/java/cdk/src/main/java/sleeper/cdk/stack/AthenaStack.java
+++ b/java/cdk/src/main/java/sleeper/cdk/stack/AthenaStack.java
@@ -162,7 +162,7 @@ private IFunction createConnector(
.functionName(functionName)
.memorySize(memory)
.timeout(Duration.seconds(timeout))
- .runtime(Runtime.JAVA_11)
+ .runtime(Runtime.JAVA_17)
.logGroup(coreStacks.getLogGroupByFunctionName(functionName))
.handler(className)
.environment(env));
diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/CompactionStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/CompactionStack.java
index 5a41a2b64f..34635b9718 100644
--- a/java/cdk/src/main/java/sleeper/cdk/stack/CompactionStack.java
+++ b/java/cdk/src/main/java/sleeper/cdk/stack/CompactionStack.java
@@ -65,6 +65,7 @@
import software.amazon.awscdk.services.iam.PolicyStatement;
import software.amazon.awscdk.services.lambda.CfnPermission;
import software.amazon.awscdk.services.lambda.IFunction;
+import software.amazon.awscdk.services.lambda.Runtime;
import software.amazon.awscdk.services.lambda.eventsources.SqsEventSource;
import software.amazon.awscdk.services.s3.Bucket;
import software.amazon.awscdk.services.s3.IBucket;
@@ -138,7 +139,6 @@
import static sleeper.core.properties.instance.CompactionProperty.COMPACTION_TASK_CPU_ARCHITECTURE;
import static sleeper.core.properties.instance.CompactionProperty.COMPACTION_TASK_CREATION_PERIOD_IN_MINUTES;
import static sleeper.core.properties.instance.CompactionProperty.ECR_COMPACTION_REPO;
-import static software.amazon.awscdk.services.lambda.Runtime.JAVA_11;
/**
* Deploys the resources needed to perform compaction jobs. Specifically, there is:
@@ -271,7 +271,7 @@ private void lambdaToCreateCompactionJobsBatchedViaSQS(
IFunction triggerFunction = jobCreatorJar.buildFunction(this, "CompactionJobsCreationTrigger", builder -> builder
.functionName(triggerFunctionName)
.description("Create batches of tables and send requests to create compaction jobs for those batches")
- .runtime(JAVA_11)
+ .runtime(Runtime.JAVA_17)
.memorySize(instanceProperties.getInt(TABLE_BATCHING_LAMBDAS_MEMORY_IN_MB))
.timeout(Duration.seconds(instanceProperties.getInt(TABLE_BATCHING_LAMBDAS_TIMEOUT_IN_SECONDS)))
.handler("sleeper.compaction.job.creation.lambda.CreateCompactionJobsTriggerLambda::handleRequest")
@@ -282,7 +282,7 @@ private void lambdaToCreateCompactionJobsBatchedViaSQS(
IFunction handlerFunction = jobCreatorJar.buildFunction(this, "CompactionJobsCreationHandler", builder -> builder
.functionName(functionName)
.description("Scan the state stores of the provided tables looking for compaction jobs to create")
- .runtime(JAVA_11)
+ .runtime(Runtime.JAVA_17)
.memorySize(instanceProperties.getInt(COMPACTION_JOB_CREATION_LAMBDA_MEMORY_IN_MB))
.timeout(Duration.seconds(instanceProperties.getInt(COMPACTION_JOB_CREATION_LAMBDA_TIMEOUT_IN_SECONDS)))
.handler("sleeper.compaction.job.creation.lambda.CreateCompactionJobsLambda::handleRequest")
@@ -594,7 +594,7 @@ private IFunction lambdaForCustomTerminationPolicy(CoreStacks coreStacks, Lambda
.handler("sleeper.compaction.task.creation.SafeTerminationLambda::handleRequest")
.logGroup(coreStacks.getLogGroupByFunctionName(functionName))
.memorySize(512)
- .runtime(software.amazon.awscdk.services.lambda.Runtime.JAVA_11)
+ .runtime(Runtime.JAVA_17)
.timeout(Duration.seconds(10)));
coreStacks.grantReadInstanceConfig(handler);
@@ -619,7 +619,7 @@ private void lambdaToCreateCompactionTasks(
IFunction handler = taskCreatorJar.buildFunction(this, "CompactionTasksCreator", builder -> builder
.functionName(functionName)
.description("If there are compaction jobs on queue create tasks to run them")
- .runtime(software.amazon.awscdk.services.lambda.Runtime.JAVA_11)
+ .runtime(Runtime.JAVA_17)
.memorySize(instanceProperties.getInt(TASK_RUNNER_LAMBDA_MEMORY_IN_MB))
.timeout(Duration.seconds(instanceProperties.getInt(TASK_RUNNER_LAMBDA_TIMEOUT_IN_SECONDS)))
.handler("sleeper.compaction.task.creation.RunCompactionTasksLambda::eventHandler")
diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/CoreStacks.java b/java/cdk/src/main/java/sleeper/cdk/stack/CoreStacks.java
index 54d393731f..1308ae0179 100644
--- a/java/cdk/src/main/java/sleeper/cdk/stack/CoreStacks.java
+++ b/java/cdk/src/main/java/sleeper/cdk/stack/CoreStacks.java
@@ -72,6 +72,10 @@ public ILogGroup getLogGroupByStateMachineId(String id) {
return loggingStack.getLogGroupByStateMachineId(id);
}
+ public ILogGroup getLogGroupByEksClusterName(String clusterName) {
+ return loggingStack.getLogGroupByEksClusterName(clusterName);
+ }
+
public void grantReadInstanceConfig(IGrantable grantee) {
configBucketStack.grantRead(grantee);
}
diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/GarbageCollectorStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/GarbageCollectorStack.java
index 96e474d9e4..6c78c4181e 100644
--- a/java/cdk/src/main/java/sleeper/cdk/stack/GarbageCollectorStack.java
+++ b/java/cdk/src/main/java/sleeper/cdk/stack/GarbageCollectorStack.java
@@ -85,7 +85,7 @@ public GarbageCollectorStack(
IFunction triggerFunction = gcJar.buildFunction(this, "GarbageCollectorTrigger", builder -> builder
.functionName(triggerFunctionName)
.description("Creates batches of Sleeper tables to perform garbage collection for and puts them on a queue to be processed")
- .runtime(Runtime.JAVA_11)
+ .runtime(Runtime.JAVA_17)
.handler("sleeper.garbagecollector.GarbageCollectorTriggerLambda::handleRequest")
.environment(Utils.createDefaultEnvironment(instanceProperties))
.reservedConcurrentExecutions(1)
@@ -95,7 +95,7 @@ public GarbageCollectorStack(
IFunction handlerFunction = gcJar.buildFunction(this, "GarbageCollectorLambda", builder -> builder
.functionName(functionName)
.description("Scan the state store looking for files that need deleting and delete them")
- .runtime(Runtime.JAVA_11)
+ .runtime(Runtime.JAVA_17)
.memorySize(instanceProperties.getInt(GARBAGE_COLLECTOR_LAMBDA_MEMORY_IN_MB))
.timeout(handlerTimeout)
.handler("sleeper.garbagecollector.GarbageCollectorLambda::handleRequest")
diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/IngestBatcherStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/IngestBatcherStack.java
index 96d6565ba2..dbac67b5e9 100644
--- a/java/cdk/src/main/java/sleeper/cdk/stack/IngestBatcherStack.java
+++ b/java/cdk/src/main/java/sleeper/cdk/stack/IngestBatcherStack.java
@@ -27,6 +27,7 @@
import software.amazon.awscdk.services.events.Schedule;
import software.amazon.awscdk.services.events.targets.LambdaFunction;
import software.amazon.awscdk.services.lambda.IFunction;
+import software.amazon.awscdk.services.lambda.Runtime;
import software.amazon.awscdk.services.lambda.eventsources.SqsEventSource;
import software.amazon.awscdk.services.s3.Bucket;
import software.amazon.awscdk.services.s3.IBucket;
@@ -139,7 +140,7 @@ public IngestBatcherStack(
IFunction submitterLambda = submitterJar.buildFunction(this, "SubmitToIngestBatcherLambda", builder -> builder
.functionName(submitterName)
.description("Triggered by an SQS event that contains a request to ingest a file")
- .runtime(software.amazon.awscdk.services.lambda.Runtime.JAVA_11)
+ .runtime(Runtime.JAVA_17)
.memorySize(instanceProperties.getInt(INGEST_BATCHER_SUBMITTER_MEMORY_IN_MB))
.timeout(Duration.seconds(instanceProperties.getInt(INGEST_BATCHER_SUBMITTER_TIMEOUT_IN_SECONDS)))
.handler("sleeper.ingest.batcher.submitter.IngestBatcherSubmitterLambda::handleRequest")
@@ -156,7 +157,7 @@ public IngestBatcherStack(
IFunction jobCreatorLambda = jobCreatorJar.buildFunction(this, "IngestBatcherJobCreationLambda", builder -> builder
.functionName(jobCreatorName)
.description("Create jobs by batching up submitted file ingest requests")
- .runtime(software.amazon.awscdk.services.lambda.Runtime.JAVA_11)
+ .runtime(Runtime.JAVA_17)
.memorySize(instanceProperties.getInt(INGEST_BATCHER_JOB_CREATION_MEMORY_IN_MB))
.timeout(Duration.seconds(instanceProperties.getInt(INGEST_BATCHER_JOB_CREATION_TIMEOUT_IN_SECONDS)))
.handler("sleeper.ingest.batcher.job.creator.IngestBatcherJobCreatorLambda::eventHandler")
diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/IngestStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/IngestStack.java
index ad15796d94..49ae11b939 100644
--- a/java/cdk/src/main/java/sleeper/cdk/stack/IngestStack.java
+++ b/java/cdk/src/main/java/sleeper/cdk/stack/IngestStack.java
@@ -38,6 +38,7 @@
import software.amazon.awscdk.services.iam.ManagedPolicy;
import software.amazon.awscdk.services.iam.PolicyStatement;
import software.amazon.awscdk.services.lambda.IFunction;
+import software.amazon.awscdk.services.lambda.Runtime;
import software.amazon.awscdk.services.s3.Bucket;
import software.amazon.awscdk.services.s3.IBucket;
import software.amazon.awscdk.services.sns.Topic;
@@ -250,7 +251,7 @@ private void lambdaToCreateIngestTasks(CoreStacks coreStacks, Queue ingestJobQue
IFunction handler = taskCreatorJar.buildFunction(this, "IngestTasksCreator", builder -> builder
.functionName(functionName)
.description("If there are ingest jobs on queue create tasks to run them")
- .runtime(software.amazon.awscdk.services.lambda.Runtime.JAVA_11)
+ .runtime(Runtime.JAVA_17)
.memorySize(instanceProperties.getInt(TASK_RUNNER_LAMBDA_MEMORY_IN_MB))
.timeout(Duration.seconds(instanceProperties.getInt(TASK_RUNNER_LAMBDA_TIMEOUT_IN_SECONDS)))
.handler("sleeper.ingest.starter.RunIngestTasksLambda::eventHandler")
diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/KeepLambdaWarmStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/KeepLambdaWarmStack.java
index e5ed38b44f..bce1689fbc 100644
--- a/java/cdk/src/main/java/sleeper/cdk/stack/KeepLambdaWarmStack.java
+++ b/java/cdk/src/main/java/sleeper/cdk/stack/KeepLambdaWarmStack.java
@@ -69,7 +69,7 @@ public KeepLambdaWarmStack(Construct scope,
IFunction handler = queryJar.buildFunction(this, "WarmQueryExecutorLambda", builder -> builder
.functionName(functionName)
.description("Sends a message to query-executor lambda in order for it to stay warm")
- .runtime(Runtime.JAVA_11)
+ .runtime(Runtime.JAVA_17)
.memorySize(instanceProperties.getInt(QUERY_PROCESSOR_LAMBDA_MEMORY_IN_MB))
.timeout(Duration.seconds(instanceProperties.getInt(QUERY_PROCESSOR_LAMBDA_TIMEOUT_IN_SECONDS)))
.handler("sleeper.query.lambda.WarmQueryExecutorLambda::handleRequest")
diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/LoggingStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/LoggingStack.java
index d8209f9759..a9fbb98d18 100644
--- a/java/cdk/src/main/java/sleeper/cdk/stack/LoggingStack.java
+++ b/java/cdk/src/main/java/sleeper/cdk/stack/LoggingStack.java
@@ -62,6 +62,7 @@ public LoggingStack(Construct scope, String id, InstanceProperties instancePrope
createLogGroup("bulk-import-NonPersistentEMR-start");
createLogGroup("bulk-import-PersistentEMR-start");
createLogGroup("bulk-import-eks-starter");
+ createLogGroup("bulk-import-eks");
createStateMachineLogGroup("EksBulkImportStateMachine");
createLogGroup("bulk-import-autodelete");
createLogGroup("bulk-import-autodelete-provider");
@@ -108,6 +109,10 @@ public ILogGroup getLogGroupByStateMachineId(String id) {
return getLogGroupByNameWithPrefixes(addStateMachineNamePrefixes(id));
}
+ public ILogGroup getLogGroupByEksClusterName(String clusterName) {
+ return getLogGroupByNameWithPrefixes(clusterName);
+ }
+
private ILogGroup getLogGroupByNameWithPrefixes(String nameWithPrefixes) {
return Objects.requireNonNull(logGroupByName.get(nameWithPrefixes), "No log group found: " + nameWithPrefixes);
}
diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/PartitionSplittingStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/PartitionSplittingStack.java
index 03628dc8e5..64327c97ed 100644
--- a/java/cdk/src/main/java/sleeper/cdk/stack/PartitionSplittingStack.java
+++ b/java/cdk/src/main/java/sleeper/cdk/stack/PartitionSplittingStack.java
@@ -24,6 +24,7 @@
import software.amazon.awscdk.services.events.Schedule;
import software.amazon.awscdk.services.events.targets.LambdaFunction;
import software.amazon.awscdk.services.lambda.IFunction;
+import software.amazon.awscdk.services.lambda.Runtime;
import software.amazon.awscdk.services.lambda.eventsources.SqsEventSource;
import software.amazon.awscdk.services.s3.Bucket;
import software.amazon.awscdk.services.s3.IBucket;
@@ -188,7 +189,7 @@ private void createTriggerFunction(InstanceProperties instanceProperties, Lambda
IFunction triggerFunction = splitterJar.buildFunction(this, "FindPartitionsToSplitTriggerLambda", builder -> builder
.functionName(triggerFunctionName)
.description("Creates batches of Sleeper tables to perform partition splitting for and puts them on a queue to be processed")
- .runtime(software.amazon.awscdk.services.lambda.Runtime.JAVA_11)
+ .runtime(Runtime.JAVA_17)
.memorySize(instanceProperties.getInt(TABLE_BATCHING_LAMBDAS_MEMORY_IN_MB))
.timeout(Duration.seconds(instanceProperties.getInt(TABLE_BATCHING_LAMBDAS_TIMEOUT_IN_SECONDS)))
.handler("sleeper.splitter.lambda.FindPartitionsToSplitTriggerLambda::handleRequest")
@@ -218,7 +219,7 @@ private void createFindPartitionsToSplitFunction(InstanceProperties instanceProp
IFunction findPartitionsToSplitLambda = splitterJar.buildFunction(this, "FindPartitionsToSplitLambda", builder -> builder
.functionName(functionName)
.description("Scan the state stores of the provided tables looking for partitions that need splitting")
- .runtime(software.amazon.awscdk.services.lambda.Runtime.JAVA_11)
+ .runtime(Runtime.JAVA_17)
.memorySize(instanceProperties.getInt(FIND_PARTITIONS_TO_SPLIT_LAMBDA_MEMORY_IN_MB))
.timeout(Duration.seconds(instanceProperties.getInt(FIND_PARTITIONS_TO_SPLIT_TIMEOUT_IN_SECONDS)))
.handler("sleeper.splitter.lambda.FindPartitionsToSplitLambda::handleRequest")
@@ -244,7 +245,7 @@ private void createSplitPartitionFunction(InstanceProperties instanceProperties,
IFunction splitPartitionLambda = splitterJar.buildFunction(this, "SplitPartitionLambda", builder -> builder
.functionName(splitFunctionName)
.description("Triggered by an SQS event that contains a partition to split")
- .runtime(software.amazon.awscdk.services.lambda.Runtime.JAVA_11)
+ .runtime(Runtime.JAVA_17)
.memorySize(instanceProperties.getInt(SPLIT_PARTITIONS_LAMBDA_MEMORY_IN_MB))
.timeout(Duration.seconds(instanceProperties.getInt(SPLIT_PARTITIONS_TIMEOUT_IN_SECONDS)))
.reservedConcurrentExecutions(concurrency)
diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/PropertiesStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/PropertiesStack.java
index ec4855d26a..36795606fc 100644
--- a/java/cdk/src/main/java/sleeper/cdk/stack/PropertiesStack.java
+++ b/java/cdk/src/main/java/sleeper/cdk/stack/PropertiesStack.java
@@ -60,7 +60,7 @@ public PropertiesStack(
.environment(Utils.createDefaultEnvironment(instanceProperties))
.description("Lambda for writing instance properties to S3 upon initialisation and teardown")
.logGroup(coreStacks.getLogGroupByFunctionName(functionName))
- .runtime(Runtime.JAVA_11));
+ .runtime(Runtime.JAVA_17));
coreStacks.grantWriteInstanceConfig(propertiesWriterLambda);
diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/QueryStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/QueryStack.java
index 38f449e1a2..cb588322b1 100644
--- a/java/cdk/src/main/java/sleeper/cdk/stack/QueryStack.java
+++ b/java/cdk/src/main/java/sleeper/cdk/stack/QueryStack.java
@@ -36,6 +36,7 @@
import software.amazon.awscdk.services.iam.PolicyStatementProps;
import software.amazon.awscdk.services.lambda.Function;
import software.amazon.awscdk.services.lambda.IFunction;
+import software.amazon.awscdk.services.lambda.Runtime;
import software.amazon.awscdk.services.lambda.eventsources.SqsEventSource;
import software.amazon.awscdk.services.lambda.eventsources.SqsEventSourceProps;
import software.amazon.awscdk.services.s3.BlockPublicAccess;
@@ -144,7 +145,7 @@ private IFunction createFunction(
return queryJar.buildFunction(this, id, builder -> builder
.functionName(functionName)
.description(description)
- .runtime(software.amazon.awscdk.services.lambda.Runtime.JAVA_11)
+ .runtime(Runtime.JAVA_17)
.memorySize(instanceProperties.getInt(QUERY_PROCESSOR_LAMBDA_MEMORY_IN_MB))
.timeout(Duration.seconds(instanceProperties.getInt(QUERY_PROCESSOR_LAMBDA_TIMEOUT_IN_SECONDS)))
.handler(handler)
diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/StateStoreCommitterStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/StateStoreCommitterStack.java
index d6b5285da4..eb65339114 100644
--- a/java/cdk/src/main/java/sleeper/cdk/stack/StateStoreCommitterStack.java
+++ b/java/cdk/src/main/java/sleeper/cdk/stack/StateStoreCommitterStack.java
@@ -22,6 +22,7 @@
import software.amazon.awscdk.services.iam.IGrantable;
import software.amazon.awscdk.services.iam.PolicyStatement;
import software.amazon.awscdk.services.lambda.IFunction;
+import software.amazon.awscdk.services.lambda.Runtime;
import software.amazon.awscdk.services.lambda.eventsources.SqsEventSource;
import software.amazon.awscdk.services.logs.ILogGroup;
import software.amazon.awscdk.services.s3.Bucket;
@@ -54,7 +55,6 @@
import static sleeper.core.properties.instance.CommonProperty.STATESTORE_COMMITTER_LAMBDA_CONCURRENCY_RESERVED;
import static sleeper.core.properties.instance.CommonProperty.STATESTORE_COMMITTER_LAMBDA_MEMORY_IN_MB;
import static sleeper.core.properties.instance.CommonProperty.STATESTORE_COMMITTER_LAMBDA_TIMEOUT_IN_SECONDS;
-import static software.amazon.awscdk.services.lambda.Runtime.JAVA_11;
public class StateStoreCommitterStack extends NestedStack {
private final InstanceProperties instanceProperties;
@@ -135,7 +135,7 @@ private void lambdaToCommitStateStoreUpdates(
IFunction handlerFunction = committerJar.buildFunction(this, "StateStoreCommitter", builder -> builder
.functionName(functionName)
.description("Commits updates to the state store. Used to commit compaction and ingest jobs asynchronously.")
- .runtime(JAVA_11)
+ .runtime(Runtime.JAVA_17)
.memorySize(instanceProperties.getInt(STATESTORE_COMMITTER_LAMBDA_MEMORY_IN_MB))
.timeout(Duration.seconds(instanceProperties.getInt(STATESTORE_COMMITTER_LAMBDA_TIMEOUT_IN_SECONDS)))
.handler("sleeper.statestore.committer.lambda.StateStoreCommitterLambda::handleRequest")
diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/TableMetricsStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/TableMetricsStack.java
index 6db39354a8..af4e2cd742 100644
--- a/java/cdk/src/main/java/sleeper/cdk/stack/TableMetricsStack.java
+++ b/java/cdk/src/main/java/sleeper/cdk/stack/TableMetricsStack.java
@@ -73,7 +73,7 @@ public TableMetricsStack(
IFunction tableMetricsTrigger = metricsJar.buildFunction(this, "MetricsTrigger", builder -> builder
.functionName(triggerFunctionName)
.description("Creates batches of Sleeper tables to calculate metrics for and puts them on a queue to be published")
- .runtime(Runtime.JAVA_11)
+ .runtime(Runtime.JAVA_17)
.handler("sleeper.metrics.TableMetricsTriggerLambda::handleRequest")
.environment(Utils.createDefaultEnvironment(instanceProperties))
.reservedConcurrentExecutions(1)
@@ -83,7 +83,7 @@ public TableMetricsStack(
IFunction tableMetricsPublisher = metricsJar.buildFunction(this, "MetricsPublisher", builder -> builder
.functionName(publishFunctionName)
.description("Generates metrics for a Sleeper table based on info in its state store, and publishes them to CloudWatch")
- .runtime(Runtime.JAVA_11)
+ .runtime(Runtime.JAVA_17)
.handler("sleeper.metrics.TableMetricsLambda::handleRequest")
.environment(Utils.createDefaultEnvironment(instanceProperties))
.reservedConcurrentExecutions(instanceProperties.getInt(METRICS_LAMBDA_CONCURRENCY_RESERVED))
diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/TransactionLogSnapshotStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/TransactionLogSnapshotStack.java
index 3fa0e2b305..b17d6dea66 100644
--- a/java/cdk/src/main/java/sleeper/cdk/stack/TransactionLogSnapshotStack.java
+++ b/java/cdk/src/main/java/sleeper/cdk/stack/TransactionLogSnapshotStack.java
@@ -87,7 +87,7 @@ private void createSnapshotCreationLambda(InstanceProperties instanceProperties,
IFunction snapshotCreationTrigger = statestoreJar.buildFunction(this, "TransactionLogSnapshotCreationTrigger", builder -> builder
.functionName(triggerFunctionName)
.description("Creates batches of Sleeper tables to create transaction log snapshots for and puts them on a queue to be processed")
- .runtime(Runtime.JAVA_11)
+ .runtime(Runtime.JAVA_17)
.handler("sleeper.statestore.snapshot.TransactionLogSnapshotCreationTriggerLambda::handleRequest")
.environment(Utils.createDefaultEnvironment(instanceProperties))
.reservedConcurrentExecutions(1)
@@ -97,7 +97,7 @@ private void createSnapshotCreationLambda(InstanceProperties instanceProperties,
IFunction snapshotCreationLambda = statestoreJar.buildFunction(this, "TransactionLogSnapshotCreation", builder -> builder
.functionName(creationFunctionName)
.description("Creates transaction log snapshots for tables")
- .runtime(Runtime.JAVA_11)
+ .runtime(Runtime.JAVA_17)
.handler("sleeper.statestore.snapshot.TransactionLogSnapshotCreationLambda::handleRequest")
.environment(Utils.createDefaultEnvironment(instanceProperties))
.reservedConcurrentExecutions(instanceProperties.getInt(TRANSACTION_LOG_SNAPSHOT_CREATION_LAMBDA_CONCURRENCY_RESERVED))
@@ -157,7 +157,7 @@ private void createSnapshotDeletionLambda(InstanceProperties instanceProperties,
IFunction snapshotDeletionTrigger = statestoreJar.buildFunction(this, "TransactionLogSnapshotDeletionTrigger", builder -> builder
.functionName(triggerFunctionName)
.description("Creates batches of Sleeper tables to delete old transaction log snapshots for and puts them on a queue to be processed")
- .runtime(Runtime.JAVA_11)
+ .runtime(Runtime.JAVA_17)
.handler("sleeper.statestore.snapshot.TransactionLogSnapshotDeletionTriggerLambda::handleRequest")
.environment(Utils.createDefaultEnvironment(instanceProperties))
.reservedConcurrentExecutions(1)
@@ -167,7 +167,7 @@ private void createSnapshotDeletionLambda(InstanceProperties instanceProperties,
IFunction snapshotDeletionLambda = statestoreJar.buildFunction(this, "TransactionLogSnapshotDeletion", builder -> builder
.functionName(deletionFunctionName)
.description("Deletes old transaction log snapshots for tables")
- .runtime(Runtime.JAVA_11)
+ .runtime(Runtime.JAVA_17)
.handler("sleeper.statestore.snapshot.TransactionLogSnapshotDeletionLambda::handleRequest")
.environment(Utils.createDefaultEnvironment(instanceProperties))
.reservedConcurrentExecutions(instanceProperties.getInt(TRANSACTION_LOG_SNAPSHOT_DELETION_LAMBDA_CONCURRENCY_RESERVED))
diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/TransactionLogTransactionStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/TransactionLogTransactionStack.java
index 8be9944d68..bff6f161c4 100644
--- a/java/cdk/src/main/java/sleeper/cdk/stack/TransactionLogTransactionStack.java
+++ b/java/cdk/src/main/java/sleeper/cdk/stack/TransactionLogTransactionStack.java
@@ -77,7 +77,7 @@ private void createTransactionDeletionLambda(InstanceProperties instanceProperti
IFunction transactionDeletionTrigger = statestoreJar.buildFunction(this, "TransactionLogTransactionDeletionTrigger", builder -> builder
.functionName(triggerFunctionName)
.description("Creates batches of Sleeper tables to delete old transaction log transactions for and puts them on a queue to be processed")
- .runtime(Runtime.JAVA_11)
+ .runtime(Runtime.JAVA_17)
.handler("sleeper.statestore.transaction.TransactionLogTransactionDeletionTriggerLambda::handleRequest")
.environment(Utils.createDefaultEnvironment(instanceProperties))
.reservedConcurrentExecutions(1)
@@ -87,7 +87,7 @@ private void createTransactionDeletionLambda(InstanceProperties instanceProperti
IFunction transactionDeletionLambda = statestoreJar.buildFunction(this, "TransactionLogTransactionDeletion", builder -> builder
.functionName(deletionFunctionName)
.description("Deletes old transaction log transactions for tables")
- .runtime(Runtime.JAVA_11)
+ .runtime(Runtime.JAVA_17)
.handler("sleeper.statestore.transaction.TransactionLogTransactionDeletionLambda::handleRequest")
.environment(Utils.createDefaultEnvironment(instanceProperties))
.reservedConcurrentExecutions(instanceProperties.getInt(TRANSACTION_LOG_TRANSACTION_DELETION_LAMBDA_CONCURRENCY_RESERVED))
diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/VpcStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/VpcStack.java
index f7dc0f66fe..921ff522ed 100644
--- a/java/cdk/src/main/java/sleeper/cdk/stack/VpcStack.java
+++ b/java/cdk/src/main/java/sleeper/cdk/stack/VpcStack.java
@@ -70,7 +70,7 @@ public VpcStack(Construct scope, String id, InstanceProperties instancePropertie
.memorySize(2048)
.description("Lambda for checking the VPC has an associated S3 endpoint")
.logGroup(logging.getLogGroupByFunctionName(functionName))
- .runtime(Runtime.JAVA_11));
+ .runtime(Runtime.JAVA_17));
vpcCheckLambda.addToRolePolicy(new PolicyStatement(new PolicyStatementProps.Builder()
.actions(Lists.newArrayList("ec2:DescribeVpcEndpoints"))
diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/WebSocketQueryStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/WebSocketQueryStack.java
index 85c2020c59..4429d95f9f 100644
--- a/java/cdk/src/main/java/sleeper/cdk/stack/WebSocketQueryStack.java
+++ b/java/cdk/src/main/java/sleeper/cdk/stack/WebSocketQueryStack.java
@@ -34,6 +34,7 @@
import software.amazon.awscdk.services.iam.ServicePrincipal;
import software.amazon.awscdk.services.lambda.IFunction;
import software.amazon.awscdk.services.lambda.Permission;
+import software.amazon.awscdk.services.lambda.Runtime;
import software.amazon.awscdk.services.s3.Bucket;
import software.amazon.awscdk.services.s3.IBucket;
import software.constructs.Construct;
@@ -86,7 +87,7 @@ protected void setupWebSocketApi(InstanceProperties instanceProperties, LambdaCo
.memorySize(256)
.logGroup(coreStacks.getLogGroupByFunctionName(functionName))
.timeout(Duration.seconds(29))
- .runtime(software.amazon.awscdk.services.lambda.Runtime.JAVA_11));
+ .runtime(Runtime.JAVA_17));
queryQueueStack.grantSendMessages(webSocketApiHandler);
coreStacks.grantReadTablesConfig(webSocketApiHandler);
diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/bulkimport/CommonEmrBulkImportHelper.java b/java/cdk/src/main/java/sleeper/cdk/stack/bulkimport/CommonEmrBulkImportHelper.java
index 18b72c9511..c167d1a442 100644
--- a/java/cdk/src/main/java/sleeper/cdk/stack/bulkimport/CommonEmrBulkImportHelper.java
+++ b/java/cdk/src/main/java/sleeper/cdk/stack/bulkimport/CommonEmrBulkImportHelper.java
@@ -22,6 +22,7 @@
import software.amazon.awscdk.services.iam.IRole;
import software.amazon.awscdk.services.iam.PolicyStatement;
import software.amazon.awscdk.services.lambda.IFunction;
+import software.amazon.awscdk.services.lambda.Runtime;
import software.amazon.awscdk.services.lambda.eventsources.SqsEventSource;
import software.amazon.awscdk.services.s3.Bucket;
import software.amazon.awscdk.services.s3.IBucket;
@@ -124,7 +125,7 @@ public IFunction createJobStarterFunction(
.memorySize(1024)
.timeout(Duration.minutes(2))
.environment(env)
- .runtime(software.amazon.awscdk.services.lambda.Runtime.JAVA_11)
+ .runtime(Runtime.JAVA_17)
.handler("sleeper.bulkimport.starter.BulkImportStarterLambda")
.logGroup(coreStacks.getLogGroupByFunctionName(functionName))
.events(Lists.newArrayList(SqsEventSource.Builder.create(jobQueue).batchSize(1).build())));
diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/bulkimport/EksBulkImportStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/bulkimport/EksBulkImportStack.java
index 05226062f3..41eea29275 100644
--- a/java/cdk/src/main/java/sleeper/cdk/stack/bulkimport/EksBulkImportStack.java
+++ b/java/cdk/src/main/java/sleeper/cdk/stack/bulkimport/EksBulkImportStack.java
@@ -32,6 +32,7 @@
import software.amazon.awscdk.services.eks.AwsAuthMapping;
import software.amazon.awscdk.services.eks.Cluster;
import software.amazon.awscdk.services.eks.FargateCluster;
+import software.amazon.awscdk.services.eks.FargateProfile;
import software.amazon.awscdk.services.eks.FargateProfileOptions;
import software.amazon.awscdk.services.eks.KubernetesManifest;
import software.amazon.awscdk.services.eks.KubernetesVersion;
@@ -43,7 +44,9 @@
import software.amazon.awscdk.services.iam.PolicyStatement;
import software.amazon.awscdk.services.iam.Role;
import software.amazon.awscdk.services.lambda.IFunction;
+import software.amazon.awscdk.services.lambda.Runtime;
import software.amazon.awscdk.services.lambda.eventsources.SqsEventSource;
+import software.amazon.awscdk.services.logs.ILogGroup;
import software.amazon.awscdk.services.s3.Bucket;
import software.amazon.awscdk.services.s3.IBucket;
import software.amazon.awscdk.services.sns.Topic;
@@ -143,7 +146,7 @@ public EksBulkImportStack(
.memorySize(1024)
.timeout(Duration.minutes(2))
.environment(env)
- .runtime(software.amazon.awscdk.services.lambda.Runtime.JAVA_11)
+ .runtime(Runtime.JAVA_17)
.handler("sleeper.bulkimport.starter.BulkImportStarterLambda")
.logGroup(coreStacks.getLogGroupByFunctionName(functionName))
.events(Lists.newArrayList(SqsEventSource.Builder.create(bulkImportJobQueue).batchSize(1).build())));
@@ -172,7 +175,7 @@ public EksBulkImportStack(
instanceProperties.set(CdkDefinedInstanceProperty.BULK_IMPORT_EKS_NAMESPACE, uniqueBulkImportId);
ISubnet subnet = Subnet.fromSubnetId(this, "EksBulkImportSubnet", instanceProperties.getList(SUBNETS).get(0));
- bulkImportCluster.addFargateProfile("EksBulkImportFargateProfile", FargateProfileOptions.builder()
+ FargateProfile fargateProfile = bulkImportCluster.addFargateProfile("EksBulkImportFargateProfile", FargateProfileOptions.builder()
.fargateProfileName(uniqueBulkImportId)
.vpc(vpc)
.subnetSelection(SubnetSelection.builder()
@@ -182,6 +185,7 @@ public EksBulkImportStack(
.namespace(uniqueBulkImportId)
.build()))
.build());
+ addFluentBitLogging(bulkImportCluster, fargateProfile, instanceProperties, coreStacks.getLogGroupByEksClusterName(uniqueBulkImportId));
ServiceAccount sparkSubmitServiceAccount = bulkImportCluster.addServiceAccount("SparkSubmitServiceAccount", ServiceAccountOptions.builder()
.namespace(uniqueBulkImportId)
@@ -205,7 +209,7 @@ public EksBulkImportStack(
.build());
addClusterAdminRoles(bulkImportCluster, instanceProperties);
- createManifests(bulkImportCluster, namespace, uniqueBulkImportId, stateMachine.getRole());
+ addRoleManifests(bulkImportCluster, namespace, uniqueBulkImportId, stateMachine.getRole());
importBucketStack.getImportBucket().grantReadWrite(sparkServiceAccount);
stateMachine.grantStartExecution(bulkImportJobStarter);
@@ -271,9 +275,47 @@ private StateMachine createStateMachine(Cluster cluster, InstanceProperties inst
.build();
}
- private KubernetesManifest createNamespace(Cluster bulkImportCluster, String bulkImportNamespace) {
- return createManifestFromResource(bulkImportCluster, "EksBulkImportNamespace", bulkImportNamespace,
- "/k8s/namespace.json");
+ @SuppressWarnings("unchecked")
+ private void addFluentBitLogging(Cluster cluster, FargateProfile fargateProfile, InstanceProperties instanceProperties, ILogGroup logGroup) {
+ // Based on guide at https://docs.aws.amazon.com/eks/latest/userguide/fargate-logging.html
+
+ KubernetesManifest namespace = cluster.addManifest("LoggingNamespace", Map.of(
+ "apiVersion", "v1",
+ "kind", "Namespace",
+ "metadata", Map.of(
+ "name", "aws-observability",
+ "labels", Map.of("aws-observability", "enabled"))));
+
+ // Fluent Bit configuration
+ // See https://docs.fluentbit.io/manual/pipeline/outputs/cloudwatch
+ Function outputReplacements = replacements(Map.of(
+ "region-placeholder", instanceProperties.get(REGION),
+ "log-group-placeholder", logGroup.getLogGroupName()));
+ withDependencyOn(namespace, cluster.addManifest("LoggingConfig", Map.of(
+ "apiVersion", "v1",
+ "kind", "ConfigMap",
+ "metadata", Map.of("name", "aws-logging", "namespace", "aws-observability"),
+ "data", Map.of(
+ "flb_log_cw", "false",
+ "filters.conf", loadResource("/fluentbit/filters.conf"),
+ "output.conf", outputReplacements.apply(loadResource("/fluentbit/output.conf")),
+ "parsers.conf", loadResource("/fluentbit/parsers.conf")))));
+
+ fargateProfile.getPodExecutionRole().addToPrincipalPolicy(PolicyStatement.Builder.create()
+ .effect(Effect.ALLOW)
+ .actions(List.of(
+ "logs:CreateLogStream",
+ "logs:CreateLogGroup",
+ "logs:DescribeLogStreams",
+ "logs:PutLogEvents",
+ "logs:PutRetentionPolicy"))
+ .resources(List.of("*"))
+ .build());
+ }
+
+ @SuppressWarnings("unchecked")
+ private KubernetesManifest createNamespace(Cluster cluster, String namespaceName) {
+ return cluster.addManifest("EksBulkImportNamespace", parseJson("/k8s/namespace.json", namespaceReplacement(namespaceName)));
}
private void addClusterAdminRoles(Cluster cluster, InstanceProperties properties) {
@@ -286,28 +328,23 @@ private void addClusterAdminRoles(Cluster cluster, InstanceProperties properties
}
}
- private void createManifests(Cluster cluster, KubernetesManifest namespace, String namespaceName,
+ @SuppressWarnings("unchecked")
+ private void addRoleManifests(Cluster cluster, KubernetesManifest namespace, String namespaceName,
IRole stateMachineRole) {
- Lists.newArrayList(
- createManifestFromResource(cluster, "SparkSubmitRole", namespaceName, "/k8s/spark-submit-role.json"),
- createManifestFromResource(cluster, "SparkSubmitRoleBinding", namespaceName,
- "/k8s/spark-submit-role-binding.json"),
- createManifestFromResource(cluster, "SparkRole", namespaceName, "/k8s/spark-role.json"),
- createManifestFromResource(cluster, "SparkRoleBinding", namespaceName, "/k8s/spark-role-binding.json"),
- createManifestFromResource(cluster, "StepFunctionRole", namespaceName, "/k8s/step-function-role.json"),
- createManifestFromResource(cluster, "StepFunctionRoleBinding", namespaceName,
- "/k8s/step-function-role-binding.json",
- replacements(Map.of("user-placeholder", stateMachineRole.getRoleArn()))))
- .forEach(manifest -> manifest.getNode().addDependency(namespace));
+ withDependencyOn(namespace,
+ cluster.addManifest("SparkSubmitRole", parseJson("/k8s/spark-submit-role.json", namespaceReplacement(namespaceName))),
+ cluster.addManifest("SparkSubmitRoleBinding", parseJson("/k8s/spark-submit-role-binding.json", namespaceReplacement(namespaceName))),
+ cluster.addManifest("SparkRole", parseJson("/k8s/spark-role.json", namespaceReplacement(namespaceName))),
+ cluster.addManifest("SparkRoleBinding", parseJson("/k8s/spark-role-binding.json", namespaceReplacement(namespaceName))),
+ cluster.addManifest("StepFunctionRole", parseJson("/k8s/step-function-role.json", namespaceReplacement(namespaceName))),
+ cluster.addManifest("StepFunctionRoleBinding", parseJson("/k8s/step-function-role-binding.json",
+ namespaceReplacement(namespaceName).andThen(replacement("user-placeholder", stateMachineRole.getRoleArn())))));
}
- private static KubernetesManifest createManifestFromResource(Cluster cluster, String id, String namespace, String resource) {
- return createManifestFromResource(cluster, id, namespace, resource, json -> json);
- }
-
- private static KubernetesManifest createManifestFromResource(Cluster cluster, String id, String namespace, String resource,
- Function replacements) {
- return cluster.addManifest(id, parseJsonWithNamespace(resource, namespace, replacements));
+ private void withDependencyOn(KubernetesManifest namespace, KubernetesManifest... manifests) {
+ for (KubernetesManifest manifest : manifests) {
+ manifest.getNode().addDependency(namespace);
+ }
}
private static Map parseEksStepDefinition(String resource, InstanceProperties instanceProperties, Cluster cluster) {
@@ -316,43 +353,44 @@ private static Map parseEksStepDefinition(String resource, Insta
private static Map parseEksStepDefinition(
String resource, InstanceProperties instanceProperties, Cluster cluster, Function replacements) {
- return parseJsonWithNamespace(resource,
- instanceProperties.get(CdkDefinedInstanceProperty.BULK_IMPORT_EKS_NAMESPACE),
- replacements(Map.of(
- "endpoint-placeholder", instanceProperties.get(CdkDefinedInstanceProperty.BULK_IMPORT_EKS_CLUSTER_ENDPOINT),
- "cluster-placeholder", cluster.getClusterName(),
- "ca-placeholder", cluster.getClusterCertificateAuthorityData()))
+ return parseJson(resource,
+ namespaceReplacement(instanceProperties.get(CdkDefinedInstanceProperty.BULK_IMPORT_EKS_NAMESPACE))
+ .andThen(replacements(Map.of(
+ "endpoint-placeholder", instanceProperties.get(CdkDefinedInstanceProperty.BULK_IMPORT_EKS_CLUSTER_ENDPOINT),
+ "cluster-placeholder", cluster.getClusterName(),
+ "ca-placeholder", cluster.getClusterCertificateAuthorityData())))
.andThen(replacements));
}
- private static Map parseJsonWithNamespace(
- String resource, String namespace, Function replacements) {
- return parseJson(resource, replacement("namespace-placeholder", namespace).andThen(replacements));
- }
-
private static Map parseJson(
String resource, Function replacements) {
- String json;
+ String json = loadResource(resource);
+ String jsonWithReplacements = replacements.apply(json);
+ return new Gson().fromJson(jsonWithReplacements, new JsonTypeToken());
+ }
+
+ private static String loadResource(String resource) {
try {
- json = IOUtils.toString(Objects.requireNonNull(EksBulkImportStack.class.getResourceAsStream(resource)), StandardCharsets.UTF_8);
+ return IOUtils.toString(Objects.requireNonNull(EksBulkImportStack.class.getResourceAsStream(resource)), StandardCharsets.UTF_8);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
+ }
- String jsonWithReplacements = replacements.apply(json);
- return new Gson().fromJson(jsonWithReplacements, new JsonTypeToken());
+ private static Function namespaceReplacement(String namespace) {
+ return replacement("namespace-placeholder", namespace);
}
private static Function replacement(String key, String value) {
- return json -> json.replace(key, value);
+ return str -> str.replace(key, value);
}
private static Function replacements(Map replacements) {
- return json -> {
+ return str -> {
for (Map.Entry replacement : replacements.entrySet()) {
- json = json.replace(replacement.getKey(), replacement.getValue());
+ str = str.replace(replacement.getKey(), replacement.getValue());
}
- return json;
+ return str;
};
}
diff --git a/java/cdk/src/main/java/sleeper/cdk/util/AutoDeleteS3Objects.java b/java/cdk/src/main/java/sleeper/cdk/util/AutoDeleteS3Objects.java
index 38266020f5..0a2d3f1feb 100644
--- a/java/cdk/src/main/java/sleeper/cdk/util/AutoDeleteS3Objects.java
+++ b/java/cdk/src/main/java/sleeper/cdk/util/AutoDeleteS3Objects.java
@@ -89,7 +89,7 @@ public static void autoDeleteForBucket(
.environment(Utils.createDefaultEnvironmentNoConfigBucket(instanceProperties))
.description("Lambda for auto-deleting S3 objects")
.logGroup(getLogGroupByFunctionName.apply(functionName))
- .runtime(Runtime.JAVA_11)
+ .runtime(Runtime.JAVA_17)
.timeout(Duration.minutes(10)));
bucket.grantRead(lambda);
diff --git a/java/cdk/src/main/resources/fluentbit/filters.conf b/java/cdk/src/main/resources/fluentbit/filters.conf
new file mode 100644
index 0000000000..f00dd75b82
--- /dev/null
+++ b/java/cdk/src/main/resources/fluentbit/filters.conf
@@ -0,0 +1,12 @@
+[FILTER]
+ Name parser
+ Match *
+ Key_name log
+ Parser crio
+[FILTER]
+ Name kubernetes
+ Match kube.*
+ Merge_Log On
+ Keep_Log Off
+ Buffer_Size 0
+ Kube_Meta_Cache_TTL 300s
diff --git a/java/cdk/src/main/resources/fluentbit/output.conf b/java/cdk/src/main/resources/fluentbit/output.conf
new file mode 100644
index 0000000000..a57cdc6469
--- /dev/null
+++ b/java/cdk/src/main/resources/fluentbit/output.conf
@@ -0,0 +1,6 @@
+[OUTPUT]
+ Name cloudwatch_logs
+ Match kube.*
+ region region-placeholder
+ log_group_name log-group-placeholder
+ log_stream_prefix fb-
diff --git a/java/cdk/src/main/resources/fluentbit/parsers.conf b/java/cdk/src/main/resources/fluentbit/parsers.conf
new file mode 100644
index 0000000000..eaa9918bf9
--- /dev/null
+++ b/java/cdk/src/main/resources/fluentbit/parsers.conf
@@ -0,0 +1,6 @@
+[PARSER]
+ Name crio
+ Format Regex
+ Regex ^(?