flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pnowoj...@apache.org
Subject [flink] branch release-1.14 updated: [FLINK-23797][tests] Wait for all task running before savepoint for all tests in SavepointITCase
Date Wed, 01 Sep 2021 14:25:46 GMT
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 856c927  [FLINK-23797][tests] Wait for all task running before savepoint for all
tests in SavepointITCase
856c927 is described below

commit 856c927ba937beb188c258490c1696555cc53f6c
Author: Anton Kalashnikov <kaa.dev@yandex.ru>
AuthorDate: Tue Aug 31 17:14:47 2021 +0200

    [FLINK-23797][tests] Wait for all task running before savepoint for all tests in SavepointITCase
---
 .../java/org/apache/flink/test/checkpointing/SavepointITCase.java    | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index cb6426a..75d1161 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -206,6 +206,7 @@ public class SavepointITCase extends TestLogger {
             client.submitJob(jobGraph).get();
 
             BoundedPassThroughOperator.getProgressLatch().await();
+            waitForAllTaskRunning(cluster.getMiniCluster(), jobId, false);
 
             client.stopWithSavepoint(jobId, drain, null).get();
 
@@ -608,6 +609,7 @@ public class SavepointITCase extends TestLogger {
                 client.submitJob(jobGraph).get();
 
                 BoundedPassThroughOperator.getProgressLatch().await();
+                waitForAllTaskRunning(cluster.getMiniCluster(), jobId, false);
 
                 client.stopWithSavepoint(jobId, false, null).get();
 
@@ -873,12 +875,13 @@ public class SavepointITCase extends TestLogger {
         cluster.before();
         try {
             ClusterClient<?> client = cluster.getClusterClient();
-            client.submitJob(jobGraph).get();
+            JobID jobID = client.submitJob(jobGraph).get();
 
             // we need to wait for both pipelines to be in state RUNNING because that's the
only
             // state which allows creating a savepoint
             failingPipelineLatch.await();
             succeedingPipelineLatch.await();
+            waitForAllTaskRunning(cluster.getMiniCluster(), jobID, false);
 
             try {
                 client.stopWithSavepoint(jobGraph.getJobID(), false, savepointDir.getAbsolutePath())

Mime
View raw message