Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D98961865A for ; Wed, 15 Jul 2015 13:55:54 +0000 (UTC) Received: (qmail 11403 invoked by uid 500); 15 Jul 2015 13:55:54 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 11361 invoked by uid 500); 15 Jul 2015 13:55:54 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 11281 invoked by uid 99); 15 Jul 2015 13:55:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Jul 2015 13:55:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4D527E17FC; Wed, 15 Jul 2015 13:55:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mxm@apache.org To: commits@flink.apache.org Date: Wed, 15 Jul 2015 13:55:57 -0000 Message-Id: In-Reply-To: <9ae074f8f76143d8baedc05b77e17e02@git.apache.org> References: <9ae074f8f76143d8baedc05b77e17e02@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/4] flink git commit: [FLINK-2292][FLINK-1573] add live per-task accumulators [FLINK-2292][FLINK-1573] add live per-task accumulators This refactors the accumulators to accumulate per task execution. The accumulators are reported from the task managers periodically to the job manager via the Heartbeat message. If the execution contains chained tasks, the accumulators are chained as well. The final accumulator results are reported via the UpdateTaskExecutionState message. The accumulators are now saved in the Execution within the ExecutionGraph. This makes the AccumulatorManager obsolete. It has been removed for now. In the future, we might introduce some caching for the web frontend visualization. Two types of accumulators are available: - external (user-defined via the RuntimeContext) - internal (flink metrics defined in the invocables) The internal (built-in) metrics are targeted at users who want to monitor their programs, e.g. through the job manager's web frontend. This closes #896. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8261ed54 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8261ed54 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8261ed54 Branch: refs/heads/master Commit: 8261ed5438278331c9cd760463273ad94d4bd410 Parents: d592ee6 Author: Maximilian Michels Authored: Wed Jul 8 09:23:42 2015 +0200 Committer: Maximilian Michels Committed: Wed Jul 15 15:19:06 2015 +0200 ---------------------------------------------------------------------- .../api/common/accumulators/Accumulator.java | 1 - .../api/common/functions/RuntimeContext.java | 10 +- .../util/AbstractRuntimeUDFContext.java | 26 +- .../functions/util/RuntimeUDFContext.java | 11 +- .../common/operators/CollectionExecutor.java | 19 +- .../functions/util/RuntimeUDFContextTest.java | 12 +- .../base/FlatMapOperatorCollectionTest.java | 4 +- .../operators/base/JoinOperatorBaseTest.java | 7 +- .../common/operators/base/MapOperatorTest.java | 7 +- .../base/PartitionMapOperatorTest.java | 6 +- .../examples/java/wordcount/WordCount.java | 2 +- .../java/org/apache/flink/api/java/DataSet.java | 18 +- .../java/org/apache/flink/api/java/Utils.java | 7 +- .../base/CoGroupOperatorCollectionTest.java | 5 +- .../operators/base/GroupReduceOperatorTest.java | 6 +- .../operators/base/JoinOperatorBaseTest.java | 6 +- .../operators/base/ReduceOperatorTest.java | 6 +- .../org/apache/flink/optimizer/Optimizer.java | 2 +- .../runtime/accumulators/AccumulatorEvent.java | 49 --- .../accumulators/AccumulatorRegistry.java | 147 +++++++++ .../accumulators/AccumulatorSnapshot.java | 84 ++++++ .../flink/runtime/execution/Environment.java | 9 +- .../librarycache/BlobLibraryCacheManager.java | 2 +- .../flink/runtime/executiongraph/Execution.java | 43 +++ .../runtime/executiongraph/ExecutionGraph.java | 100 ++++++- .../io/network/api/reader/AbstractReader.java | 1 + .../api/reader/AbstractRecordReader.java | 14 +- .../io/network/api/reader/BufferReader.java | 6 + .../io/network/api/reader/ReaderBase.java | 6 + .../io/network/api/reader/RecordReader.java | 1 + .../AdaptiveSpanningRecordDeserializer.java | 31 ++ .../api/serialization/RecordDeserializer.java | 6 + .../api/serialization/RecordSerializer.java | 6 + .../serialization/SpanningRecordSerializer.java | 16 +- ...llingAdaptiveSpanningRecordDeserializer.java | 34 ++- .../io/network/api/writer/RecordWriter.java | 28 ++ .../task/AbstractIterativePactTask.java | 19 +- .../iterative/task/IterationHeadPactTask.java | 4 +- .../jobgraph/tasks/AbstractInvokable.java | 2 +- .../accumulators/AccumulatorManager.java | 144 --------- .../accumulators/JobAccumulators.java | 42 --- .../flink/runtime/operators/DataSinkTask.java | 6 + .../flink/runtime/operators/DataSourceTask.java | 27 +- .../flink/runtime/operators/PactDriver.java | 2 +- .../runtime/operators/PactTaskContext.java | 2 +- .../runtime/operators/RegularPactTask.java | 133 +++------ .../operators/chaining/ChainedDriver.java | 14 +- .../util/DistributedRuntimeUDFContext.java | 11 +- .../runtime/taskmanager/RuntimeEnvironment.java | 32 +- .../apache/flink/runtime/taskmanager/Task.java | 42 ++- .../runtime/taskmanager/TaskExecutionState.java | 43 ++- .../flink/runtime/util/SerializedValue.java | 2 +- .../flink/runtime/jobmanager/JobManager.scala | 118 ++++---- .../runtime/messages/TaskManagerMessages.scala | 5 +- .../accumulators/AccumulatorMessages.scala | 15 +- .../flink/runtime/taskmanager/TaskManager.scala | 61 ++-- .../network/api/reader/AbstractReaderTest.java | 7 + .../operators/testutils/DriverTestBase.java | 6 +- .../operators/testutils/MockEnvironment.java | 9 +- .../flink/runtime/taskmanager/TaskTest.java | 1 + .../runtime/testingUtils/TestingCluster.scala | 2 - .../testingUtils/TestingJobManager.scala | 10 + .../TestingJobManagerMessages.scala | 10 +- .../streaming/runtime/io/CoRecordReader.java | 11 + .../io/StreamingAbstractRecordReader.java | 13 +- .../io/StreamingMutableRecordReader.java | 1 + .../runtime/tasks/OneInputStreamTask.java | 6 + .../streaming/runtime/tasks/OutputHandler.java | 32 +- .../runtime/tasks/StreamIterationHead.java | 9 +- .../streaming/runtime/tasks/StreamTask.java | 16 +- .../runtime/tasks/StreamingRuntimeContext.java | 5 +- .../api/state/StatefulOperatorTest.java | 4 +- .../streaming/runtime/io/BarrierBufferTest.java | 6 + .../runtime/tasks/StreamMockEnvironment.java | 9 +- .../flink/streaming/util/MockCoContext.java | 4 +- .../flink/streaming/util/MockContext.java | 4 +- .../streaming/util/SourceFunctionUtil.java | 4 +- .../flink/tez/runtime/RegularProcessor.java | 4 +- .../flink/test/util/RecordAPITestBase.java | 2 +- .../test/util/ForkableFlinkMiniCluster.scala | 2 - .../test/accumulators/AccumulatorITCase.java | 3 +- .../AccumulatorIterativeITCase.java | 18 +- .../accumulators/AccumulatorLiveITCase.java | 299 +++++++++++++++++++ .../apache/flink/yarn/ApplicationMaster.scala | 2 - 84 files changed, 1362 insertions(+), 599 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java index 123d956..e49cc04 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java @@ -40,7 +40,6 @@ import java.io.Serializable; * client */ public interface Accumulator extends Serializable, Cloneable { - /** * @param value * The value to add to the accumulator object http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java index eb84d1c..fb9d842 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java @@ -20,8 +20,8 @@ package org.apache.flink.api.common.functions; import java.io.IOException; import java.io.Serializable; -import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.accumulators.Accumulator; @@ -100,10 +100,12 @@ public interface RuntimeContext { Accumulator getAccumulator(String name); /** - * For system internal usage only. Use getAccumulator(...) to obtain a - * accumulator. Use this as read-only. + * Returns a map of all registered accumulators for this task. + * The returned map must not be modified. + * @deprecated Use getAccumulator(..) to obtain the value of an accumulator. */ - HashMap> getAllAccumulators(); + @Deprecated + Map> getAllAccumulators(); /** * Convenience function to create a counter object for integers. http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java index f48eb57..13d79e7 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java @@ -21,10 +21,10 @@ package org.apache.flink.api.common.functions.util; import java.io.IOException; import java.io.Serializable; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.Future; +import com.google.common.base.Preconditions; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.AccumulatorHelper; @@ -53,32 +53,34 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext { private final ExecutionConfig executionConfig; - private final HashMap> accumulators = new HashMap>(); - + private final Map> accumulators; + private final DistributedCache distributedCache; - - + + public AbstractRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, - ExecutionConfig executionConfig) + ExecutionConfig executionConfig, + Map> accumulators) { this(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, - Collections.>emptyMap()); + accumulators, Collections.>emptyMap()); } - + public AbstractRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, - Map> cpTasks) - { + Map> accumulators, + Map> cpTasks) { this.name = name; this.numParallelSubtasks = numParallelSubtasks; this.subtaskIndex = subtaskIndex; this.userCodeClassLoader = userCodeClassLoader; this.executionConfig = executionConfig; this.distributedCache = new DistributedCache(cpTasks); + this.accumulators = Preconditions.checkNotNull(accumulators); } @Override @@ -137,8 +139,8 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext { } @Override - public HashMap> getAllAccumulators() { - return this.accumulators; + public Map> getAllAccumulators() { + return Collections.unmodifiableMap(this.accumulators); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java index d116d00..1689138 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.concurrent.Future; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.BroadcastVariableInitializer; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.core.fs.Path; @@ -38,12 +39,14 @@ public class RuntimeUDFContext extends AbstractRuntimeUDFContext { private final HashMap> uninitializedBroadcastVars = new HashMap>(); - public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig) { - super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig); + public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, + ExecutionConfig executionConfig, Map> accumulators) { + super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, accumulators); } - public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, Map> cpTasks) { - super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, cpTasks); + public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, + ExecutionConfig executionConfig, Map> cpTasks, Map> accumulators) { + super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, accumulators, cpTasks); } http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java index f605113..0a9146c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java @@ -184,8 +184,8 @@ public class CollectionExecutor { // build the runtime context and compute broadcast variables, if necessary RuntimeUDFContext ctx; if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) { - ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, getClass().getClassLoader(), executionConfig) : - new IterationRuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig); + ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, getClass().getClassLoader(), executionConfig, accumulators) : + new IterationRuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig, accumulators); for (Map.Entry> bcInputs : operator.getBroadcastInputs().entrySet()) { List bcData = execute(bcInputs.getValue()); @@ -197,9 +197,6 @@ public class CollectionExecutor { List result = typedOp.executeOnCollections(inputData, ctx, executionConfig); - if (ctx != null) { - AccumulatorHelper.mergeInto(this.accumulators, ctx.getAllAccumulators()); - } return result; } @@ -226,8 +223,8 @@ public class CollectionExecutor { // build the runtime context and compute broadcast variables, if necessary RuntimeUDFContext ctx; if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) { - ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig) : - new IterationRuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig); + ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig, accumulators) : + new IterationRuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig, accumulators); for (Map.Entry> bcInputs : operator.getBroadcastInputs().entrySet()) { List bcData = execute(bcInputs.getValue()); @@ -239,9 +236,6 @@ public class CollectionExecutor { List result = typedOp.executeOnCollections(inputData1, inputData2, ctx, executionConfig); - if (ctx != null) { - AccumulatorHelper.mergeInto(this.accumulators, ctx.getAllAccumulators()); - } return result; } @@ -485,8 +479,9 @@ public class CollectionExecutor { private class IterationRuntimeUDFContext extends RuntimeUDFContext implements IterationRuntimeContext { - public IterationRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader classloader, ExecutionConfig executionConfig) { - super(name, numParallelSubtasks, subtaskIndex, classloader, executionConfig); + public IterationRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader classloader, + ExecutionConfig executionConfig, Map> accumulators) { + super(name, numParallelSubtasks, subtaskIndex, classloader, executionConfig, accumulators); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java b/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java index c77a1b6..9189d5b 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java @@ -22,9 +22,11 @@ import static org.junit.Assert.*; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.BroadcastVariableInitializer; import org.junit.Test; @@ -34,7 +36,7 @@ public class RuntimeUDFContextTest { @Test public void testBroadcastVariableNotFound() { try { - RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig()); + RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap>()); try { ctx.getBroadcastVariable("some name"); @@ -64,7 +66,7 @@ public class RuntimeUDFContextTest { @Test public void testBroadcastVariableSimple() { try { - RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig()); + RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap>()); ctx.setBroadcastVariable("name1", Arrays.asList(1, 2, 3, 4)); ctx.setBroadcastVariable("name2", Arrays.asList(1.0, 2.0, 3.0, 4.0)); @@ -98,7 +100,7 @@ public class RuntimeUDFContextTest { @Test public void testBroadcastVariableWithInitializer() { try { - RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig()); + RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap>()); ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4)); @@ -123,7 +125,7 @@ public class RuntimeUDFContextTest { @Test public void testResetBroadcastVariableWithInitializer() { try { - RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig()); + RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap>()); ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4)); @@ -146,7 +148,7 @@ public class RuntimeUDFContextTest { @Test public void testBroadcastVariableWithInitializerAndMismatch() { try { - RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig()); + RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap>()); ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4)); http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java index d231455..734324b 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java @@ -19,6 +19,7 @@ package org.apache.flink.api.common.operators.base; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RuntimeContext; @@ -33,6 +34,7 @@ import org.junit.Test; import java.io.Serializable; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; @SuppressWarnings("serial") @@ -72,7 +74,7 @@ public class FlatMapOperatorCollectionTest implements Serializable { } // run on collections final List result = getTestFlatMapOperator(udf) - .executeOnCollections(input, new RuntimeUDFContext("Test UDF", 4, 0, null, executionConfig), executionConfig); + .executeOnCollections(input, new RuntimeUDFContext("Test UDF", 4, 0, null, executionConfig, new HashMap>()), executionConfig); Assert.assertEquals(input.size(), result.size()); Assert.assertEquals(input, result); http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java index 54975b4..98f75bc 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java @@ -21,6 +21,7 @@ package org.apache.flink.api.common.operators.base; import static org.junit.Assert.*; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.functions.RichFlatJoinFunction; import org.apache.flink.api.common.functions.util.RuntimeUDFContext; @@ -33,6 +34,7 @@ import org.junit.Test; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -114,11 +116,12 @@ public class JoinOperatorBaseTest implements Serializable { try { + final HashMap> accumulatorMap = new HashMap>(); ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - List resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig); + List resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, accumulatorMap), executionConfig); executionConfig.enableObjectReuse(); - List resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig); + List resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, accumulatorMap), executionConfig); assertEquals(expected, resultSafe); assertEquals(expected, resultRegular); http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java index 8e07f07..2c98a17 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java @@ -22,10 +22,12 @@ import static org.junit.Assert.*; import static java.util.Arrays.asList; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.functions.RuntimeContext; @@ -102,11 +104,12 @@ public class MapOperatorTest implements java.io.Serializable { parser, new UnaryOperatorInformation(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), taskName); List input = new ArrayList(asList("1", "2", "3", "4", "5", "6")); + final HashMap> accumulatorMap = new HashMap>(); ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - List resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig); + List resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, accumulatorMap), executionConfig); executionConfig.enableObjectReuse(); - List resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig); + List resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, accumulatorMap), executionConfig); assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe); assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular); http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java index 61ba359..28c6d821 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java @@ -22,10 +22,12 @@ import static org.junit.Assert.*; import static java.util.Arrays.asList; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.util.Collector; import org.apache.flink.api.common.functions.MapPartitionFunction; import org.apache.flink.api.common.functions.RichMapPartitionFunction; @@ -78,9 +80,9 @@ public class PartitionMapOperatorTest implements java.io.Serializable { ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - List resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig); + List resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap>()), executionConfig); executionConfig.enableObjectReuse(); - List resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig); + List resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap>()), executionConfig); assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe); assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular); http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java index 82c3ad8..e6b8418 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java @@ -60,7 +60,7 @@ public class WordCount { // set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - + // get input data DataSet text = getTextDataSet(env); http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index d24a350..c628b04 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -408,14 +408,16 @@ public abstract class DataSet { JobExecutionResult res = getExecutionEnvironment().execute(); ArrayList accResult = res.getAccumulatorResult(id); - try { - return SerializedListAccumulator.deserializeList(accResult, serializer); - } - catch (ClassNotFoundException e) { - throw new RuntimeException("Cannot find type class of collected data type.", e); - } - catch (IOException e) { - throw new RuntimeException("Serialization error while deserializing collected data", e); + if (accResult != null) { + try { + return SerializedListAccumulator.deserializeList(accResult, serializer); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Cannot find type class of collected data type.", e); + } catch (IOException e) { + throw new RuntimeException("Serialization error while deserializing collected data", e); + } + } else { + throw new RuntimeException("The call to collect() could not retrieve the DataSet."); } } http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-java/src/main/java/org/apache/flink/api/java/Utils.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java index dd1d6d2..a1e3d25 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java @@ -114,13 +114,18 @@ public class Utils { @Override public void open(Configuration parameters) throws Exception { this.accumulator = new SerializedListAccumulator(); - getRuntimeContext().addAccumulator(id, accumulator); } @Override public void flatMap(T value, Collector out) throws Exception { accumulator.add(value, serializer); } + + @Override + public void close() throws Exception { + // Important: should only be added in close method to minimize traffic of accumulators + getRuntimeContext().addAccumulator(id, accumulator); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java index 0aa6097..a178a47 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java @@ -19,6 +19,7 @@ package org.apache.flink.api.common.operators.base; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.RichCoGroupFunction; import org.apache.flink.api.common.functions.RuntimeContext; @@ -35,6 +36,7 @@ import org.junit.Test; import java.io.Serializable; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -68,7 +70,8 @@ public class CoGroupOperatorCollectionTest implements Serializable { ); ExecutionConfig executionConfig = new ExecutionConfig(); - final RuntimeContext ctx = new RuntimeUDFContext("Test UDF", 4, 0, null, executionConfig); + final HashMap> accumulators = new HashMap>(); + final RuntimeContext ctx = new RuntimeUDFContext("Test UDF", 4, 0, null, executionConfig, accumulators); { SumCoGroup udf1 = new SumCoGroup(); http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java index 447c8c5..08f4acd 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java @@ -19,6 +19,7 @@ package org.apache.flink.api.common.operators.base; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.common.functions.RuntimeContext; @@ -31,6 +32,7 @@ import org.apache.flink.util.Collector; import org.junit.Test; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -161,9 +163,9 @@ public class GroupReduceOperatorTest implements java.io.Serializable { ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - List> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig); + List> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap>()), executionConfig); executionConfig.enableObjectReuse(); - List> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig); + List> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap>()), executionConfig); Set> resultSetMutableSafe = new HashSet>(resultMutableSafe); http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java index 1b38281..21fcfb3 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java @@ -21,6 +21,7 @@ package org.apache.flink.api.common.operators.base; import static org.junit.Assert.*; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.functions.util.RuntimeUDFContext; import org.apache.flink.api.common.operators.BinaryOperatorInformation; @@ -33,6 +34,7 @@ import org.junit.Test; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -103,9 +105,9 @@ public class JoinOperatorBaseTest implements Serializable { try { ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - List> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null, executionConfig), executionConfig); + List> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null, executionConfig, new HashMap>()), executionConfig); executionConfig.enableObjectReuse(); - List> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null, executionConfig), executionConfig); + List> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null, executionConfig, new HashMap>()), executionConfig); assertEquals(expected, new HashSet>(resultSafe)); assertEquals(expected, new HashSet>(resultRegular)); http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java index 4e1eebd..7cd9771 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java @@ -19,6 +19,7 @@ package org.apache.flink.api.common.operators.base; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.common.functions.RuntimeContext; @@ -30,6 +31,7 @@ import org.apache.flink.configuration.Configuration; import org.junit.Test; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -138,9 +140,9 @@ public class ReduceOperatorTest implements java.io.Serializable { ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - List> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig); + List> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap>()), executionConfig); executionConfig.enableObjectReuse(); - List> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig); + List> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap>()), executionConfig); Set> resultSetMutableSafe = new HashSet>(resultMutableSafe); Set> resultSetRegular = new HashSet>(resultRegular); http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java index a2b78ed..6f41c29 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java @@ -545,7 +545,7 @@ public class Optimizer { // ------------------------------------------------------------------------ private OptimizerPostPass getPostPassFromPlan(Plan program) { - final String className = program.getPostPassClassName(); + final String className = program.getPostPassClassName(); if (className == null) { throw new CompilerException("Optimizer Post Pass class description is null"); } http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java deleted file mode 100644 index ad7fabd..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.accumulators; - -import java.io.IOException; -import java.util.Map; - -import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.util.SerializedValue; - -/** - * This class encapsulates a map of accumulators for a single job. It is used - * for the transfer from TaskManagers to the JobManager and from the JobManager - * to the Client. - */ -public class AccumulatorEvent extends SerializedValue>> { - - private static final long serialVersionUID = 8965894516006882735L; - - /** JobID for the target job */ - private final JobID jobID; - - - public AccumulatorEvent(JobID jobID, Map> accumulators) throws IOException { - super(accumulators); - this.jobID = jobID; - } - - public JobID getJobID() { - return this.jobID; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java new file mode 100644 index 0000000..0ef3650 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.accumulators; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + + +/** + * Main accumulator registry which encapsulates internal and user-defined accumulators. + */ +public class AccumulatorRegistry { + + protected static final Logger LOG = LoggerFactory.getLogger(AccumulatorRegistry.class); + + protected final JobID jobID; + protected final ExecutionAttemptID taskID; + + /* Flink's internal Accumulator values stored for the executing task. */ + private final Map> flinkAccumulators = + new HashMap>(); + + /* User-defined Accumulator values stored for the executing task. */ + private final Map> userAccumulators = + Collections.synchronizedMap(new HashMap>()); + + /* The reporter reference that is handed to the reporting tasks. */ + private final ReadWriteReporter reporter; + + /** + * Flink metrics supported + */ + public enum Metric { + NUM_RECORDS_IN, + NUM_RECORDS_OUT, + NUM_BYTES_IN, + NUM_BYTES_OUT + } + + + public AccumulatorRegistry(JobID jobID, ExecutionAttemptID taskID) { + this.jobID = jobID; + this.taskID = taskID; + this.reporter = new ReadWriteReporter(flinkAccumulators); + } + + /** + * Creates a snapshot of this accumulator registry. + * @return a serialized accumulator map + */ + public AccumulatorSnapshot getSnapshot() { + try { + return new AccumulatorSnapshot(jobID, taskID, flinkAccumulators, userAccumulators); + } catch (IOException e) { + LOG.warn("Failed to serialize accumulators for task.", e); + return null; + } + } + + /** + * Gets the map for user-defined accumulators. + */ + public Map> getUserMap() { + return userAccumulators; + } + + /** + * Gets the reporter for flink internal metrics. + */ + public Reporter getReadWriteReporter() { + return reporter; + } + + /** + * Interface for Flink's internal accumulators. + */ + public interface Reporter { + void reportNumRecordsIn(long value); + void reportNumRecordsOut(long value); + void reportNumBytesIn(long value); + void reportNumBytesOut(long value); + } + + /** + * Accumulator based reporter for keeping track of internal metrics (e.g. bytes and records in/out) + */ + private static class ReadWriteReporter implements Reporter { + + private LongCounter numRecordsIn = new LongCounter(); + private LongCounter numRecordsOut = new LongCounter(); + private LongCounter numBytesIn = new LongCounter(); + private LongCounter numBytesOut = new LongCounter(); + + private ReadWriteReporter(Map> accumulatorMap) { + accumulatorMap.put(Metric.NUM_RECORDS_IN, numRecordsIn); + accumulatorMap.put(Metric.NUM_RECORDS_OUT, numRecordsOut); + accumulatorMap.put(Metric.NUM_BYTES_IN, numBytesIn); + accumulatorMap.put(Metric.NUM_BYTES_OUT, numBytesOut); + } + + @Override + public void reportNumRecordsIn(long value) { + numRecordsIn.add(value); + } + + @Override + public void reportNumRecordsOut(long value) { + numRecordsOut.add(value); + } + + @Override + public void reportNumBytesIn(long value) { + numBytesIn.add(value); + } + + @Override + public void reportNumBytesOut(long value) { + numBytesOut.add(value); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java new file mode 100644 index 0000000..a6288f0 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.accumulators; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.util.SerializedValue; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Map; + +/** + * This class encapsulates a map of accumulators for a single task. It is used + * for the transfer from TaskManagers to the JobManager and from the JobManager + * to the Client. + */ +public class AccumulatorSnapshot implements Serializable { + + private static final long serialVersionUID = 42L; + + private final JobID jobID; + private final ExecutionAttemptID executionAttemptID; + + /** + * Flink internal accumulators which can be serialized using the system class loader. + */ + private final Map> flinkAccumulators; + + /** + * Serialized user accumulators which may require the custom user class loader. + */ + private final SerializedValue>> userAccumulators; + + public AccumulatorSnapshot(JobID jobID, ExecutionAttemptID executionAttemptID, + Map> flinkAccumulators, + Map> userAccumulators) throws IOException { + this.jobID = jobID; + this.executionAttemptID = executionAttemptID; + this.flinkAccumulators = flinkAccumulators; + this.userAccumulators = new SerializedValue>>(userAccumulators); + } + + public JobID getJobID() { + return jobID; + } + + public ExecutionAttemptID getExecutionAttemptID() { + return executionAttemptID; + } + + /** + * Gets the Flink (internal) accumulators values. + * @return the serialized map + */ + public Map> getFlinkAccumulators() { + return flinkAccumulators; + } + + /** + * Gets the user-defined accumulators values. + * @return the serialized map + */ + public Map> deserializeUserAccumulators(ClassLoader classLoader) throws IOException, ClassNotFoundException { + return userAccumulators.deserializeValue(classLoader); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java index 755f1ad..c561869 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java @@ -18,9 +18,9 @@ package org.apache.flink.runtime.execution; -import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.io.disk.iomanager.IOManager; @@ -142,11 +142,10 @@ public interface Environment { BroadcastVariableManager getBroadcastVariableManager(); /** - * Reports the given set of accumulators to the JobManager. - * - * @param accumulators The accumulators to report. + * Return the registry for accumulators which are periodically sent to the job manager. + * @return the registry */ - void reportAccumulators(Map> accumulators); + AccumulatorRegistry getAccumulatorRegistry(); /** * Confirms that the invokable has successfully completed all steps it needed to http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java index 848f619..88be5e1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java @@ -46,7 +46,7 @@ import com.google.common.base.Preconditions; * For each job graph that is submitted to the system the library cache manager maintains * a set of libraries (typically JAR files) which the job requires to run. The library cache manager * caches library files in order to avoid unnecessary retransmission of data. It is based on a singleton - * programming pattern, so there exists at most on library manager at a time. + * programming pattern, so there exists at most one library manager at a time. */ public final class BlobLibraryCacheManager extends TimerTask implements LibraryCacheManager { http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index af67c3f..3b836ad 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -20,7 +20,9 @@ package org.apache.flink.runtime.executiongraph; import akka.dispatch.OnComplete; import akka.dispatch.OnFailure; +import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionLocation; @@ -54,6 +56,7 @@ import scala.concurrent.duration.FiniteDuration; import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeoutException; @@ -133,6 +136,15 @@ public class Execution implements Serializable { @SuppressWarnings("NonSerializableFieldInSerializableClass") private ExecutionContext executionContext; + /* Lock for updating the accumulators atomically. */ + private final Object accumulatorLock = new Object(); + + /* Continuously updated map of user-defined accumulators */ + private volatile Map> userAccumulators; + + /* Continuously updated map of internal accumulators */ + private volatile Map> flinkAccumulators; + // -------------------------------------------------------------------------------------------- public Execution( @@ -593,6 +605,10 @@ public class Execution implements Serializable { } void markFinished() { + markFinished(null, null); + } + + void markFinished(Map> flinkAccumulators, Map> userAccumulators) { // this call usually comes during RUNNING, but may also come while still in deploying (very fast tasks!) while (true) { @@ -613,6 +629,11 @@ public class Execution implements Serializable { } } + synchronized (accumulatorLock) { + this.flinkAccumulators = flinkAccumulators; + this.userAccumulators = userAccumulators; + } + assignedResource.releaseSlot(); vertex.getExecutionGraph().deregisterExecution(this); } @@ -935,6 +956,28 @@ public class Execution implements Serializable { return vertex.getSimpleName() + " - execution #" + attemptNumber; } + /** + * Update accumulators (discarded when the Execution has already been terminated). + * @param flinkAccumulators the flink internal accumulators + * @param userAccumulators the user accumulators + */ + public void setAccumulators(Map> flinkAccumulators, + Map> userAccumulators) { + synchronized (accumulatorLock) { + if (!state.isTerminal()) { + this.flinkAccumulators = flinkAccumulators; + this.userAccumulators = userAccumulators; + } + } + } + public Map> getUserAccumulators() { + return userAccumulators; + } + + public Map> getFlinkAccumulators() { + return flinkAccumulators; + } + @Override public String toString() { return String.format("Attempt #%d (%s) @ %s - [%s]", attemptNumber, vertex.getSimpleName(), http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 47b7ae2..6d2262c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -22,8 +22,12 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.execution.ExecutionState; @@ -38,6 +42,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.messages.ExecutionGraphMessages; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.util.SerializableObject; +import org.apache.flink.runtime.util.SerializedValue; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.InstantiationUtil; @@ -47,10 +52,12 @@ import org.slf4j.LoggerFactory; import scala.concurrent.ExecutionContext; import scala.concurrent.duration.FiniteDuration; +import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -128,6 +135,29 @@ public class ExecutionGraph implements Serializable { /** The currently executed tasks, for callbacks */ private final ConcurrentHashMap currentExecutions; + /** + * Updates the accumulators during the runtime of a job. Final accumulator results are transferred + * through the UpdateTaskExecutionState message. + * @param accumulatorSnapshot The serialized flink and user-defined accumulators + */ + public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) { + Map> flinkAccumulators = accumulatorSnapshot.getFlinkAccumulators(); + Map> userAccumulators; + try { + userAccumulators = accumulatorSnapshot.deserializeUserAccumulators(userClassLoader); + + ExecutionAttemptID execID = accumulatorSnapshot.getExecutionAttemptID(); + Execution execution = currentExecutions.get(execID); + if (execution != null) { + execution.setAccumulators(flinkAccumulators, userAccumulators); + } else { + LOG.warn("Received accumulator result for unknown execution {}.", execID); + } + } catch (Exception e) { + LOG.error("Cannot update accumulators for job " + jobID, e); + } + } + /** A list of all libraries required during the job execution. Libraries have to be stored * inside the BlobService and are referenced via the BLOB keys. */ private final List requiredJarFiles; @@ -485,6 +515,57 @@ public class ExecutionGraph implements Serializable { return executionContext; } + /** + * Gets the internal flink accumulator map of maps which contains some metrics. + * @return A map of accumulators for every executed task. + */ + public Map>> getFlinkAccumulators() { + Map>> flinkAccumulators = + new HashMap>>(); + + for (ExecutionVertex vertex : getAllExecutionVertices()) { + Map> taskAccs = vertex.getCurrentExecutionAttempt().getFlinkAccumulators(); + flinkAccumulators.put(vertex.getCurrentExecutionAttempt().getAttemptId(), taskAccs); + } + + return flinkAccumulators; + } + + /** + * Merges all accumulator results from the tasks previously executed in the Executions. + * @return The accumulator map + */ + public Map> aggregateUserAccumulators() { + + Map> userAccumulators = new HashMap>(); + + for (ExecutionVertex vertex : getAllExecutionVertices()) { + Map> next = vertex.getCurrentExecutionAttempt().getUserAccumulators(); + if (next != null) { + AccumulatorHelper.mergeInto(userAccumulators, next); + } + } + + return userAccumulators; + } + + /** + * Gets a serialized accumulator map. + * @return The accumulator map with serialized accumulator values. + * @throws IOException + */ + public Map> getAccumulatorsSerialized() throws IOException { + + Map> accumulatorMap = aggregateUserAccumulators(); + + Map> result = new HashMap>(); + for (Map.Entry> entry : accumulatorMap.entrySet()) { + result.put(entry.getKey(), new SerializedValue(entry.getValue().getLocalValue())); + } + + return result; + } + // -------------------------------------------------------------------------------------------- // Actions // -------------------------------------------------------------------------------------------- @@ -791,14 +872,31 @@ public class ExecutionGraph implements Serializable { // Callbacks and Callback Utilities // -------------------------------------------------------------------------------------------- + /** + * Updates the state of the Task and sets the final accumulator results. + * @param state + * @return + */ public boolean updateState(TaskExecutionState state) { Execution attempt = this.currentExecutions.get(state.getID()); if (attempt != null) { + switch (state.getExecutionState()) { case RUNNING: return attempt.switchToRunning(); case FINISHED: - attempt.markFinished(); + Map> flinkAccumulators = null; + Map> userAccumulators = null; + try { + AccumulatorSnapshot accumulators = state.getAccumulators(); + flinkAccumulators = accumulators.getFlinkAccumulators(); + userAccumulators = accumulators.deserializeUserAccumulators(userClassLoader); + } catch (Exception e) { + // Exceptions would be thrown in the future here + LOG.error("Failed to deserialize final accumulator results.", e); + } + + attempt.markFinished(flinkAccumulators, userAccumulators); return true; case CANCELED: attempt.cancelingComplete(); http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java index 96b6f99..90564a8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java @@ -147,4 +147,5 @@ public abstract class AbstractReader implements ReaderBase { return ++currentNumberOfEndOfSuperstepEvents == inputGate.getNumberOfInputChannels(); } + } http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java index 56e5d33..7aa57a8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network.api.reader; import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult; import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer; @@ -64,7 +65,9 @@ abstract class AbstractRecordReader extends Abstra DeserializationResult result = currentRecordDeserializer.getNextRecord(target); if (result.isBufferConsumed()) { - currentRecordDeserializer.getCurrentBuffer().recycle(); + final Buffer currentBuffer = currentRecordDeserializer.getCurrentBuffer(); + + currentBuffer.recycle(); currentRecordDeserializer = null; } @@ -89,7 +92,7 @@ abstract class AbstractRecordReader extends Abstra + "If you are using custom serialization code (Writable or Value types), check their " + "serialization routines. In the case of Kryo, check the respective Kryo serializer."); } - + if (handleEvent(bufferOrEvent.getEvent())) { if (inputGate.isFinished()) { isFinished = true; @@ -112,4 +115,11 @@ abstract class AbstractRecordReader extends Abstra } } } + + @Override + public void setReporter(AccumulatorRegistry.Reporter reporter) { + for (RecordDeserializer deserializer : recordDeserializers) { + deserializer.setReporter(reporter); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java index ca59609..debb352 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.api.reader; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; @@ -47,4 +48,9 @@ public final class BufferReader extends AbstractReader { } } } + + @Override + public void setReporter(AccumulatorRegistry.Reporter reporter) { + + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java index 2a0a6df..9f8ae20 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.api.reader; import java.io.IOException; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.event.task.TaskEvent; import org.apache.flink.runtime.util.event.EventListener; @@ -51,4 +52,9 @@ public interface ReaderBase { boolean hasReachedEndOfSuperstep(); + /** + * Setter for the reporter, e.g. for the number of records emitted and the number of bytes read. + */ + void setReporter(AccumulatorRegistry.Reporter reporter); + } http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java index b1395e3..d45920e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java @@ -80,4 +80,5 @@ public class RecordReader extends AbstractRecordRe throw new RuntimeException("Cannot instantiate class " + recordType.getName(), e); } } + } http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java index 28bcf4a..ec9f4fd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.api.serialization; import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.util.DataInputDeserializer; import org.apache.flink.runtime.util.DataOutputSerializer; @@ -45,6 +46,8 @@ public class AdaptiveSpanningRecordDeserializer im private Buffer currentBuffer; + private AccumulatorRegistry.Reporter reporter; + public AdaptiveSpanningRecordDeserializer() { this.nonSpanningWrapper = new NonSpanningWrapper(); this.spanningWrapper = new SpanningWrapper(); @@ -90,10 +93,18 @@ public class AdaptiveSpanningRecordDeserializer im if (nonSpanningRemaining >= 4) { int len = this.nonSpanningWrapper.readInt(); + if (reporter != null) { + reporter.reportNumBytesIn(len); + } + if (len <= nonSpanningRemaining - 4) { // we can get a full record from here target.read(this.nonSpanningWrapper); + if (reporter != null) { + reporter.reportNumRecordsIn(1); + } + return (this.nonSpanningWrapper.remaining() == 0) ? DeserializationResult.LAST_RECORD_FROM_BUFFER : DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER; @@ -117,6 +128,10 @@ public class AdaptiveSpanningRecordDeserializer im // get the full record target.read(this.spanningWrapper); + if (reporter != null) { + reporter.reportNumRecordsIn(1); + } + // move the remainder to the non-spanning wrapper // this does not copy it, only sets the memory segment this.spanningWrapper.moveRemainderToNonSpanningDeserializer(this.nonSpanningWrapper); @@ -144,6 +159,12 @@ public class AdaptiveSpanningRecordDeserializer im return this.nonSpanningWrapper.remaining() > 0 || this.spanningWrapper.getNumGatheredBytes() > 0; } + @Override + public void setReporter(AccumulatorRegistry.Reporter reporter) { + this.reporter = reporter; + this.spanningWrapper.setReporter(reporter); + } + // ----------------------------------------------------------------------------------------------------------------- private static final class NonSpanningWrapper implements DataInputView { @@ -426,6 +447,8 @@ public class AdaptiveSpanningRecordDeserializer im private int recordLimit; + private AccumulatorRegistry.Reporter reporter; + public SpanningWrapper() { this.lengthBuffer = ByteBuffer.allocate(4); this.lengthBuffer.order(ByteOrder.BIG_ENDIAN); @@ -463,6 +486,10 @@ public class AdaptiveSpanningRecordDeserializer im } else { this.recordLength = this.lengthBuffer.getInt(0); + if (reporter != null) { + reporter.reportNumBytesIn(this.recordLength); + } + this.lengthBuffer.clear(); segmentPosition = toPut; } @@ -607,5 +634,9 @@ public class AdaptiveSpanningRecordDeserializer im public int read(byte[] b) throws IOException { return this.serializationReadBuffer.read(b); } + + public void setReporter(AccumulatorRegistry.Reporter reporter) { + this.reporter = reporter; + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java index dd8ea06..e4c7890 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java @@ -23,6 +23,7 @@ import java.io.IOException; import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.buffer.Buffer; /** @@ -64,4 +65,9 @@ public interface RecordDeserializer { void clear(); boolean hasUnfinishedData(); + + /** + * Setter for the reporter, e.g. for the number of records emitted and the number of bytes read. + */ + void setReporter(AccumulatorRegistry.Reporter reporter); } http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java index bb6db55..e9f339a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java @@ -22,6 +22,7 @@ package org.apache.flink.runtime.io.network.api.serialization; import java.io.IOException; import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.buffer.Buffer; /** @@ -63,4 +64,9 @@ public interface RecordSerializer { void clear(); boolean hasData(); + + /** + * Setter for the reporter, e.g. for the number of records emitted and the number of bytes read. + */ + void setReporter(AccumulatorRegistry.Reporter reporter); } http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java index 38e130d..f163e05 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java @@ -24,6 +24,7 @@ import java.nio.ByteOrder; import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.util.DataOutputSerializer; @@ -50,6 +51,8 @@ public class SpanningRecordSerializer implements R /** Limit of current {@link MemorySegment} of target buffer */ private int limit; + private AccumulatorRegistry.Reporter reporter; + public SpanningRecordSerializer() { this.serializationBuffer = new DataOutputSerializer(128); @@ -75,7 +78,13 @@ public class SpanningRecordSerializer implements R // write data and length record.write(this.serializationBuffer); - this.lengthBuffer.putInt(0, this.serializationBuffer.length()); + int len = this.serializationBuffer.length(); + this.lengthBuffer.putInt(0, len); + + if (reporter != null) { + reporter.reportNumBytesOut(len); + reporter.reportNumRecordsOut(1); + } this.dataBuffer = this.serializationBuffer.wrapAsByteBuffer(); @@ -173,4 +182,9 @@ public class SpanningRecordSerializer implements R // either data in current target buffer or intermediate buffers return this.position > 0 || (this.lengthBuffer.hasRemaining() || this.dataBuffer.hasRemaining()); } + + @Override + public void setReporter(AccumulatorRegistry.Reporter reporter) { + this.reporter = reporter; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java index 6b0d836..f3e4892 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java @@ -24,6 +24,7 @@ import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.InputViewDataInputStreamWrapper; import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.util.DataInputDeserializer; import org.apache.flink.util.StringUtils; @@ -61,6 +62,8 @@ public class SpillingAdaptiveSpanningRecordDeserializer= 4) { int len = this.nonSpanningWrapper.readInt(); + if (reporter != null) { + reporter.reportNumBytesIn(len); + } + if (len <= nonSpanningRemaining - 4) { // we can get a full record from here try { target.read(this.nonSpanningWrapper); + if (reporter != null) { + reporter.reportNumRecordsIn(1); + } + int remaining = this.nonSpanningWrapper.remaining(); if (remaining > 0) { return DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER; @@ -151,6 +162,10 @@ public class SpillingAdaptiveSpanningRecordDeserializer 0 || this.spanningWrapper.getNumGatheredBytes() > 0; } + @Override + public void setReporter(AccumulatorRegistry.Reporter reporter) { + this.reporter = reporter; + this.spanningWrapper.setReporter(reporter); + } + // ----------------------------------------------------------------------------------------------------------------- private static final class NonSpanningWrapper implements DataInputView { @@ -469,7 +490,9 @@ public class SpillingAdaptiveSpanningRecordDeserializer