flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [3/4] flink git commit: [FLINK-2292][FLINK-1573] add live per-task accumulators
Date Wed, 15 Jul 2015 13:55:56 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 4381fd0..29efc4c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.api.writer;
 
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.task.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
@@ -49,6 +50,12 @@ public class RecordWriter<T extends IOReadableWritable> {
 
 	private final int numChannels;
 
+	/**
+	 * Counter for the number of records emitted and for the number of bytes written.
+	 * @param counter
+	 */
+	private AccumulatorRegistry.Reporter reporter;
+
 	/** {@link RecordSerializer} per outgoing channel */
 	private final RecordSerializer<T>[] serializers;
 
@@ -81,6 +88,7 @@ public class RecordWriter<T extends IOReadableWritable> {
 
 			synchronized (serializer) {
 				SerializationResult result = serializer.addRecord(record);
+
 				while (result.isFullBuffer()) {
 					Buffer buffer = serializer.getCurrentBuffer();
 
@@ -90,8 +98,18 @@ public class RecordWriter<T extends IOReadableWritable> {
 					}
 
 					buffer = writer.getBufferProvider().requestBufferBlocking();
+					if (reporter != null) {
+						// increase the number of written bytes by the memory segment's size
+						reporter.reportNumBytesOut(buffer.getSize());
+					}
+
 					result = serializer.setNextBuffer(buffer);
 				}
+
+				if(reporter != null) {
+					// count number of emitted records
+					reporter.reportNumRecordsOut(1);
+				}
 			}
 		}
 	}
@@ -173,4 +191,14 @@ public class RecordWriter<T extends IOReadableWritable> {
 			}
 		}
 	}
