flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [2/3] flink git commit: [FLINK-9094] [tests] Harden AccumulatorLiveITCase
Date Tue, 03 Apr 2018 14:34:33 GMT
[FLINK-9094] [tests] Harden AccumulatorLiveITCase

The problem was that we did not wait for the proper shut down of the job before
resetting the latches. This could lead to a deadlock.

This closes #5771.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/78c3d9b0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/78c3d9b0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/78c3d9b0

Branch: refs/heads/master
Commit: 78c3d9b0c657bf06a712ce453edb02da13fa3acf
Parents: 7222745
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Tue Mar 27 09:22:08 2018 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Tue Apr 3 14:49:17 2018 +0200

----------------------------------------------------------------------
 .../test/accumulators/AccumulatorLiveITCase.java    | 16 +++++++++++++---
 1 file changed, 13 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/78c3d9b0/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 302fe3e..379e8ab 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HeartbeatManagerOptions;
+import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
@@ -140,10 +141,16 @@ public class AccumulatorLiveITCase extends TestLogger {
 	private static void submitJobAndVerifyResults(JobGraph jobGraph) throws Exception {
 		Deadline deadline = Deadline.now().plus(Duration.ofSeconds(30));
 
-		ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
+		final ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
 
-		client.setDetached(true);
-		client.submitJob(jobGraph, AccumulatorLiveITCase.class.getClassLoader());
+		final CheckedThread submissionThread = new CheckedThread() {
+			@Override
+			public void go() throws Exception {
+				client.submitJob(jobGraph, AccumulatorLiveITCase.class.getClassLoader());
+			}
+		};
+
+		submissionThread.start();
 
 		try {
 			NotifyingMapper.notifyLatch.await();
@@ -167,6 +174,9 @@ public class AccumulatorLiveITCase extends TestLogger {
 			NotifyingMapper.shutdownLatch.trigger();
 		} finally {
 			NotifyingMapper.shutdownLatch.trigger();
+
+			// wait for the job to have terminated
+			submissionThread.sync();
 		}
 	}
 


Mime
View raw message