flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/2] git commit: Add test case for TaskManager connection loss, leading to task failure.
Date Thu, 06 Nov 2014 14:38:16 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master 6ecd0f826 -> a959dd503


Add test case for TaskManager connection loss, leading to task failure.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/17e01861
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/17e01861
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/17e01861

Branch: refs/heads/master
Commit: 17e018611906dac568615d228e8a29a3918032a7
Parents: 6ecd0f8
Author: Stephan Ewen <sewen@apache.org>
Authored: Thu Nov 6 12:58:37 2014 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Nov 6 12:58:37 2014 +0100

----------------------------------------------------------------------
 .../flink/runtime/instance/AllocatedSlot.java   |  2 +-
 .../TaskManagerLossFailsTasksTest.java          | 79 ++++++++++++++++++++
 2 files changed, 80 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/17e01861/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
index f105c26..b3ffbb1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java
@@ -146,7 +146,7 @@ public class AllocatedSlot {
 			// kill all tasks currently running in this slot
 			Execution exec = this.executedTask;
 			if (exec != null && !exec.isFinished()) {
-				exec.fail(new Exception("The slot in which the task was scheduled has been cancelled."));
+				exec.fail(new Exception("The slot in which the task was scheduled has been killed (probably
loss of TaskManager)."));
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/17e01861/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.java
new file mode 100644
index 0000000..37bdaa3
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getSimpleAcknowledgingTaskmanager;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.jobmanager.tasks.NoOpInvokable;
+import org.apache.flink.runtime.protocols.TaskOperationProtocol;
+import org.junit.Test;
+
+public class TaskManagerLossFailsTasksTest {
+
+	@Test
+	public void testTasksFailWhenTaskManagerLost() {
+		try {
+			TaskOperationProtocol tm1 = getSimpleAcknowledgingTaskmanager();
+			TaskOperationProtocol tm2 = getSimpleAcknowledgingTaskmanager();
+			
+			Instance instance1 = getInstance(tm1, 10);
+			Instance instance2 = getInstance(tm2, 10);
+			
+			Scheduler scheduler = new Scheduler();
+			scheduler.newInstanceAvailable(instance1);
+			scheduler.newInstanceAvailable(instance2);
+			
+			// The job:
+			
+			final AbstractJobVertex sender = new AbstractJobVertex("Task");
+			sender.setInvokableClass(NoOpInvokable.class);
+			sender.setParallelism(20);
+			
+			final JobGraph jobGraph = new JobGraph("Pointwise Job", sender);
+			
+			ExecutionGraph eg = new ExecutionGraph(new JobID(), "test job", new Configuration());
+			eg.setNumberOfRetriesLeft(0);
+			eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+			
+			
+			assertEquals(JobStatus.CREATED, eg.getState());
+			
+			eg.scheduleForExecution(scheduler);
+			assertEquals(JobStatus.RUNNING, eg.getState());
+			
+			instance1.markDead();
+			assertEquals(JobStatus.FAILING, eg.getState());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		
+	}
+}


Mime
View raw message