Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server] Dropping unassigned partitions #1196

Open
wants to merge 31 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
cc1d7ff
Initial commit
kristyelee Sep 12, 2024
c4b6682
First test for removal of partition
kristyelee Sep 17, 2024
8093581
Write function to remove unsubscribed (unassigned) partitions
kristyelee Sep 18, 2024
c8ebe2d
Merge branch 'linkedin:main' into kristy_lee/650
kristyelee Sep 18, 2024
3eaeafb
Write test function to test removing unsubscribed (unassigned) partit…
kristyelee Sep 18, 2024
0e24cec
Update StorageEngine intializer in VeniceServer
kristyelee Sep 23, 2024
ff6a165
StorageService arguments set to initial state in VeniceServer
kristyelee Sep 24, 2024
01ca67f
Standardize code
kristyelee Sep 24, 2024
fd14574
Standardize code
kristyelee Sep 24, 2024
56e7711
Update ideal state
kristyelee Sep 24, 2024
04c4e9e
Update initialized StorageService object [with revised import]
kristyelee Sep 25, 2024
e09f7bf
[Placeholder]
kristyelee Sep 25, 2024
6c50513
Updated StorageService constructor and initializer with functionToChe…
kristyelee Sep 27, 2024
06dd6c2
Updated StorageService constructor and initializer [modified]
kristyelee Sep 27, 2024
53d177c
Revised StorageService constructor
kristyelee Sep 27, 2024
d8bad05
Merge branch 'linkedin:main' into kristy_lee/650
kristyelee Sep 27, 2024
df1c0b9
Code restructure: verifying storage partition
kristyelee Sep 30, 2024
54d8708
Code restructure: move storage partition check to AbstractStorageEngine
kristyelee Oct 1, 2024
14924e1
Merge branch 'linkedin:main' into kristy_lee/650
kristyelee Oct 1, 2024
eb5dd89
[Commented modified code]
kristyelee Oct 2, 2024
d55f704
Code restructure: move storage partition check to AbstractStorageEngine
kristyelee Oct 2, 2024
3ffd26d
Code restructure
kristyelee Oct 2, 2024
c63d62f
Retain relevant/used code changes
kristyelee Oct 3, 2024
3979b63
StorageService unit test
kristyelee Oct 7, 2024
b926839
Updates to StorageService unit test
kristyelee Oct 7, 2024
9a8b968
StorageService + unit test update
kristyelee Oct 8, 2024
a32b55c
Update to StorageService unit test
kristyelee Oct 10, 2024
8b27c8d
Apply review comments and code addition for hostname comparison.
kristyelee Oct 11, 2024
3431081
Apply review comments
kristyelee Oct 12, 2024
528f7a9
Apply review comments
kristyelee Oct 14, 2024
cdf99f7
[server] Remove storage partitions not assigned to host
kristyelee Oct 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.linkedin.venice.ConfigKeys;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceNoStoreException;
import com.linkedin.venice.helix.SafeHelixDataAccessor;
import com.linkedin.venice.helix.SafeHelixManager;
import com.linkedin.venice.kafka.protocol.state.PartitionState;
import com.linkedin.venice.kafka.protocol.state.StoreVersionState;
import com.linkedin.venice.meta.PersistenceType;
Expand All @@ -42,6 +44,8 @@
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.helix.PropertyKey;
import org.apache.helix.model.IdealState;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.rocksdb.RocksDBException;
Expand Down Expand Up @@ -371,6 +375,38 @@ public synchronized AbstractStorageEngine openStore(
return engine;
}