+
+	/**
+	 * Counter for the number of records emitted and the records processed.
+	 */
+	public void setReporter(AccumulatorRegistry.Reporter reporter) {
+		for(RecordSerializer<?> serializer : serializers) {
+			serializer.setReporter(reporter);
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
index 9c5fdca..72434e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.iterative.task;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.aggregators.Aggregator;
@@ -52,6 +53,8 @@ import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.MutableObjectIterator;
 
 import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
 
 /**
  * The abstract base class for all tasks able to participate in an iteration.
@@ -166,7 +169,7 @@ public abstract class AbstractIterativePactTask<S extends Function, OT> extends
 	public DistributedRuntimeUDFContext createRuntimeContext(String taskName) {
 		Environment env = getEnvironment();
 		return new IterativeRuntimeUdfContext(taskName, env.getNumberOfSubtasks(),
-				env.getIndexInSubtaskGroup(), getUserCodeClassLoader(), getExecutionConfig());
+				env.getIndexInSubtaskGroup(), getUserCodeClassLoader(), getExecutionConfig(), this.accumulatorMap);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -356,8 +359,10 @@ public abstract class AbstractIterativePactTask<S extends Function, OT> extends
 
 	private class IterativeRuntimeUdfContext extends DistributedRuntimeUDFContext implements IterationRuntimeContext {
 
-		public IterativeRuntimeUdfContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig) {
-			super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig);
+		public IterativeRuntimeUdfContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader,
+										ExecutionConfig executionConfig,
+										Map<String, Accumulator<?,?>> accumulatorMap) {
+			super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, accumulatorMap);
 		}
 
 		@Override
@@ -375,6 +380,14 @@ public abstract class AbstractIterativePactTask<S extends Function, OT> extends
 		public <T extends Value> T getPreviousIterationAggregate(String name) {
 			return (T) getIterationAggregators().getPreviousGlobalAggregate(name);
 		}
+
+		@Override
+		public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> newAccumulator) {
+			// only add accumulator on first iteration
+			if (inFirstIteration()) {
+				super.addAccumulator(name, newAccumulator);
+			}
+		}
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
index cf02bdf..9cb045f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.slf4j.Logger;
@@ -112,8 +113,9 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
 		List<RecordWriter<?>> finalOutputWriters = new ArrayList<RecordWriter<?>>();
 		final TaskConfig finalOutConfig = this.config.getIterationHeadFinalOutputConfig();
 		final ClassLoader userCodeClassLoader = getUserCodeClassLoader();
+		AccumulatorRegistry.Reporter reporter = getEnvironment().getAccumulatorRegistry().getReadWriteReporter();
 		this.finalOutputCollector = RegularPactTask.getOutputCollector(this, finalOutConfig,
-			userCodeClassLoader, finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs());
+			userCodeClassLoader, finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs(), reporter);
 
 		// sanity check the setup
 		final int writersIntoStepFunction = this.eventualOutputs.size();

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
index 85dd5c5..df41672 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
@@ -26,7 +26,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * This is the abstract base class for every task that can be executed ba a TaskManager.
+ * This is the abstract base class for every task that can be executed by a TaskManager.
  * Concrete tasks like the stream vertices of the batch tasks
  * (see {@link org.apache.flink.runtime.operators.RegularPactTask}) inherit from this class.
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/AccumulatorManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/AccumulatorManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/AccumulatorManager.java
deleted file mode 100644
index c824232..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/AccumulatorManager.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.accumulators;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map;
-
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.util.SerializedValue;
-
-/**
- * This class manages the accumulators for different jobs. Either the jobs are
- * running and new accumulator results have to be merged in, or the jobs are no
- * longer running and the results shall be still available for the client or the
- * web interface. Accumulators for older jobs are automatically removed when new
- * arrive, based on a maximum number of entries.
- * 
- * All functions are thread-safe and thus can be called directly from
- * JobManager.
- */
-public class AccumulatorManager {
-
-	/** Map of accumulators belonging to recently started jobs */
-	private final Map<JobID, JobAccumulators> jobAccumulators = new HashMap<JobID, JobAccumulators>();
-
-	private final LinkedList<JobID> lru = new LinkedList<JobID>();
-	private int maxEntries;
-
-
-	public AccumulatorManager(int maxEntries) {
-		this.maxEntries = maxEntries;
-	}
-
-	/**
-	 * Merges the new accumulators with the existing accumulators collected for
-	 * the job.
-	 */
-	public void processIncomingAccumulators(JobID jobID,
-											Map<String, Accumulator<?, ?>> newAccumulators) {
-		synchronized (this.jobAccumulators) {
-			
-			JobAccumulators jobAccumulators = this.jobAccumulators.get(jobID);
-			if (jobAccumulators == null) {
-				jobAccumulators = new JobAccumulators();
-				this.jobAccumulators.put(jobID, jobAccumulators);
-				cleanup(jobID);
-			}
-			jobAccumulators.processNew(newAccumulators);
-		}
-	}
-
-	public Map<String, Object> getJobAccumulatorResults(JobID jobID) {
-		Map<String, Object> result = new HashMap<String, Object>();
-
-		JobAccumulators acc;
-		synchronized (jobAccumulators) {
-			acc = jobAccumulators.get(jobID);
-		}
-
-		if (acc != null) {
-			for (Map.Entry<String, Accumulator<?, ?>> entry : acc.getAccumulators().entrySet()) {
-				result.put(entry.getKey(), entry.getValue().getLocalValue());
-			}
-		}
-
-		return result;
-	}
-
-	public Map<String, SerializedValue<Object>> getJobAccumulatorResultsSerialized(JobID jobID) throws IOException {
-		JobAccumulators acc;
-		synchronized (jobAccumulators) {
-			acc = jobAccumulators.get(jobID);
-		}
-
-		if (acc == null || acc.getAccumulators().isEmpty()) {
-			return Collections.emptyMap();
-		}
-
-		Map<String, SerializedValue<Object>> result = new HashMap<String, SerializedValue<Object>>();
-		for (Map.Entry<String, Accumulator<?, ?>> entry : acc.getAccumulators().entrySet()) {
-			result.put(entry.getKey(), new SerializedValue<Object>(entry.getValue().getLocalValue()));
-		}
-
-		return result;
-	}
-
-	public StringifiedAccumulatorResult[] getJobAccumulatorResultsStringified(JobID jobID) throws IOException {
-		JobAccumulators acc;
-		synchronized (jobAccumulators) {
-			acc = jobAccumulators.get(jobID);
-		}
-
-		if (acc == null || acc.getAccumulators().isEmpty()) {
-			return new StringifiedAccumulatorResult[0];
-		}
-
-		Map<String, Accumulator<?, ?>> accMap = acc.getAccumulators();
-
-		StringifiedAccumulatorResult[] result = new StringifiedAccumulatorResult[accMap.size()];
-		int i = 0;
-		for (Map.Entry<String, Accumulator<?, ?>> entry : accMap.entrySet()) {
-			String type = entry.getValue() == null ? "(null)" : entry.getValue().getClass().getSimpleName();
-			String value = entry.getValue() == null ? "(null)" : entry.getValue().toString();
-			result[i++] = new StringifiedAccumulatorResult(entry.getKey(), type, value);
-		}
-		return result;
-	}
-
-	/**
-	 * Cleanup data for the oldest jobs if the maximum number of entries is reached.
-	 *
-	 * @param jobId The (potentially new) JobId.
-	 */
-	private void cleanup(JobID jobId) {
-		if (!lru.contains(jobId)) {
-			lru.addFirst(jobId);
-		}
-		if (lru.size() > this.maxEntries) {
-			JobID toRemove = lru.removeLast();
-			this.jobAccumulators.remove(toRemove);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/JobAccumulators.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/JobAccumulators.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/JobAccumulators.java
deleted file mode 100644
index 970d993..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/JobAccumulators.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.accumulators;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.AccumulatorHelper;
-
-/**
- * Simple class wrapping a map of accumulators for a single job. Just for better
- * handling.
- */
-public class JobAccumulators {
-
-	private final Map<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>();
-
-	public Map<String, Accumulator<?, ?>> getAccumulators() {
-		return this.accumulators;
-	}
-
-	public void processNew(Map<String, Accumulator<?, ?>> newAccumulators) {
-		AccumulatorHelper.mergeInto(this.accumulators, newAccumulators);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index 345e1ab..b3130a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.api.reader.MutableReader;
 import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
@@ -356,6 +357,11 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 			throw new Exception("Illegal input group size in task configuration: " + groupSize);
 		}
 
+		final AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry();
+		final AccumulatorRegistry.Reporter reporter = accumulatorRegistry.getReadWriteReporter();
+
+		inputReader.setReporter(reporter);
+
 		this.inputTypeSerializerFactory = this.config.getInputSerializer(0, getUserCodeClassLoader());
 		@SuppressWarnings({ "rawtypes" })
 		final MutableObjectIterator<?> iter = new ReaderIterator(inputReader, this.inputTypeSerializerFactory.getSerializer());

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index 0bbe4bf..3f1c642 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -39,9 +40,9 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.NoSuchElementException;
 
 /**
@@ -187,25 +188,22 @@ public class DataSourceTask<OT> extends AbstractInvokable {
 					format.close();
 				}
 			} // end for all input splits
-			
+
 			// close the collector. if it is a chaining task collector, it will close its chained tasks
 			this.output.close();
-			
+
 			// close all chained tasks letting them report failure
 			RegularPactTask.closeChainedTasks(this.chainedTasks, this);
-			
-			// Merge and report accumulators
-			RegularPactTask.reportAndClearAccumulators(getEnvironment(),
-					new HashMap<String, Accumulator<?,?>>(), chainedTasks);
+
 		}
 		catch (Exception ex) {
 			// close the input, but do not report any exceptions, since we already have another root cause
 			try {
 				this.format.close();
 			} catch (Throwable ignored) {}
-			
+
 			RegularPactTask.cancelChainedTasks(this.chainedTasks);
-			
+
 			ex = ExceptionInChainedStubException.exceptionUnwrap(ex);
 
 			if (ex instanceof CancelTaskException) {
@@ -275,7 +273,7 @@ public class DataSourceTask<OT> extends AbstractInvokable {
 		catch (Throwable t) {
 			throw new RuntimeException("The user defined 'configure()' method caused an error: " + t.getMessage(), t);
 		}
-		
+
 		// get the factory for the type serializer
 		this.serializerFactory = this.config.getOutputSerializer(userCodeClassLoader);
 	}
@@ -287,7 +285,14 @@ public class DataSourceTask<OT> extends AbstractInvokable {
 	private void initOutputs(ClassLoader cl) throws Exception {
 		this.chainedTasks = new ArrayList<ChainedDriver<?, ?>>();
 		this.eventualOutputs = new ArrayList<RecordWriter<?>>();
-		this.output = RegularPactTask.initOutputs(this, cl, this.config, this.chainedTasks, this.eventualOutputs, getExecutionConfig());
+
+		final AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry();
+		final AccumulatorRegistry.Reporter reporter = accumulatorRegistry.getReadWriteReporter();
+
+		Map<String, Accumulator<?, ?>> accumulatorMap = accumulatorRegistry.getUserMap();
+
+		this.output = RegularPactTask.initOutputs(this, cl, this.config, this.chainedTasks, this.eventualOutputs,
+				getExecutionConfig(), reporter, accumulatorMap);
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
index d0f4116..a53f5bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
@@ -74,7 +74,7 @@ public interface PactDriver<S extends Function, OT> {
 	 *                   code typically signal situations where this instance in unable to proceed, exceptions
 	 *                   from the user code should be forwarded.
 	 */
-	void run() throws Exception; 
+	void run() throws Exception;
 	
 	/**
 	 * This method is invoked in any case (clean termination and exception) at the end of the tasks operation.

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
index b296506..bc23fa3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
@@ -50,7 +50,7 @@ public interface PactTaskContext<S, OT> {
 	MemoryManager getMemoryManager();
 	
 	IOManager getIOManager();
-	
+
 	<X> MutableObjectIterator<X> getInput(int index);
 	
 	<X> TypeSerializerFactory<X> getInputSerializer(int index);

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 1c3328e..78bf383 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -20,18 +20,17 @@ package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.distributions.DataDistribution;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableMaterialization;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
@@ -71,7 +70,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -114,7 +112,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 	protected List<RecordWriter<?>> eventualOutputs;
 
 	/**
-	 * The input readers to this task.
+	 * The input readers of this task.
 	 */
 	protected MutableReader<?>[] inputReaders;
 
@@ -212,7 +210,11 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 	 */
 	protected volatile boolean running = true;
 
-	
+	/**
+	 * The accumulator map used in the RuntimeContext.
+	 */
+	protected Map<String, Accumulator<?,?>> accumulatorMap;
+
 	// --------------------------------------------------------------------------------------------
 	//                                  Task Interface
 	// --------------------------------------------------------------------------------------------
@@ -273,7 +275,9 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 			LOG.debug(formatLogString("Start task code."));
 		}
 
-		this.runtimeUdfContext = createRuntimeContext(getEnvironment().getTaskName());
+		Environment env = getEnvironment();
+
+		this.runtimeUdfContext = createRuntimeContext(env.getTaskName());
 
 		// whatever happens in this scope, make sure that the local strategies are cleaned up!
 		// note that the initialization of the local strategies is in the try-finally block as well,
@@ -367,6 +371,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 
 			clearReaders(inputReaders);
 			clearWriters(eventualOutputs);
+
 		}
 
 		if (this.running) {
@@ -505,18 +510,6 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 
 			// close all chained tasks letting them report failure
 			RegularPactTask.closeChainedTasks(this.chainedTasks, this);
-
-			// Collect the accumulators of all involved UDFs and send them to the
-			// JobManager. close() has been called earlier for all involved UDFs
-			// (using this.stub.close() and closeChainedTasks()), so UDFs can no longer
-			// modify accumulators;
-
-			// collect the counters from the udf in the core driver
-			Map<String, Accumulator<?, ?>> accumulators =
-					FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext).getAllAccumulators();
-			
-			// collect accumulators from chained tasks and report them
-			reportAndClearAccumulators(getEnvironment(), accumulators, this.chainedTasks);
 		}
 		catch (Exception ex) {
 			// close the input, but do not report any exceptions, since we already have another root cause
@@ -557,60 +550,6 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 		}
 	}
 
-	/**
-	 * This method is called at the end of a task, receiving the accumulators of
-	 * the task and the chained tasks. It merges them into a single map of
-	 * accumulators and sends them to the JobManager.
-	 *
-	 * @param chainedTasks
-	 *          Each chained task might have accumulators which will be merged
-	 *          with the accumulators of the stub.
-	 */
-	protected static void reportAndClearAccumulators(Environment env,
-													Map<String, Accumulator<?, ?>> accumulators,
-													ArrayList<ChainedDriver<?, ?>> chainedTasks) {
-
-		// We can merge here the accumulators from the stub and the chained
-		// tasks. Type conflicts can occur here if counters with same name but
-		// different type were used.
-		
-		if (!chainedTasks.isEmpty()) {
-			if (accumulators == null) {
-				accumulators = new HashMap<String, Accumulator<?, ?>>();
-			}
-			
-			for (ChainedDriver<?, ?> chainedTask : chainedTasks) {
-				RuntimeContext rc = FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null);
-				if (rc != null) {
-					Map<String, Accumulator<?, ?>> chainedAccumulators = rc.getAllAccumulators();
-					if (chainedAccumulators != null) {
-						AccumulatorHelper.mergeInto(accumulators, chainedAccumulators);
-					}
-				}
-			}
-		}
-
-		// Don't report if the UDF didn't collect any accumulators
-		if (accumulators == null || accumulators.size() == 0) {
-			return;
-		}
-
-		// Report accumulators to JobManager
-		env.reportAccumulators(accumulators);
-
-		// We also clear the accumulators, since stub instances might be reused
-		// (e.g. in iterations) and we don't want to count twice. This may not be
-		// done before sending
-		AccumulatorHelper.resetAndClearAccumulators(accumulators);
-		
-		for (ChainedDriver<?, ?> chainedTask : chainedTasks) {
-			RuntimeContext rc = FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null);
-			if (rc != null) {
-				AccumulatorHelper.resetAndClearAccumulators(rc.getAllAccumulators());
-			}
-		}
-	}
-
 	protected void closeLocalStrategiesAndCaches() {
 		
 		// make sure that all broadcast variable references held by this task are released
@@ -725,6 +664,9 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 
 		int currentReaderOffset = 0;
 
+		AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
+		AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
+
 		for (int i = 0; i < numInputs; i++) {
 			//  ---------------- create the input readers ---------------------
 			// in case where a logical input unions multiple physical inputs, create a union reader
@@ -744,6 +686,8 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 				throw new Exception("Illegal input group size in task configuration: " + groupSize);
 			}
 
+			inputReaders[i].setReporter(reporter);
+
 			currentReaderOffset += groupSize;
 		}
 		this.inputReaders = inputReaders;
