flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject [6/7] flink git commit: [FLINK-7287] [kafka, tests] Fix test instabilities in KafkaConsumerTestBase
Date Fri, 28 Jul 2017 13:53:36 GMT
[FLINK-7287] [kafka, tests] Fix test instabilities in KafkaConsumerTestBase

This closes #4414.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f0d4a772
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f0d4a772
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f0d4a772

Branch: refs/heads/master
Commit: f0d4a772f44e95b52d58284b85074cb04d2796ab
Parents: 1ded2d8
Author: Nico Kruber <nico@data-artisans.com>
Authored: Thu Jul 27 18:31:13 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Fri Jul 28 21:52:30 2017 +0800

----------------------------------------------------------------------
 .../connectors/kafka/KafkaConsumerTestBase.java    | 17 +++++++++--------
 1 file changed, 9 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f0d4a772/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index dac45f7..fda6832 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -217,7 +217,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 					env.execute();
 				}
 				catch (Throwable t) {
-					if (!(t.getCause() instanceof JobCancellationException)) {
+					if (!(t instanceof JobCancellationException)) {
 						errorRef.set(t);
 					}
 				}
@@ -243,8 +243,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		}
 		while (System.nanoTime() < deadline);
 
-		// cancel the job
+		// cancel the job & wait for the job to finish
 		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+		runner.join();
 
 		final Throwable t = errorRef.get();
 		if (t != null) {
@@ -303,7 +304,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 					env.execute();
 				}
 				catch (Throwable t) {
-					if (!(t.getCause() instanceof JobCancellationException)) {
+					if (!(t instanceof JobCancellationException)) {
 						errorRef.set(t);
 					}
 				}
@@ -328,8 +329,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		}
 		while (System.nanoTime() < deadline);
 
-		// cancel the job
+		// cancel the job & wait for the job to finish
 		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+		runner.join();
 
 		final Throwable t = errorRef.get();
 		if (t != null) {
@@ -1607,8 +1609,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 					env1.execute("Metrics test job");
 				} catch (Throwable t) {
-					LOG.warn("Got exception during execution", t);
 					if (!(t instanceof JobCancellationException)) { // we'll cancel the job
+						LOG.warn("Got exception during execution", t);
 						error.f0 = t;
 					}
 				}
@@ -1658,11 +1660,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase
{
 		} finally {
 			// cancel
 			JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+			// wait for the job to finish (it should due to the cancel command above)
+			jobThread.join();
 		}
 
-		while (jobThread.isAlive()) {
-			Thread.sleep(50);
-		}
 		if (error.f0 != null) {
 			throw error.f0;
 		}


Mime
View raw message