flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [01/13] flink git commit: [runtime] [tests] Run ProcessFailureBatchRecoveryITCase in BATCH and PIPELINED execution mode
Date Wed, 18 Mar 2015 16:48:50 GMT
Repository: flink
Updated Branches:
  refs/heads/master 1930678fb -> 4a49a73a7


[runtime] [tests] Run ProcessFailureBatchRecoveryITCase in BATCH and PIPELINED execution mode


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cf6c63c8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cf6c63c8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cf6c63c8

Branch: refs/heads/master
Commit: cf6c63c897927d7c827fd7a22e657e2270ff82d0
Parents: 9c77f07
Author: Ufuk Celebi <uce@apache.org>
Authored: Tue Mar 17 15:40:33 2015 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Wed Mar 18 17:44:40 2015 +0100

----------------------------------------------------------------------
 .../partition/SpilledSubpartitionViewTest.java  |  3 +-
 .../ProcessFailureBatchRecoveryITCase.java      | 30 +++++++++++++++-----
 2 files changed, 24 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cf6c63c8/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
index 11037dc..fff7bc6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
@@ -76,8 +76,7 @@ public class SpilledSubpartitionViewTest {
 	public static Collection<Object[]> ioMode() {
 		return Arrays.asList(new Object[][]{
 				{IOMode.SYNC},
-				{IOMode.ASYNC}
-		});
+				{IOMode.ASYNC}});
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/cf6c63c8/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
index 7bb094b..6f5e698 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureBatchRecoveryITCase.java
@@ -20,11 +20,10 @@ package org.apache.flink.test.recovery;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
-
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 import org.apache.commons.io.FileUtils;
-
+import org.apache.flink.api.common.ExecutionMode;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.DataSet;
@@ -37,12 +36,11 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
-
 import org.junit.Test;
-
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import scala.Some;
 import scala.Tuple2;
 import scala.concurrent.Await;
@@ -54,14 +52,17 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.StringWriter;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
 import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
-
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
 
 /**
  * This test verifies the behavior of the recovery in the case when a TaskManager
@@ -77,6 +78,7 @@ import static org.junit.Assert.*;
  * the original task managers. The recovery should restart the tasks on the new TaskManager.
  */
 @SuppressWarnings("serial")
+@RunWith(Parameterized.class)
 public class ProcessFailureBatchRecoveryITCase {
 
 	private static final String READY_MARKER_FILE_PREFIX = "ready_";
@@ -84,6 +86,19 @@ public class ProcessFailureBatchRecoveryITCase {
 
 	private static final int PARALLELISM = 4;
 
+	private ExecutionMode executionMode;
+
+	public ProcessFailureBatchRecoveryITCase(ExecutionMode executionMode) {
+		this.executionMode = executionMode;
+	}
+
+	@Parameterized.Parameters
+	public static Collection<Object[]> executionMode() {
+		return Arrays.asList(new Object[][]{
+				{ExecutionMode.PIPELINED},
+				{ExecutionMode.BATCH}});
+	}
+
 	@Test
 	public void testTaskManagerProcessFailure() {
 
@@ -161,6 +176,7 @@ public class ProcessFailureBatchRecoveryITCase {
 			ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", jobManagerPort);
 			env.setDegreeOfParallelism(PARALLELISM);
 			env.setNumberOfExecutionRetries(1);
+			env.getConfig().setExecutionMode(executionMode);
 
 			final long NUM_ELEMENTS = 100000L;
 			final DataSet<Long> result = env.generateSequence(1, NUM_ELEMENTS)


Mime
View raw message