flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [5/9] flink git commit: [tests] Add integration test for restart recovery
Date Thu, 19 Feb 2015 19:53:29 GMT
[tests] Add integration test for restart recovery


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7d80711d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7d80711d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7d80711d

Branch: refs/heads/master
Commit: 7d80711d366c5a2127e50051e6c0d5b347b638c6
Parents: a110449
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Feb 16 21:40:06 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Feb 19 18:54:14 2015 +0100

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  60 -----
 .../apache/flink/runtime/instance/Instance.java |   4 +-
 .../flink/runtime/instance/InstanceManager.java |  42 +---
 .../flink/runtime/jobmanager/JobManager.scala   |   6 +-
 .../flink/runtime/taskmanager/TaskManager.scala |   2 +-
 .../runtime/instance/InstanceManagerTest.java   |  10 +-
 .../runtime/testingUtils/TestingUtils.scala     |   1 -
 .../test/recovery/SimpleRecoveryITCase.java     | 252 +++++++++++++++++++
 .../TaskManagerFailureRecoveryITCase.java       | 184 ++++++++++++++
 9 files changed, 456 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7d80711d/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 42a3c9a..767648a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -56,12 +56,6 @@ public final class ConfigConstants {
 	public static final String JOB_MANAGER_IPC_PORT_KEY = "jobmanager.rpc.port";
 
 	/**
-	 * The config parameter defining the number of seconds that a task manager heartbeat may
be missing before it is
-	 * marked as failed.
-	 */
-	public static final String JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY = "jobmanager.max-heartbeat-delay-before-failure.msecs";
-
-	/**
 	 * The config parameter defining the storage directory to be used by the blob server.
 	 */
 	public static final String BLOB_STORAGE_DIRECTORY_KEY = "blob.storage.directory";
@@ -136,34 +130,6 @@ public final class ConfigConstants {
 	public static final String TASK_MANAGER_NUM_TASK_SLOTS = "taskmanager.numberOfTaskSlots";
 
 	/**
-	 * The number of incoming network IO threads (e.g. incoming connection threads used in NettyConnectionManager
-	 * for the ServerBootstrap.)
-	 */
-	public static final String TASK_MANAGER_NET_NUM_IN_THREADS_KEY = "taskmanager.net.numInThreads";
-
-	/**
-	 * The number of outgoing network IO threads (e.g. outgoing connection threads used in NettyConnectionManager
for
-	 * the Bootstrap.)
-	 */
-	public static final String TASK_MANAGER_NET_NUM_OUT_THREADS_KEY = "taskmanager.net.numOutThreads";
-
-	/**
-	 * The low water mark used in NettyConnectionManager for the Bootstrap.
-	 */
-	public static final String TASK_MANAGER_NET_NETTY_LOW_WATER_MARK = "taskmanager.net.nettyLowWaterMark";
-
-	/**
-	 * The high water mark used in NettyConnectionManager for the Bootstrap.
-	 */
-	public static final String TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK = "taskmanager.net.nettyHighWaterMark";
-	
-	/**
-	 * Parameter for the interval in which the TaskManager sends the periodic heart beat messages
-	 * to the JobManager (in msecs).
-	 */
-	public static final String TASK_MANAGER_HEARTBEAT_INTERVAL_KEY = "taskmanager.heartbeat-interval";
-
-	/**
 	 * Flag indicating whether to start a thread, which repeatedly logs the memory usage of
the JVM.
 	 */
 	public static final String TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD = "taskmanager.debug.memory.startLogThread";
@@ -473,32 +439,6 @@ public final class ConfigConstants {
 	public static final int DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE = 32768;
 
 	/**
-	 * Default number of incoming network IO threads (e.g. number of incoming connection threads
used in
-	 * NettyConnectionManager for the ServerBootstrap). If set to -1, a reasonable default depending
on the number of
-	 * cores will be picked.
-	 */
-	public static final int DEFAULT_TASK_MANAGER_NET_NUM_IN_THREADS = -1;
-
-	/**
-	 * Default number of outgoing network IO threads (e.g. number of outgoing connection threads
used in
-	 * NettyConnectionManager for the Bootstrap). If set to -1, a reasonable default depending
on the number of cores
-	 * will be picked.
-	 */
-	public static final int DEFAULT_TASK_MANAGER_NET_NUM_OUT_THREADS = -1;
-
-	/**
-	 * Default low water mark used in NettyConnectionManager for the Bootstrap. If set to -1,
NettyConnectionManager
-	 * will use half of the network buffer size as the low water mark.
-	 */
-	public static final int DEFAULT_TASK_MANAGER_NET_NETTY_LOW_WATER_MARK = -1;
-
-	/**
-	 * Default high water mark used in NettyConnectionManager for the Bootstrap. If set to -1,
NettyConnectionManager
-	 * will use the network buffer size as the high water mark.
-	 */
-	public static final int DEFAULT_TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK = -1;
-
-	/**
 	 * Flag indicating whether to start a thread, which repeatedly logs the memory usage of
the JVM.
 	 */
 	public static final boolean DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD = false;

http://git-wip-us.apache.org/repos/asf/flink/blob/7d80711d/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index a5a9263..4e08389 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -329,7 +329,7 @@ public class Instance {
 	
 	@Override
 	public String toString() {
-		return instanceId + " @ " + (taskManager != null ? taskManager.path() : "ActorRef.noSender")
+ " - " +
-				numberOfSlots + " slots" + " - " + hashCode();
+		return String.format("%s @ %s - %d slots - URL: %s", instanceId, connectionInfo.getHostname(),
+				numberOfSlots, (taskManager != null ? taskManager.path() : "ActorRef.noSender"));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7d80711d/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index b28da35..2ee41da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -27,8 +27,6 @@ import java.util.Map;
 import java.util.Set;
 
 import akka.actor.ActorRef;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,9 +55,6 @@ public class InstanceManager {
 	
 	/** Listeners that want to be notified about availability and disappearance of instances
*/
 	private final List<InstanceListener> instanceListeners = new ArrayList<InstanceListener>();
-
-	/** Duration after which a task manager is considered dead if it did not send a heart-beat
message. */
-	private final long heartbeatTimeout;
 	
 	/** The total number of task slots that the system has */
 	private int totalNumberOfAliveTaskSlots;
@@ -72,32 +67,12 @@ public class InstanceManager {
 	// ------------------------------------------------------------------------
 	
 	/**
-	 * Creates an instance manager, using the global configuration value for maximum interval
between heartbeats
-	 * where a task manager is still considered alive.
+	 * Creates an new instance manager.
 	 */
 	public InstanceManager() {
-		this(GlobalConfiguration.getLong(
-				ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY,
-				ConfigConstants.DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT));
-	}
-	
-	public InstanceManager(long heartbeatTimeout) {
-		this(heartbeatTimeout, heartbeatTimeout);
-	}
-	
-	public InstanceManager(long heartbeatTimeout, long cleanupInterval) {
-		if (heartbeatTimeout <= 0 || cleanupInterval <= 0) {
-			throw new IllegalArgumentException("Heartbeat timeout and cleanup interval must be positive.");
-		}
-		
 		this.registeredHostsById = new HashMap<InstanceID, Instance>();
 		this.registeredHostsByConnection = new HashMap<ActorRef, Instance>();
 		this.deadHosts = new HashSet<ActorRef>();
-		this.heartbeatTimeout = heartbeatTimeout;
-	}
-	
-	public long getHeartbeatTimeout() {
-		return heartbeatTimeout;
 	}
 
 	public void shutdown() {
@@ -132,14 +107,19 @@ public class InstanceManager {
 
 			if (host == null){
 				if (LOG.isDebugEnabled()) {
-					LOG.debug("Received hearbeat from unknown TaskManager with instance ID " + instanceId.toString()
+ 
-							" Possibly TaskManager was maked as dead (timed-out) earlier. " +
+					LOG.debug("Received heartbeat from unknown TaskManager with instance ID " + instanceId.toString()
+
+							" Possibly TaskManager was marked as dead (timed-out) earlier. " +
 							"Reporting back that task manager is no longer known.");
 				}
 				return false;
 			}
 
 			host.reportHeartBeat();
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Received heartbeat from TaskManager " + host);
+			}
+
 			return true;
 		}
 	}
@@ -164,7 +144,7 @@ public class InstanceManager {
 						" which was marked as dead earlier because of a heart-beat timeout.");
 			}
 
-			InstanceID id = null;
+			InstanceID id;
 			do {
 				id = new InstanceID();
 			} while (registeredHostsById.containsKey(id));
@@ -178,8 +158,8 @@ public class InstanceManager {
 			totalNumberOfAliveTaskSlots += numberOfSlots;
 			
 			if (LOG.isInfoEnabled()) {
-				LOG.info(String.format("Registered TaskManager at %s as %s. Current number of registered
hosts is %d.",
-						taskManager.path(), id, registeredHostsById.size()));
+				LOG.info(String.format("Registered TaskManager at %s (%s) as %s. Current number of registered
hosts is %d.",
+						connectionInfo.getHostname(), taskManager.path(), id, registeredHostsById.size()));
 			}
 
 			host.reportHeartBeat();

http://git-wip-us.apache.org/repos/asf/flink/blob/7d80711d/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 1741cdb..cb8acae 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -723,9 +723,9 @@ object JobManager {
       ConfigConstants.DEFAULT_EXECUTION_RETRIES_KEY,
       ConfigConstants.DEFAULT_EXECUTION_RETRIES)
 
-    val delayBetweenRetries = 2 * configuration.getLong(
-      ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY,
-      ConfigConstants.DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT)
+    val delayBetweenRetries = 2 * Duration(configuration.getString(
+      ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE,
+      ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT)).toMillis
 
     (archiveCount, profilingEnabled, cleanupInterval, executionRetries, delayBetweenRetries)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/7d80711d/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 99d824b..c14ebca 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -433,7 +433,7 @@ import scala.collection.JavaConverters._
       if (jobConfig.getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true)) {
         profiler match {
           case Some(profilerActorRef) => profilerActorRef ! MonitorTask(task)
-          case None => log.info("There is no profiling enabled for the task manager.")
+          case None => // no log message here - floods the log
         }
       }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7d80711d/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
index dff3dd3..b9aa674 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
@@ -65,10 +65,6 @@ public class InstanceManagerTest{
 		try {
 			InstanceManager cm = new InstanceManager();
 			
-			// catches error that some parts assumed config values in seconds, others in
-			// milliseconds by verifying that the timeout is not larger than 2 minutes.
-			assertTrue(cm.getHeartbeatTimeout() < 2 * 60 * 1000);
-			
 			final int dataPort = 20000;
 
 			HardwareDescription hardwareDescription = HardwareDescription.extractFromSystem(4096);
@@ -76,7 +72,7 @@ public class InstanceManagerTest{
 			InetAddress address = InetAddress.getByName("127.0.0.1");
 			
 			// register three instances
-			InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, dataPort + 0);
+			InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, dataPort);
 			InstanceConnectionInfo ici2 = new InstanceConnectionInfo(address, dataPort + 15);
 			InstanceConnectionInfo ici3 = new InstanceConnectionInfo(address, dataPort + 30);
 
@@ -121,7 +117,7 @@ public class InstanceManagerTest{
 
 			HardwareDescription resources = HardwareDescription.extractFromSystem(4096);
 			InetAddress address = InetAddress.getByName("127.0.0.1");
-			InstanceConnectionInfo ici = new InstanceConnectionInfo(address, dataPort + 0);
+			InstanceConnectionInfo ici = new InstanceConnectionInfo(address, dataPort);
 
 			JavaTestKit probe = new JavaTestKit(system);
 			InstanceID i = cm.registerTaskManager(probe.getRef(), ici, resources, 1);
@@ -157,7 +153,7 @@ public class InstanceManagerTest{
 			InetAddress address = InetAddress.getByName("127.0.0.1");
 			
 			// register three instances
-			InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, dataPort + 0);
+			InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, dataPort);
 			InstanceConnectionInfo ici2 = new InstanceConnectionInfo(address, dataPort + 1);
 			InstanceConnectionInfo ici3 = new InstanceConnectionInfo(address, dataPort + 2);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7d80711d/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 4416ba6..c26b255 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -101,7 +101,6 @@ object TestingUtils {
     val config = new Configuration()
     config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
     config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTMs)
-    config.setInteger(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, 1000)
     config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, timeout)
     new TestingCluster(config)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/7d80711d/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
new file mode 100644
index 0000000..911edb3
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+public class SimpleRecoveryITCase {
+
+
+	private static ForkableFlinkMiniCluster cluster;
+
+	@BeforeClass
+	public static void setupCluster() {
+		Configuration config = new Configuration();
+		config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2);
+		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+		config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
+
+		cluster = new ForkableFlinkMiniCluster(config, false);
+	}
+
+	@AfterClass
+	public static void teardownCluster() {
+		try {
+			cluster.stop();
+		}
+		catch (Throwable t) {
+			System.err.println("Error stopping cluster on shutdown");
+			t.printStackTrace();
+			fail("Cluster shutdown caused an exception: " + t.getMessage());
+		}
+	}
+
+	@Test
+	public void testFailedRunThenSuccessfulRun() {
+
+		FailOnceMapper.failuresBeforeSuccess = 1;
+
+		try {
+			List<Long> resultCollection = new ArrayList<Long>();
+
+			// attempt 1
+			{
+				ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
+						"localhost", cluster.getJobManagerRPCPort());
+
+				env.setDegreeOfParallelism(4);
+				env.setNumberOfExecutionRetries(0);
+
+				env.generateSequence(1, 10)
+						.map(new FailOnceMapper<Long>())
+						.reduce(new ReduceFunction<Long>() {
+							@Override
+							public Long reduce(Long value1, Long value2) {
+								return value1 + value2;
+							}
+						})
+						.output(new LocalCollectionOutputFormat<Long>(resultCollection));
+
+				try {
+					env.execute();
+					fail("The program should have failed");
+				}
+				catch (ProgramInvocationException e) {
+					// expected
+				}
+			}
+
+			// attempt 2
+			{
+				ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
+						"localhost", cluster.getJobManagerRPCPort());
+
+				env.setDegreeOfParallelism(4);
+				env.setNumberOfExecutionRetries(0);
+
+				env.generateSequence(1, 10)
+						.map(new FailOnceMapper<Long>())
+						.reduce(new ReduceFunction<Long>() {
+							@Override
+							public Long reduce(Long value1, Long value2) {
+								return value1 + value2;
+							}
+						})
+						.output(new LocalCollectionOutputFormat<Long>(resultCollection));
+
+				try {
+					JobExecutionResult result = env.execute();
+					assertTrue(result.getNetRuntime() >= 0);
+					assertNotNull(result.getAllAccumulatorResults());
+					assertTrue(result.getAllAccumulatorResults().isEmpty());
+				}
+				catch (JobExecutionException e) {
+					fail("The program should have succeeded on the second run");
+				}
+
+				long sum = 0;
+				for (long l : resultCollection) {
+					sum += l;
+				}
+				assertEquals(55, sum);
+			}
+
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testRestart() {
+
+		FailOnceMapper.failuresBeforeSuccess = 1;
+
+		try {
+			List<Long> resultCollection = new ArrayList<Long>();
+
+			ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
+					"localhost", cluster.getJobManagerRPCPort());
+
+			env.setDegreeOfParallelism(4);
+			env.setNumberOfExecutionRetries(1);
+
+			env.generateSequence(1, 10)
+					.map(new FailOnceMapper<Long>())
+					.reduce(new ReduceFunction<Long>() {
+						@Override
+						public Long reduce(Long value1, Long value2) {
+							return value1 + value2;
+						}
+					})
+					.output(new LocalCollectionOutputFormat<Long>(resultCollection));
+
+			try {
+				JobExecutionResult result = env.execute();
+				assertTrue(result.getNetRuntime() >= 0);
+				assertNotNull(result.getAllAccumulatorResults());
+				assertTrue(result.getAllAccumulatorResults().isEmpty());
+			}
+			catch (JobExecutionException e) {
+				fail("The program should have succeeded on the second run");
+			}
+
+			long sum = 0;
+			for (long l : resultCollection) {
+				sum += l;
+			}
+			assertEquals(55, sum);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testRestartMultipleTimes() {
+
+		FailOnceMapper.failuresBeforeSuccess = 3;
+
+		try {
+			List<Long> resultCollection = new ArrayList<Long>();
+
+			ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
+					"localhost", cluster.getJobManagerRPCPort());
+
+			env.setDegreeOfParallelism(4);
+			env.setNumberOfExecutionRetries(3);
+
+			env.generateSequence(1, 10)
+					.map(new FailOnceMapper<Long>())
+					.reduce(new ReduceFunction<Long>() {
+						@Override
+						public Long reduce(Long value1, Long value2) {
+							return value1 + value2;
+						}
+					})
+					.output(new LocalCollectionOutputFormat<Long>(resultCollection));
+
+			try {
+				JobExecutionResult result = env.execute();
+				assertTrue(result.getNetRuntime() >= 0);
+				assertNotNull(result.getAllAccumulatorResults());
+				assertTrue(result.getAllAccumulatorResults().isEmpty());
+			}
+			catch (JobExecutionException e) {
+				fail("The program should have succeeded on the second run");
+			}
+
+			long sum = 0;
+			for (long l : resultCollection) {
+				sum += l;
+			}
+			assertEquals(55, sum);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	// ------------------------------------------------------------------------------------
+
+	private static class FailOnceMapper<T> extends RichMapFunction<T, T> {
+
+		private static int failuresBeforeSuccess = 0;
+
+		@Override
+		public T map(T value) throws Exception {
+			if (failuresBeforeSuccess > 0 && getRuntimeContext().getIndexOfThisSubtask()
== 1) {
+				failuresBeforeSuccess--;
+				throw new Exception("Test Failure");
+			}
+
+			return value;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7d80711d/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
new file mode 100644
index 0000000..85856ba
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.pattern.Patterns;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.junit.Test;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.*;
+
+public class TaskManagerFailureRecoveryITCase {
+
+	@Test
+	public void testRestartWithFailingTaskManager() {
+
+		final int PARALLELISM = 4;
+
+		ForkableFlinkMiniCluster cluster = null;
+		ActorSystem additionalSystem = null;
+
+		try {
+			Configuration config = new Configuration();
+			config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
+			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
+
+			config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "500 ms");
+			config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
+			config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 2);
+
+			cluster = new ForkableFlinkMiniCluster(config, false);
+
+			// for the result
+			List<Long> resultCollection = new ArrayList<Long>();
+
+			final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
+					"localhost", cluster.getJobManagerRPCPort());
+
+			env.setDegreeOfParallelism(PARALLELISM);
+			env.setNumberOfExecutionRetries(1);
+
+			env.generateSequence(1, 10)
+					.map(new FailingMapper<Long>())
+					.reduce(new ReduceFunction<Long>() {
+						@Override
+						public Long reduce(Long value1, Long value2) {
+							return value1 + value2;
+						}
+					})
+					.output(new LocalCollectionOutputFormat<Long>(resultCollection));
+
+
+			// simple reference (atomic does not matter) to pass back an exception from the trigger
thread
+			final AtomicReference<Throwable> ref = new AtomicReference<Throwable>();
+
+			// trigger the execution from a separate thread, so we are available to temper with the
+			// cluster during the execution
+			Thread trigger = new Thread("program trigger") {
+				@Override
+				public void run() {
+					try {
+						env.execute();
+					}
+					catch (Throwable t) {
+						ref.set(t);
+					}
+				}
+			};
+			trigger.setDaemon(true);
+			trigger.start();
+
+			// block until all the mappers are actually deployed
+			// the mappers in turn are waiting
+			for (int i = 0; i < PARALLELISM; i++) {
+				FailingMapper.TASK_TO_COORD_QUEUE.take();
+			}
+
+			// bring up one more task manager and wait for it to appear
+			{
+				additionalSystem = cluster.startTaskManagerActorSystem(2);
+				ActorRef additionalTaskManager = cluster.startTaskManager(2, additionalSystem);
+				Object message = TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage();
+				Future<Object> future = Patterns.ask(additionalTaskManager, message, 30000);
+
+				try {
+					Await.result(future, new FiniteDuration(30000, TimeUnit.MILLISECONDS));
+				}
+				catch (TimeoutException e) {
+					fail ("The additional TaskManager did not come up within 30 seconds");
+				}
+			}
+
+			// kill the two other TaskManagers
+			for (ActorRef tm : cluster.getTaskManagersAsJava()) {
+				tm.tell(PoisonPill.getInstance(), null);
+			}
+
+			// wait for the next set of mappers (the recovery ones) to come online
+			for (int i = 0; i < PARALLELISM; i++) {
+				FailingMapper.TASK_TO_COORD_QUEUE.take();
+			}
+
+			// tell the mappers that they may continue this time
+			for (int i = 0; i < PARALLELISM; i++) {
+				FailingMapper.COORD_TO_TASK_QUEUE.add(new Object());
+			}
+
+			// wait for the program to finish
+			trigger.join();
+			if (ref.get() != null) {
+				Throwable t = ref.get();
+				t.printStackTrace();
+				fail("Program execution caused an exception: " + t.getMessage());
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (additionalSystem != null) {
+				additionalSystem.shutdown();
+			}
+			if (cluster != null) {
+				cluster.stop();
+			}
+		}
+	}
+
+	private static class FailingMapper<T> extends RichMapFunction<T, T> {
+
+		private static final BlockingQueue<Object> TASK_TO_COORD_QUEUE = new LinkedBlockingQueue<Object>();
+
+		private static final BlockingQueue<Object> COORD_TO_TASK_QUEUE = new LinkedBlockingQueue<Object>();
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			TASK_TO_COORD_QUEUE.add(new Object());
+			COORD_TO_TASK_QUEUE.take();
+		}
+
+		@Override
+		public T map(T value) throws Exception {
+			return value;
+		}
+	}
+}


Mime
View raw message