flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject [1/4] flink git commit: [hotfix] [kafka] Add argument checks to rescaling tests in FlinkKafkaConsumerBase
Date Mon, 24 Jul 2017 10:44:00 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.3 ab28c8e84 -> 45a5f875b


[hotfix] [kafka] Add argument checks to rescaling tests in FlinkKafkaConsumerBase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c94f0d56
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c94f0d56
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c94f0d56

Branch: refs/heads/release-1.3
Commit: c94f0d56f9e40ec65d8a3b57ca26fd66c164e333
Parents: 6abd402
Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Authored: Mon Jul 24 17:20:52 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Mon Jul 24 18:39:38 2017 +0800

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaConsumerBaseTest.java  | 14 ++++++++------
 1 file changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c94f0d56/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index de0df5f..04508dc 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -36,7 +35,6 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
@@ -60,9 +58,7 @@ import java.util.Set;
 
 import static org.hamcrest.Matchers.everyItem;
 import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.is;
 import static org.hamcrest.collection.IsIn.isIn;
-import static org.hamcrest.collection.IsMapContaining.hasEntry;
 import static org.hamcrest.collection.IsMapContaining.hasKey;
 import static org.hamcrest.core.IsNot.not;
 import static org.junit.Assert.assertEquals;
@@ -554,7 +550,7 @@ public class FlinkKafkaConsumerBaseTest {
 
 	@Test
 	public void testScaleDown() throws Exception {
-		testRescaling(5, 10, 2, 1);
+		testRescaling(5, 10, 2, 100);
 	}
 
 	/**
@@ -570,6 +566,10 @@ public class FlinkKafkaConsumerBaseTest {
 		final int restoredParallelism,
 		final int restoredNumPartitions) throws Exception {
 
+		Preconditions.checkArgument(
+			restoredNumPartitions >= numPartitions,
+			"invalid test case for Kafka repartitioning; Kafka only allows increasing partitions.");
+
 		List<KafkaTopicPartition> mockFetchedPartitionsOnStartup = new ArrayList<>();
 		for (int i = 0; i < numPartitions; i++) {
 			mockFetchedPartitionsOnStartup.add(new KafkaTopicPartition("test-topic", i));
@@ -605,7 +605,8 @@ public class FlinkKafkaConsumerBaseTest {
 			globalSubscribedPartitions.putAll(subscribedPartitions);
 		}
 
-
+		// any new partitions after the restore should not have been picked up;
+		// global number of subscribed partitions should still equal the original number during
the fresh run
 		assertThat(globalSubscribedPartitions.values(), hasSize(numPartitions));
 		assertThat(mockFetchedPartitionsOnStartup, everyItem(isIn(globalSubscribedPartitions.keySet())));
 
@@ -846,6 +847,7 @@ public class FlinkKafkaConsumerBaseTest {
 			this.isAutoCommitEnabled = isAutoCommitEnabled;
 		}
 	}
+
 	private static final class TestingListState<T> implements ListState<T> {
 
 		private final List<T> list = new ArrayList<>();


Mime
View raw message