Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AB1B317F82 for ; Fri, 10 Apr 2015 12:04:59 +0000 (UTC) Received: (qmail 46428 invoked by uid 500); 10 Apr 2015 12:04:43 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 46351 invoked by uid 500); 10 Apr 2015 12:04:43 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 46272 invoked by uid 99); 10 Apr 2015 12:04:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Apr 2015 12:04:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 93891E03D0; Fri, 10 Apr 2015 12:04:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Fri, 10 Apr 2015 12:04:46 -0000 Message-Id: <5b3e112dbac4478298c0f70cdd54dd3b@git.apache.org> In-Reply-To: <061af2cc3f9b41b691670bb3edfd86b7@git.apache.org> References: <061af2cc3f9b41b691670bb3edfd86b7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/4] flink git commit: [runtime] Accumulators are reported through the RuntimeEnvironment, not directly sent as an actor message [runtime] Accumulators are reported through the RuntimeEnvironment, not directly sent as an actor message Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/201bea3f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/201bea3f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/201bea3f Branch: refs/heads/master Commit: 201bea3f20bf4d96123bbb562db834297d8e9ec4 Parents: 9ffcdf3 Author: Stephan Ewen Authored: Wed Apr 8 17:11:44 2015 +0200 Committer: Stephan Ewen Committed: Fri Apr 10 12:58:39 2015 +0200 ---------------------------------------------------------------------- .../flink/runtime/execution/Environment.java | 38 ++++++++++++++++---- .../runtime/execution/RuntimeEnvironment.java | 19 ++++++++++ .../runtime/operators/RegularPactTask.java | 16 ++++----- 3 files changed, 58 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/201bea3f/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java index 503a0b9..1bd4f7b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.execution; import akka.actor.ActorRef; +import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; @@ -80,7 +81,8 @@ public interface Environment { int getNumberOfSubtasks(); /** - * Returns the index of this subtask in the subtask group. + * Returns the index of this subtask in the subtask group. The index + * is between 0 and {@link #getNumberOfSubtasks()} - 1. * * @return the index of this subtask in the subtask group */ @@ -89,7 +91,8 @@ public interface Environment { /** * Returns the input split provider assigned to this environment. * - * @return the input split provider or null if no such provider has been assigned to this environment. + * @return The input split provider or {@code null} if no such + * provider has been assigned to this environment. */ InputSplitProvider getInputSplitProvider(); @@ -114,12 +117,15 @@ public interface Environment { */ String getTaskName(); - String getTaskNameWithSubtasks(); - /** - * Returns the proxy object for the accumulator protocol. + * Returns the name of the task running in this environment, appended + * with the subtask indicator, such as "MyTask (3/6)", where + * 3 would be ({@link #getIndexInSubtaskGroup()} + 1), and 6 would be + * {@link #getNumberOfSubtasks()}. + * + * @return The name of the task running in this environment, with subtask indicator. */ - ActorRef getJobManager(); + String getTaskNameWithSubtasks(); /** * Returns the user code class loader @@ -130,6 +136,17 @@ public interface Environment { BroadcastVariableManager getBroadcastVariableManager(); + /** + * Reports the given set of accumulators to the JobManager. + * + * @param accumulators The accumulators to report. + */ + void reportAccumulators(Map> accumulators); + + // -------------------------------------------------------------------------------------------- + // Fields relevant to the I/O system. Should go into Task + // -------------------------------------------------------------------------------------------- + ResultPartitionWriter getWriter(int index); ResultPartitionWriter[] getAllWriters(); @@ -138,4 +155,13 @@ public interface Environment { InputGate[] getAllInputGates(); + + /** + * Returns the proxy object for the accumulator protocol. + */ + // THIS DOES NOT BELONG HERE, THIS TOTALLY BREAKS COMPONENTIZATION. + // THE EXECUTED TASKS HAVE BEEN KEPT INDEPENDENT OF ANY RPC OR ACTOR + // COMMUNICATION !!! + ActorRef getJobManager(); + } http://git-wip-us.apache.org/repos/asf/flink/blob/201bea3f/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java index 3fb7493..5416f48 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java @@ -19,8 +19,11 @@ package org.apache.flink.runtime.execution; import akka.actor.ActorRef; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.accumulators.AccumulatorEvent; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; @@ -38,10 +41,12 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.messages.accumulators.ReportAccumulatorResult; import org.apache.flink.runtime.taskmanager.Task; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -352,6 +357,20 @@ public class RuntimeEnvironment implements Environment, Runnable { } @Override + public void reportAccumulators(Map> accumulators) { + AccumulatorEvent evt; + try { + evt = new AccumulatorEvent(getJobID(), accumulators); + } + catch (IOException e) { + throw new RuntimeException("Cannot serialize accumulators to send them to JobManager", e); + } + + ReportAccumulatorResult accResult = new ReportAccumulatorResult(getJobID(), owner.getExecutionId(), evt); + jobManager.tell(accResult, ActorRef.noSender()); + } + + @Override public ResultPartitionWriter getWriter(int index) { checkElementIndex(index, writers.length, "Illegal environment writer request."); http://git-wip-us.apache.org/repos/asf/flink/blob/201bea3f/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java index 71b8afc..b528f75 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.operators; -import akka.actor.ActorRef; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.AccumulatorHelper; @@ -32,7 +31,6 @@ import org.apache.flink.api.common.typeutils.TypeComparatorFactory; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.accumulators.AccumulatorEvent; import org.apache.flink.runtime.broadcast.BroadcastVariableMaterialization; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.execution.Environment; @@ -45,7 +43,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memorymanager.MemoryManager; -import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.operators.chaining.ChainedDriver; import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException; import org.apache.flink.runtime.operators.resettable.SpillingResettableMutableObjectIterator; @@ -67,6 +64,7 @@ import org.apache.flink.types.Record; import org.apache.flink.util.Collector; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.MutableObjectIterator; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -567,15 +565,17 @@ public class RegularPactTask extends AbstractInvokable i * Each chained task might have accumulators which will be merged * with the accumulators of the stub. */ - protected static void reportAndClearAccumulators( - Environment env, Map> accumulators, ArrayList> chainedTasks) { + protected static void reportAndClearAccumulators(Environment env, + Map> accumulators, + ArrayList> chainedTasks) { // We can merge here the accumulators from the stub and the chained // tasks. Type conflicts can occur here if counters with same name but // different type were used. for (ChainedDriver chainedTask : chainedTasks) { if (FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null) != null) { - Map> chainedAccumulators = FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null).getAllAccumulators(); + Map> chainedAccumulators = + FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null).getAllAccumulators(); AccumulatorHelper.mergeInto(accumulators, chainedAccumulators); } } @@ -586,9 +586,7 @@ public class RegularPactTask extends AbstractInvokable i } // Report accumulators to JobManager - JobManagerMessages.ReportAccumulatorResult accResult = new JobManagerMessages.ReportAccumulatorResult(new - AccumulatorEvent(env.getJobID(), AccumulatorHelper.copy(accumulators))); - env.getJobManager().tell(accResult, ActorRef.noSender()); + env.reportAccumulators(accumulators); // We also clear the accumulators, since stub instances might be reused // (e.g. in iterations) and we don't want to count twice. This may not be