flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/2] flink git commit: [tests] Improve Kafka concurrent produce/consumer test by allowing retries when connecting to non-leader broker.
Date Wed, 23 Sep 2015 13:25:24 GMT
[tests] Improve Kafka concurrent produce/consumer test by allowing retries when connecting
to non-leader broker.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ea3b5efd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ea3b5efd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ea3b5efd

Branch: refs/heads/master
Commit: ea3b5efdddd6c5cf769dddcbcfef16b75574a59f
Parents: 44fe370
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Sep 23 13:31:13 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Sep 23 14:18:11 2015 +0200

----------------------------------------------------------------------
 .../connectors/kafka/KafkaConsumerTestBase.java | 36 +++++++++++++++++---
 .../connectors/kafka/KafkaTestBase.java         | 28 +++++++++++++++
 2 files changed, 59 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ea3b5efd/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index ed1644c..5016e7e 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -40,6 +40,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
@@ -64,11 +65,14 @@ import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOn
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
 import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.testutils.junit.RetryOnException;
+import org.apache.flink.testutils.junit.RetryRule;
 import org.apache.flink.util.Collector;
 
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.junit.Assert;
 
+import org.junit.Rule;
 import scala.collection.Seq;
 
 import java.lang.reflect.Field;
@@ -93,8 +97,10 @@ import static org.junit.Assert.fail;
 
 @SuppressWarnings("serial")
 public abstract class KafkaConsumerTestBase extends KafkaTestBase {
-
-
+	
+	@Rule
+	public RetryRule retryRule = new RetryRule();
+	
 	// ------------------------------------------------------------------------
 	//  Required methods by the abstract test base
 	// ------------------------------------------------------------------------
@@ -242,11 +248,16 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	 * <pre>
 	 * (generator source) --> (kafka sink)-[KAFKA-TOPIC]-(kafka source) --> (validating
sink)
 	 * </pre>
+	 * 
+	 * We need to externally retry this test. We cannot let Flink's retry mechanism do it, because
the Kafka producer
+	 * does not guarantee exactly-once output. Hence a recovery would introduce duplicates that
+	 * cause the test to fail.
 	 */
+	@RetryOnException(times=2, exception=kafka.common.NotLeaderForPartitionException.class)
 	public void runSimpleConcurrentProducerConsumerTopology() throws Exception {
 		LOG.info("Starting runSimpleConcurrentProducerConsumerTopology()");
 
-		final String topic = "concurrentProducerConsumerTopic";
+		final String topic = "concurrentProducerConsumerTopic_" + UUID.randomUUID().toString();
 		final int parallelism = 3;
 		final int elementsPerPartition = 100;
 		final int totalElements = parallelism * elementsPerPartition;
@@ -256,7 +267,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		final StreamExecutionEnvironment env =
 				StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
 		env.setParallelism(parallelism);
-		env.setNumberOfExecutionRetries(0);
 		env.getConfig().disableSysoutLogging();
 
 		TypeInformation<Tuple2<Long, String>> longStringType = TypeInfoParser.parse("Tuple2<Long,
String>");
@@ -331,7 +341,23 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			}
 		}).setParallelism(1);
 
-		tryExecute(env, "runSimpleConcurrentProducerConsumerTopology");
+		try {
+			tryExecutePropagateExceptions(env, "runSimpleConcurrentProducerConsumerTopology");
+		}
+		catch (ProgramInvocationException | JobExecutionException e) {
+			// look for NotLeaderForPartitionException
+			Throwable cause = e.getCause();
+
+			// search for nested SuccessExceptions
+			int depth = 0;
+			while (cause != null && depth++ < 20) {
+				if (cause instanceof kafka.common.NotLeaderForPartitionException) {
+					throw (Exception) cause;
+				}
+				cause = cause.getCause();
+			}
+			throw e;
+		}
 
 		LOG.info("Finished runSimpleConcurrentProducerConsumerTopology()");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ea3b5efd/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index cfcee35..cfc104b 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -316,6 +316,28 @@ public abstract class KafkaTestBase extends TestLogger {
 		}
 	}
 
+	protected static void tryExecutePropagateExceptions(StreamExecutionEnvironment see, String
name) throws Exception {
+		try {
+			see.execute(name);
+		}
+		catch (ProgramInvocationException | JobExecutionException root) {
+			Throwable cause = root.getCause();
+
+			// search for nested SuccessExceptions
+			int depth = 0;
+			while (!(cause instanceof SuccessException)) {
+				if (cause == null || depth++ == 20) {
+					throw root;
+				}
+				else {
+					cause = cause.getCause();
+				}
+			}
+		}
+	}
+	
+	
+
 	protected static void createTestTopic(String topic, int numberOfPartitions, int replicationFactor)
{
 		
 		// create topic with one client
@@ -331,6 +353,12 @@ public abstract class KafkaTestBase extends TestLogger {
 		// validate that the topic has been created
 		final long deadline = System.currentTimeMillis() + 30000;
 		do {
+			try {
+				Thread.sleep(100);
+			}
+			catch (InterruptedException e) {
+				// restore interrupted state
+			}
 			List<PartitionInfo> partitions = FlinkKafkaConsumer.getPartitionsForTopic(topic,
standardProps);
 			if (partitions != null && partitions.size() > 0) {
 				return;


Mime
View raw message