flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/5] flink git commit: [hotfix] [kafka consumer] Increase Kafka test stability
Date Wed, 13 Apr 2016 18:51:24 GMT
Repository: flink
Updated Branches:
  refs/heads/master b0a7a1b81 -> 1a34f2165


[hotfix] [kafka consumer] Increase Kafka test stability


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2728f924
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2728f924
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2728f924

Branch: refs/heads/master
Commit: 2728f924cf64cd62929b8f0e394a1d4335af8156
Parents: b0a7a1b
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Apr 13 10:38:37 2016 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Apr 13 20:50:48 2016 +0200

----------------------------------------------------------------------
 .../connectors/kafka/KafkaConsumerTestBase.java        | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2728f924/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 8ff67b4..a65a411 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -1278,13 +1278,24 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase
{
 							new Tuple2Partitioner(parallelism)))
 					.setParallelism(parallelism);
 
-			writeEnv.execute("Write sequence");
+			try {
+				writeEnv.execute("Write sequence");
+			}
+			catch (Exception e) {
+				LOG.error("Write attempt failed, trying again", e);
+				deleteTestTopic(topicName);
+				JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+				continue;
+			}
+			
 			LOG.info("Finished writing sequence");
 
 			// -------- Validate the Sequence --------
 			
 			// we need to validate the sequence, because kafka's producers are not exactly once
 			LOG.info("Validating sequence");
+
+			JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
 			
 			final StreamExecutionEnvironment readEnv = StreamExecutionEnvironment.createRemoteEnvironment("localhost",
flinkPort);
 			readEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());


Mime
View raw message