flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [4/4] flink git commit: [FLINK-2292][FLINK-1573] add live per-task accumulators
Date Wed, 15 Jul 2015 13:55:57 GMT
[FLINK-2292][FLINK-1573] add live per-task accumulators

This refactors the accumulators to accumulate per task execution. The
accumulators are reported from the task managers periodically to the job
manager via the Heartbeat message. If the execution contains chained
tasks, the accumulators are chained as well. The final accumulator
results are reported via the UpdateTaskExecutionState message.

The accumulators are now saved in the Execution within the
ExecutionGraph. This makes the AccumulatorManager obsolete. It has been
removed for now. In the future, we might introduce some caching for the
web frontend visualization.

Two types of accumulators are available:

- external (user-defined via the RuntimeContext)
- internal (flink metrics defined in the invocables)

The internal (built-in) metrics are targeted at users who want to
monitor their programs, e.g. through the job manager's web frontend.

This closes #896.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8261ed54
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8261ed54
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8261ed54

Branch: refs/heads/master
Commit: 8261ed5438278331c9cd760463273ad94d4bd410
Parents: d592ee6
Author: Maximilian Michels <mxm@apache.org>
Authored: Wed Jul 8 09:23:42 2015 +0200
Committer: Maximilian Michels <mxm@apache.org>
Committed: Wed Jul 15 15:19:06 2015 +0200

----------------------------------------------------------------------
 .../api/common/accumulators/Accumulator.java    |   1 -
 .../api/common/functions/RuntimeContext.java    |  10 +-
 .../util/AbstractRuntimeUDFContext.java         |  26 +-
 .../functions/util/RuntimeUDFContext.java       |  11 +-
 .../common/operators/CollectionExecutor.java    |  19 +-
 .../functions/util/RuntimeUDFContextTest.java   |  12 +-
 .../base/FlatMapOperatorCollectionTest.java     |   4 +-
 .../operators/base/JoinOperatorBaseTest.java    |   7 +-
 .../common/operators/base/MapOperatorTest.java  |   7 +-
 .../base/PartitionMapOperatorTest.java          |   6 +-
 .../examples/java/wordcount/WordCount.java      |   2 +-
 .../java/org/apache/flink/api/java/DataSet.java |  18 +-
 .../java/org/apache/flink/api/java/Utils.java   |   7 +-
 .../base/CoGroupOperatorCollectionTest.java     |   5 +-
 .../operators/base/GroupReduceOperatorTest.java |   6 +-
 .../operators/base/JoinOperatorBaseTest.java    |   6 +-
 .../operators/base/ReduceOperatorTest.java      |   6 +-
 .../org/apache/flink/optimizer/Optimizer.java   |   2 +-
 .../runtime/accumulators/AccumulatorEvent.java  |  49 ---
 .../accumulators/AccumulatorRegistry.java       | 147 +++++++++
 .../accumulators/AccumulatorSnapshot.java       |  84 ++++++
 .../flink/runtime/execution/Environment.java    |   9 +-
 .../librarycache/BlobLibraryCacheManager.java   |   2 +-
 .../flink/runtime/executiongraph/Execution.java |  43 +++
 .../runtime/executiongraph/ExecutionGraph.java  | 100 ++++++-
 .../io/network/api/reader/AbstractReader.java   |   1 +
 .../api/reader/AbstractRecordReader.java        |  14 +-
 .../io/network/api/reader/BufferReader.java     |   6 +
 .../io/network/api/reader/ReaderBase.java       |   6 +
 .../io/network/api/reader/RecordReader.java     |   1 +
 .../AdaptiveSpanningRecordDeserializer.java     |  31 ++
 .../api/serialization/RecordDeserializer.java   |   6 +
 .../api/serialization/RecordSerializer.java     |   6 +
 .../serialization/SpanningRecordSerializer.java |  16 +-
 ...llingAdaptiveSpanningRecordDeserializer.java |  34 ++-
 .../io/network/api/writer/RecordWriter.java     |  28 ++
 .../task/AbstractIterativePactTask.java         |  19 +-
 .../iterative/task/IterationHeadPactTask.java   |   4 +-
 .../jobgraph/tasks/AbstractInvokable.java       |   2 +-
 .../accumulators/AccumulatorManager.java        | 144 ---------
 .../accumulators/JobAccumulators.java           |  42 ---
 .../flink/runtime/operators/DataSinkTask.java   |   6 +
 .../flink/runtime/operators/DataSourceTask.java |  27 +-
 .../flink/runtime/operators/PactDriver.java     |   2 +-
 .../runtime/operators/PactTaskContext.java      |   2 +-
 .../runtime/operators/RegularPactTask.java      | 133 +++------
 .../operators/chaining/ChainedDriver.java       |  14 +-
 .../util/DistributedRuntimeUDFContext.java      |  11 +-
 .../runtime/taskmanager/RuntimeEnvironment.java |  32 +-
 .../apache/flink/runtime/taskmanager/Task.java  |  42 ++-
 .../runtime/taskmanager/TaskExecutionState.java |  43 ++-
 .../flink/runtime/util/SerializedValue.java     |   2 +-
 .../flink/runtime/jobmanager/JobManager.scala   | 118 ++++----
 .../runtime/messages/TaskManagerMessages.scala  |   5 +-
 .../accumulators/AccumulatorMessages.scala      |  15 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  61 ++--
 .../network/api/reader/AbstractReaderTest.java  |   7 +
 .../operators/testutils/DriverTestBase.java     |   6 +-
 .../operators/testutils/MockEnvironment.java    |   9 +-
 .../flink/runtime/taskmanager/TaskTest.java     |   1 +
 .../runtime/testingUtils/TestingCluster.scala   |   2 -
 .../testingUtils/TestingJobManager.scala        |  10 +
 .../TestingJobManagerMessages.scala             |  10 +-
 .../streaming/runtime/io/CoRecordReader.java    |  11 +
 .../io/StreamingAbstractRecordReader.java       |  13 +-
 .../io/StreamingMutableRecordReader.java        |   1 +
 .../runtime/tasks/OneInputStreamTask.java       |   6 +
 .../streaming/runtime/tasks/OutputHandler.java  |  32 +-
 .../runtime/tasks/StreamIterationHead.java      |   9 +-
 .../streaming/runtime/tasks/StreamTask.java     |  16 +-
 .../runtime/tasks/StreamingRuntimeContext.java  |   5 +-
 .../api/state/StatefulOperatorTest.java         |   4 +-
 .../streaming/runtime/io/BarrierBufferTest.java |   6 +
 .../runtime/tasks/StreamMockEnvironment.java    |   9 +-
 .../flink/streaming/util/MockCoContext.java     |   4 +-
 .../flink/streaming/util/MockContext.java       |   4 +-
 .../streaming/util/SourceFunctionUtil.java      |   4 +-
 .../flink/tez/runtime/RegularProcessor.java     |   4 +-
 .../flink/test/util/RecordAPITestBase.java      |   2 +-
 .../test/util/ForkableFlinkMiniCluster.scala    |   2 -
 .../test/accumulators/AccumulatorITCase.java    |   3 +-
 .../AccumulatorIterativeITCase.java             |  18 +-
 .../accumulators/AccumulatorLiveITCase.java     | 299 +++++++++++++++++++
 .../apache/flink/yarn/ApplicationMaster.scala   |   2 -
 84 files changed, 1362 insertions(+), 599 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java