@@ -1073,14 +1017,21 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 
 		ClassLoader userCodeClassLoader = getUserCodeClassLoader();
 
-		this.output = initOutputs(this, userCodeClassLoader, this.config, this.chainedTasks, this.eventualOutputs, this.getExecutionConfig());
+		AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry();
+		AccumulatorRegistry.Reporter reporter = accumulatorRegistry.getReadWriteReporter();
+
+		this.accumulatorMap = accumulatorRegistry.getUserMap();
+
+		this.output = initOutputs(this, userCodeClassLoader, this.config, this.chainedTasks, this.eventualOutputs,
+				this.getExecutionConfig(), reporter, this.accumulatorMap);
 	}
 
 	public DistributedRuntimeUDFContext createRuntimeContext(String taskName) {
 		Environment env = getEnvironment();
+
 		return new DistributedRuntimeUDFContext(taskName, env.getNumberOfSubtasks(),
 				env.getIndexInSubtaskGroup(), getUserCodeClassLoader(), getExecutionConfig(),
-				env.getDistributedCacheEntries());
+				env.getDistributedCacheEntries(), this.accumulatorMap);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -1257,7 +1208,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 	 * @return The OutputCollector that data produced in this task is submitted to.
 	 */
 	public static <T> Collector<T> getOutputCollector(AbstractInvokable task, TaskConfig config, ClassLoader cl,
-			List<RecordWriter<?>> eventualOutputs, int outputOffset, int numOutputs) throws Exception
+			List<RecordWriter<?>> eventualOutputs, int outputOffset, int numOutputs, AccumulatorRegistry.Reporter reporter) throws Exception
 	{
 		if (numOutputs == 0) {
 			return null;
@@ -1286,11 +1237,15 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 					}
 					final DataDistribution distribution = config.getOutputDataDistribution(i, cl);
 					final Partitioner<?> partitioner = config.getOutputPartitioner(i, cl);
-					
+
 					oe = new RecordOutputEmitter(strategy, comparator, partitioner, distribution);
 				}
 
