Skip to content

Commit

Permalink
StorageService + unit test update
Browse files Browse the repository at this point in the history
  • Loading branch information
kristyelee committed Oct 8, 2024
1 parent b926839 commit 9a8b968
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.store.rocksdb.RocksDBUtils;
import com.linkedin.venice.utils.ExceptionUtils;
import com.linkedin.venice.utils.LatencyUtils;
import com.linkedin.venice.utils.Utils;
Expand Down Expand Up @@ -376,33 +377,32 @@ public synchronized AbstractStorageEngine openStore(
}

public synchronized void checkWhetherStoragePartitionsShouldBeKeptOrNot(SafeHelixManager manager) {
if (getStorageEngineRepository() == null || manager == null) {
if (manager == null) {
return;
}
for (AbstractStorageEngine storageEngine: getStorageEngineRepository().getAllLocalStorageEngines()) {
String storageEngineName = storageEngine.getStoreVersionName();
String storeName = Version.parseStoreFromKafkaTopicName(storageEngineName);
Set<Integer> storageEnginePartitionIds = storageEngine.getPartitionIds();
PropertyKey.Builder propertyKeyBuilder =
new PropertyKey.Builder(configLoader.getVeniceClusterConfig().getClusterName());
SafeHelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
IdealState idealState = helixDataAccessor.getProperty(propertyKeyBuilder.idealStates(storeName));

if (idealState == null) {
return;
}

Set<Integer> idealStatePartitionIds = new HashSet<>();

idealState.getPartitionSet().stream().forEach(partitionId -> {
idealStatePartitionIds.add(Integer.parseInt(partitionId));
});
Set<Integer> storageEnginePartitionIds = storageEngine.getPartitionIds();

for (Integer storageEnginePartitionId: storageEnginePartitionIds) {
if (idealStatePartitionIds.contains(storageEnginePartitionId)) {
continue;
continue;
} else {
Set<Integer> idealStatePartitionIds = new HashSet<>();
idealState.getPartitionSet().stream().forEach(partitionDbName -> {
idealStatePartitionIds.add(RocksDBUtils.parsePartitionIdFromPartitionDbName(partitionDbName));
});

for (Integer storageEnginePartitionId: storageEnginePartitionIds) {
if (idealStatePartitionIds.contains(storageEnginePartitionId)) {
continue;
}
storageEngine.dropPartition(storageEnginePartitionId);
}
storageEngine.dropPartition(storageEnginePartitionId);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,10 @@ public synchronized Set<Integer> getPartitionIds() {
return this.partitionList.values().stream().map(Partition::getPartitionId).collect(Collectors.toSet());
}

public synchronized SparseConcurrentList<Partition> getPartitionList() {
return this.partitionList;
}

public AbstractStoragePartition getPartitionOrThrow(int partitionId) {
AbstractStoragePartition partition;
ReadWriteLock readWriteLock = getRWLockForPartitionOrThrow(partitionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,12 @@ public void testCheckWhetherStoragePartitionsShouldBeKeptOrNot() throws NoSuchFi
AbstractStorageEngine abstractStorageEngine = mock(AbstractStorageEngine.class);
mockStorageEngineRepository.addLocalStorageEngine(abstractStorageEngine);

String resourceName = "test_store_v1";
String storeName = "test_store";
when(abstractStorageEngine.getStoreVersionName()).thenReturn(storeName);
when(abstractStorageEngine.getStoreVersionName()).thenReturn(resourceName);
abstractStorageEngine.addStoragePartition(1);
abstractStorageEngine.addStoragePartition(2);
abstractStorageEngine.addStoragePartition(3);

String clusterName = "test_cluster";
VeniceConfigLoader mockVeniceConfigLoader = mock(VeniceConfigLoader.class);
Expand All @@ -162,8 +166,10 @@ public void testCheckWhetherStoragePartitionsShouldBeKeptOrNot() throws NoSuchFi
when(manager.getHelixDataAccessor()).thenReturn(helixDataAccessor);
IdealState idealState = mock(IdealState.class);
when(helixDataAccessor.getProperty((PropertyKey) any())).thenReturn(idealState);
Set<String> helixPartitionSet = new HashSet<>();
Set<String> helixPartitionSet = new HashSet<>(Arrays.asList("test_store_v1_1", "test_store_v1_2"));
when(idealState.getPartitionSet()).thenReturn(helixPartitionSet);
Set<Integer> partitionSet = new HashSet<>(Arrays.asList(1, 2, 3));
when(abstractStorageEngine.getPartitionIds()).thenReturn(partitionSet);

Field storageEngineRepositoryField = StorageService.class.getDeclaredField("storageEngineRepository");
storageEngineRepositoryField.setAccessible(true);
Expand All @@ -173,6 +179,9 @@ public void testCheckWhetherStoragePartitionsShouldBeKeptOrNot() throws NoSuchFi
Field configLoaderField = StorageService.class.getDeclaredField("configLoader");
configLoaderField.setAccessible(true);
configLoaderField.set(mockStorageService, mockVeniceConfigLoader);
Field partitionListField = AbstractStorageEngine.class.getDeclaredField("partitionList");
partitionListField.setAccessible(true);
partitionListField.set(abstractStorageEngine, abstractStorageEngine.getPartitionList());

doCallRealMethod().when(mockStorageService).checkWhetherStoragePartitionsShouldBeKeptOrNot(manager);
mockStorageService.checkWhetherStoragePartitionsShouldBeKeptOrNot(manager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import com.linkedin.venice.listener.ServerReadMetadataRepository;
import com.linkedin.venice.listener.ServerStoreAclHandler;
import com.linkedin.venice.listener.StoreValueSchemasCacheService;
import com.linkedin.venice.meta.IngestionMode;
import com.linkedin.venice.meta.ReadOnlyLiveClusterConfigRepository;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
Expand Down Expand Up @@ -72,7 +71,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -312,20 +310,14 @@ private List<AbstractVeniceService> createServices() {
? new RocksDBMemoryStats(metricsRepository, "RocksDBMemoryStats", plainTableEnabled)
: null;

boolean whetherToRestoreDataPartitions = !isIsolatedIngestion()
|| veniceConfigLoader.getVeniceServerConfig().freezeIngestionIfReadyToServeOrLocalDataExists();

// Create and add StorageService. storeRepository will be populated by StorageService
storageService = new StorageService(
veniceConfigLoader,
storageEngineStats,
rocksDBMemoryStats,
storeVersionStateSerializer,
partitionStateSerializer,
metadataRepo,
whetherToRestoreDataPartitions,
true,
functionToCheckWhetherStorageEngineShouldBeKeptOrNot());
metadataRepo);
storageEngineMetadataService =
new StorageEngineMetadataService(storageService.getStorageEngineRepository(), partitionStateSerializer);
services.add(storageEngineMetadataService);
Expand Down Expand Up @@ -706,14 +698,6 @@ protected VeniceConfigLoader getConfigLoader() {
return veniceConfigLoader;
}

protected final boolean isIsolatedIngestion() {
return veniceConfigLoader.getVeniceServerConfig().getIngestionMode().equals(IngestionMode.ISOLATED);
}

private Function<String, Boolean> functionToCheckWhetherStorageEngineShouldBeKeptOrNot() {
return storageEngineName -> true;
}

public MetricsRepository getMetricsRepository() {
return metricsRepository;
}
Expand Down

0 comments on commit 9a8b968

Please sign in to comment.