index 123d956..e49cc04 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java
@@ -40,7 +40,6 @@ import java.io.Serializable;
  *            client
  */
 public interface Accumulator<V, R extends Serializable> extends Serializable, Cloneable {
-
 	/**
 	 * @param value
 	 *            The value to add to the accumulator object

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index eb84d1c..fb9d842 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -20,8 +20,8 @@ package org.apache.flink.api.common.functions;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
@@ -100,10 +100,12 @@ public interface RuntimeContext {
 	<V, A extends Serializable> Accumulator<V, A> getAccumulator(String name);
 
 	/**
-	 * For system internal usage only. Use getAccumulator(...) to obtain a
-	 * accumulator. Use this as read-only.
+	 * Returns a map of all registered accumulators for this task.
+	 * The returned map must not be modified.
+	 * @deprecated Use getAccumulator(..) to obtain the value of an accumulator.
 	 */
-	HashMap<String, Accumulator<?, ?>> getAllAccumulators();
+	@Deprecated
+	Map<String, Accumulator<?, ?>> getAllAccumulators();
 
 	/**
 	 * Convenience function to create a counter object for integers.

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index f48eb57..13d79e7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -21,10 +21,10 @@ package org.apache.flink.api.common.functions.util;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Future;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
@@ -53,32 +53,34 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
 
 	private final ExecutionConfig executionConfig;
 
-	private final HashMap<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>();
-	
+	private final Map<String, Accumulator<?, ?>> accumulators;
+
 	private final DistributedCache distributedCache;
-	
-	
+
+
 	public AbstractRuntimeUDFContext(String name,
 										int numParallelSubtasks, int subtaskIndex,
 										ClassLoader userCodeClassLoader,
-										ExecutionConfig executionConfig)
+										ExecutionConfig executionConfig,
+										Map<String, Accumulator<?,?>> accumulators)
 	{
 		this(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig,
-				Collections.<String, Future<Path>>emptyMap());
+				accumulators, Collections.<String, Future<Path>>emptyMap());
 	}
-	
+
 	public AbstractRuntimeUDFContext(String name,
 										int numParallelSubtasks, int subtaskIndex,
 										ClassLoader userCodeClassLoader,
 										ExecutionConfig executionConfig,
-										Map<String, Future<Path>> cpTasks)
-	{
+										Map<String, Accumulator<?,?>> accumulators,
+										Map<String, Future<Path>> cpTasks) {
 		this.name = name;
 		this.numParallelSubtasks = numParallelSubtasks;
 		this.subtaskIndex = subtaskIndex;
 		this.userCodeClassLoader = userCodeClassLoader;
 		this.executionConfig = executionConfig;
 		this.distributedCache = new DistributedCache(cpTasks);
+		this.accumulators = Preconditions.checkNotNull(accumulators);
 	}
 
 	@Override
@@ -137,8 +139,8 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
 	}
 
 	@Override
-	public HashMap<String, Accumulator<?, ?>> getAllAccumulators() {
-		return this.accumulators;
+	public Map<String, Accumulator<?, ?>> getAllAccumulators() {
+		return Collections.unmodifiableMap(this.accumulators);
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
index d116d00..1689138 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.concurrent.Future;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.core.fs.Path;
@@ -38,12 +39,14 @@ public class RuntimeUDFContext extends AbstractRuntimeUDFContext {
 	private final HashMap<String, List<?>> uninitializedBroadcastVars = new HashMap<String, List<?>>();
 	
 	
-	public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig) {
-		super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig);
+	public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader,
+							ExecutionConfig executionConfig, Map<String, Accumulator<?,?>> accumulators) {
+		super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, accumulators);
 	}
 	
-	public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, Map<String, Future<Path>> cpTasks) {
-		super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, cpTasks);
+	public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader,
+							ExecutionConfig executionConfig, Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?,?>> accumulators) {
+		super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, accumulators, cpTasks);
 	}
 	
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
index f605113..0a9146c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
@@ -184,8 +184,8 @@ public class CollectionExecutor {
 		// build the runtime context and compute broadcast variables, if necessary
 		RuntimeUDFContext ctx;
 		if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) {
-			ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, getClass().getClassLoader(), executionConfig) :
-					new IterationRuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig);
+			ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, getClass().getClassLoader(), executionConfig, accumulators) :
+					new IterationRuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig, accumulators);
 			
 			for (Map.Entry<String, Operator<?>> bcInputs : operator.getBroadcastInputs().entrySet()) {
 				List<?> bcData = execute(bcInputs.getValue());
@@ -197,9 +197,6 @@ public class CollectionExecutor {
 		
 		List<OUT> result = typedOp.executeOnCollections(inputData, ctx, executionConfig);
 		
-		if (ctx != null) {
-			AccumulatorHelper.mergeInto(this.accumulators, ctx.getAllAccumulators());
-		}
 		return result;
 	}
 	
@@ -226,8 +223,8 @@ public class CollectionExecutor {
 		// build the runtime context and compute broadcast variables, if necessary
 		RuntimeUDFContext ctx;
 		if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) {
-			ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig) :
-				new IterationRuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig);
+			ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig, accumulators) :
+				new IterationRuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig, accumulators);
 			
 			for (Map.Entry<String, Operator<?>> bcInputs : operator.getBroadcastInputs().entrySet()) {
 				List<?> bcData = execute(bcInputs.getValue());
@@ -239,9 +236,6 @@ public class CollectionExecutor {
 		
 		List<OUT> result = typedOp.executeOnCollections(inputData1, inputData2, ctx, executionConfig);
 		
-		if (ctx != null) {
-			AccumulatorHelper.mergeInto(this.accumulators, ctx.getAllAccumulators());
-		}
 		return result;
 	}
 	
@@ -485,8 +479,9 @@ public class CollectionExecutor {
 	
 	private class IterationRuntimeUDFContext extends RuntimeUDFContext implements IterationRuntimeContext {
 
-		public IterationRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader classloader, ExecutionConfig executionConfig) {
-			super(name, numParallelSubtasks, subtaskIndex, classloader, executionConfig);
+		public IterationRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader classloader,
+										ExecutionConfig executionConfig, Map<String, Accumulator<?,?>> accumulators) {
+			super(name, numParallelSubtasks, subtaskIndex, classloader, executionConfig, accumulators);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java b/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
index c77a1b6..9189d5b 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
@@ -22,9 +22,11 @@ import static org.junit.Assert.*;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.junit.Test;
 
@@ -34,7 +36,7 @@ public class RuntimeUDFContextTest {
 	@Test
 	public void testBroadcastVariableNotFound() {
 		try {
-			RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig());
+			RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Accumulator<?, ?>>());
 			
 			try {
 				ctx.getBroadcastVariable("some name");
@@ -64,7 +66,7 @@ public class RuntimeUDFContextTest {
 	@Test
 	public void testBroadcastVariableSimple() {
 		try {
-			RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig());
+			RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Accumulator<?, ?>>());
 			
 			ctx.setBroadcastVariable("name1", Arrays.asList(1, 2, 3, 4));
 			ctx.setBroadcastVariable("name2", Arrays.asList(1.0, 2.0, 3.0, 4.0));
@@ -98,7 +100,7 @@ public class RuntimeUDFContextTest {
 	@Test
 	public void testBroadcastVariableWithInitializer() {
 		try {
-			RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig());
+			RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Accumulator<?, ?>>());
 			
 			ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));
 			
@@ -123,7 +125,7 @@ public class RuntimeUDFContextTest {
 	@Test
 	public void testResetBroadcastVariableWithInitializer() {
 		try {
-			RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig());
+			RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Accumulator<?, ?>>());
 			
 			ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));
 			
@@ -146,7 +148,7 @@ public class RuntimeUDFContextTest {
 	@Test
 	public void testBroadcastVariableWithInitializerAndMismatch() {
 		try {
-			RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig());
+			RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Accumulator<?, ?>>());
 			
 			ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
index d231455..734324b 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common.operators.base;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
@@ -33,6 +34,7 @@ import org.junit.Test;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 
 @SuppressWarnings("serial")
@@ -72,7 +74,7 @@ public class FlatMapOperatorCollectionTest implements Serializable {
 		}
 		// run on collections
 		final List<String> result = getTestFlatMapOperator(udf)
-				.executeOnCollections(input, new RuntimeUDFContext("Test UDF", 4, 0, null, executionConfig), executionConfig);
+				.executeOnCollections(input, new RuntimeUDFContext("Test UDF", 4, 0, null, executionConfig, new HashMap<String, Accumulator<?, ?>>()), executionConfig);
 
 		Assert.assertEquals(input.size(), result.size());
 		Assert.assertEquals(input, result);

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
index 54975b4..98f75bc 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.operators.base;
 import static org.junit.Assert.*;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.RichFlatJoinFunction;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
@@ -33,6 +34,7 @@ import org.junit.Test;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -114,11 +116,12 @@ public class JoinOperatorBaseTest implements Serializable {
 
 
 		try {
+			final HashMap<String, Accumulator<?, ?>> accumulatorMap = new HashMap<String, Accumulator<?, ?>>();
 			ExecutionConfig executionConfig = new ExecutionConfig();
 			executionConfig.disableObjectReuse();
-			List<Integer> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig);
+			List<Integer> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, accumulatorMap), executionConfig);
 			executionConfig.enableObjectReuse();
-			List<Integer> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig);
+			List<Integer> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, accumulatorMap), executionConfig);
 
 			assertEquals(expected, resultSafe);
 			assertEquals(expected, resultRegular);

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
index 8e07f07..2c98a17 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
@@ -22,10 +22,12 @@ import static org.junit.Assert.*;
 import static java.util.Arrays.asList;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
@@ -102,11 +104,12 @@ public class MapOperatorTest implements java.io.Serializable {
 					parser, new UnaryOperatorInformation<String, Integer>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), taskName);
 			
 			List<String> input = new ArrayList<String>(asList("1", "2", "3", "4", "5", "6"));
+			final HashMap<String, Accumulator<?, ?>> accumulatorMap = new HashMap<String, Accumulator<?, ?>>();
 			ExecutionConfig executionConfig = new ExecutionConfig();
 			executionConfig.disableObjectReuse();
-			List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig);
+			List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, accumulatorMap), executionConfig);
 			executionConfig.enableObjectReuse();
-			List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig);
+			List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, accumulatorMap), executionConfig);
 			
 			assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe);
 			assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular);

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
index 61ba359..28c6d821 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
@@ -22,10 +22,12 @@ import static org.junit.Assert.*;
 import static java.util.Arrays.asList;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.api.common.functions.MapPartitionFunction;
 import org.apache.flink.api.common.functions.RichMapPartitionFunction;
@@ -78,9 +80,9 @@ public class PartitionMapOperatorTest implements java.io.Serializable {
 
 			ExecutionConfig executionConfig = new ExecutionConfig();
 			executionConfig.disableObjectReuse();
-			List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig);
+			List<Integer> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap<String, Accumulator<?, ?>>()), executionConfig);
 			executionConfig.enableObjectReuse();
-			List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig);
+			List<Integer> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap<String, Accumulator<?, ?>>()), executionConfig);
 			
 			assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe);
 			assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular);

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
index 82c3ad8..e6b8418 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
@@ -60,7 +60,7 @@ public class WordCount {
 		
 		// set up the execution environment
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
+
 		// get input data
 		DataSet<String> text = getTextDataSet(env);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index d24a350..c628b04 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -408,14 +408,16 @@ public abstract class DataSet<T> {
 		JobExecutionResult res = getExecutionEnvironment().execute();
 
 		ArrayList<byte[]> accResult = res.getAccumulatorResult(id);
-		try {
-			return SerializedListAccumulator.deserializeList(accResult, serializer);
-		}
-		catch (ClassNotFoundException e) {
-			throw new RuntimeException("Cannot find type class of collected data type.", e);
-		}
-		catch (IOException e) {
-			throw new RuntimeException("Serialization error while deserializing collected data", e);
+		if (accResult != null) {
+			try {
+				return SerializedListAccumulator.deserializeList(accResult, serializer);
+			} catch (ClassNotFoundException e) {
+				throw new RuntimeException("Cannot find type class of collected data type.", e);
+			} catch (IOException e) {
+				throw new RuntimeException("Serialization error while deserializing collected data", e);
+			}
+		} else {
+			throw new RuntimeException("The call to collect() could not retrieve the DataSet.");
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
index dd1d6d2..a1e3d25 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
@@ -114,13 +114,18 @@ public class Utils {
 		@Override
 		public void open(Configuration parameters) throws Exception {
 			this.accumulator = new SerializedListAccumulator<T>();
-			getRuntimeContext().addAccumulator(id, accumulator);
 		}
 
 		@Override
 		public void flatMap(T value, Collector<T> out) throws Exception {
 			accumulator.add(value, serializer);
 		}
+
+		@Override
+		public void close() throws Exception {
+			// Important: should only be added in close method to minimize traffic of accumulators
+			getRuntimeContext().addAccumulator(id, accumulator);
+		}
 	}
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
index 0aa6097..a178a47 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common.operators.base;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.RichCoGroupFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
@@ -35,6 +36,7 @@ import org.junit.Test;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -68,7 +70,8 @@ public class CoGroupOperatorCollectionTest implements Serializable {
 			);
 
 			ExecutionConfig executionConfig = new ExecutionConfig();
-			final RuntimeContext ctx = new RuntimeUDFContext("Test UDF", 4, 0, null, executionConfig);
+			final HashMap<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>();
+			final RuntimeContext ctx = new RuntimeUDFContext("Test UDF", 4, 0, null, executionConfig, accumulators);
 
 			{
 				SumCoGroup udf1 = new SumCoGroup();

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
index 447c8c5..08f4acd 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common.operators.base;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
@@ -31,6 +32,7 @@ import org.apache.flink.util.Collector;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -161,9 +163,9 @@ public class GroupReduceOperatorTest implements java.io.Serializable {
 
 			ExecutionConfig executionConfig = new ExecutionConfig();
 			executionConfig.disableObjectReuse();
-			List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig);
+			List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap<String, Accumulator<?, ?>>()), executionConfig);
 			executionConfig.enableObjectReuse();
-			List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig);
+			List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap<String, Accumulator<?, ?>>()), executionConfig);
 			
 			
 			Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<Tuple2<String, Integer>>(resultMutableSafe);

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
index 1b38281..21fcfb3 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.operators.base;
 import static org.junit.Assert.*;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
@@ -33,6 +34,7 @@ import org.junit.Test;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -103,9 +105,9 @@ public class JoinOperatorBaseTest implements Serializable {
 		try {
 			ExecutionConfig executionConfig = new ExecutionConfig();
 			executionConfig.disableObjectReuse();
-			List<Tuple2<Double, String>> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null, executionConfig), executionConfig);
+			List<Tuple2<Double, String>> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null, executionConfig, new HashMap<String, Accumulator<?, ?>>()), executionConfig);
 			executionConfig.enableObjectReuse();
-			List<Tuple2<Double, String>> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null, executionConfig), executionConfig);
+			List<Tuple2<Double, String>> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null, executionConfig, new HashMap<String, Accumulator<?, ?>>()), executionConfig);
 
 			assertEquals(expected, new HashSet<Tuple2<Double, String>>(resultSafe));
 			assertEquals(expected, new HashSet<Tuple2<Double, String>>(resultRegular));

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
index 4e1eebd..7cd9771 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common.operators.base;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
@@ -30,6 +31,7 @@ import org.apache.flink.configuration.Configuration;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -138,9 +140,9 @@ public class ReduceOperatorTest implements java.io.Serializable {
 
 			ExecutionConfig executionConfig = new ExecutionConfig();
 			executionConfig.disableObjectReuse();
-			List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig);
+			List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap<String, Accumulator<?, ?>>()), executionConfig);
 			executionConfig.enableObjectReuse();
-			List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig), executionConfig);
+			List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap<String, Accumulator<?, ?>>()), executionConfig);
 
 			Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<Tuple2<String, Integer>>(resultMutableSafe);
 			Set<Tuple2<String, Integer>> resultSetRegular = new HashSet<Tuple2<String, Integer>>(resultRegular);

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
index a2b78ed..6f41c29 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
@@ -545,7 +545,7 @@ public class Optimizer {
 	// ------------------------------------------------------------------------
 	
 	private OptimizerPostPass getPostPassFromPlan(Plan program) {
-		final String className =  program.getPostPassClassName();
+		final String className = program.getPostPassClassName();
 		if (className == null) {
 			throw new CompilerException("Optimizer Post Pass class description is null");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
deleted file mode 100644
index ad7fabd..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.accumulators;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.util.SerializedValue;
-
-/**
- * This class encapsulates a map of accumulators for a single job. It is used
- * for the transfer from TaskManagers to the JobManager and from the JobManager
- * to the Client.
- */
-public class AccumulatorEvent extends SerializedValue<Map<String, Accumulator<?, ?>>> {
-
-	private static final long serialVersionUID = 8965894516006882735L;
-
-	/** JobID for the target job */
-	private final JobID jobID;
-
-
-	public AccumulatorEvent(JobID jobID, Map<String, Accumulator<?, ?>> accumulators) throws IOException {
-		super(accumulators);
-		this.jobID = jobID;
-	}
-
-	public JobID getJobID() {
-		return this.jobID;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java
new file mode 100644
index 0000000..0ef3650
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorRegistry.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.accumulators;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Main accumulator registry which encapsulates internal and user-defined accumulators.
+ */
+public class AccumulatorRegistry {
+
+	protected static final Logger LOG = LoggerFactory.getLogger(AccumulatorRegistry.class);
+
+	protected final JobID jobID;
+	protected final ExecutionAttemptID taskID;
+
+	/* Flink's internal Accumulator values stored for the executing task. */
+	private final Map<Metric, Accumulator<?, ?>> flinkAccumulators =
+			new HashMap<Metric, Accumulator<?, ?>>();
+
+	/* User-defined Accumulator values stored for the executing task. */
+	private final Map<String, Accumulator<?, ?>> userAccumulators =
+			Collections.synchronizedMap(new HashMap<String, Accumulator<?, ?>>());
+
+	/* The reporter reference that is handed to the reporting tasks. */
+	private final ReadWriteReporter reporter;
+
+	/**
+	 * Flink metrics supported
+	 */
+	public enum Metric {
+		NUM_RECORDS_IN,
+		NUM_RECORDS_OUT,
+		NUM_BYTES_IN,
+		NUM_BYTES_OUT
+	}
+
+
+	public AccumulatorRegistry(JobID jobID, ExecutionAttemptID taskID) {
+		this.jobID = jobID;
+		this.taskID = taskID;
+		this.reporter = new ReadWriteReporter(flinkAccumulators);
+	}
+
+	/**
+	 * Creates a snapshot of this accumulator registry.
+	 * @return a serialized accumulator map
+	 */
+	public AccumulatorSnapshot getSnapshot() {
+		try {
+			return new AccumulatorSnapshot(jobID, taskID, flinkAccumulators, userAccumulators);
+		} catch (IOException e) {
+			LOG.warn("Failed to serialize accumulators for task.", e);
+			return null;
+		}
+	}
+
+	/**
+	 * Gets the map for user-defined accumulators.
+	 */
+	public Map<String, Accumulator<?, ?>> getUserMap() {
+		return userAccumulators;
+	}
+
+	/**
+	 * Gets the reporter for flink internal metrics.
+	 */
+	public Reporter getReadWriteReporter() {
+		return reporter;
+	}
+
+	/**
+	 * Interface for Flink's internal accumulators.
+	 */
+	public interface Reporter {
+		void reportNumRecordsIn(long value);
+		void reportNumRecordsOut(long value);
+		void reportNumBytesIn(long value);
+		void reportNumBytesOut(long value);
+	}
+
+	/**
+	 * Accumulator based reporter for keeping track of internal metrics (e.g. bytes and records in/out)
+	 */
+	private static class ReadWriteReporter implements Reporter {
+
+		private LongCounter numRecordsIn = new LongCounter();
+		private LongCounter numRecordsOut = new LongCounter();
+		private LongCounter numBytesIn = new LongCounter();
+		private LongCounter numBytesOut = new LongCounter();
+
+		private ReadWriteReporter(Map<Metric, Accumulator<?,?>> accumulatorMap) {
+			accumulatorMap.put(Metric.NUM_RECORDS_IN, numRecordsIn);
+			accumulatorMap.put(Metric.NUM_RECORDS_OUT, numRecordsOut);
+			accumulatorMap.put(Metric.NUM_BYTES_IN, numBytesIn);
+			accumulatorMap.put(Metric.NUM_BYTES_OUT, numBytesOut);
+		}
+
+		@Override
+		public void reportNumRecordsIn(long value) {
+			numRecordsIn.add(value);
+		}
+
+		@Override
+		public void reportNumRecordsOut(long value) {
+			numRecordsOut.add(value);
+		}
+
+		@Override
+		public void reportNumBytesIn(long value) {
+			numBytesIn.add(value);
+		}
+
+		@Override
+		public void reportNumBytesOut(long value) {
+			numBytesOut.add(value);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java
new file mode 100644
index 0000000..a6288f0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.accumulators;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.util.SerializedValue;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * This class encapsulates a map of accumulators for a single task. It is used
+ * for the transfer from TaskManagers to the JobManager and from the JobManager
+ * to the Client.
+ */
+public class AccumulatorSnapshot implements Serializable {
+
+	private static final long serialVersionUID = 42L;
+
+	private final JobID jobID;
+	private final ExecutionAttemptID executionAttemptID;
+
+	/**
+	 * Flink internal accumulators which can be serialized using the system class loader.
+	 */
+	private final Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators;
+
+	/**
+	 * Serialized user accumulators which may require the custom user class loader.
+	 */
+	private final SerializedValue<Map<String, Accumulator<?, ?>>> userAccumulators;
+
+	public AccumulatorSnapshot(JobID jobID, ExecutionAttemptID executionAttemptID,
+							Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators,
+							Map<String, Accumulator<?, ?>> userAccumulators) throws IOException {
+		this.jobID = jobID;
+		this.executionAttemptID = executionAttemptID;
+		this.flinkAccumulators = flinkAccumulators;
+		this.userAccumulators = new SerializedValue<Map<String, Accumulator<?, ?>>>(userAccumulators);
+	}
+
+	public JobID getJobID() {
+		return jobID;
+	}
+
+	public ExecutionAttemptID getExecutionAttemptID() {
+		return executionAttemptID;
+	}
+
+	/**
+	 * Gets the Flink (internal) accumulators values.
+	 * @return the serialized map
+	 */
+	public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getFlinkAccumulators() {
+		return flinkAccumulators;
+	}
+
+	/**
+	 * Gets the user-defined accumulators values.
+	 * @return the serialized map
+	 */
+	public Map<String, Accumulator<?, ?>> deserializeUserAccumulators(ClassLoader classLoader) throws IOException, ClassNotFoundException {
+		return userAccumulators.deserializeValue(classLoader);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 755f1ad..c561869 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.runtime.execution;
 
-import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -142,11 +142,10 @@ public interface Environment {
 	BroadcastVariableManager getBroadcastVariableManager();
 
 	/**
-	 * Reports the given set of accumulators to the JobManager.
-	 *
-	 * @param accumulators The accumulators to report.
+	 * Return the registry for accumulators which are periodically sent to the job manager.
+	 * @return the registry
 	 */
-	void reportAccumulators(Map<String, Accumulator<?, ?>> accumulators);
+	AccumulatorRegistry getAccumulatorRegistry();
 
 	/**
 	 * Confirms that the invokable has successfully completed all steps it needed to

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index 848f619..88be5e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -46,7 +46,7 @@ import com.google.common.base.Preconditions;
  * For each job graph that is submitted to the system the library cache manager maintains
  * a set of libraries (typically JAR files) which the job requires to run. The library cache manager
  * caches library files in order to avoid unnecessary retransmission of data. It is based on a singleton
- * programming pattern, so there exists at most on library manager at a time.
+ * programming pattern, so there exists at most one library manager at a time.
  */
 public final class BlobLibraryCacheManager extends TimerTask implements LibraryCacheManager {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index af67c3f..3b836ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.executiongraph;
 
 import akka.dispatch.OnComplete;
 import akka.dispatch.OnFailure;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
@@ -54,6 +56,7 @@ import scala.concurrent.duration.FiniteDuration;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeoutException;
@@ -133,6 +136,15 @@ public class Execution implements Serializable {
 	@SuppressWarnings("NonSerializableFieldInSerializableClass")
 	private ExecutionContext executionContext;
 
+	/* Lock for updating the accumulators atomically. */
+	private final Object accumulatorLock = new Object();
+
+	/* Continuously updated map of user-defined accumulators */
+	private volatile Map<String, Accumulator<?, ?>> userAccumulators;
+
+	/* Continuously updated map of internal accumulators */
+	private volatile Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators;
+
 	// --------------------------------------------------------------------------------------------
 	
 	public Execution(
@@ -593,6 +605,10 @@ public class Execution implements Serializable {
 	}
 
 	void markFinished() {
+		markFinished(null, null);
+	}
+
+	void markFinished(Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators, Map<String, Accumulator<?, ?>> userAccumulators) {
 
 		// this call usually comes during RUNNING, but may also come while still in deploying (very fast tasks!)
 		while (true) {
@@ -613,6 +629,11 @@ public class Execution implements Serializable {
 							}
 						}
 
+						synchronized (accumulatorLock) {
+							this.flinkAccumulators = flinkAccumulators;
+							this.userAccumulators = userAccumulators;
+						}
+
 						assignedResource.releaseSlot();
 						vertex.getExecutionGraph().deregisterExecution(this);
 					}
@@ -935,6 +956,28 @@ public class Execution implements Serializable {
 		return vertex.getSimpleName() + " - execution #" + attemptNumber;
 	}
 
+	/**
+	 * Update accumulators (discarded when the Execution has already been terminated).
+	 * @param flinkAccumulators the flink internal accumulators
+	 * @param userAccumulators the user accumulators
+	 */
+	public void setAccumulators(Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators,
+								Map<String, Accumulator<?, ?>> userAccumulators) {
+		synchronized (accumulatorLock) {
+			if (!state.isTerminal()) {
+				this.flinkAccumulators = flinkAccumulators;
+				this.userAccumulators = userAccumulators;
+			}
+		}
+	}
+	public Map<String, Accumulator<?, ?>> getUserAccumulators() {
+		return userAccumulators;
+	}
+
+	public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getFlinkAccumulators() {
+		return flinkAccumulators;
+	}
+
 	@Override
 	public String toString() {
 		return String.format("Attempt #%d (%s) @ %s - [%s]", attemptNumber, vertex.getSimpleName(),

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 47b7ae2..6d2262c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -22,8 +22,12 @@ import akka.actor.ActorRef;
 
 import akka.actor.ActorSystem;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -38,6 +42,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.messages.ExecutionGraphMessages;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.runtime.util.SerializedValue;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.apache.flink.util.InstantiationUtil;
@@ -47,10 +52,12 @@ import org.slf4j.LoggerFactory;
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -128,6 +135,29 @@ public class ExecutionGraph implements Serializable {
 	/** The currently executed tasks, for callbacks */
 	private final ConcurrentHashMap<ExecutionAttemptID, Execution> currentExecutions;
 
+	/**
+	 * Updates the accumulators during the runtime of a job. Final accumulator results are transferred
+	 * through the UpdateTaskExecutionState message.
+	 * @param accumulatorSnapshot The serialized flink and user-defined accumulators
+	 */
+	public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
+		Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators = accumulatorSnapshot.getFlinkAccumulators();
+		Map<String, Accumulator<?, ?>> userAccumulators;
+		try {
+			userAccumulators = accumulatorSnapshot.deserializeUserAccumulators(userClassLoader);
+
+			ExecutionAttemptID execID = accumulatorSnapshot.getExecutionAttemptID();
+			Execution execution = currentExecutions.get(execID);
+			if (execution != null) {
+				execution.setAccumulators(flinkAccumulators, userAccumulators);
+			} else {
+				LOG.warn("Received accumulator result for unknown execution {}.", execID);
+			}
+		} catch (Exception e) {
+			LOG.error("Cannot update accumulators for job " + jobID, e);
+		}
+	}
+
 	/** A list of all libraries required during the job execution. Libraries have to be stored
 	 * inside the BlobService and are referenced via the BLOB keys. */
 	private final List<BlobKey> requiredJarFiles;
@@ -485,6 +515,57 @@ public class ExecutionGraph implements Serializable {
 		return executionContext;
 	}
 
+	/**
+	 * Gets the internal flink accumulator map of maps which contains some metrics.
+	 * @return A map of accumulators for every executed task.
+	 */
+	public Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?,?>>> getFlinkAccumulators() {
+		Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>> flinkAccumulators =
+				new HashMap<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>>();
+
+		for (ExecutionVertex vertex : getAllExecutionVertices()) {
+			Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> taskAccs = vertex.getCurrentExecutionAttempt().getFlinkAccumulators();
+			flinkAccumulators.put(vertex.getCurrentExecutionAttempt().getAttemptId(), taskAccs);
+		}
+
+		return flinkAccumulators;
+	}
+
+	/**
+	 * Merges all accumulator results from the tasks previously executed in the Executions.
+	 * @return The accumulator map
+	 */
+	public Map<String, Accumulator<?,?>> aggregateUserAccumulators() {
+
+		Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<String, Accumulator<?, ?>>();
+
+		for (ExecutionVertex vertex : getAllExecutionVertices()) {
+			Map<String, Accumulator<?, ?>> next = vertex.getCurrentExecutionAttempt().getUserAccumulators();
+			if (next != null) {
+				AccumulatorHelper.mergeInto(userAccumulators, next);
+			}
+		}
+
+		return userAccumulators;
+	}
+
+	/**
+	 * Gets a serialized accumulator map.
+	 * @return The accumulator map with serialized accumulator values.
+	 * @throws IOException
+	 */
+	public Map<String, SerializedValue<Object>> getAccumulatorsSerialized() throws IOException {
+
+		Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators();
+
+		Map<String, SerializedValue<Object>> result = new HashMap<String, SerializedValue<Object>>();
+		for (Map.Entry<String, Accumulator<?, ?>> entry : accumulatorMap.entrySet()) {
+			result.put(entry.getKey(), new SerializedValue<Object>(entry.getValue().getLocalValue()));
+		}
+
+		return result;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Actions
 	// --------------------------------------------------------------------------------------------
@@ -791,14 +872,31 @@ public class ExecutionGraph implements Serializable {
 	//  Callbacks and Callback Utilities
 	// --------------------------------------------------------------------------------------------
 
+	/**
+	 * Updates the state of the Task and sets the final accumulator results.
+	 * @param state
+	 * @return
+	 */
 	public boolean updateState(TaskExecutionState state) {
 		Execution attempt = this.currentExecutions.get(state.getID());
 		if (attempt != null) {
+
 			switch (state.getExecutionState()) {
 				case RUNNING:
 					return attempt.switchToRunning();
 				case FINISHED:
-					attempt.markFinished();
+					Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators = null;
+					Map<String, Accumulator<?, ?>> userAccumulators = null;
+					try {
+						AccumulatorSnapshot accumulators = state.getAccumulators();
+						flinkAccumulators = accumulators.getFlinkAccumulators();
+						userAccumulators = accumulators.deserializeUserAccumulators(userClassLoader);
+					} catch (Exception e) {
+						// Exceptions would be thrown in the future here
+						LOG.error("Failed to deserialize final accumulator results.", e);
+					}
+
+					attempt.markFinished(flinkAccumulators, userAccumulators);
 					return true;
 				case CANCELED:
 					attempt.cancelingComplete();

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
index 96b6f99..90564a8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
@@ -147,4 +147,5 @@ public abstract class AbstractReader implements ReaderBase {
 
 		return ++currentNumberOfEndOfSuperstepEvents == inputGate.getNumberOfInputChannels();
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
index 56e5d33..7aa57a8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.api.reader;
 
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
 import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
@@ -64,7 +65,9 @@ abstract class AbstractRecordReader<T extends IOReadableWritable> extends Abstra
 				DeserializationResult result = currentRecordDeserializer.getNextRecord(target);
 
 				if (result.isBufferConsumed()) {
-					currentRecordDeserializer.getCurrentBuffer().recycle();
+					final Buffer currentBuffer = currentRecordDeserializer.getCurrentBuffer();
+
+					currentBuffer.recycle();
 					currentRecordDeserializer = null;
 				}
 
@@ -89,7 +92,7 @@ abstract class AbstractRecordReader<T extends IOReadableWritable> extends Abstra
 							+ "If you are using custom serialization code (Writable or Value types), check their "
 							+ "serialization routines. In the case of Kryo, check the respective Kryo serializer.");
 				}
-				
+
 				if (handleEvent(bufferOrEvent.getEvent())) {
 					if (inputGate.isFinished()) {
 						isFinished = true;
@@ -112,4 +115,11 @@ abstract class AbstractRecordReader<T extends IOReadableWritable> extends Abstra
 			}
 		}
 	}
+
+	@Override
+	public void setReporter(AccumulatorRegistry.Reporter reporter) {
+		for (RecordDeserializer<?> deserializer : recordDeserializers) {
+			deserializer.setReporter(reporter);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
index ca59609..debb352 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.api.reader;
 
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
@@ -47,4 +48,9 @@ public final class BufferReader extends AbstractReader {
 			}
 		}
 	}
+
+	@Override
+	public void setReporter(AccumulatorRegistry.Reporter reporter) {
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
index 2a0a6df..9f8ae20 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.api.reader;
 
 import java.io.IOException;
 
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.task.TaskEvent;
 import org.apache.flink.runtime.util.event.EventListener;
 
@@ -51,4 +52,9 @@ public interface ReaderBase {
 
 	boolean hasReachedEndOfSuperstep();
 
+	/**
+	 * Setter for the reporter, e.g. for the number of records emitted and the number of bytes read.
+	 */
+	void setReporter(AccumulatorRegistry.Reporter reporter);
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java
index b1395e3..d45920e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java
@@ -80,4 +80,5 @@ public class RecordReader<T extends IOReadableWritable> extends AbstractRecordRe
 			throw new RuntimeException("Cannot instantiate class " + recordType.getName(), e);
 		}
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
index 28bcf4a..ec9f4fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.api.serialization;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.DataInputDeserializer;
 import org.apache.flink.runtime.util.DataOutputSerializer;
@@ -45,6 +46,8 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
 
 	private Buffer currentBuffer;
 
+	private AccumulatorRegistry.Reporter reporter;
+
 	public AdaptiveSpanningRecordDeserializer() {
 		this.nonSpanningWrapper = new NonSpanningWrapper();
 		this.spanningWrapper = new SpanningWrapper();
@@ -90,10 +93,18 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
 		if (nonSpanningRemaining >= 4) {
 			int len = this.nonSpanningWrapper.readInt();
 
+			if (reporter != null) {
+				reporter.reportNumBytesIn(len);
+			}
+
 			if (len <= nonSpanningRemaining - 4) {
 				// we can get a full record from here
 				target.read(this.nonSpanningWrapper);
 
+				if (reporter != null) {
+					reporter.reportNumRecordsIn(1);
+				}
+
 				return (this.nonSpanningWrapper.remaining() == 0) ?
 						DeserializationResult.LAST_RECORD_FROM_BUFFER :
 						DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
@@ -117,6 +128,10 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
 			// get the full record
 			target.read(this.spanningWrapper);
 
+			if (reporter != null) {
+				reporter.reportNumRecordsIn(1);
+			}
+
 			// move the remainder to the non-spanning wrapper
 			// this does not copy it, only sets the memory segment
 			this.spanningWrapper.moveRemainderToNonSpanningDeserializer(this.nonSpanningWrapper);
@@ -144,6 +159,12 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
 		return this.nonSpanningWrapper.remaining() > 0 || this.spanningWrapper.getNumGatheredBytes() > 0;
 	}
 
+	@Override
+	public void setReporter(AccumulatorRegistry.Reporter reporter) {
+		this.reporter = reporter;
+		this.spanningWrapper.setReporter(reporter);
+	}
+
 	// -----------------------------------------------------------------------------------------------------------------
 
 	private static final class NonSpanningWrapper implements DataInputView {
@@ -426,6 +447,8 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
 
 		private int recordLimit;
 
+		private AccumulatorRegistry.Reporter reporter;
+
 		public SpanningWrapper() {
 			this.lengthBuffer = ByteBuffer.allocate(4);
 			this.lengthBuffer.order(ByteOrder.BIG_ENDIAN);
@@ -463,6 +486,10 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
 				} else {
 					this.recordLength = this.lengthBuffer.getInt(0);
 
+					if (reporter != null) {
+						reporter.reportNumBytesIn(this.recordLength);
+					}
+
 					this.lengthBuffer.clear();
 					segmentPosition = toPut;
 				}
@@ -607,5 +634,9 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
 		public int read(byte[] b) throws IOException {
 			return this.serializationReadBuffer.read(b);
 		}
+
+		public void setReporter(AccumulatorRegistry.Reporter reporter) {
+			this.reporter = reporter;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
index dd8ea06..e4c7890 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 
 /**
@@ -64,4 +65,9 @@ public interface RecordDeserializer<T extends IOReadableWritable> {
 	void clear();
 	
 	boolean hasUnfinishedData();
+
+	/**
+	 * Setter for the reporter, e.g. for the number of records emitted and the number of bytes read.
+	 */
+	void setReporter(AccumulatorRegistry.Reporter reporter);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
index bb6db55..e9f339a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
@@ -22,6 +22,7 @@ package org.apache.flink.runtime.io.network.api.serialization;
 import java.io.IOException;
 
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 
 /**
@@ -63,4 +64,9 @@ public interface RecordSerializer<T extends IOReadableWritable> {
 	void clear();
 	
 	boolean hasData();
+
+	/**
+	 * Setter for the reporter, e.g. for the number of records emitted and the number of bytes read.
+	 */
+	void setReporter(AccumulatorRegistry.Reporter reporter);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index 38e130d..f163e05 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -24,6 +24,7 @@ import java.nio.ByteOrder;
 
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.DataOutputSerializer;
 
@@ -50,6 +51,8 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 	/** Limit of current {@link MemorySegment} of target buffer */
 	private int limit;
 
+	private AccumulatorRegistry.Reporter reporter;
+
 	public SpanningRecordSerializer() {
 		this.serializationBuffer = new DataOutputSerializer(128);
 
@@ -75,7 +78,13 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 		// write data and length
 		record.write(this.serializationBuffer);
 
-		this.lengthBuffer.putInt(0, this.serializationBuffer.length());
+		int len = this.serializationBuffer.length();
+		this.lengthBuffer.putInt(0, len);
+
+		if (reporter != null) {
+			reporter.reportNumBytesOut(len);
+			reporter.reportNumRecordsOut(1);
+		}
 
 		this.dataBuffer = this.serializationBuffer.wrapAsByteBuffer();
 
@@ -173,4 +182,9 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 		// either data in current target buffer or intermediate buffers
 		return this.position > 0 || (this.lengthBuffer.hasRemaining() || this.dataBuffer.hasRemaining());
 	}
+
+	@Override
+	public void setReporter(AccumulatorRegistry.Reporter reporter) {
+		this.reporter = reporter;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index 6b0d836..f3e4892 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.DataInputDeserializer;
 import org.apache.flink.util.StringUtils;
@@ -61,6 +62,8 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 
 	private Buffer currentBuffer;
 
+	private AccumulatorRegistry.Reporter reporter;
+
 	public SpillingAdaptiveSpanningRecordDeserializer() {
 		
 		String tempDirString = GlobalConfiguration.getString(
@@ -112,11 +115,19 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 		if (nonSpanningRemaining >= 4) {
 			int len = this.nonSpanningWrapper.readInt();
 
+			if (reporter != null) {
+				reporter.reportNumBytesIn(len);
+			}
+
 			if (len <= nonSpanningRemaining - 4) {
 				// we can get a full record from here
 				try {
 					target.read(this.nonSpanningWrapper);
 
+					if (reporter != null) {
+						reporter.reportNumRecordsIn(1);
+					}
+
 					int remaining = this.nonSpanningWrapper.remaining();
 					if (remaining > 0) {
 						return DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
@@ -151,6 +162,10 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 		if (this.spanningWrapper.hasFullRecord()) {
 			// get the full record
 			target.read(this.spanningWrapper.getInputView());
+
+			if (reporter != null) {
+				reporter.reportNumRecordsIn(1);
+			}
 			
 			// move the remainder to the non-spanning wrapper
 			// this does not copy it, only sets the memory segment
@@ -176,6 +191,12 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 		return this.nonSpanningWrapper.remaining() > 0 || this.spanningWrapper.getNumGatheredBytes() > 0;
 	}
 
+	@Override
+	public void setReporter(AccumulatorRegistry.Reporter reporter) {
+		this.reporter = reporter;
+		this.spanningWrapper.setReporter(reporter);
+	}
+
 	// -----------------------------------------------------------------------------------------------------------------
 	
 	private static final class NonSpanningWrapper implements DataInputView {
@@ -469,7 +490,9 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 		private File spillFile;
 		
 		private InputViewDataInputStreamWrapper spillFileReader;
-		
+
+		private AccumulatorRegistry.Reporter reporter;
+
 		public SpanningWrapper(String[] tempDirs) {
 			this.tempDirs = tempDirs;
 			
@@ -522,6 +545,11 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 					return;
 				} else {
 					this.recordLength = this.lengthBuffer.getInt(0);
+
+					if (reporter != null) {
+						reporter.reportNumBytesIn(recordLength);
+					}
+
 					this.lengthBuffer.clear();
 					segmentPosition = toPut;
 					
@@ -652,5 +680,9 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 			random.nextBytes(bytes);
 			return StringUtils.byteToHexString(bytes);
 		}
+
+		public void setReporter(AccumulatorRegistry.Reporter reporter) {
+			this.reporter = reporter;
+		}
 	}
 }


Mime
View raw message