public synchronized void checkWhetherStoragePartitionsShouldBeKeptOrNot(SafeHelixManager manager) {
if (getStorageEngineRepository() == null || manager == null) {
kristyelee marked this conversation as resolved.
Show resolved Hide resolved
return;
}
for (AbstractStorageEngine storageEngine: getStorageEngineRepository().getAllLocalStorageEngines()) {
String storageEngineName = storageEngine.getStoreVersionName();
String storeName = Version.parseStoreFromKafkaTopicName(storageEngineName);
PropertyKey.Builder propertyKeyBuilder =
new PropertyKey.Builder(configLoader.getVeniceClusterConfig().getClusterName());
SafeHelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
IdealState idealState = helixDataAccessor.getProperty(propertyKeyBuilder.idealStates(storeName));

if (idealState == null) {
return;
kristyelee marked this conversation as resolved.
Show resolved Hide resolved
}

Set<Integer> idealStatePartitionIds = new HashSet<>();

idealState.getPartitionSet().stream().forEach(partitionId -> {
kristyelee marked this conversation as resolved.
Show resolved Hide resolved
idealStatePartitionIds.add(Integer.parseInt(partitionId));
});
Set<Integer> storageEnginePartitionIds = storageEngine.getPartitionIds();

for (Integer storageEnginePartitionId: storageEnginePartitionIds) {
if (idealStatePartitionIds.contains(storageEnginePartitionId)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also isn't quite right. The ideal state is the superset of all partitions for the store. All partitions ID's in the storageEngine will be in the superset.

We need to parse more information from the ideal state. Specifically, for each partition that is on this node, we need to determine if the ideal state for that partition id contains the node name.

To better paint the picture, an ideal state is a big json document that looks like this:

"Test_Store_Migration_Demo_v1_0": {
"lor1-app56585.prod.linkedin.com_1690": "LEADER",
"lor1-app56614.prod.linkedin.com_1690": "STANDBY",
"lor1-app110448.prod.linkedin.com_1690": "STANDBY"
},
"Test_Store_Migration_Demo_v1_1": {
"lor1-app56586.prod.linkedin.com_1690": "LEADER",
"lor1-app71895.prod.linkedin.com_1690": "STANDBY",
"lor1-app111181.prod.linkedin.com_1690": "STANDBY"
},

Here, for partitions 1 and 0, we have three hosts assigned roles for that partition. What we should do is, does the current host we're working with reside in these lists?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have written the comparison and retrieved the current host from manager.getInstanceName().

continue;
}
storageEngine.dropPartition(storageEnginePartitionId);
}
}
}

