flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [4/8] flink git commit: [FLINK-3011] [runtime, tests] Translate ExecutionGraphRestartTest to Java
Date Thu, 19 Nov 2015 17:41:47 GMT
[FLINK-3011] [runtime, tests] Translate ExecutionGraphRestartTest to Java


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/839ae19d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/839ae19d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/839ae19d

Branch: refs/heads/release-0.10
Commit: 839ae19d65a51b532dda1657033124d5715b6f54
Parents: ffd4e2b
Author: Ufuk Celebi <uce@apache.org>
Authored: Tue Nov 17 11:40:54 2015 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Thu Nov 19 18:27:03 2015 +0100

----------------------------------------------------------------------
 .../ExecutionGraphRestartTest.java              | 161 ++++++++++++++++++
 .../ExecutionGraphRestartTest.scala             | 162 -------------------
 2 files changed, 161 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/839ae19d/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
new file mode 100644
index 0000000..57b1829
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.Tasks;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.Test;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class ExecutionGraphRestartTest {
+
+	private final static int NUM_TASKS = 31;
+
+	@Test
+	public void testNotRestartManually() throws Exception {
+		Instance instance = ExecutionGraphTestUtils.getInstance(
+				new SimpleActorGateway(TestingUtils.directExecutionContext()),
+				NUM_TASKS);
+
+		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+		scheduler.newInstanceAvailable(instance);
+
+		JobVertex sender = new JobVertex("Task");
+		sender.setInvokableClass(Tasks.NoOpInvokable.class);
+		sender.setParallelism(NUM_TASKS);
+
+		JobGraph jobGraph = new JobGraph("Pointwise job", sender);
+
+		ExecutionGraph eg = new ExecutionGraph(
+				TestingUtils.defaultExecutionContext(),
+				new JobID(),
+				"test job",
+				new Configuration(),
+				AkkaUtils.getDefaultTimeout());
+		eg.setNumberOfRetriesLeft(0);
+		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+
+		assertEquals(JobStatus.CREATED, eg.getState());
+
+		eg.scheduleForExecution(scheduler);
+		assertEquals(JobStatus.RUNNING, eg.getState());
+
+		eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception"));
+
+		for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
+			vertex.getCurrentExecutionAttempt().cancelingComplete();
+		}
+
+		assertEquals(JobStatus.FAILED, eg.getState());
+
+		eg.restart();
+
+		assertEquals(JobStatus.FAILED, eg.getState());
+	}
+
+	@Test
+	public void testRestartAutomatically() throws Exception {
+		Instance instance = ExecutionGraphTestUtils.getInstance(
+				new SimpleActorGateway(TestingUtils.directExecutionContext()),
+				NUM_TASKS);
+
+		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+		scheduler.newInstanceAvailable(instance);
+
+		JobVertex sender = new JobVertex("Task");
+		sender.setInvokableClass(Tasks.NoOpInvokable.class);
+		sender.setParallelism(NUM_TASKS);
+
+		JobGraph jobGraph = new JobGraph("Pointwise job", sender);
+
+		ExecutionGraph eg = new ExecutionGraph(
+				TestingUtils.defaultExecutionContext(),
+				new JobID(),
+				"Test job",
+				new Configuration(),
+				AkkaUtils.getDefaultTimeout());
+		eg.setNumberOfRetriesLeft(1);
+		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+
+		assertEquals(JobStatus.CREATED, eg.getState());
+
+		eg.scheduleForExecution(scheduler);
+
+		assertEquals(JobStatus.RUNNING, eg.getState());
+
+		eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception"));
+		assertEquals(JobStatus.FAILING, eg.getState());
+
+		for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
+			vertex.getCurrentExecutionAttempt().cancelingComplete();
+		}
+
+		FiniteDuration timeout = new FiniteDuration(2, TimeUnit.MINUTES);
+
+		// Wait for async restart
+		Deadline deadline = timeout.fromNow();
+		while (deadline.hasTimeLeft() && eg.getState() != JobStatus.RUNNING) {
+			Thread.sleep(100);
+		}
+
+		assertEquals(JobStatus.RUNNING, eg.getState());
+
+		// Wait for deploying after async restart
+		deadline = timeout.fromNow();
+		boolean success = false;
+
+		while (deadline.hasTimeLeft() && !success) {
+			success = true;
+
+			for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
+				if (vertex.getCurrentExecutionAttempt().getAssignedResource() == null) {
+					success = false;
+					Thread.sleep(100);
+					break;
+				}
+			}
+		}
+
+		if (deadline.hasTimeLeft()) {
+			for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
+				vertex.getCurrentExecutionAttempt().markFinished();
+			}
+
+			assertEquals(JobStatus.FINISHED, eg.getState());
+		}
+		else {
+			fail("Failed to wait until all execution attempts left the state DEPLOYING.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/839ae19d/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
deleted file mode 100644
index 8fb3c4e..0000000
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.flink.api.common.JobID
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.execution.ExecutionState
-import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway
-import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, JobVertex}
-import org.apache.flink.runtime.jobmanager.Tasks
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
-import org.apache.flink.runtime.testingUtils.TestingUtils
-
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{Matchers, WordSpecLike}
-
-import scala.collection.JavaConverters._
-import scala.concurrent.duration.FiniteDuration
-
-@RunWith(classOf[JUnitRunner])
-class ExecutionGraphRestartTest extends WordSpecLike with Matchers {
-
-  val NUM_TASKS = 31
-
-  "The execution graph" must {
-    "not be manually restartable" in {
-      try {
-        val instance = ExecutionGraphTestUtils.getInstance(
-          new SimpleActorGateway(TestingUtils.directExecutionContext),
-            NUM_TASKS)
-
-        val scheduler = new Scheduler(TestingUtils.defaultExecutionContext)
-        scheduler.newInstanceAvailable(instance)
-
-        val sender = new JobVertex("Task")
-        sender.setInvokableClass(classOf[Tasks.NoOpInvokable])
-        sender.setParallelism(NUM_TASKS)
-
-        val jobGraph = new JobGraph("Pointwise job", sender)
-
-        val eg = new ExecutionGraph(
-          TestingUtils.defaultExecutionContext,
-          new JobID(),
-          "test job",
-          new Configuration(),
-          AkkaUtils.getDefaultTimeout)
-        eg.setNumberOfRetriesLeft(0)
-        eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources)
-
-        eg.getState should equal(JobStatus.CREATED)
-
-        eg.scheduleForExecution(scheduler)
-        eg.getState should equal(JobStatus.RUNNING)
-
-        eg.getAllExecutionVertices.iterator().next().fail(new Exception("Test Exception"))
-
-        for (vertex <- eg.getAllExecutionVertices().asScala) {
-          vertex.getCurrentExecutionAttempt().cancelingComplete()
-        }
-
-        eg.getState should equal(JobStatus.FAILED)
-
-        eg.restart()
-
-        eg.getState should equal(JobStatus.FAILED)
-      } catch {
-        case t: Throwable =>
-          t.printStackTrace()
-          fail(t.getMessage)
-      }
-    }
-
-    "restart itself automatically" in {
-      try {
-        val instance = ExecutionGraphTestUtils.getInstance(
-          new SimpleActorGateway(TestingUtils.directExecutionContext),
-          NUM_TASKS)
-
-        val scheduler = new Scheduler(TestingUtils.defaultExecutionContext)
-        scheduler.newInstanceAvailable(instance)
-
-        val sender = new JobVertex("Task")
-        sender.setInvokableClass(classOf[Tasks.NoOpInvokable])
-        sender.setParallelism(NUM_TASKS)
-
-        val jobGraph = new JobGraph("Pointwise job", sender)
-
-        val eg = new ExecutionGraph(
-          TestingUtils.defaultExecutionContext,
-          new JobID(),
-          "Test job",
-          new Configuration(),
-          AkkaUtils.getDefaultTimeout)
-        eg.setNumberOfRetriesLeft(1)
-        eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources)
-
-        eg.getState should equal(JobStatus.CREATED)
-
-        eg.scheduleForExecution(scheduler)
-        eg.getState should equal(JobStatus.RUNNING)
-
-        eg.getAllExecutionVertices.iterator().next().fail(new Exception("Test Exception"))
-        eg.getState should equal(JobStatus.FAILING)
-        
-        for (vertex <- eg.getAllExecutionVertices.asScala) {
-          vertex.getCurrentExecutionAttempt().cancelingComplete()
-        }
-
-        val timeout = new FiniteDuration(2, TimeUnit.MINUTES)
-
-        // Wait for async restart
-        var deadline = timeout.fromNow
-        while (deadline.hasTimeLeft() && eg.getState != JobStatus.RUNNING) {
-          Thread.sleep(100)
-        }
-
-        eg.getState should equal(JobStatus.RUNNING)
-
-        // Wait for deploying after async restart
-        deadline = timeout.fromNow
-        while (deadline.hasTimeLeft() && eg.getAllExecutionVertices.asScala.exists(
-          _.getCurrentExecutionAttempt.getAssignedResource == null)) {
-          Thread.sleep(100)
-        }
-
-        if (deadline.hasTimeLeft()) {
-          for (vertex <- eg.getAllExecutionVertices.asScala) {
-            vertex.getCurrentExecutionAttempt().markFinished()
-          }
-
-          eg.getState() should equal(JobStatus.FINISHED)
-        } else {
-          fail("Failed to wait until all execution attempts left the state DEPLOYING.")
-        }
-      } catch {
-        case t: Throwable =>
-          t.printStackTrace()
-          fail(t.getMessage)
-      }
-    }
-  }
-}


Mime
View raw message