flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [2/7] flink git commit: [FLINK-8965][tests] Port TimestampITCase to flip6
Date Fri, 23 Mar 2018 18:12:49 GMT
[FLINK-8965][tests] Port TimestampITCase to flip6

This closes #5728.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/81d809a5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/81d809a5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/81d809a5

Branch: refs/heads/release-1.5
Commit: 81d809a5f0030b14e0b7128d298cd9904e474ebd
Parents: f0bd7b6
Author: zentol <chesnay@apache.org>
Authored: Mon Feb 26 17:19:15 2018 +0100
Committer: zentol <chesnay@apache.org>
Committed: Fri Mar 23 19:12:29 2018 +0100

----------------------------------------------------------------------
 .../test/streaming/runtime/TimestampITCase.java | 73 ++++++++++----------
 1 file changed, 38 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/81d809a5/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
index 5e08e8a..3b46c82 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -24,11 +24,11 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.StoppableFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.MultiShotLatch;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -45,17 +45,18 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
@@ -75,34 +76,24 @@ public class TimestampITCase extends TestLogger {
 	// this is used in some tests to synchronize
 	static MultiShotLatch latch;
 
-	private static LocalFlinkMiniCluster cluster;
+	@ClassRule
+	public static final MiniClusterResource CLUSTER = new MiniClusterResource(
+		new MiniClusterResource.MiniClusterResourceConfiguration(
+			getConfiguration(),
+			NUM_TASK_MANAGERS,
+			NUM_TASK_SLOTS),
+		true);
 
-	@Before
-	public void setupLatch() {
-		// ensure that we get a fresh latch for each test
-		latch = new MultiShotLatch();
-	}
-
-	@BeforeClass
-	public static void startCluster() {
+	private static Configuration getConfiguration() {
 		Configuration config = new Configuration();
-		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
-		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
 		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
-
-		cluster = new LocalFlinkMiniCluster(config, false);
-
-		cluster.start();
-
-		TestStreamEnvironment.setAsContext(cluster, PARALLELISM);
+		return config;
 	}
 
-	@AfterClass
-	public static void shutdownCluster() {
-		cluster.stop();
-		cluster = null;
-
-		TestStreamEnvironment.unsetAsContext();
+	@Before
+	public void setupLatch() {
+		// ensure that we get a fresh latch for each test
+		latch = new MultiShotLatch();
 	}
 
 	/**
@@ -162,7 +153,8 @@ public class TimestampITCase extends TestLogger {
 	public void testWatermarkPropagationNoFinalWatermarkOnStop() throws Exception {
 
 		// for this test to work, we need to be sure that no other jobs are being executed
-		while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) {
+		final ClusterClient<?> clusterClient = CLUSTER.getClusterClient();
+		while (!getRunningJobs(clusterClient).isEmpty()) {
 			Thread.sleep(100);
 		}
 
@@ -185,14 +177,15 @@ public class TimestampITCase extends TestLogger {
 				.transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
 				.addSink(new DiscardingSink<Integer>());
 
-		new Thread("stopper") {
+		Thread t = new Thread("stopper") {
 			@Override
 			public void run() {
 				try {
 					// try until we get the running jobs
-					List<JobID> running;
-					while ((running = cluster.getCurrentlyRunningJobsJava()).isEmpty()) {
+					List<JobID> running = getRunningJobs(clusterClient);
+					while (running.isEmpty()) {
 						Thread.sleep(10);
+						running = getRunningJobs(clusterClient);
 					}
 
 					JobID id = running.get(0);
@@ -200,7 +193,7 @@ public class TimestampITCase extends TestLogger {
 					// send stop until the job is stopped
 					do {
 						try {
-							cluster.stopJob(id);
+							clusterClient.stop(id);
 						}
 						catch (Exception e) {
 							if (e.getCause() instanceof IllegalStateException) {
@@ -214,13 +207,14 @@ public class TimestampITCase extends TestLogger {
 						}
 						Thread.sleep(10);
 					}
-					while (!cluster.getCurrentlyRunningJobsJava().isEmpty());
+					while (!getRunningJobs(clusterClient).isEmpty());
 				}
 				catch (Throwable t) {
 					t.printStackTrace();
 				}
 			}
-		}.start();
+		};
+		t.start();
 
 		env.execute();
 
@@ -246,6 +240,7 @@ public class TimestampITCase extends TestLogger {
 						subtaskWatermarks.get(subtaskWatermarks.size() - 1));
 			}
 		}
+		t.join();
 	}
 
 	/**
@@ -855,4 +850,12 @@ public class TimestampITCase extends TestLogger {
 		@Override
 		public void cancel() {}
 	}
+
+	private static List<JobID> getRunningJobs(ClusterClient<?> client) throws Exception
{
+		Collection<JobStatusMessage> statusMessages = client.listJobs().get();
+		return statusMessages.stream()
+			.filter(status -> !status.getJobState().isGloballyTerminalState())
+			.map(JobStatusMessage::getJobId)
+			.collect(Collectors.toList());
+	}
 }


Mime
View raw message