flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject [1/8] flink git commit: [FLINK-7011] [kafka] Remove Kafka testStartFromKafkaCommitOffsets ITCases
Date Sat, 01 Jul 2017 12:19:52 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.3 d02e688e7 -> 4a82e3d70


[FLINK-7011] [kafka] Remove Kafka testStartFromKafkaCommitOffsets ITCases

The testStartFromKafkaCommitOffsets ITCases are covering too much within
one single test. The case verifies that whatever offset was committed to
Kafka, Flink rreads it correctly and can uses that as the correct
starting point for exactly-once.

This over-engineered test was instable that we needed to first read some
records and wait until some records is committed. This wait is hard to
define.

It is in fact sufficient to have 2 separate tests to cover the tested
behaviour:
- test that committed Kafka offsets are correct (there is already a
  ITCase for this, i.e. `runCommitOffsetsToKafka`)
- test that committed offsets are correctly picked up and used correctly
  (there is actually also a test for this, i.e.
`runStartFromGroupOffsets`)

Hence, this test can be removed without harming test coverage.

This closes #4190.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/87ff2890
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/87ff2890
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/87ff2890

Branch: refs/heads/release-1.3
Commit: 87ff2890ccec895f950bd9d20e4394cae75e9d5c
Parents: d02e688
Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Authored: Tue Jun 27 15:53:06 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Sat Jul 1 16:18:31 2017 +0800

----------------------------------------------------------------------
 .../connectors/kafka/Kafka010ITCase.java        |  5 --
 .../connectors/kafka/Kafka08ITCase.java         |  5 --
 .../connectors/kafka/Kafka09ITCase.java         |  5 --
 .../connectors/kafka/KafkaConsumerTestBase.java | 88 --------------------
 4 files changed, 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/87ff2890/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
index add623e..dfbacc1 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
@@ -160,11 +160,6 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 	}
 
 	@Test(timeout = 60000)
-	public void testStartFromKafkaCommitOffsets() throws Exception {
-		runStartFromKafkaCommitOffsets();
-	}
-
-	@Test(timeout = 60000)
 	public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
 		runAutoOffsetRetrievalAndCommitToKafka();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/87ff2890/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index 8cc735d..4e9bcf6 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -149,11 +149,6 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 	}
 
 	@Test(timeout = 60000)
-	public void testStartFromZookeeperCommitOffsets() throws Exception {
-		runStartFromKafkaCommitOffsets();
-	}
-
-	@Test(timeout = 60000)
 	public void testAutoOffsetRetrievalAndCommitToZookeeper() throws Exception {
 		runAutoOffsetRetrievalAndCommitToKafka();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/87ff2890/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
index ca9965c..a7259e5 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -140,11 +140,6 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
 	}
 
 	@Test(timeout = 60000)
-	public void testStartFromKafkaCommitOffsets() throws Exception {
-		runStartFromKafkaCommitOffsets();
-	}
-
-	@Test(timeout = 60000)
 	public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
 		runAutoOffsetRetrievalAndCommitToKafka();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/87ff2890/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index ac278fb..285bcf9 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -29,7 +29,6 @@ import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -264,93 +263,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	}
 
 	/**
-	 * This test first writes a total of 300 records to a test topic, reads the first 150 so
that some offsets are
-	 * committed to Kafka, and then startup the consumer again to read the remaining records
starting from the committed offsets.
-	 * The test ensures that whatever offsets were committed to Kafka, the consumer correctly
picks them up
-	 * and starts at the correct position.
-	 */
-	public void runStartFromKafkaCommitOffsets() throws Exception {
-		final int parallelism = 3;
-		final int recordsInEachPartition = 300;
-		final int recordsToConsume = 150;
-		final int consumePause = 50;
-
-		final String topicName = writeSequence("testStartFromKafkaCommitOffsetsTopic", recordsInEachPartition,
parallelism, 1);
-
-		KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler();
-
-		Long o1;
-		Long o2;
-		Long o3;
-		int attempt = 0;
-		// make sure that o1, o2, o3 are not all null before proceeding
-		do {
-			attempt++;
-			LOG.info("Attempt " + attempt + " to read records and commit some offsets to Kafka");
-
-			final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.getConfig().disableSysoutLogging();
-			env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-			env.setParallelism(parallelism);
-			env.enableCheckpointing(20); // fast checkpoints to make sure we commit some offsets
-
-			env
-				.addSource(kafkaServer.getConsumer(topicName, new SimpleStringSchema(), standardProps))
-				.map(new ThrottledMapper<String>(consumePause))
-				.map(new MapFunction<String, Object>() {
-					int count = 0;
-					@Override
-					public Object map(String value) throws Exception {
-						count++;
-						if (count == recordsToConsume) {
-							throw new SuccessException();
-						}
-						return null;
-					}
-				})
-				.addSink(new DiscardingSink<>());
-
-			tryExecute(env, "Read some records to commit offsets to Kafka");
-
-			o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
-			o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
-			o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
-		} while (o1 == null && o2 == null && o3 == null && attempt <
3);
-
-		if (o1 == null && o2 == null && o3 == null) {
-			throw new RuntimeException("No offsets have been committed after 3 attempts");
-		}
-
-		LOG.info("Got final committed offsets from Kafka o1={}, o2={}, o3={}", o1, o2, o3);
-
-		final StreamExecutionEnvironment env2 = StreamExecutionEnvironment.getExecutionEnvironment();
-		env2.getConfig().disableSysoutLogging();
-		env2.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-		env2.setParallelism(parallelism);
-
-		// whatever offsets were committed for each partition, the consumer should pick
-		// them up and start from the correct position so that the remaining records are all read
-		HashMap<Integer, Tuple2<Integer, Integer>> partitionsToValuesCountAndStartOffset
= new HashMap<>();
-		partitionsToValuesCountAndStartOffset.put(0, new Tuple2<>(
-			(o1 != null) ? (int) (recordsInEachPartition - o1) : recordsInEachPartition,
-			(o1 != null) ? o1.intValue() : 0
-		));
-		partitionsToValuesCountAndStartOffset.put(1, new Tuple2<>(
-			(o2 != null) ? (int) (recordsInEachPartition - o2) : recordsInEachPartition,
-			(o2 != null) ? o2.intValue() : 0
-		));
-		partitionsToValuesCountAndStartOffset.put(2, new Tuple2<>(
-			(o3 != null) ? (int) (recordsInEachPartition - o3) : recordsInEachPartition,
-			(o3 != null) ? o3.intValue() : 0
-		));
-
-		readSequence(env2, StartupMode.GROUP_OFFSETS, null, standardProps, topicName, partitionsToValuesCountAndStartOffset);
-
-		kafkaOffsetHandler.close();
-		deleteTestTopic(topicName);
-	}
-
-	/**
 	 * This test ensures that when the consumers retrieve some start offset from kafka (earliest,
latest), that this offset
 	 * is committed to Kafka, even if some partitions are not read.
 	 *


Mime
View raw message