Skip to content

Commit

Permalink
Apply review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
kristyelee committed Oct 12, 2024
1 parent 3431081 commit 91f8519
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -388,30 +388,17 @@ public synchronized void checkWhetherStoragePartitionsShouldBeKeptOrNot(SafeHeli
new PropertyKey.Builder(configLoader.getVeniceClusterConfig().getClusterName());
SafeHelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
IdealState idealState = helixDataAccessor.getProperty(propertyKeyBuilder.idealStates(storeName));
String listenerHostName = manager.getInstanceName();
String instanceHostName = manager.getInstanceName();

if (idealState != null) {
Set<Integer> idealStatePartitionIds = new HashSet<>();
Set<String> partitionSet = idealState.getPartitionSet();
partitionSet.stream().forEach(partitionDbName -> {
idealStatePartitionIds.add(RocksDBUtils.parsePartitionIdFromPartitionDbName(partitionDbName));
});

Map<String, Map<String, String>> mapFields = idealState.getRecord().getMapFields();
for (Map.Entry<String, Map<String, String>> entry: mapFields.entrySet()) {
boolean keepPartition = false;
String partitionDbName = entry.getKey();
int partitionId = RocksDBUtils.parsePartitionIdFromPartitionDbName(partitionDbName);
if (!storageEnginePartitionIds.contains(partitionId)) {
continue;
}
for (String hostName: entry.getValue().keySet()) {
if (hostName.equals(listenerHostName)) {
keepPartition = true;
break;
}
}
if (!keepPartition) {
if (!entry.getValue().containsKey(instanceHostName)) {
storageEngine.dropPartition(partitionId);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -695,10 +695,6 @@ public synchronized Set<Integer> getPartitionIds() {
return this.partitionList.values().stream().map(Partition::getPartitionId).collect(Collectors.toSet());
}

public synchronized SparseConcurrentList<Partition> getPartitionList() {
return this.partitionList;
}

public AbstractStoragePartition getPartitionOrThrow(int partitionId) {
AbstractStoragePartition partition;
ReadWriteLock readWriteLock = getRWLockForPartitionOrThrow(partitionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey;
import org.apache.helix.model.IdealState;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
Expand Down Expand Up @@ -142,8 +141,6 @@ public void testGetStoreAndUserPartitionsMapping() {
public void testCheckWhetherStoragePartitionsShouldBeKeptOrNot() throws NoSuchFieldException, IllegalAccessException {
StorageService mockStorageService = mock(StorageService.class);
SafeHelixManager manager = mock(SafeHelixManager.class);
HelixManager helixManager = mock(HelixManager.class);
when(manager.getOriginalManager()).thenReturn(helixManager);
StorageEngineRepository mockStorageEngineRepository = mock(StorageEngineRepository.class);
AbstractStorageEngine abstractStorageEngine = mock(AbstractStorageEngine.class);
mockStorageEngineRepository.addLocalStorageEngine(abstractStorageEngine);
Expand Down Expand Up @@ -171,23 +168,21 @@ public void testCheckWhetherStoragePartitionsShouldBeKeptOrNot() throws NoSuchFi
when(manager.getHelixDataAccessor()).thenReturn(helixDataAccessor);
IdealState idealState = mock(IdealState.class);
when(helixDataAccessor.getProperty((PropertyKey) any())).thenReturn(idealState);
Set<String> helixPartitionSet = new HashSet<>(Arrays.asList("test_store_v1_0", "test_store_v1_1"));
when(idealState.getPartitionSet()).thenReturn(helixPartitionSet);
ZNRecord record = new ZNRecord("testId");
Map<String, Map<String, String>> mapFields = new HashMap<>();
Map<String, String> testPartitionZero = new HashMap<>();
Map<String, String> testPartitionOne = new HashMap<>();
testPartitionZero.put("lor1-app56585.prod.linkedin.com_1690", "LEADER");
testPartitionZero.put("lor1-app56614.prod.linkedin.com_1690", "STANDBY");
testPartitionZero.put("lor1-app110448.prod.linkedin.com_1690", "STANDBY");
testPartitionOne.put("lor1-app56586.prod.linkedin.com_1690", "LEADER");
testPartitionOne.put("lor1-app71895.prod.linkedin.com_1690", "STANDBY");
testPartitionOne.put("lor1-app111181.prod.linkedin.com_1690", "STANDBY");
testPartitionZero.put("host_1430", "LEADER");
testPartitionZero.put("host_1435", "STANDBY");
testPartitionZero.put("host_1440", "STANDBY");
testPartitionOne.put("host_1520", "LEADER");
testPartitionOne.put("host_1525", "STANDBY");
testPartitionOne.put("host_1530", "STANDBY");
mapFields.put("test_store_v1_0", testPartitionZero);
mapFields.put("test_store_v1_1", testPartitionOne);
record.setMapFields(mapFields);
when(idealState.getRecord()).thenReturn(record);
when(manager.getInstanceName()).thenReturn("lor1-app56586.prod.linkedin.com_1690");
when(manager.getInstanceName()).thenReturn("host_1520");

Set<Integer> partitionSet = new HashSet<>(Arrays.asList(0, 1));
when(abstractStorageEngine.getPartitionIds()).thenReturn(partitionSet);
Expand All @@ -208,12 +203,10 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
Field configLoaderField = StorageService.class.getDeclaredField("configLoader");
configLoaderField.setAccessible(true);
configLoaderField.set(mockStorageService, mockVeniceConfigLoader);
Field partitionListField = AbstractStorageEngine.class.getDeclaredField("partitionList");
partitionListField.setAccessible(true);
partitionListField.set(abstractStorageEngine, abstractStorageEngine.getPartitionList());

doCallRealMethod().when(mockStorageService).checkWhetherStoragePartitionsShouldBeKeptOrNot(manager);
mockStorageService.checkWhetherStoragePartitionsShouldBeKeptOrNot(manager);
Assert.assertFalse(abstractStorageEngine.containsPartition(0));
verify(abstractStorageEngine).dropPartition(0);
}
}

0 comments on commit 91f8519

Please sign in to comment.