-				writers.add(new RecordWriter<Record>(task.getEnvironment().getWriter(outputOffset + i), oe));
+				// setup accumulator counters
+				final RecordWriter<Record> recordWriter = new RecordWriter<Record>(task.getEnvironment().getWriter(outputOffset + i), oe);
+				recordWriter.setReporter(reporter);
+
+				writers.add(recordWriter);
 			}
 			if (eventualOutputs != null) {
 				eventualOutputs.addAll(writers);
@@ -1318,12 +1273,18 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 				else {
 					final DataDistribution dataDist = config.getOutputDataDistribution(i, cl);
 					final Partitioner<?> partitioner = config.getOutputPartitioner(i, cl);
-					
+
 					final TypeComparator<T> comparator = compFactory.createComparator();
 					oe = new OutputEmitter<T>(strategy, comparator, partitioner, dataDist);
 				}
 
-				writers.add(new RecordWriter<SerializationDelegate<T>>(task.getEnvironment().getWriter(outputOffset + i), oe));
+				final RecordWriter<SerializationDelegate<T>> recordWriter =
+						new RecordWriter<SerializationDelegate<T>>(task.getEnvironment().getWriter(outputOffset + i), oe);
+
+				// setup live accumulator counters
+				recordWriter.setReporter(reporter);
+
+				writers.add(recordWriter);
 			}
 			if (eventualOutputs != null) {
 				eventualOutputs.addAll(writers);
@@ -1338,7 +1299,11 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 	 */
 	@SuppressWarnings("unchecked")
 	public static <T> Collector<T> initOutputs(AbstractInvokable nepheleTask, ClassLoader cl, TaskConfig config,
-					List<ChainedDriver<?, ?>> chainedTasksTarget, List<RecordWriter<?>> eventualOutputs, ExecutionConfig executionConfig)
+										List<ChainedDriver<?, ?>> chainedTasksTarget,
+										List<RecordWriter<?>> eventualOutputs,
+										ExecutionConfig executionConfig,
+										AccumulatorRegistry.Reporter reporter,
+										Map<String, Accumulator<?,?>> accumulatorMap)
 	throws Exception
 	{
 		final int numOutputs = config.getNumOutputs();
@@ -1370,12 +1335,12 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 				final TaskConfig chainedStubConf = config.getChainedStubConfig(i);
 				final String taskName = config.getChainedTaskName(i);
 
-				if (i == numChained -1) {
+				if (i == numChained - 1) {
 					// last in chain, instantiate the output collector for this task
-					previous = getOutputCollector(nepheleTask, chainedStubConf, cl, eventualOutputs, 0, chainedStubConf.getNumOutputs());
+					previous = getOutputCollector(nepheleTask, chainedStubConf, cl, eventualOutputs, 0, chainedStubConf.getNumOutputs(), reporter);
 				}
 
-				ct.setup(chainedStubConf, taskName, previous, nepheleTask, cl, executionConfig);
+				ct.setup(chainedStubConf, taskName, previous, nepheleTask, cl, executionConfig, accumulatorMap);
 				chainedTasksTarget.add(0, ct);
 
 				previous = ct;
@@ -1386,7 +1351,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 		// else
 
 		// instantiate the output collector the default way from this configuration
-		return getOutputCollector(nepheleTask , config, cl, eventualOutputs, 0, numOutputs);
+		return getOutputCollector(nepheleTask , config, cl, eventualOutputs, 0, numOutputs, reporter);
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
index b4cfa27..ea6cfe3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.operators.chaining;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.runtime.execution.Environment;
@@ -28,6 +29,8 @@ import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.Collector;
 
+import java.util.Map;
+
 /**
  * The interface to be implemented by drivers that do not run in an own pact task context, but are chained to other
  * tasks.
@@ -50,20 +53,23 @@ public abstract class ChainedDriver<IT, OT> implements Collector<IT> {
 
 	
 	public void setup(TaskConfig config, String taskName, Collector<OT> outputCollector,
-			AbstractInvokable parent, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig)
+			AbstractInvokable parent, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig,
+			Map<String, Accumulator<?,?>> accumulatorMap)
 	{
 		this.config = config;
 		this.taskName = taskName;
 		this.outputCollector = outputCollector;
 		this.userCodeClassLoader = userCodeClassLoader;
-		
+
+		Environment env = parent.getEnvironment();
+
 		if (parent instanceof RegularPactTask) {
 			this.udfContext = ((RegularPactTask<?, ?>) parent).createRuntimeContext(taskName);
 		} else {
-			Environment env = parent.getEnvironment();
 			this.udfContext = new DistributedRuntimeUDFContext(taskName, env.getNumberOfSubtasks(),
 					env.getIndexInSubtaskGroup(), userCodeClassLoader, parent.getExecutionConfig(),
-					env.getDistributedCacheEntries());
+					env.getDistributedCacheEntries(), accumulatorMap
+			);
 		}
 
 		this.executionConfig = executionConfig;

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
index f4cd354..4b480ba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.concurrent.Future;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext;
@@ -41,12 +42,14 @@ public class DistributedRuntimeUDFContext extends AbstractRuntimeUDFContext {
 	private final HashMap<String, BroadcastVariableMaterialization<?, ?>> broadcastVars = new HashMap<String, BroadcastVariableMaterialization<?, ?>>();
 	
 	
-	public DistributedRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig) {
-		super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig);
+	public DistributedRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader,
+										ExecutionConfig executionConfig, Map<String, Accumulator<?,?>> accumulators) {
+		super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, accumulators);
 	}
 	
-	public DistributedRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, Map<String, Future<Path>> cpTasks) {
-		super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, cpTasks);
+	public DistributedRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader,
+										ExecutionConfig executionConfig, Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?,?>> accumulators) {
+		super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, accumulators, cpTasks);
 	}
 	
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 5ab0150..f166c36 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -21,10 +21,9 @@ package org.apache.flink.runtime.taskmanager;
 import akka.actor.ActorRef;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.accumulators.AccumulatorEvent;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -34,12 +33,10 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.messages.accumulators.ReportAccumulatorResult;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.util.SerializedValue;
 
-import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.Future;
 
@@ -76,7 +73,9 @@ public class RuntimeEnvironment implements Environment {
 	private final InputGate[] inputGates;
 	
 	private final ActorRef jobManagerActor;
-	
+
+	private final AccumulatorRegistry accumulatorRegistry;
+
 	// ------------------------------------------------------------------------
 
 	public RuntimeEnvironment(JobID jobId, JobVertexID jobVertexId, ExecutionAttemptID executionId,
@@ -86,6 +85,7 @@ public class RuntimeEnvironment implements Environment {
 								ClassLoader userCodeClassLoader,
 								MemoryManager memManager, IOManager ioManager,
 								BroadcastVariableManager bcVarManager,
+								AccumulatorRegistry accumulatorRegistry,
 								InputSplitProvider splitProvider,
 								Map<String, Future<Path>> distCacheEntries,
 								ResultPartitionWriter[] writers,
@@ -93,7 +93,7 @@ public class RuntimeEnvironment implements Environment {
 								ActorRef jobManagerActor) {
 		
 		checkArgument(parallelism > 0 && subtaskIndex >= 0 && subtaskIndex < parallelism);
-		
+
 		this.jobId = checkNotNull(jobId);
 		this.jobVertexId = checkNotNull(jobVertexId);
 		this.executionId = checkNotNull(executionId);
@@ -107,6 +107,7 @@ public class RuntimeEnvironment implements Environment {
 		this.memManager = checkNotNull(memManager);
 		this.ioManager = checkNotNull(ioManager);
 		this.bcVarManager = checkNotNull(bcVarManager);
+		this.accumulatorRegistry = checkNotNull(accumulatorRegistry);
 		this.splitProvider = checkNotNull(splitProvider);
 		this.distCacheEntries = checkNotNull(distCacheEntries);
 		this.writers = checkNotNull(writers);
@@ -183,6 +184,11 @@ public class RuntimeEnvironment implements Environment {
 	}
 
 	@Override
+	public AccumulatorRegistry getAccumulatorRegistry() {
+		return accumulatorRegistry;
+	}
+
+	@Override
 	public InputSplitProvider getInputSplitProvider() {
 		return splitProvider;
 	}
@@ -213,20 +219,6 @@ public class RuntimeEnvironment implements Environment {
 	}
 
 	@Override
-	public void reportAccumulators(Map<String, Accumulator<?, ?>> accumulators) {
-		AccumulatorEvent evt;
-		try {
-			evt = new AccumulatorEvent(getJobID(), accumulators);
-		}
-		catch (IOException e) {
-			throw new RuntimeException("Cannot serialize accumulators to send them to JobManager", e);
-		}
-
-		ReportAccumulatorResult accResult = new ReportAccumulatorResult(jobId, executionId, evt);
-		jobManagerActor.tell(accResult, ActorRef.noSender());
-	}
-
-	@Override
 	public void acknowledgeCheckpoint(long checkpointId) {
 		acknowledgeCheckpoint(checkpointId, null);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 616998c..13a2ace 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -100,10 +101,10 @@ public class Task implements Runnable {
 
 	/** The class logger. */
 	private static final Logger LOG = LoggerFactory.getLogger(Task.class);
-	
+
 	/** The tread group that contains all task threads */
 	private static final ThreadGroup TASK_THREADS_GROUP = new ThreadGroup("Flink Task Threads");
-	
+
 	/** For atomic state updates */
 	private static final AtomicReferenceFieldUpdater<Task, ExecutionState> STATE_UPDATER =
 			AtomicReferenceFieldUpdater.newUpdater(Task.class, ExecutionState.class, "executionState");
@@ -176,13 +177,16 @@ public class Task implements Runnable {
 
 	/** The library cache, from which the task can request its required JAR files */
 	private final LibraryCacheManager libraryCache;
-	
+
 	/** The cache for user-defined files that the invokable requires */
 	private final FileCache fileCache;
-	
+
 	/** The gateway to the network stack, which handles inputs and produced results */
 	private final NetworkEnvironment network;
 
+	/** The registry of this task which enables live reporting of accumulators */
+	private final AccumulatorRegistry accumulatorRegistry;
+
 	/** The thread that executes the task */
 	private final Thread executingThread;
 
@@ -194,10 +198,10 @@ public class Task implements Runnable {
 
 	/** atomic flag that makes sure the invokable is canceled exactly once upon error */
 	private final AtomicBoolean invokableHasBeenCanceled;
-	
+
 	/** The invokable of this task, if initialized */
 	private volatile AbstractInvokable invokable;
-	
+
 	/** The current execution state of the task */
 	private volatile ExecutionState executionState = ExecutionState.CREATED;
 
@@ -245,12 +249,13 @@ public class Task implements Runnable {
 
 		this.memoryManager = checkNotNull(memManager);
 		this.ioManager = checkNotNull(ioManager);
-		this.broadcastVariableManager =checkNotNull(bcVarManager);
+		this.broadcastVariableManager = checkNotNull(bcVarManager);
+		this.accumulatorRegistry = new AccumulatorRegistry(jobId, executionId);
 
 		this.jobManager = checkNotNull(jobManagerActor);
 		this.taskManager = checkNotNull(taskManagerActor);
 		this.actorAskTimeout = new Timeout(checkNotNull(actorAskTimeout));
-		
+
 		this.libraryCache = checkNotNull(libraryCache);
 		this.fileCache = checkNotNull(fileCache);
 		this.network = checkNotNull(networkEnvironment);
@@ -361,6 +366,10 @@ public class Task implements Runnable {
 		return inputGatesById.get(id);
 	}
 
+	public AccumulatorRegistry getAccumulatorRegistry() {
+		return accumulatorRegistry;
+	}
+
 	public Thread getExecutingThread() {
 		return executingThread;
 	}
@@ -499,7 +508,8 @@ public class Task implements Runnable {
 			Environment env = new RuntimeEnvironment(jobId, vertexId, executionId,
 					taskName, taskNameWithSubtask, subtaskIndex, parallelism,
 					jobConfiguration, taskConfiguration,
-					userCodeClassLoader, memoryManager, ioManager, broadcastVariableManager,
+					userCodeClassLoader, memoryManager, ioManager,
+					broadcastVariableManager, accumulatorRegistry,
 					splitProvider, distributedCacheEntries,
 					writers, inputGates, jobManager);
 
@@ -518,7 +528,7 @@ public class Task implements Runnable {
 
 			// get our private reference onto the stack (be safe against concurrent changes) 
 			SerializedValue<StateHandle<?>> operatorState = this.operatorState;
-			
+
 			if (operatorState != null) {
 				if (invokable instanceof OperatorStateCarrier) {
 					try {
@@ -553,7 +563,7 @@ public class Task implements Runnable {
 			if (!STATE_UPDATER.compareAndSet(this, ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
 				throw new CancelTaskException();
 			}
-			
+
 			// notify everyone that we switched to running. especially the TaskManager needs
 			// to know this!
 			notifyObservers(ExecutionState.RUNNING, null);
@@ -653,7 +663,7 @@ public class Task implements Runnable {
 		finally {
 			try {
 				LOG.info("Freeing task resources for " + taskNameWithSubtask);
-				
+
 				// stop the async dispatcher.
 				// copy dispatcher reference to stack, against concurrent release
 				ExecutorService dispatcher = this.asyncCallDispatcher;
@@ -867,15 +877,15 @@ public class Task implements Runnable {
 	 */
 	public void triggerCheckpointBarrier(final long checkpointID, final long checkpointTimestamp) {
 		AbstractInvokable invokable = this.invokable;
-		
+
 		if (executionState == ExecutionState.RUNNING && invokable != null) {
 			if (invokable instanceof CheckpointedOperator) {
-				
+
 				// build a local closure 
 				final CheckpointedOperator checkpointer = (CheckpointedOperator) invokable;
 				final Logger logger = LOG;
 				final String taskName = taskNameWithSubtask;
-				
+
 				Runnable runnable = new Runnable() {
 					@Override
 					public void run() {
@@ -1038,7 +1048,7 @@ public class Task implements Runnable {
 	public String toString() {
 		return getTaskNameWithSubtasks() + " [" + executionState + ']';
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Task Names
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
index 6c85ab5..0637017 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager;
 
 import java.util.Arrays;
 
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.api.common.JobID;
@@ -52,8 +53,11 @@ public class TaskExecutionState implements java.io.Serializable {
 	// class may not be part of the system class loader.
 	private transient Throwable cachedError;
 
+	/** Serialized flink and user-defined accumulators */
+	private final AccumulatorSnapshot accumulators;
+
 	/**
-	 * Creates a new task execution state update, with no attached exception.
+	 * Creates a new task execution state update, with no attached exception and no accumulators.
 	 *
 	 * @param jobID
 	 *        the ID of the job the task belongs to
@@ -63,13 +67,28 @@ public class TaskExecutionState implements java.io.Serializable {
 	 *        the execution state to be reported
 	 */
 	public TaskExecutionState(JobID jobID, ExecutionAttemptID executionId, ExecutionState executionState) {
-		this(jobID, executionId, executionState, null);
+		this(jobID, executionId, executionState, null, null);
 	}
-	
+
+	/**
+	 * Creates a new task execution state update, with an attached exception but no accumulators.
+	 *
+	 * @param jobID
+	 *        the ID of the job the task belongs to
+	 * @param executionId
+	 *        the ID of the task execution whose state is to be reported
+	 * @param executionState
+	 *        the execution state to be reported
+	 */
+	public TaskExecutionState(JobID jobID, ExecutionAttemptID executionId,
+							ExecutionState executionState, Throwable error) {
+		this(jobID, executionId, executionState, error, null);
+	}
+
 	/**
 	 * Creates a new task execution state update, with an attached exception.
 	 * This constructor may never throw an exception.
-	 * 
+	 *
 	 * @param jobID
 	 *        the ID of the job the task belongs to
 	 * @param executionId
@@ -78,11 +97,15 @@ public class TaskExecutionState implements java.io.Serializable {
 	 *        the execution state to be reported
 	 * @param error
 	 *        an optional error
+	 * @param accumulators
+	 *        The flink and user-defined accumulators which may be null.
 	 */
 	public TaskExecutionState(JobID jobID, ExecutionAttemptID executionId,
-								ExecutionState executionState, Throwable error) {
+			ExecutionState executionState, Throwable error,
+			AccumulatorSnapshot accumulators) {
 
-		if (jobID == null || executionId == null || executionState == null) {
+
+			if (jobID == null || executionId == null || executionState == null) {
 			throw new NullPointerException();
 		}
 
@@ -90,6 +113,7 @@ public class TaskExecutionState implements java.io.Serializable {
 		this.executionId = executionId;
 		this.executionState = executionState;
 		this.cachedError = error;
+		this.accumulators = accumulators;
 
 		if (error != null) {
 			byte[] serializedError;
@@ -178,6 +202,13 @@ public class TaskExecutionState implements java.io.Serializable {
 		return this.jobID;
 	}
 
+	/**
+	 * Gets flink and user-defined accumulators in serialized form.
+	 */
+	public AccumulatorSnapshot getAccumulators() {
+		return accumulators;
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedValue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedValue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedValue.java
index f5e897b..6a5468a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedValue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedValue.java
@@ -29,7 +29,7 @@ import java.util.Arrays;
  * special class loader, the deserialization fails with a {@code ClassNotFoundException}.
  *
  * To work around that issue, the SerializedValue serialized data immediately into a byte array.
- * When send through RPC or another service that uses serialization, the only the byte array is
+ * When send through RPC or another service that uses serialization, only the byte array is
  * transferred. The object is deserialized later (upon access) and requires the accessor to
  * provide the corresponding class loader.
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 3b4ce15..8823041 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -29,39 +29,39 @@ import org.apache.flink.api.common.{ExecutionConfig, JobID}
 import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration}
 import org.apache.flink.core.io.InputSplitAssigner
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult
-import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.client._
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex}
-import org.apache.flink.runtime.instance.InstanceManager
-import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
-import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager
-import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
 import org.apache.flink.runtime.jobmanager.web.WebInfoServer
 import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
 import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
-import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
-import org.apache.flink.runtime.messages.RegistrationMessages._
-import org.apache.flink.runtime.messages.TaskManagerMessages.{Heartbeat, SendStackTrace}
 import org.apache.flink.runtime.messages.TaskMessages.{PartitionState, UpdateTaskExecutionState}
 import org.apache.flink.runtime.messages.accumulators._
 import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint}
-import org.apache.flink.runtime.net.NetUtils
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
 import org.apache.flink.runtime.taskmanager.TaskManager
-import org.apache.flink.runtime.util.{EnvironmentInformation, SerializedValue, ZooKeeperUtil}
-import org.apache.flink.runtime.{ActorLogMessages, ActorSynchronousLogging, StreamingMode}
+import org.apache.flink.runtime.util.ZooKeeperUtil
+import org.apache.flink.runtime.util.{SerializedValue, EnvironmentInformation}
+import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, ActorLogMessages}
+import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.instance.InstanceManager
+import org.apache.flink.runtime.jobgraph.{JobVertexID, JobGraph, JobStatus}
+import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
+import org.apache.flink.runtime.messages.JobManagerMessages._
+import org.apache.flink.runtime.messages.RegistrationMessages._
+import org.apache.flink.runtime.messages.TaskManagerMessages.{SendStackTrace, Heartbeat}
 import org.apache.flink.util.{ExceptionUtils, InstantiationUtil}
 
-import scala.collection.JavaConverters._
 import scala.concurrent._
 import scala.concurrent.duration._
 import scala.concurrent.forkjoin.ForkJoinPool
 import scala.language.postfixOps
+import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
 
 /**
  * The job manager is responsible for receiving Flink jobs, scheduling the tasks, gathering the
@@ -97,7 +97,6 @@ class JobManager(
     protected val scheduler: FlinkScheduler,
     protected val libraryCacheManager: BlobLibraryCacheManager,
     protected val archive: ActorRef,
-    protected val accumulatorManager: AccumulatorManager,
     protected val defaultExecutionRetries: Int,
     protected val delayBetweenRetries: Long,
     protected val timeout: FiniteDuration,
@@ -221,7 +220,6 @@ class JobManager(
               originalSender ! result
             }(context.dispatcher)
 
-            sender ! true
           case None => log.error("Cannot find execution graph for ID " +
             s"${taskExecutionState.getJobID} to change state to " +
             s"${taskExecutionState.getExecutionState}.")
@@ -298,7 +296,7 @@ class JobManager(
             newJobStatus match {
               case JobStatus.FINISHED =>
                 val accumulatorResults: java.util.Map[String, SerializedValue[AnyRef]] = try {
-                  accumulatorManager.getJobAccumulatorResultsSerialized(jobID)
+                  executionGraph.getAccumulatorsSerialized
                 } catch {
                   case e: Exception =>
                     log.error(s"Cannot fetch serialized accumulators for job $jobID", e)
@@ -402,13 +400,22 @@ class JobManager(
       import scala.collection.JavaConverters._
       sender ! RegisteredTaskManagers(instanceManager.getAllRegisteredInstances.asScala)
 
-    case Heartbeat(instanceID, metricsReport) =>
-      try {
-        log.debug(s"Received hearbeat message from $instanceID.")
-        instanceManager.reportHeartBeat(instanceID, metricsReport)
-      } catch {
-        case t: Throwable => log.error(s"Could not report heart beat from ${sender().path}.", t)
-      }
+    case Heartbeat(instanceID, metricsReport, accumulators) =>
+      log.debug(s"Received hearbeat message from $instanceID.")
+
+      Future {
+        accumulators foreach {
+          case accumulators =>
+              currentJobs.get(accumulators.getJobID) match {
+                case Some((jobGraph, jobInfo)) =>
+                  jobGraph.updateAccumulators(accumulators)
+                case None =>
+                  // ignore accumulator values for old job
+              }
+        }
+      }(context.dispatcher)
+
+      instanceManager.reportHeartBeat(instanceID, metricsReport)
 
     case message: AccumulatorMessage => handleAccumulatorMessage(message)
 
@@ -676,33 +683,18 @@ class JobManager(
    * @param message The accumulator message.
    */
   private def handleAccumulatorMessage(message: AccumulatorMessage): Unit = {
-
     message match {
-      case ReportAccumulatorResult(jobId, _, accumulatorEvent) =>
-        val classLoader = try {
-          libraryCacheManager.getClassLoader(jobId)
-        } catch {
-          case e: Exception =>
-            log.error("Dropping accumulators. No class loader available for job " + jobId, e)
-            return
-        }
-
-        if (classLoader != null) {
-          try {
-            val accumulators = accumulatorEvent.deserializeValue(classLoader)
-            accumulatorManager.processIncomingAccumulators(jobId, accumulators)
-          }
-          catch {
-            case e: Exception => log.error("Cannot update accumulators for job " + jobId, e)
-          }
-        } else {
-          log.error("Dropping accumulators. No class loader available for job " + jobId)
-        }
 
       case RequestAccumulatorResults(jobID) =>
         try {
-          val accumulatorValues: java.util.Map[String, SerializedValue[Object]] =
-            accumulatorManager.getJobAccumulatorResultsSerialized(jobID)
+          val accumulatorValues: java.util.Map[String, SerializedValue[Object]] = {
+            currentJobs.get(jobID) match {
+              case Some((graph, jobInfo)) =>
+                graph.getAccumulatorsSerialized
+              case None =>
+                null // TODO check also archive
+            }
+          }
 
           sender() ! AccumulatorResultsFound(jobID, accumulatorValues)
         }
@@ -714,8 +706,31 @@ class JobManager(
 
       case RequestAccumulatorResultsStringified(jobId) =>
         try {
-          val accumulatorValues: Array[StringifiedAccumulatorResult] =
-            accumulatorManager.getJobAccumulatorResultsStringified(jobId)
+          val accumulatorValues: Array[StringifiedAccumulatorResult] = {
+            currentJobs.get(jobId) match {
+              case Some((graph, jobInfo)) =>
+                val accumulators = graph.aggregateUserAccumulators()
+
+                val result: Array[StringifiedAccumulatorResult] = new
+                    Array[StringifiedAccumulatorResult](accumulators.size)
+
+                var i = 0
+                accumulators foreach {
+                  case (name, accumulator) =>
+                    val (typeString, valueString) =
+                      if (accumulator != null) {
+                        (accumulator.getClass.getSimpleName, accumulator.toString)
+                      } else {
+                        (null, null)
+                      }
+                    result(i) = new StringifiedAccumulatorResult(name, typeString, valueString)
+                    i += 1
+                }
+                result
+              case None =>
+                null // TODO check also archive
+            }
+          }
 
           sender() ! AccumulatorResultStringsFound(jobId, accumulatorValues)
         }
@@ -1058,7 +1073,7 @@ object JobManager {
    */
   def createJobManagerComponents(configuration: Configuration)
     : (ExecutionContext, InstanceManager, FlinkScheduler, BlobLibraryCacheManager,
-      Props, AccumulatorManager, Int, Long, FiniteDuration, Int) = {
+      Props, Int, Long, FiniteDuration, Int) = {
 
     val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
 
@@ -1091,8 +1106,6 @@ object JobManager {
 
     val archiveProps: Props = Props(classOf[MemoryArchivist], archiveCount)
 
-    val accumulatorManager: AccumulatorManager = new AccumulatorManager(Math.min(1, archiveCount))
-
     val executionContext = ExecutionContext.fromExecutor(new ForkJoinPool())
 
     var blobServer: BlobServer = null
@@ -1131,7 +1144,6 @@ object JobManager {
       scheduler,
       libraryCacheManager,
       archiveProps,
-      accumulatorManager,
       executionRetries,
       delayBetweenRetries,
       timeout,
@@ -1179,7 +1191,6 @@ object JobManager {
       scheduler,
       libraryCacheManager,
       archiveProps,
-      accumulatorManager,
       executionRetries,
       delayBetweenRetries,
       timeout,
@@ -1199,7 +1210,6 @@ object JobManager {
       scheduler,
       libraryCacheManager,
       archiver,
-      accumulatorManager,
       executionRetries,
       delayBetweenRetries,
       timeout,

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
index b12f1b5..6cb571c 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.messages
 
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.instance.InstanceID
 
 /**
@@ -52,8 +53,10 @@ object TaskManagerMessages {
    *
    * @param instanceID The instance ID of the reporting TaskManager.
    * @param metricsReport utf-8 encoded JSON metrics report from the metricRegistry.
+   * @param accumulators Accumulators of tasks serialized as Tuple2[internal, user-defined]
    */
-  case class Heartbeat(instanceID: InstanceID, metricsReport: Array[Byte])
+  case class Heartbeat(instanceID: InstanceID, metricsReport: Array[Byte],
+     accumulators: Seq[AccumulatorSnapshot])
 
 
   // --------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala
index 82c4ab6..015c96e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala
@@ -19,8 +19,7 @@
 package org.apache.flink.runtime.messages.accumulators
 
 import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.accumulators.{StringifiedAccumulatorResult, AccumulatorEvent}
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult
 import org.apache.flink.runtime.util.SerializedValue
 
 /**
@@ -38,18 +37,6 @@ sealed trait AccumulatorMessage {
 sealed trait AccumulatorResultsResponse extends AccumulatorMessage
 
 /**
- * Reports the accumulator results of the individual tasks to the job manager.
- *
- * @param jobID The ID of the job the accumulator belongs to
- * @param executionId The ID of the task execution that the accumulator belongs to.
- * @param accumulatorEvent The serialized accumulators
- */
-case class ReportAccumulatorResult(jobID: JobID,
-                                   executionId: ExecutionAttemptID,
-                                   accumulatorEvent: AccumulatorEvent)
-  extends AccumulatorMessage
-
-/**
  * Requests the accumulator results of the job identified by [[jobID]] from the job manager.
  * The result is sent back to the sender as a [[AccumulatorResultsResponse]] message.
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 1a35d01..f07fa0c 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -37,6 +37,7 @@ import grizzled.slf4j.Logger
 
 import org.apache.flink.configuration.{Configuration, ConfigConstants, GlobalConfiguration, IllegalConfigurationException}
 import org.apache.flink.runtime.messages.checkpoint.{NotifyCheckpointComplete, TriggerCheckpoint, AbstractCheckpointMessage}
+import org.apache.flink.runtime.accumulators.{AccumulatorSnapshot, AccumulatorRegistry}
 import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, ActorLogMessages}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.blob.{BlobService, BlobCache}
@@ -68,6 +69,7 @@ import scala.concurrent.duration._
 import scala.concurrent.forkjoin.ForkJoinPool
 import scala.util.{Failure, Success}
 import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
 
 import scala.language.postfixOps
 
@@ -328,7 +330,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
               network.getPartitionManager.releasePartitionsProducedBy(executionID)
             } catch {
               case t: Throwable => killTaskManagerFatal(
-                "Fatal leak: Unable to release intermediate result partition data", t)
+              "Fatal leak: Unable to release intermediate result partition data", t)
             }
           }
 
@@ -389,7 +391,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
           } else {
             log.debug(s"Cannot find task to cancel for execution ${executionID})")
             sender ! new TaskOperationResult(executionID, false,
-              "No task with that execution ID was found.")
+            "No task with that execution ID was found.")
           }
 
         case PartitionState(taskExecutionId, taskResultId, partitionId, state) =>
@@ -400,7 +402,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
               log.debug(s"Cannot find task $taskExecutionId to respond with partition state.")
           }
       }
-    }
+      }
   }
 
   /**
@@ -793,12 +795,12 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
 
       // create the task. this does not grab any TaskManager resources or download
       // and libraries - the operation does not block
-      val execId = tdd.getExecutionId
       val task = new Task(tdd, memoryManager, ioManager, network, bcVarManager,
                           self, jobManagerActor, config.timeout, libCache, fileCache)
 
       log.info(s"Received task ${task.getTaskNameWithSubtasks}")
-      
+
+      val execId = tdd.getExecutionId
       // add the task to the map
       val prevTask = runningTasks.put(execId, task)
       if (prevTask != null) {
@@ -898,22 +900,28 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
 
     val task = runningTasks.remove(executionID)
     if (task != null) {
-      
-        // the task must be in a terminal state
-        if (!task.getExecutionState.isTerminal) {
-          try {
-            task.failExternally(new Exception("Task is being removed from TaskManager"))
-          } catch {
-            case e: Exception => log.error("Could not properly fail task", e)
-          }
+
+      // the task must be in a terminal state
+      if (!task.getExecutionState.isTerminal) {
+        try {
+          task.failExternally(new Exception("Task is being removed from TaskManager"))
+        } catch {
+          case e: Exception => log.error("Could not properly fail task", e)
         }
+      }
+
+      log.info(s"Unregistering task and sending final execution state " +
+        s"${task.getExecutionState} to JobManager for task ${task.getTaskName} " +
+        s"(${task.getExecutionId})")
 
-        log.info(s"Unregistering task and sending final execution state " +
-          s"${task.getExecutionState} to JobManager for task ${task.getTaskName} " +
-          s"(${task.getExecutionId})")
+      val accumulators = {
+        val registry = task.getAccumulatorRegistry
+        registry.getSnapshot
+      }
 
-        self ! UpdateTaskExecutionState(new TaskExecutionState(
-          task.getJobID, task.getExecutionId, task.getExecutionState, task.getFailureCause))
+      self ! UpdateTaskExecutionState(new TaskExecutionState(
+        task.getJobID, task.getExecutionId, task.getExecutionState, task.getFailureCause,
+        accumulators))
     }
     else {
       log.error(s"Cannot find task with ID $executionID to unregister.")
@@ -931,9 +939,20 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
   private def sendHeartbeatToJobManager(): Unit = {
     try {
       log.debug("Sending heartbeat to JobManager")
-      val report: Array[Byte] = metricRegistryMapper.writeValueAsBytes(metricRegistry)
-      currentJobManager foreach {
-        jm => jm ! Heartbeat(instanceID, report)
+      val metricsReport: Array[Byte] = metricRegistryMapper.writeValueAsBytes(metricRegistry)
+
+      val accumulatorEvents =
+        scala.collection.mutable.Buffer[AccumulatorSnapshot]()
+
+      runningTasks foreach {
+        case (execID, task) =>
+          val registry = task.getAccumulatorRegistry
+          val accumulators = registry.getSnapshot
+          accumulatorEvents.append(accumulators)
+      }
+
+       currentJobManager foreach {
+        jm => jm ! Heartbeat(instanceID, metricsReport, accumulatorEvents)
       }
     }
     catch {

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
index 4e5fb40..14bf022 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
@@ -18,8 +18,10 @@
 
 package org.apache.flink.runtime.io.network.api.reader;
 
+import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.task.TaskEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
@@ -183,5 +185,10 @@ public class AbstractReaderTest {
 		protected MockReader(InputGate inputGate) {
 			super(inputGate);
 		}
+
+		@Override
+		public void setReporter(AccumulatorRegistry.Reporter reporter) {
+
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index 9b9609b..0aab5fe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -61,7 +61,7 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
 	private final IOManager ioManager;
 	
 	private final MemoryManager memManager;
-	
+
 	private final List<MutableObjectIterator<Record>> inputs;
 	
 	private final List<TypeComparator<Record>> comparators;
@@ -105,7 +105,7 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
 		this.perSortFractionMem = (double)perSortMemory/totalMem;
 		this.ioManager = new IOManagerAsync();
 		this.memManager = totalMem > 0 ? new DefaultMemoryManager(totalMem,1) : null;
-		
+
 		this.inputs = new ArrayList<MutableObjectIterator<Record>>();
 		this.comparators = new ArrayList<TypeComparator<Record>>();
 		this.sorters = new ArrayList<UnilateralSortMerger<Record>>();
@@ -295,7 +295,7 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
 	public IOManager getIOManager() {
 		return this.ioManager;
 	}
-	
+
 	@Override
 	public MemoryManager getMemoryManager() {
 		return this.memManager;

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 0f62b27..b9cb416 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -79,6 +80,8 @@ public class MockEnvironment implements Environment {
 
 	private final BroadcastVariableManager bcVarManager = new BroadcastVariableManager();
 
+	private final AccumulatorRegistry accumulatorRegistry;
+
 	private final int bufferSize;
 
 	public MockEnvironment(long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
@@ -91,6 +94,8 @@ public class MockEnvironment implements Environment {
 		this.ioManager = new IOManagerAsync();
 		this.inputSplitProvider = inputSplitProvider;
 		this.bufferSize = bufferSize;
+
+		this.accumulatorRegistry = new AccumulatorRegistry(jobID, getExecutionId());
 	}
 
 	public IteratorWrappingTestSingleInputGate<Record> addInput(MutableObjectIterator<Record> inputIterator) {
@@ -259,8 +264,8 @@ public class MockEnvironment implements Environment {
 	}
 
 	@Override
-	public void reportAccumulators(Map<String, Accumulator<?, ?>> accumulators) {
-		// discard, this is only for testing
+	public AccumulatorRegistry getAccumulatorRegistry() {
+		return this.accumulatorRegistry;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index e9e761c..bd36dd4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -25,6 +25,7 @@ import akka.actor.Props;
 
 import com.google.common.collect.Maps;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 219e5ae..f2535fa 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -65,7 +65,6 @@ class TestingCluster(userConfiguration: Configuration,
       scheduler,
       libraryCacheManager,
       _,
-      accumulatorManager,
       executionRetries,
       delayBetweenRetries,
       timeout,
@@ -82,7 +81,6 @@ class TestingCluster(userConfiguration: Configuration,
         scheduler,
         libraryCacheManager,
         archive,
-        accumulatorManager,
         executionRetries,
         delayBetweenRetries,
         timeout,

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index 5747b7e..6d316ca 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -147,6 +147,16 @@ trait TestingJobManager extends ActorLogMessages with WrapAsScala {
         case None => sender ! WorkingTaskManager(None)
       }
 
+    case RequestAccumulatorValues(jobID) =>
+
+      val (flinkAccumulators, userAccumulators) = currentJobs.get(jobID) match {
+        case Some((graph, jobInfo)) =>
+          (graph.getFlinkAccumulators, graph.aggregateUserAccumulators)
+        case None => null
+      }
+
+      sender ! RequestAccumulatorValuesResponse(jobID, flinkAccumulators, userAccumulators)
+
     case NotifyWhenJobStatus(jobID, state) =>
       val jobStatusListener = waitForJobStatus.getOrElseUpdate(jobID,
         scala.collection.mutable.HashMap[JobStatus, Set[ActorRef]]())

http://git-wip-us.apache.org/repos/asf/flink/blob/8261ed54/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index 241c6c0..46e8486 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -20,9 +20,12 @@ package org.apache.flink.runtime.testingUtils
 
 import akka.actor.ActorRef
 import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.executiongraph.ExecutionGraph
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry
+import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph}
 import org.apache.flink.runtime.instance.InstanceGateway
 import org.apache.flink.runtime.jobgraph.JobStatus
+import java.util.Map
+import org.apache.flink.api.common.accumulators.Accumulator
 
 object TestingJobManagerMessages {
 
@@ -53,4 +56,9 @@ object TestingJobManagerMessages {
 
   case class NotifyWhenTaskManagerTerminated(taskManager: ActorRef)
   case class TaskManagerTerminated(taskManager: ActorRef)
+
+  case class RequestAccumulatorValues(jobID: JobID)
+  case class RequestAccumulatorValuesResponse(jobID: JobID,
+    flinkAccumulators: Map[ExecutionAttemptID, Map[AccumulatorRegistry.Metric, Accumulator[_,_]]],
+    userAccumulators: Map[String, Accumulator[_,_]])
 }


Mime
View raw message