Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3C7CE177B8 for ; Wed, 11 Feb 2015 16:34:57 +0000 (UTC) Received: (qmail 79761 invoked by uid 500); 11 Feb 2015 16:34:44 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 79732 invoked by uid 500); 11 Feb 2015 16:34:44 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 79723 invoked by uid 99); 11 Feb 2015 16:34:44 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Feb 2015 16:34:44 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 20A7CE03C8; Wed, 11 Feb 2015 16:34:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rmetzger@apache.org To: commits@flink.apache.org Message-Id: <61ddcf6bc8e7442eb23578d9241712cb@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-1489] Replaces blocking scheduleOrUpdateConsumers message calls with asynchronous futures. Buffers PartitionInfos at the JobManager in case that the respective consumer has not been scheduled. Date: Wed, 11 Feb 2015 16:34:44 +0000 (UTC) Repository: flink Updated Branches: refs/heads/master f577c9ef4 -> aedbacfc5 [FLINK-1489] Replaces blocking scheduleOrUpdateConsumers message calls with asynchronous futures. Buffers PartitionInfos at the JobManager in case that the respective consumer has not been scheduled. Conflicts: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala Adds TaskUpdate message aggregation before sending the messages to the TaskManagers This closes #378 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aedbacfc Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aedbacfc Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aedbacfc Branch: refs/heads/master Commit: aedbacfc553a7226bbf06410232fb2a2b315cf8b Parents: f577c9e Author: Till Rohrmann Authored: Fri Feb 6 15:13:28 2015 +0100 Committer: Robert Metzger Committed: Wed Feb 11 17:34:18 2015 +0100 ---------------------------------------------------------------------- .../deployment/PartialPartitionInfo.java | 102 ++++++++++++ .../flink/runtime/deployment/PartitionInfo.java | 3 +- .../flink/runtime/executiongraph/Execution.java | 157 ++++++++++++++----- .../runtime/executiongraph/ExecutionGraph.java | 14 +- .../runtime/executiongraph/ExecutionVertex.java | 27 +++- .../runtime/io/network/NetworkEnvironment.java | 16 +- .../partition/IntermediateResultPartition.java | 44 +++--- .../apache/flink/runtime/client/JobClient.scala | 5 +- .../flink/runtime/jobmanager/JobManager.scala | 16 +- .../runtime/messages/JobmanagerMessages.scala | 1 - .../flink/runtime/messages/Messages.scala | 25 +++ .../runtime/messages/TaskManagerMessages.scala | 28 +++- .../flink/runtime/taskmanager/TaskManager.scala | 53 ++++--- 13 files changed, 380 insertions(+), 111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/aedbacfc/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialPartitionInfo.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialPartitionInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialPartitionInfo.java new file mode 100644 index 0000000..a27c976 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialPartitionInfo.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.deployment; + +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionEdge; +import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; +import org.apache.flink.runtime.instance.InstanceConnectionInfo; +import org.apache.flink.runtime.io.network.RemoteAddress; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; + +/** + * This class contains the partial partition info which is created if the consumer instance is not + * yet clear. Once the instance on which the consumer runs is known, the complete partition info + * can be computed. + */ +public class PartialPartitionInfo { + private final IntermediateDataSetID intermediateDataSetID; + + private final IntermediateResultPartitionID partitionID; + + private final ExecutionAttemptID producerExecutionID; + + private final InstanceConnectionInfo producerInstanceConnectionInfo; + + private final int partitionConnectionIndex; + + public PartialPartitionInfo(IntermediateDataSetID intermediateDataSetID, + IntermediateResultPartitionID partitionID, + ExecutionAttemptID executionID, + InstanceConnectionInfo producerInstanceConnectionInfo, + int partitionConnectionIndex) { + this.intermediateDataSetID = intermediateDataSetID; + this.partitionID = partitionID; + this.producerExecutionID = executionID; + this.producerInstanceConnectionInfo = producerInstanceConnectionInfo; + this.partitionConnectionIndex = partitionConnectionIndex; + } + + public PartitionInfo createPartitionInfo(Execution consumerExecution) throws IllegalStateException { + if(consumerExecution != null){ + PartitionInfo.PartitionLocation producerLocation; + + RemoteAddress resolvedProducerAddress; + + if(consumerExecution.getAssignedResourceLocation().equals( + producerInstanceConnectionInfo)) { + resolvedProducerAddress = null; + producerLocation = PartitionInfo.PartitionLocation.LOCAL; + } else { + resolvedProducerAddress = new RemoteAddress(producerInstanceConnectionInfo, + partitionConnectionIndex); + + producerLocation = PartitionInfo.PartitionLocation.REMOTE; + } + + return new PartitionInfo(partitionID, producerExecutionID, producerLocation, + resolvedProducerAddress); + + } else { + throw new RuntimeException("Cannot create partition info, because consumer execution " + + "is null."); + } + } + + public IntermediateDataSetID getIntermediateDataSetID() { + return intermediateDataSetID; + } + + public static PartialPartitionInfo fromEdge(final ExecutionEdge edge){ + IntermediateResultPartition partition = edge.getSource(); + IntermediateResultPartitionID partitionID = edge.getSource().getPartitionId(); + + IntermediateDataSetID intermediateDataSetID = partition.getIntermediateResult().getId(); + + Execution producer = partition.getProducer().getCurrentExecutionAttempt(); + ExecutionAttemptID producerExecutionID = producer.getAttemptId(); + + return new PartialPartitionInfo(intermediateDataSetID, partitionID, producerExecutionID, + producer.getAssignedResourceLocation(), + partition.getIntermediateResult().getConnectionIndex()); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/aedbacfc/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java index dd2c063..333340a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java @@ -130,7 +130,8 @@ public class PartitionInfo implements IOReadableWritable, Serializable { // The producer needs to be running, otherwise the consumer might request a partition, // which has not been registered yet. - if (producerSlot != null && producerState == ExecutionState.RUNNING) { + if (producerSlot != null && (producerState == ExecutionState.RUNNING || + producerState == ExecutionState.FINISHED)) { if (producerSlot.getInstance().equals(consumerSlot.getInstance())) { producerLocation = PartitionLocation.LOCAL; } http://git-wip-us.apache.org/repos/asf/flink/blob/aedbacfc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 8c5c673..c4e5abf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -20,17 +20,23 @@ package org.apache.flink.runtime.executiongraph; import akka.actor.ActorRef; import akka.dispatch.OnComplete; +import static akka.dispatch.Futures.future; + +import akka.dispatch.OnFailure; import akka.pattern.Patterns; import akka.util.Timeout; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.deployment.PartialPartitionInfo; import org.apache.flink.runtime.deployment.PartitionInfo; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.instance.InstanceConnectionInfo; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.io.network.RemoteAddress; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; @@ -39,6 +45,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture; import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.messages.TaskManagerMessages; +import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult; import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; @@ -46,7 +53,10 @@ import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; import java.io.Serializable; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import static com.google.common.base.Preconditions.checkArgument; @@ -316,21 +326,26 @@ public class Execution implements Serializable { markFailed(failure); } else { - TaskManagerMessages.TaskOperationResult result = (TaskManagerMessages.TaskOperationResult) success; if (success == null) { markFailed(new Exception("Failed to deploy the task to slot " + slot + ": TaskOperationResult was null")); } - else if (!result.executionID().equals(attemptId)) { - markFailed(new Exception("Answer execution id does not match the request execution id.")); - } - else if (result.success()) { - switchToRunning(); - } - else { - // deployment failed :( - markFailed(new Exception("Failed to deploy the task " + - getVertexWithAttempt() + " to slot " + slot + ": " + result - .description())); + + if(success instanceof TaskOperationResult) { + TaskOperationResult result = (TaskOperationResult) success; + + if (!result.executionID().equals(attemptId)) { + markFailed(new Exception("Answer execution id does not match the request execution id.")); + } else if (result.success()) { + switchToRunning(); + } else { + // deployment failed :( + markFailed(new Exception("Failed to deploy the task " + + getVertexWithAttempt() + " to slot " + slot + ": " + result + .description())); + } + }else { + markFailed(new Exception("Failed to deploy the task to slot " + slot + + ": Response was not of type TaskOperationResult")); } } } @@ -401,11 +416,9 @@ public class Execution implements Serializable { } // TODO This leads to many unnecessary RPC calls in most cases - boolean scheduleOrUpdateConsumers(List> consumers) throws Exception { - boolean success = true; - + void scheduleOrUpdateConsumers(List> consumers) { if (consumers.size() != 1) { - throw new IllegalStateException("Only one consumer is supported currently."); + fail(new IllegalStateException("Only one consumer is supported currently.")); } final List consumer = consumers.get(0); @@ -416,32 +429,66 @@ public class Execution implements Serializable { final ExecutionState consumerState = consumerVertex.getExecutionState(); if (consumerState == CREATED) { - if (state == RUNNING) { - if (!consumerVertex.scheduleForExecution(consumerVertex.getExecutionGraph().getScheduler(), false)) { - success = false; + consumerVertex.cachePartitionInfo(PartialPartitionInfo.fromEdge(edge)); + + future(new Callable(){ + @Override + public Boolean call() throws Exception { + try { + consumerVertex.scheduleForExecution( + consumerVertex.getExecutionGraph().getScheduler(), false); + } catch (Exception exception) { + fail(new IllegalStateException("Could not schedule consumer " + + "vertex " + consumerVertex, exception)); + } + + return true; } - } - else { - success = false; + }, AkkaUtils.globalExecutionContext()); + + // double check to resolve race conditions + if(consumerVertex.getExecutionState() == RUNNING){ + consumerVertex.sendPartitionInfos(); } } else if (consumerState == RUNNING) { SimpleSlot consumerSlot = consumerVertex.getCurrentAssignedResource(); - ExecutionAttemptID consumerExecutionId = consumerVertex.getCurrentExecutionAttempt().getAttemptId(); + ExecutionAttemptID consumerExecutionId = consumerVertex. + getCurrentExecutionAttempt().getAttemptId(); + + IntermediateResultPartitionID partitionID = edge.getSource().getPartitionId(); + int connectionIndex = edge.getSource().getIntermediateResult().getConnectionIndex(); + + PartitionInfo.PartitionLocation producerLocation; + RemoteAddress producerAddress = null; + + if(consumerSlot.getInstance().getInstanceConnectionInfo().equals( + getAssignedResourceLocation())) { + producerLocation = PartitionInfo.PartitionLocation.LOCAL; + } else { + producerLocation = PartitionInfo.PartitionLocation.REMOTE; + producerAddress = new RemoteAddress(getAssignedResourceLocation(), + connectionIndex); + } - PartitionInfo partitionInfo = PartitionInfo.fromEdge(edge, consumerSlot); + PartitionInfo partitionInfo = new PartitionInfo(partitionID, attemptId, + producerLocation, producerAddress); - if (!sendUpdateTaskRpcCall(consumerSlot, consumerExecutionId, edge.getSource().getIntermediateResult().getId(), partitionInfo)) { - success = false; - } + TaskManagerMessages.UpdateTask updateTaskMessage = + new TaskManagerMessages.UpdateTaskSinglePartitionInfo(consumerExecutionId, + edge.getSource().getIntermediateResult().getId(), partitionInfo); + sendUpdateTaskRpcCall(consumerSlot, updateTaskMessage); } else if (consumerState == SCHEDULED || consumerState == DEPLOYING) { - success = false; + consumerVertex.cachePartitionInfo(PartialPartitionInfo.fromEdge(edge)); + + // double check to resolve race conditions + if(consumerVertex.getExecutionState() == RUNNING){ + consumerVertex.sendPartitionInfos(); + } } } - - return success; } /** @@ -548,6 +595,32 @@ public class Execution implements Serializable { } } + void sendPartitionInfos() { + ConcurrentLinkedQueue partialPartitionInfos = + vertex.getPartialPartitionInfos(); + + // check if the ExecutionVertex has already been archived and thus cleared the + // partial partition infos queue + if(partialPartitionInfos != null) { + + PartialPartitionInfo partialPartitionInfo; + + List resultIDs = new ArrayList(); + List partitionInfos = new ArrayList(); + + while ((partialPartitionInfo = partialPartitionInfos.poll()) != null) { + resultIDs.add(partialPartitionInfo.getIntermediateDataSetID()); + partitionInfos.add(partialPartitionInfo.createPartitionInfo(this)); + } + + TaskManagerMessages.UpdateTask updateTaskMessage = + TaskManagerMessages.createUpdateTaskMultiplePartitionInfos(attemptId, resultIDs, + partitionInfos); + + sendUpdateTaskRpcCall(assignedResource, updateTaskMessage); + } + } + // -------------------------------------------------------------------------------------------- // Internal Actions // -------------------------------------------------------------------------------------------- @@ -614,6 +687,7 @@ public class Execution implements Serializable { private boolean switchToRunning() { if (transitionState(DEPLOYING, RUNNING)) { + sendPartitionInfos(); return true; } else { @@ -671,7 +745,7 @@ public class Execution implements Serializable { if(failure != null){ fail(new Exception("Task could not be canceled.", failure)); }else{ - TaskManagerMessages.TaskOperationResult result = (TaskManagerMessages.TaskOperationResult)success; + TaskOperationResult result = (TaskOperationResult)success; if(!result.success()){ LOG.debug("Cancel task call did not find task. Probably akka message call" + " race."); @@ -700,21 +774,20 @@ public class Execution implements Serializable { } } - private boolean sendUpdateTaskRpcCall(final SimpleSlot consumerSlot, final ExecutionAttemptID executionId, final IntermediateDataSetID resultId, final PartitionInfo partitionInfo) throws Exception { + private void sendUpdateTaskRpcCall(final SimpleSlot consumerSlot, + final TaskManagerMessages.UpdateTask updateTaskMsg) { final Instance instance = consumerSlot.getInstance(); - final TaskManagerMessages.TaskOperationResult result = AkkaUtils.ask( - instance.getTaskManager(), new TaskManagerMessages.UpdateTask(executionId, resultId, partitionInfo), timeout); + Future futureUpdate = Patterns.ask(instance.getTaskManager(), updateTaskMsg, + new Timeout(timeout)); - if (!result.success()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Update task {} was unsuccessful (maybe an RPC race): {}", executionId, result.description()); + futureUpdate.onFailure(new OnFailure() { + @Override + public void onFailure(Throwable failure) throws Throwable { + fail(new IllegalStateException("Update task on instance " + instance + + " failed due to:", failure)); } - - return false; - } - - return true; + }, AkkaUtils.globalExecutionContext()); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/aedbacfc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 3f857e5..c1f45e1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -521,14 +521,20 @@ public class ExecutionGraph implements Serializable { } } - public boolean scheduleOrUpdateConsumers(ExecutionAttemptID executionId, int partitionIndex) throws Exception { + public void scheduleOrUpdateConsumers(ExecutionAttemptID executionId, int partitionIndex) { Execution execution = currentExecutions.get(executionId); + if (execution == null) { - throw new IllegalStateException("Cannot find execution for execution ID " + executionId); + fail(new IllegalStateException("Cannot find execution for execution ID " + + executionId)); + } + else if(execution.getVertex() == null){ + fail(new IllegalStateException("Execution with execution ID " + executionId + + " has no vertex assigned.")); + } else { + execution.getVertex().scheduleOrUpdateConsumers(partitionIndex); } - - return execution.getVertex().scheduleOrUpdateConsumers(partitionIndex); } public Map getRegisteredExecutions() { http://git-wip-us.apache.org/repos/asf/flink/blob/aedbacfc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 86173da..58515ff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.executiongraph; +import org.apache.flink.runtime.deployment.PartialPartitionInfo; import org.apache.flink.runtime.instance.InstanceConnectionInfo; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.JobException; @@ -46,6 +47,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import static com.google.common.base.Preconditions.checkElementIndex; @@ -72,6 +74,8 @@ public class ExecutionVertex implements Serializable { private IntermediateResultPartition[] resultPartitions; private ExecutionEdge[][] inputEdges; + + private ConcurrentLinkedQueue partialPartitionInfos; private final int subTaskIndex; @@ -109,6 +113,9 @@ public class ExecutionVertex implements Serializable { } this.inputEdges = new ExecutionEdge[jobVertex.getJobVertex().getInputs().size()][]; + + this.partialPartitionInfos = new ConcurrentLinkedQueue(); + this.priorExecutions = new CopyOnWriteArrayList(); this.currentExecution = new Execution(this, 0, createTimestamp, timeout); @@ -196,6 +203,10 @@ public class ExecutionVertex implements Serializable { public ExecutionGraph getExecutionGraph() { return this.jobVertex.getGraph(); } + + public ConcurrentLinkedQueue getPartialPartitionInfos() { + return partialPartitionInfos; + } // -------------------------------------------------------------------------------------------- // Graph building @@ -408,18 +419,18 @@ public class ExecutionVertex implements Serializable { * Schedules or updates the {@link IntermediateResultPartition} consumer * tasks of the intermediate result partition with the given index. */ - boolean scheduleOrUpdateConsumers(int partitionIndex) throws Exception { + void scheduleOrUpdateConsumers(int partitionIndex) { checkElementIndex(partitionIndex, resultPartitions.length); IntermediateResultPartition partition = resultPartitions[partitionIndex]; - return currentExecution.scheduleOrUpdateConsumers(partition.getConsumers()); + currentExecution.scheduleOrUpdateConsumers(partition.getConsumers()); } /** * This method cleans fields that are irrelevant for the archived execution attempt. */ - public void prepareForArchiving() { + public void prepareForArchiving() throws IllegalStateException { Execution execution = currentExecution; ExecutionState state = execution.getState(); @@ -440,6 +451,16 @@ public class ExecutionVertex implements Serializable { this.resultPartitions = null; this.inputEdges = null; this.locationConstraintInstances = null; + this.partialPartitionInfos.clear(); + this.partialPartitionInfos = null; + } + + public void cachePartitionInfo(PartialPartitionInfo partitionInfo){ + this.partialPartitionInfos.add(partitionInfo); + } + + void sendPartitionInfos() { + currentExecution.sendPartitionInfos(); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/aedbacfc/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index aa6c64c..74a448a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network; import akka.actor.ActorRef; +import akka.util.Timeout; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.io.network.api.reader.BufferReader; import org.apache.flink.runtime.io.network.api.writer.BufferWriter; @@ -47,6 +48,8 @@ public class NetworkEnvironment { private static final Logger LOG = LoggerFactory.getLogger(NetworkEnvironment.class); + private final ActorRef taskManager; + private final ActorRef jobManager; private final FiniteDuration jobManagerTimeout; @@ -64,7 +67,10 @@ public class NetworkEnvironment { /** * Initializes all network I/O components. */ - public NetworkEnvironment(ActorRef jobManager, FiniteDuration jobManagerTimeout, NetworkEnvironmentConfiguration config) throws IOException { + public NetworkEnvironment(ActorRef taskManager, ActorRef jobManager, + FiniteDuration jobManagerTimeout, + NetworkEnvironmentConfiguration config) throws IOException { + this.taskManager = checkNotNull(taskManager); this.jobManager = checkNotNull(jobManager); this.jobManagerTimeout = checkNotNull(jobManagerTimeout); @@ -96,12 +102,16 @@ public class NetworkEnvironment { } } + public ActorRef getTaskManager() { + return taskManager; + } + public ActorRef getJobManager() { return jobManager; } - public FiniteDuration getJobManagerTimeout() { - return jobManagerTimeout; + public Timeout getJobManagerTimeout() { + return new Timeout(jobManagerTimeout); } public void registerTask(Task task) throws IOException { http://git-wip-us.apache.org/repos/asf/flink/blob/aedbacfc/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java index 6acfbce..7a987c1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java @@ -18,6 +18,9 @@ package org.apache.flink.runtime.io.network.partition; +import akka.actor.ActorRef; +import akka.dispatch.OnFailure; +import akka.pattern.Patterns; import com.google.common.base.Optional; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.deployment.PartitionDeploymentDescriptor; @@ -36,10 +39,11 @@ import org.apache.flink.runtime.io.network.partition.queue.PipelinedPartitionQue import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionType; import org.apache.flink.runtime.jobgraph.JobID; -import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers; +import org.apache.flink.runtime.messages.TaskManagerMessages.FailTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Option; +import scala.concurrent.Future; import java.io.IOException; @@ -243,28 +247,26 @@ public class IntermediateResultPartition implements BufferPoolOwner { } private void scheduleOrUpdateConsumers() throws IOException { - while (!isReleased) { - final JobManagerMessages.ConsumerNotificationResult result = AkkaUtils.ask( - networkEnvironment.getJobManager(), - new JobManagerMessages.ScheduleOrUpdateConsumers(jobId, producerExecutionId, partitionIndex), + if(!isReleased){ + ScheduleOrUpdateConsumers msg = new ScheduleOrUpdateConsumers(jobId, + producerExecutionId, partitionIndex); + + Future futureResponse = Patterns.ask(networkEnvironment.getJobManager(), msg, networkEnvironment.getJobManagerTimeout()); - if (result.success()) { - return; - } - else { - Option error = result.error(); - if (error.isDefined()) { - throw new IOException(error.get().getMessage(), error.get()); - } - } + futureResponse.onFailure(new OnFailure(){ + @Override + public void onFailure(Throwable failure) throws Throwable { + LOG.error("Could not schedule or update consumers at the JobManager.", failure); - try { - Thread.sleep(10); - } - catch (InterruptedException e) { - throw new IOException("Unexpected interruption during consumer schedule or update.", e); - } + // Fail task at the TaskManager + FailTask failMsg = new FailTask(producerExecutionId, + new RuntimeException("Could not schedule or update consumers at " + + "the JobManager.", failure)); + + networkEnvironment.getTaskManager().tell(failMsg, ActorRef.noSender()); + } + }, AkkaUtils.globalExecutionContext()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/aedbacfc/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala index 676ddda..191e11a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala @@ -171,11 +171,10 @@ object JobClient{ var waitForAnswer = true var answer: JobExecutionResult = null - val result =(jobClient ? SubmitJobAndWait(jobGraph, listenToEvents = listenToStatusEvents))( - AkkaUtils.INF_TIMEOUT).mapTo[JobExecutionResult] - while(waitForAnswer) { try { + val result =(jobClient ? SubmitJobAndWait(jobGraph, listenToEvents = listenToStatusEvents))( + timeout).mapTo[JobExecutionResult] answer = Await.result(result, timeout) waitForAnswer = false } catch { http://git-wip-us.apache.org/repos/asf/flink/blob/aedbacfc/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 756cb4b..eea7cae 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -20,14 +20,16 @@ package org.apache.flink.runtime.jobmanager import java.io.{IOException, File} import java.net.InetSocketAddress +import akka.actor.Status.Failure import akka.actor._ import akka.pattern.ask import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, Configuration} import org.apache.flink.core.io.InputSplitAssigner import org.apache.flink.runtime.blob.BlobServer -import org.apache.flink.runtime.executiongraph.{Execution, ExecutionJobVertex, ExecutionGraph} +import org.apache.flink.runtime.executiongraph.{ExecutionJobVertex, ExecutionGraph} import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged +import org.apache.flink.runtime.messages.Messages.Acknowledge import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.util.EnvironmentInformation import org.apache.flink.runtime.{JobException, ActorLogMessages} @@ -295,13 +297,13 @@ Actor with ActorLogMessages with ActorLogging { case ScheduleOrUpdateConsumers(jobId, executionId, partitionIndex) => currentJobs.get(jobId) match { case Some((executionGraph, _)) => - sender ! ConsumerNotificationResult( - executionGraph.scheduleOrUpdateConsumers(executionId, partitionIndex) - ) + sender ! Acknowledge + executionGraph.scheduleOrUpdateConsumers(executionId, partitionIndex) case None => - log.error("Cannot find execution graph for job ID {}.", jobId) - sender ! ConsumerNotificationResult(success = false, Some( - new IllegalStateException("Cannot find execution graph for job ID " + jobId))) + log.error("Cannot find execution graph for job ID {} to schedule or update consumers", + jobId); + sender ! Failure(new IllegalStateException("Cannot find execution graph for job ID " + + jobId + " to schedule or update consumers.")) } case ReportAccumulatorResult(accumulatorEvent) => http://git-wip-us.apache.org/repos/asf/flink/blob/aedbacfc/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala index 5189a02..6aacf10 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobmanagerMessages.scala @@ -28,7 +28,6 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState * The job manager specific messages */ object JobManagerMessages { - /** * Submits a job to the job manager. If [[registerForEvents]] is true, * then the sender will be registered as listener for the state change messages. If [[detached]] http://git-wip-us.apache.org/repos/asf/flink/blob/aedbacfc/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala new file mode 100644 index 0000000..4f27761 --- /dev/null +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/Messages.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.messages + +object Messages { + + case object Acknowledge + +} http://git-wip-us.apache.org/repos/asf/flink/blob/aedbacfc/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala index 6e3a3b1..f5a2cd4 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala @@ -58,12 +58,32 @@ object TaskManagerMessages { case class UnregisterTask(executionID: ExecutionAttemptID) /** - * Updates the reader identified by [[resultId]] of the task identified by - * [[executionId]] from the task manager. + * Updates the reader of the task identified by + * [[executionID]] from the task manager. */ - case class UpdateTask(executionId: ExecutionAttemptID, + sealed trait UpdateTask{ + def executionID: ExecutionAttemptID + } + + case class UpdateTaskSinglePartitionInfo(executionID: ExecutionAttemptID, resultId: IntermediateDataSetID, - partitionInfo: PartitionInfo) + partitionInfo: PartitionInfo) extends UpdateTask + + case class UpdateTaskMultiplePartitionInfos(executionID: ExecutionAttemptID, + partitionInfos: Seq[(IntermediateDataSetID, + PartitionInfo)]) extends UpdateTask + + def createUpdateTaskMultiplePartitionInfos(executionID: ExecutionAttemptID, + resultIDs: java.util.List[IntermediateDataSetID], + partitionInfos: java.util.List[PartitionInfo]): + UpdateTaskMultiplePartitionInfos = { + require(resultIDs.size() == partitionInfos.size(), "ResultIDs must have the same length as" + + "partitionInfos.") + + import scala.collection.JavaConverters.asScalaBufferConverter + new UpdateTaskMultiplePartitionInfos(executionID, + resultIDs.asScala.zip(partitionInfos.asScala)) + } /** * Fails all intermediate result partitions identified by [[executionID]] from the task manager. http://git-wip-us.apache.org/repos/asf/flink/blob/aedbacfc/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index de3fa7a..eba688a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -46,6 +46,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID import org.apache.flink.runtime.jobmanager.JobManager import org.apache.flink.runtime.memorymanager.DefaultMemoryManager import org.apache.flink.runtime.messages.JobManagerMessages.UpdateTaskExecutionState +import org.apache.flink.runtime.messages.Messages.Acknowledge import org.apache.flink.runtime.messages.RegistrationMessages.{AlreadyRegistered, RefuseRegistration, AcknowledgeRegistration, RegisterTaskManager} import org.apache.flink.runtime.messages.TaskManagerMessages._ @@ -251,8 +252,13 @@ import scala.collection.JavaConverters._ case SubmitTask(tdd) => submitTask(tdd) - case UpdateTask(executionId, resultId, partitionInfo) => - updateTask(executionId, resultId, partitionInfo) + case updateMsg:UpdateTask => + updateMsg match { + case UpdateTaskSinglePartitionInfo(executionID, resultID, partitionInfo) => + updateTask(executionID, List((resultID, partitionInfo))) + case UpdateTaskMultiplePartitionInfos(executionID, partitionInfos) => + updateTask(executionID, partitionInfos) + } case CancelTask(executionID) => runningTasks.get(executionID) match { @@ -462,43 +468,45 @@ import scala.collection.JavaConverters._ registered = false } - private def updateTask(executionId: ExecutionAttemptID, resultId: IntermediateDataSetID, - partitionInfo: PartitionInfo): Unit = { - - partitionInfo.getProducerLocation match { - case PartitionInfo.PartitionLocation.UNKNOWN => - sender ! TaskOperationResult(executionId, success = false, - "Tried to update task with UNKNOWN channel.") + private def updateTask(executionId: ExecutionAttemptID, + partitionInfos: Seq[(IntermediateDataSetID, PartitionInfo)]): Unit = { - case _ => - runningTasks.get(executionId) match { - case Some(task) => - Option(task.getEnvironment.getReaderById(resultId)) match { + runningTasks.get(executionId) match { + case Some(task) => + val errors = partitionInfos flatMap { + case (resultID, partitionInfo) => + Option(task.getEnvironment.getReaderById(resultID)) match { case Some(reader) => Future { try { reader.updateInputChannel(partitionInfo) } catch { case t: Throwable => - log.error("Task update failure: {} Trying to cancel task.", t.getMessage) + log.error(t, "Task update failure. Trying to cancel task.") try { task.cancelExecution() } catch { case t: Throwable => - log.error("Failed canceling task with execution ID {} after task" + - "update failure: {}.", executionId, t.getMessage) + log.error(t, "Failed canceling task with execution ID {} after task" + + "update failure..", executionId) } } } - sender ! TaskOperationResult(executionId, success = true) - case None => sender ! TaskOperationResult(executionId, success = false, - s"No reader with ID $resultId was found.") + None + case None => Some(s"No reader with ID $resultID for task $executionId was found.") } + } - case None => sender ! TaskOperationResult(executionId, success = false, - s"No task with execution ID $executionId was found.") + if(errors.isEmpty) { + sender ! Acknowledge + } else { + sender ! Failure(new IllegalStateException(errors.mkString("\n"))) } + case None => + log.info("Could not update task with ID {}, because it is no longer running.", + executionId) + sender ! Acknowledge } } @@ -544,7 +552,8 @@ import scala.collection.JavaConverters._ } try { - networkEnvironment = Some(new NetworkEnvironment(currentJobManager, timeout, networkConfig)) + networkEnvironment = Some(new NetworkEnvironment(self, currentJobManager, timeout, + networkConfig)) } catch { case ioe: IOException => log.error(ioe, "Failed to instantiate network environment.")