Skip to content

Commit

Permalink
Standardize code
Browse files Browse the repository at this point in the history
  • Loading branch information
kristyelee committed Sep 24, 2024
1 parent ff6a165 commit 01ca67f
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -586,19 +586,6 @@ void resubscribeForAllPartitions() throws InterruptedException {
}
}

/**
* Removes partitions that are not subscribed / not assigned.
*/

public synchronized void removeUnsubscribedPartitions() {
throwIfNotRunning();
for (PartitionConsumptionState partitionConsumptionState: partitionConsumptionStateMap.values()) {
if (!partitionConsumptionState.isSubscribed()) {
partitionConsumptionStateMap.remove(partitionConsumptionState.getPartition(), partitionConsumptionState);
}
}
}

/**
* Adds an asynchronous partition subscription request for the task.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2034,21 +2034,6 @@ public void testUnsubscribeConsumption(AAConfig aaConfig) throws Exception {
}, aaConfig);
}

@Test(dataProvider = "aaConfigProvider")
public void testRemoveUnsubscribedPartitions(AAConfig aaConfig) throws Exception {
localVeniceWriter.broadcastStartOfPush(new HashMap<>());
localVeniceWriter.put(putKeyFoo, putValue, SCHEMA_ID);

runTest(Utils.setOf(PARTITION_FOO), () -> {
verify(mockLogNotifier, timeout(TEST_TIMEOUT_MS)).started(topic, PARTITION_FOO);
// Start of push has already been consumed. Stop consumption.
storeIngestionTaskUnderTest.unSubscribePartition(fooTopicPartition);
// Unassigned partitions should be removed.
storeIngestionTaskUnderTest.removeUnsubscribedPartitions();
verify(mockLogNotifier, timeout(TEST_TIMEOUT_MS)).stopped(anyString(), anyInt(), anyLong());
}, aaConfig);
}

@Test(dataProvider = "aaConfigProvider")
public void testKillConsumption(AAConfig aaConfig) throws Exception {
final Thread writingThread = new Thread(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,27 +126,6 @@ public void testAddingAPartitionTwice() throws Exception {
Assert.fail("Adding the same partition:" + partitionId + " again did not throw any exception as expected.");
}

public void testRemovingPartition() throws Exception {

init();

// first, add partition
doAddPartition(partitionId);

if (!testStore.containsPartition(partitionId)) {
Assert.fail("Adding a new partition: " + partitionId + "failed!");
}

// remove existing partition
doRemovePartition(partitionId);

Assert.assertEquals(
testStoreEngine.containsPartition(partitionId),
false,
"Failed to remove partition: " + partitionId + " from the store engine!");

}

public void testRemovingPartitionTwice() throws Exception {

init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,6 @@ public void testAddingAPartitionTwice() throws Exception {
super.testAddingAPartitionTwice();
}

@Test
public void testRemovingPartition() throws Exception {
super.testRemovingPartition();
}

@Test
public void testRemovingPartitionTwice() throws Exception {
super.testRemovingPartitionTwice();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@
import com.linkedin.venice.listener.ServerReadMetadataRepository;
import com.linkedin.venice.listener.ServerStoreAclHandler;
import com.linkedin.venice.listener.StoreValueSchemasCacheService;
import com.linkedin.venice.meta.*;
import com.linkedin.venice.meta.ReadOnlyLiveClusterConfigRepository;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.StaticClusterInfoProvider;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.PubSubClientsFactory;
import com.linkedin.venice.schema.SchemaReader;
import com.linkedin.venice.security.SSLFactory;
Expand All @@ -63,7 +67,12 @@
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.lazy.Lazy;
import io.tehuti.metrics.MetricsRepository;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down

0 comments on commit 01ca67f

Please sign in to comment.