Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CEBA718DBF for ; Mon, 13 Jul 2015 17:26:51 +0000 (UTC) Received: (qmail 67936 invoked by uid 500); 13 Jul 2015 16:59:24 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 67239 invoked by uid 500); 13 Jul 2015 16:59:23 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 66409 invoked by uid 99); 13 Jul 2015 16:54:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Jul 2015 16:54:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5095AE0974; Mon, 13 Jul 2015 16:54:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Mon, 13 Jul 2015 16:54:10 -0000 Message-Id: <446a556a1f5f4eeaadaedd7701adbbc3@git.apache.org> In-Reply-To: <9eeae8a9673a42ca9b0a541017431b18@git.apache.org> References: <9eeae8a9673a42ca9b0a541017431b18@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [7/7] flink git commit: [FLINK-2329] [runtime] Introduces InstanceGateway as an abstraction to communicate with the TaskManager. [FLINK-2329] [runtime] Introduces InstanceGateway as an abstraction to communicate with the TaskManager. Replaces AkkaUtils.globalExecutionContext with instance dependent ExecutionContext. This closes #893 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2ccb5fdb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2ccb5fdb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2ccb5fdb Branch: refs/heads/master Commit: 2ccb5fdb47aa0e3766fd7fbd17a41feaca29fcbc Parents: aa5e5b3 Author: Till Rohrmann Authored: Tue Jul 7 11:41:44 2015 +0200 Committer: Stephan Ewen Committed: Mon Jul 13 17:54:31 2015 +0200 ---------------------------------------------------------------------- .../flink/runtime/executiongraph/Execution.java | 89 ++- .../runtime/executiongraph/ExecutionGraph.java | 55 +- .../runtime/executiongraph/ExecutionVertex.java | 40 +- .../runtime/instance/AkkaInstanceGateway.java | 111 ++++ .../apache/flink/runtime/instance/Instance.java | 34 +- .../flink/runtime/instance/InstanceGateway.java | 82 +++ .../flink/runtime/instance/InstanceManager.java | 3 +- .../runtime/io/network/NetworkEnvironment.java | 34 +- .../runtime/jobmanager/scheduler/Scheduler.java | 11 +- .../jobmanager/web/SetupInfoServlet.java | 3 +- .../web-docs-infoserver/js/taskmanager.js | 2 +- .../apache/flink/runtime/akka/AkkaUtils.scala | 2 - .../flink/runtime/jobmanager/JobManager.scala | 95 +++- .../runtime/minicluster/FlinkMiniCluster.scala | 20 +- .../flink/runtime/taskmanager/TaskManager.scala | 34 +- .../executiongraph/AllVerticesIteratorTest.java | 2 + .../ExecutionGraphConstructionTest.java | 54 +- .../ExecutionGraphDeploymentTest.java | 59 +- .../executiongraph/ExecutionGraphTestUtils.java | 121 ++-- .../ExecutionStateProgressTest.java | 31 +- .../ExecutionVertexCancelTest.java | 563 ++++++++----------- .../ExecutionVertexDeploymentTest.java | 115 +--- .../ExecutionVertexSchedulingTest.java | 35 +- .../executiongraph/LocalInputSplitsTest.java | 53 +- .../executiongraph/PointwisePatternTest.java | 50 +- .../TerminalStateDeadlockTest.java | 10 +- .../VertexLocationConstraintTest.java | 91 +-- .../executiongraph/VertexSlotSharingTest.java | 7 +- .../instance/BaseTestingInstanceGateway.java | 94 ++++ .../runtime/instance/DummyInstanceGateway.java | 57 ++ .../flink/runtime/instance/InstanceTest.java | 7 +- .../flink/runtime/instance/SimpleSlotTest.java | 4 +- .../io/network/NetworkEnvironmentTest.java | 6 +- .../ScheduleWithCoLocationHintTest.java | 37 +- .../scheduler/SchedulerIsolatedTasksTest.java | 31 +- .../scheduler/SchedulerSlotSharingTest.java | 51 +- .../scheduler/SchedulerTestUtils.java | 4 +- ...askManagerComponentsStartupShutdownTest.java | 6 +- .../ExecutionGraphRestartTest.scala | 40 +- .../TaskManagerLossFailsTasksTest.scala | 33 +- .../runtime/jobmanager/RecoveryITCase.scala | 8 +- .../runtime/testingUtils/TestingCluster.scala | 30 +- .../testingUtils/TestingJobManager.scala | 15 +- .../TestingJobManagerMessages.scala | 3 +- .../runtime/testingUtils/TestingUtils.scala | 62 +- .../apache/flink/test/util/TestBaseUtils.java | 6 +- .../test/util/ForkableFlinkMiniCluster.scala | 36 +- .../taskmanager/TaskManagerFailsITCase.scala | 12 +- .../apache/flink/yarn/ApplicationMaster.scala | 30 +- .../flink/yarn/ApplicationMasterActor.scala | 2 +- 50 files changed, 1399 insertions(+), 981 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 76a58e8..af67c3f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -18,13 +18,9 @@ package org.apache.flink.runtime.executiongraph; -import akka.actor.ActorRef; import akka.dispatch.OnComplete; import akka.dispatch.OnFailure; -import akka.pattern.Patterns; -import akka.util.Timeout; import org.apache.flink.runtime.JobException; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionLocation; @@ -32,6 +28,7 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.InstanceConnectionInfo; +import org.apache.flink.runtime.instance.InstanceGateway; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; @@ -50,6 +47,7 @@ import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.util.SerializedValue; import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; +import scala.concurrent.ExecutionContext; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -131,9 +129,20 @@ public class Execution implements Serializable { private SerializedValue> operatorState; + /** The execution context which is used to execute futures. */ + @SuppressWarnings("NonSerializableFieldInSerializableClass") + private ExecutionContext executionContext; + // -------------------------------------------------------------------------------------------- - public Execution(ExecutionVertex vertex, int attemptNumber, long startTimestamp, FiniteDuration timeout) { + public Execution( + ExecutionContext executionContext, + ExecutionVertex vertex, + int attemptNumber, + long startTimestamp, + FiniteDuration timeout) { + this.executionContext = checkNotNull(executionContext); + this.vertex = checkNotNull(vertex); this.attemptId = new ExecutionAttemptID(); @@ -200,6 +209,8 @@ public class Execution implements Serializable { } assignedResource = null; + executionContext = null; + partialInputChannelDeploymentDescriptors.clear(); partialInputChannelDeploymentDescriptors = null; } @@ -338,8 +349,9 @@ public class Execution implements Serializable { vertex.getExecutionGraph().registerExecution(this); final Instance instance = slot.getInstance(); - final Future deployAction = Patterns.ask(instance.getTaskManager(), - new SubmitTask(deployment), new Timeout(timeout)); + final InstanceGateway gateway = instance.getInstanceGateway(); + + final Future deployAction = gateway.ask(new SubmitTask(deployment), timeout); deployAction.onComplete(new OnComplete(){ @@ -366,7 +378,7 @@ public class Execution implements Serializable { } } } - }, AkkaUtils.globalExecutionContext()); + }, executionContext); } catch (Throwable t) { markFailed(t); @@ -402,7 +414,7 @@ public class Execution implements Serializable { else if (current == FINISHED || current == FAILED) { // nothing to do any more. finished failed before it could be cancelled. // in any case, the task is removed from the TaskManager already - sendFailIntermediateResultPartitionsRPCCall(); + sendFailIntermediateResultPartitionsRpcCall(); return; } @@ -485,7 +497,7 @@ public class Execution implements Serializable { return true; } - }, AkkaUtils.globalExecutionContext()); + }, executionContext); // double check to resolve race conditions if(consumerVertex.getExecutionState() == RUNNING){ @@ -533,7 +545,7 @@ public class Execution implements Serializable { final UpdatePartitionInfo updateTaskMessage = new UpdateTaskSinglePartitionInfo( consumer.getAttemptId(), partition.getIntermediateResult().getId(), descriptor); - sendUpdateTaskRpcCall(consumerSlot, updateTaskMessage); + sendUpdatePartitionInfoRpcCall(consumerSlot, updateTaskMessage); } // ---------------------------------------------------------------- // Consumer is scheduled or deploying => cache input channel @@ -689,11 +701,12 @@ public class Execution implements Serializable { inputChannelDeploymentDescriptors.add(partialInputChannelDeploymentDescriptor.createInputChannelDeploymentDescriptor(this)); } - UpdatePartitionInfo updateTaskMessage = - createUpdateTaskMultiplePartitionInfos(attemptId, resultIDs, - inputChannelDeploymentDescriptors); + UpdatePartitionInfo updateTaskMessage = createUpdateTaskMultiplePartitionInfos( + attemptId, + resultIDs, + inputChannelDeploymentDescriptors); - sendUpdateTaskRpcCall(assignedResource, updateTaskMessage); + sendUpdatePartitionInfoRpcCall(assignedResource, updateTaskMessage); } } @@ -804,14 +817,23 @@ public class Execution implements Serializable { } } + /** + * This method sends a CancelTask message to the instance of the assigned slot. + * + * The sending is tried up to NUM_CANCEL_CALL_TRIES times. + */ private void sendCancelRpcCall() { final SimpleSlot slot = this.assignedResource; if (slot != null) { - Future cancelResult = AkkaUtils.retry(slot.getInstance().getTaskManager(), new - CancelTask(attemptId), NUM_CANCEL_CALL_TRIES, - AkkaUtils.globalExecutionContext(), timeout); + final InstanceGateway gateway = slot.getInstance().getInstanceGateway(); + + Future cancelResult = gateway.retry( + new CancelTask(attemptId), + NUM_CANCEL_CALL_TRIES, + timeout, + executionContext); cancelResult.onComplete(new OnComplete() { @@ -827,35 +849,40 @@ public class Execution implements Serializable { } } } - }, AkkaUtils.globalExecutionContext()); + }, executionContext); } } - private void sendFailIntermediateResultPartitionsRPCCall() { + private void sendFailIntermediateResultPartitionsRpcCall() { final SimpleSlot slot = this.assignedResource; if (slot != null) { final Instance instance = slot.getInstance(); if (instance.isAlive()) { - try { - // TODO For some tests this could be a problem when querying too early if all resources were released - instance.getTaskManager().tell(new FailIntermediateResultPartitions(attemptId), ActorRef.noSender()); - } catch (Throwable t) { - fail(new Exception("Intermediate result partition could not be failed.", t)); - } + final InstanceGateway gateway = instance.getInstanceGateway(); + + // TODO For some tests this could be a problem when querying too early if all resources were released + gateway.tell(new FailIntermediateResultPartitions(attemptId)); } } } - private void sendUpdateTaskRpcCall(final SimpleSlot consumerSlot, - final UpdatePartitionInfo updateTaskMsg) { + /** + * Sends an UpdatePartitionInfo message to the instance of the consumerSlot. + * + * @param consumerSlot Slot to whose instance the message will be sent + * @param updatePartitionInfo UpdatePartitionInfo message + */ + private void sendUpdatePartitionInfoRpcCall( + final SimpleSlot consumerSlot, + final UpdatePartitionInfo updatePartitionInfo) { if (consumerSlot != null) { final Instance instance = consumerSlot.getInstance(); + final InstanceGateway gateway = instance.getInstanceGateway(); - Future futureUpdate = Patterns.ask(instance.getTaskManager(), updateTaskMsg, - new Timeout(timeout)); + Future futureUpdate = gateway.ask(updatePartitionInfo, timeout); futureUpdate.onFailure(new OnFailure() { @Override @@ -863,7 +890,7 @@ public class Execution implements Serializable { fail(new IllegalStateException("Update task on instance " + instance + " failed due to:", failure)); } - }, AkkaUtils.globalExecutionContext()); + }, executionContext); } } http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 84cbab7..47b7ae2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -24,7 +24,6 @@ import akka.actor.ActorSystem; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.JobException; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.execution.ExecutionState; @@ -45,6 +44,7 @@ import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.ExecutionContext; import scala.concurrent.duration.FiniteDuration; import java.io.Serializable; @@ -197,6 +197,10 @@ public class ExecutionGraph implements Serializable { @SuppressWarnings("NonSerializableFieldInSerializableClass") private CheckpointCoordinator checkpointCoordinator; + /** The execution context which is used to execute futures. */ + @SuppressWarnings("NonSerializableFieldInSerializableClass") + private ExecutionContext executionContext; + // ------ Fields that are only relevant for archived execution graphs ------------ private ExecutionConfig executionConfig; @@ -207,17 +211,38 @@ public class ExecutionGraph implements Serializable { /** * This constructor is for tests only, because it does not include class loading information. */ - ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, FiniteDuration timeout) { - this(jobId, jobName, jobConfig, timeout, new ArrayList(), ExecutionGraph.class.getClassLoader()); - } - - public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, FiniteDuration timeout, - List requiredJarFiles, ClassLoader userClassLoader) { - - if (jobId == null || jobName == null || jobConfig == null || userClassLoader == null) { + ExecutionGraph( + ExecutionContext executionContext, + JobID jobId, + String jobName, + Configuration jobConfig, + FiniteDuration timeout) { + this( + executionContext, + jobId, + jobName, + jobConfig, + timeout, + new ArrayList(), + ExecutionGraph.class.getClassLoader() + ); + } + + public ExecutionGraph( + ExecutionContext executionContext, + JobID jobId, + String jobName, + Configuration jobConfig, + FiniteDuration timeout, + List requiredJarFiles, + ClassLoader userClassLoader) { + + if (executionContext == null || jobId == null || jobName == null || jobConfig == null || userClassLoader == null) { throw new NullPointerException(); } + this.executionContext = executionContext; + this.jobID = jobId; this.jobName = jobName; this.jobConfiguration = jobConfig; @@ -451,6 +476,15 @@ public class ExecutionGraph implements Serializable { return this.stateTimestamps[status.ordinal()]; } + /** + * Returns the ExecutionContext associated with this ExecutionGraph. + * + * @return ExecutionContext associated with this ExecutionGraph + */ + public ExecutionContext getExecutionContext() { + return executionContext; + } + // -------------------------------------------------------------------------------------------- // Actions // -------------------------------------------------------------------------------------------- @@ -629,6 +663,7 @@ public class ExecutionGraph implements Serializable { userClassLoader = null; scheduler = null; checkpointCoordinator = null; + executionContext = null; for (ExecutionJobVertex vertex : verticesInCreationOrder) { vertex.prepareForArchiving(); @@ -719,7 +754,7 @@ public class ExecutionGraph implements Serializable { restart(); return null; } - }, AkkaUtils.globalExecutionContext()); + }, executionContext); break; } else if (numberOfRetriesLeft <= 0 && transitionState(current, JobStatus.FAILED, failureCause)) { http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index a70fa7d..f9001cf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.executiongraph; -import akka.actor.ActorRef; - import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; @@ -30,6 +28,7 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.InstanceConnectionInfo; +import org.apache.flink.runtime.instance.InstanceGateway; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.DistributionPattern; @@ -101,14 +100,20 @@ public class ExecutionVertex implements Serializable { // -------------------------------------------------------------------------------------------- - public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex, - IntermediateResult[] producedDataSets, FiniteDuration timeout) { + public ExecutionVertex( + ExecutionJobVertex jobVertex, + int subTaskIndex, + IntermediateResult[] producedDataSets, + FiniteDuration timeout) { this(jobVertex, subTaskIndex, producedDataSets, timeout, System.currentTimeMillis()); } - public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex, - IntermediateResult[] producedDataSets, FiniteDuration timeout, - long createTimestamp) { + public ExecutionVertex( + ExecutionJobVertex jobVertex, + int subTaskIndex, + IntermediateResult[] producedDataSets, + FiniteDuration timeout, + long createTimestamp) { this.jobVertex = jobVertex; this.subTaskIndex = subTaskIndex; @@ -125,7 +130,12 @@ public class ExecutionVertex implements Serializable { this.priorExecutions = new CopyOnWriteArrayList(); - this.currentExecution = new Execution(this, 0, createTimestamp, timeout); + this.currentExecution = new Execution( + getExecutionGraph().getExecutionContext(), + this, + 0, + createTimestamp, + timeout); // create a co-location scheduling hint, if necessary CoLocationGroup clg = jobVertex.getCoLocationGroup(); @@ -416,8 +426,12 @@ public class ExecutionVertex implements Serializable { if (state == FINISHED || state == CANCELED || state == FAILED) { priorExecutions.add(execution); - currentExecution = new Execution(this, execution.getAttemptNumber()+1, - System.currentTimeMillis(), timeout); + currentExecution = new Execution( + getExecutionGraph().getExecutionContext(), + this, + execution.getAttemptNumber()+1, + System.currentTimeMillis(), + timeout); CoLocationGroup grp = jobVertex.getCoLocationGroup(); if (grp != null) { @@ -455,9 +469,9 @@ public class ExecutionVertex implements Serializable { // send only if we actually have a target if (slot != null) { - ActorRef taskManager = slot.getInstance().getTaskManager(); - if (taskManager != null) { - taskManager.tell(message, ActorRef.noSender()); + InstanceGateway gateway = slot.getInstance().getInstanceGateway(); + if (gateway != null) { + gateway.tell(message); } } else { http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaInstanceGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaInstanceGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaInstanceGateway.java new file mode 100644 index 0000000..b7d60c5 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaInstanceGateway.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.instance; + +import akka.actor.ActorRef; +import akka.pattern.Patterns; +import akka.util.Timeout; +import org.apache.flink.runtime.akka.AkkaUtils; +import scala.concurrent.ExecutionContext; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +/** + * InstanceGateway implementation which uses Akka to communicate with remote instances. + */ +public class AkkaInstanceGateway implements InstanceGateway { + + /** ActorRef of the remote instance */ + private final ActorRef taskManager; + + public AkkaInstanceGateway(ActorRef taskManager) { + this.taskManager = taskManager; + } + + /** + * Sends a message asynchronously and returns its response. The response to the message is + * returned as a future. + * + * @param message Message to be sent + * @param timeout Timeout until the Future is completed with an AskTimeoutException + * @return Future which contains the response to the sent message + */ + @Override + public Future ask(Object message, FiniteDuration timeout) { + return Patterns.ask(taskManager, message, new Timeout(timeout)); + } + + /** + * Sends a message asynchronously without a result. + * + * @param message Message to be sent + */ + @Override + public void tell(Object message) { + taskManager.tell(message, ActorRef.noSender()); + } + + /** + * Forwards a message. For the receiver of this message it looks as if sender has sent the + * message. + * + * @param message Message to be sent + * @param sender Sender of the forwarded message + */ + @Override + public void forward(Object message, ActorRef sender) { + taskManager.tell(message, sender); + } + + /** + * Retries to send asynchronously a message up to numberRetries times. The response to this + * message is returned as a future. The message is re-sent if the number of retries is not yet + * exceeded and if an exception occurred while sending it. + * + * @param message Message to be sent + * @param numberRetries Number of times to retry sending the message + * @param timeout Timeout for each sending attempt + * @param executionContext ExecutionContext which is used to send the message multiple times + * @return Future of the response to the sent message + */ + @Override + public Future retry( + Object message, + int numberRetries, + FiniteDuration timeout, + ExecutionContext executionContext) { + + return AkkaUtils.retry( + taskManager, + message, + numberRetries, + executionContext, + timeout); + } + + /** + * Returns the ActorPath of the remote instance. + * + * @return ActorPath of the remote instance. + */ + @Override + public String path() { + return taskManager.path().toString(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java index 39caf08..1c44b5d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java @@ -25,8 +25,6 @@ import java.util.List; import java.util.Queue; import java.util.Set; -import akka.actor.ActorRef; - import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener; import org.slf4j.Logger; @@ -43,8 +41,8 @@ public class Instance { /** The lock on which to synchronize allocations and failure state changes */ private final Object instanceLock = new Object(); - /** The actor ref to the task manager represented by this taskManager. */ - private final ActorRef taskManager; + /** The instacne gateway to communicate with the instance */ + private final InstanceGateway instanceGateway; /** The instance connection information for the data transfer. */ private final InstanceConnectionInfo connectionInfo; @@ -81,15 +79,19 @@ public class Instance { /** * Constructs an instance reflecting a registered TaskManager. * - * @param taskManager The actor reference of the represented task manager. + * @param instanceGateway The instance gateway to communicate with the remote instance * @param connectionInfo The remote connection where the task manager receives requests. * @param id The id under which the taskManager is registered. * @param resources The resources available on the machine. * @param numberOfSlots The number of task slots offered by this taskManager. */ - public Instance(ActorRef taskManager, InstanceConnectionInfo connectionInfo, InstanceID id, - HardwareDescription resources, int numberOfSlots) { - this.taskManager = taskManager; + public Instance( + InstanceGateway instanceGateway, + InstanceConnectionInfo connectionInfo, + InstanceID id, + HardwareDescription resources, + int numberOfSlots) { + this.instanceGateway = instanceGateway; this.connectionInfo = connectionInfo; this.instanceId = id; this.resources = resources; @@ -327,12 +329,14 @@ public class Instance { } } - public ActorRef getTaskManager() { - return taskManager; - } - - public String getPath(){ - return taskManager.path().toString(); + /** + * Returns the InstanceGateway of this Instance. This gateway can be used to communicate with + * it. + * + * @return InstanceGateway associated with this instance + */ + public InstanceGateway getInstanceGateway() { + return instanceGateway; } public InstanceConnectionInfo getInstanceConnectionInfo() { @@ -386,6 +390,6 @@ public class Instance { @Override public String toString() { return String.format("%s @ %s - %d slots - URL: %s", instanceId, connectionInfo.getHostname(), - numberOfSlots, (taskManager != null ? taskManager.path() : "ActorRef.noSender")); + numberOfSlots, (instanceGateway != null ? instanceGateway.path() : "No instance gateway")); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceGateway.java new file mode 100644 index 0000000..a30b2f6 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceGateway.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.instance; + +import akka.actor.ActorRef; +import scala.concurrent.ExecutionContext; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +/** + * Interface to abstract the communication with an Instance. + * + * It allows to avoid direct interaction with an ActorRef. + */ +public interface InstanceGateway { + + /** + * Sends a message asynchronously and returns its response. The response to the message is + * returned as a future. + * + * @param message Message to be sent + * @param timeout Timeout until the Future is completed with an AskTimeoutException + * @return Future which contains the response to the sent message + */ + Future ask(Object message, FiniteDuration timeout); + + /** + * Sends a message asynchronously without a result. + * + * @param message Message to be sent + */ + void tell(Object message); + + /** + * Forwards a message. For the receiver of this message it looks as if sender has sent the + * message. + * + * @param message Message to be sent + * @param sender Sender of the forwarded message + */ + void forward(Object message, ActorRef sender); + + /** + * Retries to send asynchronously a message up to numberRetries times. The response to this + * message is returned as a future. The message is re-sent if the number of retries is not yet + * exceeded and if an exception occurred while sending it. + * + * @param message Message to be sent + * @param numberRetries Number of times to retry sending the message + * @param timeout Timeout for each sending attempt + * @param executionContext ExecutionContext which is used to send the message multiple times + * @return Future of the response to the sent message + */ + Future retry( + Object message, + int numberRetries, + FiniteDuration timeout, + ExecutionContext executionContext); + + /** + * Returns the path of the remote instance. + * + * @return Path of the remote instance. + */ + String path(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java index c1800bd..4f6c7a6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java @@ -149,8 +149,9 @@ public class InstanceManager { id = new InstanceID(); } while (registeredHostsById.containsKey(id)); + InstanceGateway instanceGateway = new AkkaInstanceGateway(taskManager); - Instance host = new Instance(taskManager, connectionInfo, id, resources, numberOfSlots); + Instance host = new Instance(instanceGateway, connectionInfo, id, resources, numberOfSlots); registeredHostsById.put(id, host); registeredHostsByConnection.put(taskManager, host); http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index c082c6a..0ffc889 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -23,7 +23,6 @@ import akka.dispatch.OnFailure; import akka.pattern.Patterns; import akka.util.Timeout; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; @@ -46,6 +45,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Option; import scala.Tuple2; +import scala.concurrent.ExecutionContext; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -89,11 +89,20 @@ public class NetworkEnvironment { private boolean isShutdown; /** + * ExecutionEnvironment which is used to execute remote calls with the + * {@link JobManagerResultPartitionConsumableNotifier} + */ + private final ExecutionContext executionContext; + + /** * Initializes all network I/O components. */ - public NetworkEnvironment(FiniteDuration jobManagerTimeout, - NetworkEnvironmentConfiguration config) throws IOException { + public NetworkEnvironment( + ExecutionContext executionContext, + FiniteDuration jobManagerTimeout, + NetworkEnvironmentConfiguration config) throws IOException { + this.executionContext = executionContext; this.configuration = checkNotNull(config); this.jobManagerTimeout = checkNotNull(jobManagerTimeout); @@ -182,7 +191,10 @@ public class NetworkEnvironment { this.partitionManager = new ResultPartitionManager(); this.taskEventDispatcher = new TaskEventDispatcher(); this.partitionConsumableNotifier = new JobManagerResultPartitionConsumableNotifier( - jobManagerRef, taskManagerRef, new Timeout(jobManagerTimeout)); + executionContext, + jobManagerRef, + taskManagerRef, + new Timeout(jobManagerTimeout)); this.partitionStateChecker = new JobManagerPartitionStateChecker( jobManagerRef, taskManagerRef); @@ -414,6 +426,12 @@ public class NetworkEnvironment { */ private static class JobManagerResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier { + /** + * {@link ExecutionContext} which is used for the failure handler of {@link ScheduleOrUpdateConsumers} + * messages. + */ + private final ExecutionContext executionContext; + private final ActorRef jobManager; private final ActorRef taskManager; @@ -421,8 +439,12 @@ public class NetworkEnvironment { private final Timeout jobManagerMessageTimeout; public JobManagerResultPartitionConsumableNotifier( - ActorRef jobManager, ActorRef taskManager, Timeout jobManagerMessageTimeout) { + ExecutionContext executionContext, + ActorRef jobManager, + ActorRef taskManager, + Timeout jobManagerMessageTimeout) { + this.executionContext = executionContext; this.jobManager = jobManager; this.taskManager = taskManager; this.jobManagerMessageTimeout = jobManagerMessageTimeout; @@ -448,7 +470,7 @@ public class NetworkEnvironment { taskManager.tell(failMsg, ActorRef.noSender()); } - }, AkkaUtils.globalExecutionContext()); + }, executionContext); } } http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java index 940082e..cb99e52 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java @@ -37,7 +37,6 @@ import akka.dispatch.Futures; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.instance.SlotSharingGroupAssignment; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.instance.SharedSlot; @@ -50,6 +49,7 @@ import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.ExecutionContext; /** * The scheduler is responsible for distributing the ready-to-run tasks among instances and slots. @@ -95,12 +95,17 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { /** The number of slot allocations where locality could not be respected */ private int nonLocalizedAssignments; + /** The ExecutionContext which is used to execute newSlotAvailable futures. */ + private final ExecutionContext executionContext; + // ------------------------------------------------------------------------ /** * Creates a new scheduler. */ - public Scheduler() {} + public Scheduler(ExecutionContext executionContext) { + this.executionContext = executionContext; + } /** * Shuts the scheduler down. After shut down no more tasks can be added to the scheduler. @@ -519,7 +524,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { handleNewSlot(); return null; } - }, AkkaUtils.globalExecutionContext()); + }, executionContext); } private void handleNewSlot() { http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java index 4e028d4..567d15a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java @@ -146,8 +146,7 @@ public class SetupInfoServlet extends HttpServlet { long time = new Date().getTime() - instance.getLastHeartBeat(); try { - objInner.put("inetAdress", instance.getInstanceConnectionInfo().getInetAdress()); - objInner.put("ipcPort", instance.getTaskManager().path().address().hostPort()); + objInner.put("path", instance.getInstanceGateway().path()); objInner.put("dataPort", instance.getInstanceConnectionInfo().dataPort()); objInner.put("timeSinceLastHeartbeat", time / 1000); objInner.put("slotsNumber", instance.getTotalNumberOfSlots()); http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/resources/web-docs-infoserver/js/taskmanager.js ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/resources/web-docs-infoserver/js/taskmanager.js b/flink-runtime/src/main/resources/web-docs-infoserver/js/taskmanager.js index 1ea9a41..68e4278 100644 --- a/flink-runtime/src/main/resources/web-docs-infoserver/js/taskmanager.js +++ b/flink-runtime/src/main/resources/web-docs-infoserver/js/taskmanager.js @@ -224,7 +224,7 @@ function processTMdata(json) { ""; var content = "" + - ""+tm.inetAdress+"
IPC Port: "+tm.ipcPort+", Data Port: "+tm.dataPort+"" + // first row: TaskManager + ""+tm.path+"
Data Port: "+tm.dataPort+"" + // first row: TaskManager ""+tmMemoryBox+"" + // second row: memory statistics "Loading Information" + // Information ""; http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index 7ffaddd..d38e503 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -41,8 +41,6 @@ object AkkaUtils { val INF_TIMEOUT = 21474835 seconds - var globalExecutionContext: ExecutionContext = ExecutionContext.global - /** * Creates a local actor system without remoting. * http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index dc1599a..3b4ce15 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -60,6 +60,7 @@ import org.apache.flink.util.{ExceptionUtils, InstantiationUtil} import scala.collection.JavaConverters._ import scala.concurrent._ import scala.concurrent.duration._ +import scala.concurrent.forkjoin.ForkJoinPool import scala.language.postfixOps /** @@ -89,16 +90,18 @@ import scala.language.postfixOps * - [[JobStatusChanged]] indicates that the status of job (RUNNING, CANCELING, FINISHED, etc.) has * changed. This message is sent by the ExecutionGraph. */ -class JobManager(protected val flinkConfiguration: Configuration, - protected val instanceManager: InstanceManager, - protected val scheduler: FlinkScheduler, - protected val libraryCacheManager: BlobLibraryCacheManager, - protected val archive: ActorRef, - protected val accumulatorManager: AccumulatorManager, - protected val defaultExecutionRetries: Int, - protected val delayBetweenRetries: Long, - protected val timeout: FiniteDuration, - protected val mode: StreamingMode) +class JobManager( + protected val flinkConfiguration: Configuration, + protected val executionContext: ExecutionContext, + protected val instanceManager: InstanceManager, + protected val scheduler: FlinkScheduler, + protected val libraryCacheManager: BlobLibraryCacheManager, + protected val archive: ActorRef, + protected val accumulatorManager: AccumulatorManager, + protected val defaultExecutionRetries: Int, + protected val delayBetweenRetries: Long, + protected val timeout: FiniteDuration, + protected val mode: StreamingMode) extends Actor with ActorLogMessages with ActorSynchronousLogging { /** List of current jobs running jobs */ @@ -117,7 +120,7 @@ class JobManager(protected val flinkConfiguration: Configuration, // disconnect the registered task managers instanceManager.getAllRegisteredInstances.asScala.foreach { - _.getTaskManager ! Disconnect("JobManager is shutting down") + _.getInstanceGateway().tell(Disconnect("JobManager is shutting down")) } archive ! PoisonPill @@ -136,7 +139,6 @@ class JobManager(protected val flinkConfiguration: Configuration, } log.debug(s"Job manager ${self.path} is completely stopped.") - } /** @@ -411,8 +413,8 @@ class JobManager(protected val flinkConfiguration: Configuration, case message: AccumulatorMessage => handleAccumulatorMessage(message) case RequestStackTrace(instanceID) => - val taskManager = instanceManager.getRegisteredInstanceById(instanceID).getTaskManager - taskManager forward SendStackTrace + val gateway = instanceManager.getRegisteredInstanceById(instanceID).getInstanceGateway + gateway.forward(SendStackTrace, sender) case Terminated(taskManager) => if (instanceManager.isRegistered(taskManager)) { @@ -480,10 +482,18 @@ class JobManager(protected val flinkConfiguration: Configuration, } // see if there already exists an ExecutionGraph for the corresponding job ID - executionGraph = currentJobs.getOrElseUpdate(jobGraph.getJobID, - (new ExecutionGraph(jobGraph.getJobID, jobGraph.getName, - jobGraph.getJobConfiguration, timeout, jobGraph.getUserJarBlobKeys, userCodeLoader), - JobInfo(sender(), System.currentTimeMillis())))._1 + executionGraph = currentJobs.getOrElseUpdate( + jobGraph.getJobID, + (new ExecutionGraph( + executionContext, + jobGraph.getJobID, + jobGraph.getName, + jobGraph.getJobConfiguration, + timeout, + jobGraph.getUserJarBlobKeys, + userCodeLoader), + JobInfo(sender(), System.currentTimeMillis())) + )._1 // configure the execution graph val jobNumberRetries = if (jobGraph.getNumberOfExecutionRetries >= 0) { @@ -1046,8 +1056,8 @@ object JobManager { * @param configuration The configuration from which to parse the config values. * @return The members for a default JobManager. */ - def createJobManagerComponents(configuration: Configuration) : - (InstanceManager, FlinkScheduler, BlobLibraryCacheManager, + def createJobManagerComponents(configuration: Configuration) + : (ExecutionContext, InstanceManager, FlinkScheduler, BlobLibraryCacheManager, Props, AccumulatorManager, Int, Long, FiniteDuration, Int) = { val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration) @@ -1083,6 +1093,8 @@ object JobManager { val accumulatorManager: AccumulatorManager = new AccumulatorManager(Math.min(1, archiveCount)) + val executionContext = ExecutionContext.fromExecutor(new ForkJoinPool()) + var blobServer: BlobServer = null var instanceManager: InstanceManager = null var scheduler: FlinkScheduler = null @@ -1091,7 +1103,7 @@ object JobManager { try { blobServer = new BlobServer(configuration) instanceManager = new InstanceManager() - scheduler = new FlinkScheduler() + scheduler = new FlinkScheduler(executionContext) libraryCacheManager = new BlobLibraryCacheManager(blobServer, cleanupInterval) instanceManager.addInstanceListener(scheduler) @@ -1114,8 +1126,16 @@ object JobManager { } } - (instanceManager, scheduler, libraryCacheManager, archiveProps, accumulatorManager, - executionRetries, delayBetweenRetries, timeout, archiveCount) + (executionContext, + instanceManager, + scheduler, + libraryCacheManager, + archiveProps, + accumulatorManager, + executionRetries, + delayBetweenRetries, + timeout, + archiveCount) } /** @@ -1154,9 +1174,16 @@ object JobManager { archiverActorName: Option[String], streamingMode: StreamingMode): (ActorRef, ActorRef) = { - val (instanceManager, scheduler, libraryCacheManager, archiveProps, accumulatorManager, - executionRetries, delayBetweenRetries, - timeout, _) = createJobManagerComponents(configuration) + val (executionContext, + instanceManager, + scheduler, + libraryCacheManager, + archiveProps, + accumulatorManager, + executionRetries, + delayBetweenRetries, + timeout, + _) = createJobManagerComponents(configuration) // start the archiver with the given name, or without (avoid name conflicts) val archiver: ActorRef = archiverActorName match { @@ -1164,9 +1191,19 @@ object JobManager { case None => actorSystem.actorOf(archiveProps) } - val jobManagerProps = Props(classOf[JobManager], configuration, instanceManager, scheduler, - libraryCacheManager, archiver, accumulatorManager, executionRetries, - delayBetweenRetries, timeout, streamingMode) + val jobManagerProps = Props( + classOf[JobManager], + configuration, + executionContext, + instanceManager, + scheduler, + libraryCacheManager, + archiver, + accumulatorManager, + executionRetries, + delayBetweenRetries, + timeout, + streamingMode) val jobManager: ActorRef = jobMangerActorName match { case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName) http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index 73a37de..49c701e 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -34,7 +34,7 @@ import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegistere import org.slf4j.LoggerFactory import scala.concurrent.duration.FiniteDuration -import scala.concurrent.{Future, Await} +import scala.concurrent.{ExecutionContext, Future, Await} /** * Abstract base class for Flink's mini cluster. The mini cluster starts a @@ -48,9 +48,10 @@ import scala.concurrent.{Future, Await} * @param streamingMode True, if the system should be started in streaming mode, false if * in pure batch mode. */ -abstract class FlinkMiniCluster(val userConfiguration: Configuration, - val singleActorSystem: Boolean, - val streamingMode: StreamingMode) { +abstract class FlinkMiniCluster( + val userConfiguration: Configuration, + val singleActorSystem: Boolean, + val streamingMode: StreamingMode) { def this(userConfiguration: Configuration, singleActorSystem: Boolean) = this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY) @@ -157,7 +158,7 @@ abstract class FlinkMiniCluster(val userConfiguration: Configuration, val future = gracefulStop(jobManagerActor, timeout) - implicit val executionContext = AkkaUtils.globalExecutionContext + implicit val executionContext = ExecutionContext.global Await.ready(Future.sequence(future +: futures), timeout) @@ -179,7 +180,7 @@ abstract class FlinkMiniCluster(val userConfiguration: Configuration, } def waitForTaskManagersToBeRegistered(): Unit = { - implicit val executionContext = AkkaUtils.globalExecutionContext + implicit val executionContext = ExecutionContext.global val futures = taskManagerActors map { taskManager => (taskManager ? NotifyWhenRegisteredAtJobManager)(timeout) @@ -196,8 +197,11 @@ abstract class FlinkMiniCluster(val userConfiguration: Configuration, } @throws(classOf[JobExecutionException]) - def submitJobAndWait(jobGraph: JobGraph, printUpdates: Boolean, timeout: FiniteDuration) - : SerializedJobExecutionResult = { + def submitJobAndWait( + jobGraph: JobGraph, + printUpdates: Boolean, + timeout: FiniteDuration) + : SerializedJobExecutionResult = { val clientActorSystem = if (singleActorSystem) jobManagerActorSystem else JobClient.startJobClientActorSystem(configuration) http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 520decd..44a0b04 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -65,6 +65,7 @@ import org.apache.flink.runtime.util.{ZooKeeperUtil, MathUtils, EnvironmentInfor import scala.concurrent._ import scala.concurrent.duration._ +import scala.concurrent.forkjoin.ForkJoinPool import scala.util.{Failure, Success} import scala.collection.JavaConverters._ @@ -1363,14 +1364,16 @@ object TaskManager { @throws(classOf[IllegalConfigurationException]) @throws(classOf[IOException]) @throws(classOf[Exception]) - def startTaskManagerComponentsAndActor(configuration: Configuration, - actorSystem: ActorSystem, - taskManagerHostname: String, - taskManagerActorName: Option[String], - jobManagerPath: Option[String], - localTaskManagerCommunication: Boolean, - streamingMode: StreamingMode, - taskManagerClass: Class[_ <: TaskManager]): ActorRef = { + def startTaskManagerComponentsAndActor( + configuration: Configuration, + actorSystem: ActorSystem, + taskManagerHostname: String, + taskManagerActorName: Option[String], + jobManagerPath: Option[String], + localTaskManagerCommunication: Boolean, + streamingMode: StreamingMode, + taskManagerClass: Class[_ <: TaskManager]) + : ActorRef = { // get and check the JobManager config val jobManagerAkkaUrl: String = jobManagerPath.getOrElse { @@ -1380,17 +1383,20 @@ object TaskManager { } val (taskManagerConfig : TaskManagerConfiguration, - netConfig: NetworkEnvironmentConfiguration, - connectionInfo: InstanceConnectionInfo) - - = parseTaskManagerConfiguration(configuration, taskManagerHostname, - localTaskManagerCommunication) + netConfig: NetworkEnvironmentConfiguration, + connectionInfo: InstanceConnectionInfo + ) = parseTaskManagerConfiguration( + configuration, + taskManagerHostname, + localTaskManagerCommunication) // pre-start checks checkTempDirs(taskManagerConfig.tmpDirPaths) + val executionContext = ExecutionContext.fromExecutor(new ForkJoinPool()) + // we start the network first, to make sure it can allocate its buffers first - val network = new NetworkEnvironment(taskManagerConfig.timeout, netConfig) + val network = new NetworkEnvironment(executionContext, taskManagerConfig.timeout, netConfig) // computing the amount of memory to use depends on how much memory is available // it strictly needs to happen AFTER the network stack has been initialized http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java index 693e014..1e66d81 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java @@ -22,6 +22,7 @@ import java.util.Arrays; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.testingUtils.TestingUtils; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -43,6 +44,7 @@ public class AllVerticesIteratorTest { v4.setParallelism(2); ExecutionGraph eg = Mockito.mock(ExecutionGraph.class); + Mockito.when(eg.getExecutionContext()).thenReturn(TestingUtils.directExecutionContext()); ExecutionJobVertex ejv1 = new ExecutionJobVertex(eg, v1, 1, AkkaUtils.getDefaultTimeout()); http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java index a4bd03c..a47ea77 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.when; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.testingUtils.TestingUtils; import org.junit.Test; import org.mockito.Matchers; @@ -99,7 +100,12 @@ public class ExecutionGraphConstructionTest { List ordered = new ArrayList(Arrays.asList(v1, v2, v3, v4, v5)); - ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout()); + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + jobId, + jobName, + cfg, + AkkaUtils.getDefaultTimeout()); try { eg.attachJobGraph(ordered); } @@ -137,7 +143,12 @@ public class ExecutionGraphConstructionTest { List ordered = new ArrayList(Arrays.asList(v1, v2, v3)); - ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout()); + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + jobId, + jobName, + cfg, + AkkaUtils.getDefaultTimeout()); try { eg.attachJobGraph(ordered); } @@ -198,7 +209,12 @@ public class ExecutionGraphConstructionTest { List ordered = new ArrayList(Arrays.asList(v1, v2, v3)); - ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout()); + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + jobId, + jobName, + cfg, + AkkaUtils.getDefaultTimeout()); try { eg.attachJobGraph(ordered); } @@ -446,7 +462,12 @@ public class ExecutionGraphConstructionTest { List ordered = new ArrayList(Arrays.asList(v1)); - ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout()); + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + jobId, + jobName, + cfg, + AkkaUtils.getDefaultTimeout()); try { eg.attachJobGraph(ordered); } @@ -496,7 +517,12 @@ public class ExecutionGraphConstructionTest { List ordered = new ArrayList(Arrays.asList(v1, v2, v3, v5, v4)); - ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, AkkaUtils.getDefaultTimeout()); + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + jobId, + jobName, + cfg, + AkkaUtils.getDefaultTimeout()); try { eg.attachJobGraph(ordered); fail("Attached wrong jobgraph"); @@ -551,7 +577,11 @@ public class ExecutionGraphConstructionTest { List ordered = new ArrayList(Arrays.asList(v1, v2, v3, v4, v5)); - ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + jobId, + jobName, + cfg, AkkaUtils.getDefaultTimeout()); try { eg.attachJobGraph(ordered); @@ -591,7 +621,11 @@ public class ExecutionGraphConstructionTest { List ordered = new ArrayList(Arrays.asList(v1, v2, v3)); - ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + jobId, + jobName, + cfg, AkkaUtils.getDefaultTimeout()); try { @@ -657,7 +691,11 @@ public class ExecutionGraphConstructionTest { JobGraph jg = new JobGraph(jobId, jobName, v1, v2, v3, v4, v5, v6, v7, v8); - ExecutionGraph eg = new ExecutionGraph(jobId, jobName, cfg, + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + jobId, + jobName, + cfg, AkkaUtils.getDefaultTimeout()); eg.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources()); http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index 03a41b4..cff7146 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -30,12 +30,6 @@ import java.util.Comparator; import java.util.List; import java.util.Map; -import akka.actor.Actor; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Props; -import akka.testkit.JavaTestKit; -import akka.testkit.TestActorRef; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; @@ -51,29 +45,13 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.operators.RegularPactTask; import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; public class ExecutionGraphDeploymentTest { - private static ActorSystem system; - - @BeforeClass - public static void setup() { - system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig()); - } - - @AfterClass - public static void teardown() { - JavaTestKit.shutdownActorSystem(system); - system = null; - } - @Test public void testBuildDeploymentDescriptor() { try { - TestingUtils.setCallingThreadDispatcher(system); final JobID jobId = new JobID(); final JobVertexID jid1 = new JobVertexID(); @@ -100,7 +78,11 @@ public class ExecutionGraphDeploymentTest { v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL); v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL); - ExecutionGraph eg = new ExecutionGraph(jobId, "some job", new Configuration(), + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + jobId, + "some job", + new Configuration(), AkkaUtils.getDefaultTimeout()); List ordered = Arrays.asList(v1, v2, v3, v4); @@ -110,15 +92,9 @@ public class ExecutionGraphDeploymentTest { ExecutionJobVertex ejv = eg.getAllVertices().get(jid2); ExecutionVertex vertex = ejv.getTaskVertices()[3]; - // create synchronous task manager - final TestActorRef simpleTaskManager = TestActorRef.create(system, - Props.create(ExecutionGraphTestUtils - .SimpleAcknowledgingTaskManager.class)); - - ExecutionGraphTestUtils.SimpleAcknowledgingTaskManager tm = (ExecutionGraphTestUtils - .SimpleAcknowledgingTaskManager) simpleTaskManager.underlyingActor(); + ExecutionGraphTestUtils.SimpleInstanceGateway instanceGateway = new ExecutionGraphTestUtils.SimpleInstanceGateway(TestingUtils.directExecutionContext()); - final Instance instance = getInstance(simpleTaskManager); + final Instance instance = getInstance(instanceGateway); final SimpleSlot slot = instance.allocateSimpleSlot(jobId); @@ -128,7 +104,7 @@ public class ExecutionGraphDeploymentTest { assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState()); - TaskDeploymentDescriptor descr = tm.lastTDD; + TaskDeploymentDescriptor descr = instanceGateway.lastTDD; assertNotNull(descr); assertEquals(jobId, descr.getJobID()); @@ -152,9 +128,6 @@ public class ExecutionGraphDeploymentTest { e.printStackTrace(); fail(e.getMessage()); } - finally { - TestingUtils.setGlobalExecutionContext(); - } } @Test @@ -307,19 +280,23 @@ public class ExecutionGraphDeploymentTest { v2.setInvokableClass(RegularPactTask.class); // execution graph that executes actions synchronously - ExecutionGraph eg = new ExecutionGraph(jobId, "some job", new Configuration(), + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.directExecutionContext(), + jobId, + "some job", + new Configuration(), AkkaUtils.getDefaultTimeout()); eg.setQueuedSchedulingAllowed(false); List ordered = Arrays.asList(v1, v2); eg.attachJobGraph(ordered); - // create a mock taskmanager that accepts deployment calls - ActorRef tm = system.actorOf(Props.create(ExecutionGraphTestUtils.SimpleAcknowledgingTaskManager.class)); - - Scheduler scheduler = new Scheduler(); + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); for (int i = 0; i < dop1 + dop2; i++) { - scheduler.newInstanceAvailable(ExecutionGraphTestUtils.getInstance(tm)); + scheduler.newInstanceAvailable( + ExecutionGraphTestUtils.getInstance( + new ExecutionGraphTestUtils.SimpleInstanceGateway( + TestingUtils.directExecutionContext()))); } assertEquals(dop1 + dop2, scheduler.getNumberOfAvailableSlots()); http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index a77a09e..8a63060 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -26,17 +26,16 @@ import java.lang.reflect.Field; import java.net.InetAddress; import java.util.LinkedList; -import akka.actor.ActorRef; -import akka.actor.Status; -import akka.actor.UntypedActor; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.instance.BaseTestingInstanceGateway; import org.apache.flink.runtime.instance.HardwareDescription; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.InstanceConnectionInfo; +import org.apache.flink.runtime.instance.InstanceGateway; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.jobgraph.JobVertex; @@ -49,9 +48,11 @@ import org.apache.flink.runtime.messages.TaskMessages.SubmitTask; import org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPartitions; import org.apache.flink.runtime.messages.TaskMessages.CancelTask; import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult; +import org.apache.flink.runtime.testingUtils.TestingUtils; import org.mockito.Matchers; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import scala.concurrent.ExecutionContext; public class ExecutionGraphTestUtils { @@ -100,103 +101,95 @@ public class ExecutionGraphTestUtils { // utility mocking methods // -------------------------------------------------------------------------------------------- - public static Instance getInstance(final ActorRef taskManager) throws - Exception { - return getInstance(taskManager, 1); + public static Instance getInstance(final InstanceGateway gateway) throws Exception { + return getInstance(gateway, 1); } - public static Instance getInstance(final ActorRef taskManager, final int numberOfSlots) throws Exception { + public static Instance getInstance(final InstanceGateway gateway, final int numberOfSlots) throws Exception { HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024); InetAddress address = InetAddress.getByName("127.0.0.1"); InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001); - - return new Instance(taskManager, connection, new InstanceID(), hardwareDescription, numberOfSlots); + + return new Instance(gateway, connection, new InstanceID(), hardwareDescription, numberOfSlots); } - public static class SimpleAcknowledgingTaskManager extends UntypedActor { + public static class SimpleInstanceGateway extends BaseTestingInstanceGateway { public TaskDeploymentDescriptor lastTDD; + + public SimpleInstanceGateway(ExecutionContext executionContext){ + super(executionContext); + } + @Override - public void onReceive(Object msg) throws Exception { - if (msg instanceof SubmitTask) { - SubmitTask submitTask = (SubmitTask) msg; + public Object handleMessage(Object message) { + Object result = null; + if(message instanceof SubmitTask) { + SubmitTask submitTask = (SubmitTask) message; lastTDD = submitTask.tasks(); - getSender().tell(Messages.getAcknowledge(), getSelf()); - } else if (msg instanceof CancelTask) { - CancelTask cancelTask = (CancelTask) msg; - getSender().tell(new TaskOperationResult(cancelTask.attemptID(), true), getSelf()); - } - else if (msg instanceof FailIntermediateResultPartitions) { - getSender().tell(new Object(), getSelf()); + result = Messages.getAcknowledge(); + } else if(message instanceof CancelTask) { + CancelTask cancelTask = (CancelTask) message; + + result = new TaskOperationResult(cancelTask.attemptID(), true); + } else if(message instanceof FailIntermediateResultPartitions) { + result = new Object(); } + + return result; } } - public static final String ERROR_MESSAGE = "test_failure_error_message"; + public static class SimpleFailingInstanceGateway extends BaseTestingInstanceGateway { + public SimpleFailingInstanceGateway(ExecutionContext executionContext) { + super(executionContext); + } - public static class SimpleFailingTaskManager extends UntypedActor { @Override - public void onReceive(Object msg) throws Exception { - if (msg instanceof SubmitTask) { - getSender().tell(new Status.Failure(new Exception(ERROR_MESSAGE)), getSelf()); - } else if (msg instanceof CancelTask) { - CancelTask cancelTask = (CancelTask) msg; - getSender().tell(new TaskOperationResult(cancelTask.attemptID(), true), getSelf()); + public Object handleMessage(Object message) throws Exception { + if(message instanceof SubmitTask) { + throw new Exception(ERROR_MESSAGE); + } else if (message instanceof CancelTask) { + CancelTask cancelTask = (CancelTask) message; + + return new TaskOperationResult(cancelTask.attemptID(), true); + } else { + return null; } } } - - public static ExecutionJobVertex getExecutionVertex(JobVertexID id) throws JobException { + + public static final String ERROR_MESSAGE = "test_failure_error_message"; + + public static ExecutionJobVertex getExecutionVertex(JobVertexID id, ExecutionContext executionContext) throws JobException { JobVertex ajv = new JobVertex("TestVertex", id); ajv.setInvokableClass(mock(AbstractInvokable.class).getClass()); - - ExecutionGraph graph = new ExecutionGraph(new JobID(), "test job", new Configuration(), + + ExecutionGraph graph = new ExecutionGraph( + executionContext, + new JobID(), + "test job", + new Configuration(), AkkaUtils.getDefaultTimeout()); - + ExecutionJobVertex ejv = spy(new ExecutionJobVertex(graph, ajv, 1, AkkaUtils.getDefaultTimeout())); - + Answer noop = new Answer() { @Override public Void answer(InvocationOnMock invocation) { return null; } }; - + doAnswer(noop).when(ejv).vertexCancelled(Matchers.anyInt()); doAnswer(noop).when(ejv).vertexFailed(Matchers.anyInt(), Matchers.any(Throwable.class)); doAnswer(noop).when(ejv).vertexFinished(Matchers.anyInt()); - + return ejv; } - // -------------------------------------------------------------------------------------------- - - public static final class ActionQueue { - - private final LinkedList runnables = new LinkedList(); - - public void triggerNextAction() { - Runnable r = runnables.remove(); - r.run(); - } - - public void triggerLatestAction(){ - Runnable r = runnables.removeLast(); - r.run(); - } - - public Runnable popNextAction() { - Runnable r = runnables.remove(); - return r; - } - - public void queueAction(Runnable r) { - this.runnables.add(r); - } - - public boolean isEmpty(){ - return runnables.isEmpty(); - } + public static ExecutionJobVertex getExecutionVertex(JobVertexID id) throws JobException { + return getExecutionVertex(id, TestingUtils.defaultExecutionContext()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/2ccb5fdb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java index 7787ab4..f47e92c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java @@ -24,10 +24,6 @@ import static org.mockito.Mockito.mock; import java.util.Arrays; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Props; -import akka.testkit.JavaTestKit; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.instance.SimpleSlot; @@ -37,24 +33,10 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; public class ExecutionStateProgressTest { - private static ActorSystem system; - - @BeforeClass - public static void setup(){ - system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig()); - } - - @AfterClass - public static void teardown(){ - JavaTestKit.shutdownActorSystem(system); - } - @Test public void testAccumulatedStateFinished() { try { @@ -65,8 +47,13 @@ public class ExecutionStateProgressTest { ajv.setParallelism(3); ajv.setInvokableClass(mock(AbstractInvokable.class).getClass()); - ExecutionGraph graph = new ExecutionGraph(jid, "test job", new Configuration(), + ExecutionGraph graph = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + jid, + "test job", + new Configuration(), AkkaUtils.getDefaultTimeout()); + graph.attachJobGraph(Arrays.asList(ajv)); setGraphStatus(graph, JobStatus.RUNNING); @@ -74,9 +61,11 @@ public class ExecutionStateProgressTest { ExecutionJobVertex ejv = graph.getJobVertex(vid); // mock resources and mock taskmanager - ActorRef taskManager = system.actorOf(Props.create(SimpleAcknowledgingTaskManager.class)); for (ExecutionVertex ee : ejv.getTaskVertices()) { - SimpleSlot slot = getInstance(taskManager).allocateSimpleSlot(jid); + SimpleSlot slot = getInstance( + new SimpleInstanceGateway( + TestingUtils.defaultExecutionContext()) + ).allocateSimpleSlot(jid); ee.deployToSlot(slot); }