/**
* Drops the partition of the specified store version in the storage service. When all data partitions are dropped,
* it will also drop the storage engine of the specific store version.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package com.linkedin.davinci.storage;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.linkedin.davinci.config.VeniceClusterConfig;
import com.linkedin.davinci.config.VeniceConfigLoader;
import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.config.VeniceStoreVersionConfig;
Expand All @@ -14,6 +17,8 @@
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.store.StorageEngineFactory;
import com.linkedin.venice.exceptions.VeniceNoStoreException;
import com.linkedin.venice.helix.SafeHelixDataAccessor;
import com.linkedin.venice.helix.SafeHelixManager;
import com.linkedin.venice.kafka.protocol.state.PartitionState;
import com.linkedin.venice.kafka.protocol.state.StoreVersionState;
import com.linkedin.venice.meta.PartitionerConfig;
Expand All @@ -23,12 +28,18 @@
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
import com.linkedin.venice.utils.Utils;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey;
import org.apache.helix.model.IdealState;
import org.mockito.internal.util.collections.Sets;
import org.testng.Assert;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -121,4 +132,49 @@ public void testGetStoreAndUserPartitionsMapping() {
expectedMapping.put(resourceName, partitionSet);
Assert.assertEquals(storageService.getStoreAndUserPartitionsMapping(), expectedMapping);
}

@Test
public void testCheckWhetherStoragePartitionsShouldBeKeptOrNot() throws NoSuchFieldException, IllegalAccessException {
StorageService mockStorageService = mock(StorageService.class);
SafeHelixManager manager = mock(SafeHelixManager.class);
HelixManager helixManager = mock(HelixManager.class);
when(manager.getOriginalManager()).thenReturn(helixManager);
StorageEngineRepository mockStorageEngineRepository = mock(StorageEngineRepository.class);
AbstractStorageEngine abstractStorageEngine = mock(AbstractStorageEngine.class);
mockStorageEngineRepository.addLocalStorageEngine(abstractStorageEngine);

String storeName = "test_store";
when(abstractStorageEngine.getStoreVersionName()).thenReturn(storeName);

String clusterName = "test_cluster";
VeniceConfigLoader mockVeniceConfigLoader = mock(VeniceConfigLoader.class);
VeniceServerConfig mockServerConfig = mock(VeniceServerConfig.class);
VeniceClusterConfig mockClusterConfig = mock(VeniceClusterConfig.class);
when(mockServerConfig.getDataBasePath()).thenReturn("/tmp");
when(mockVeniceConfigLoader.getVeniceServerConfig()).thenReturn(mockServerConfig);
when(mockVeniceConfigLoader.getVeniceClusterConfig()).thenReturn(mockClusterConfig);
when(mockVeniceConfigLoader.getVeniceClusterConfig().getClusterName()).thenReturn(clusterName);

List<AbstractStorageEngine> localStorageEngines = new ArrayList<>();
localStorageEngines.add(abstractStorageEngine);

SafeHelixDataAccessor helixDataAccessor = mock(SafeHelixDataAccessor.class);
when(manager.getHelixDataAccessor()).thenReturn(helixDataAccessor);
IdealState idealState = mock(IdealState.class);
when(helixDataAccessor.getProperty((PropertyKey) any())).thenReturn(idealState);
Set<String> helixPartitionSet = new HashSet<>();
when(idealState.getPartitionSet()).thenReturn(helixPartitionSet);

Field storageEngineRepositoryField = StorageService.class.getDeclaredField("storageEngineRepository");
storageEngineRepositoryField.setAccessible(true);
storageEngineRepositoryField.set(mockStorageService, mockStorageEngineRepository);
when(mockStorageService.getStorageEngineRepository()).thenReturn(mockStorageEngineRepository);
when(mockStorageService.getStorageEngineRepository().getAllLocalStorageEngines()).thenReturn(localStorageEngines);
Field configLoaderField = StorageService.class.getDeclaredField("configLoader");
configLoaderField.setAccessible(true);
configLoaderField.set(mockStorageService, mockVeniceConfigLoader);

doCallRealMethod().when(mockStorageService).checkWhetherStoragePartitionsShouldBeKeptOrNot(manager);
mockStorageService.checkWhetherStoragePartitionsShouldBeKeptOrNot(manager);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.linkedin.venice.listener.ServerReadMetadataRepository;
import com.linkedin.venice.listener.ServerStoreAclHandler;
import com.linkedin.venice.listener.StoreValueSchemasCacheService;
import com.linkedin.venice.meta.IngestionMode;
import com.linkedin.venice.meta.ReadOnlyLiveClusterConfigRepository;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
Expand Down Expand Up @@ -71,6 +72,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -310,14 +312,20 @@ private List<AbstractVeniceService> createServices() {
? new RocksDBMemoryStats(metricsRepository, "RocksDBMemoryStats", plainTableEnabled)
: null;

boolean whetherToRestoreDataPartitions = !isIsolatedIngestion()
kristyelee marked this conversation as resolved.
Show resolved Hide resolved
|| veniceConfigLoader.getVeniceServerConfig().freezeIngestionIfReadyToServeOrLocalDataExists();

// Create and add StorageService. storeRepository will be populated by StorageService
storageService = new StorageService(
veniceConfigLoader,
storageEngineStats,
rocksDBMemoryStats,
storeVersionStateSerializer,
partitionStateSerializer,
metadataRepo);
metadataRepo,
whetherToRestoreDataPartitions,
true,
functionToCheckWhetherStorageEngineShouldBeKeptOrNot());
kristyelee marked this conversation as resolved.
Show resolved Hide resolved
storageEngineMetadataService =
new StorageEngineMetadataService(storageService.getStorageEngineRepository(), partitionStateSerializer);
services.add(storageEngineMetadataService);
Expand Down Expand Up @@ -355,6 +363,11 @@ private List<AbstractVeniceService> createServices() {
return helixData;
});

managerFuture.thenApply(manager -> {
storageService.checkWhetherStoragePartitionsShouldBeKeptOrNot(manager);
return true;
});

heartbeatMonitoringService = new HeartbeatMonitoringService(
metricsRepository,
metadataRepo,
Expand Down Expand Up @@ -693,6 +706,14 @@ protected VeniceConfigLoader getConfigLoader() {
return veniceConfigLoader;
}

protected final boolean isIsolatedIngestion() {
return veniceConfigLoader.getVeniceServerConfig().getIngestionMode().equals(IngestionMode.ISOLATED);
}

private Function<String, Boolean> functionToCheckWhetherStorageEngineShouldBeKeptOrNot() {
return storageEngineName -> true;
}

public MetricsRepository getMetricsRepository() {
return metricsRepository;
}
Expand Down
Loading