flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject flink git commit: [FLINK-2382] fix Flink live accumulators for TwoInputStreamTask
Date Tue, 21 Jul 2015 15:42:32 GMT
Repository: flink
Updated Branches:
  refs/heads/master 0f589aad8 -> 1d373a7de


[FLINK-2382] fix Flink live accumulators for TwoInputStreamTask


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1d373a7d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1d373a7d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1d373a7d

Branch: refs/heads/master
Commit: 1d373a7de39ac3fc14d69025122b04d27ec9d7ba
Parents: 0f589aa
Author: Maximilian Michels <mxm@apache.org>
Authored: Tue Jul 21 17:37:21 2015 +0200
Committer: Maximilian Michels <mxm@apache.org>
Committed: Tue Jul 21 17:37:21 2015 +0200

----------------------------------------------------------------------
 .../flink/streaming/runtime/tasks/TwoInputStreamTask.java     | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1d373a7d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 507b813..f981cd5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
@@ -66,6 +67,10 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT,
TwoInputS
 
 		inputProcessor = new StreamTwoInputProcessor<IN1, IN2>(inputList1, inputList2, inputDeserializer1,
inputDeserializer2, getExecutionConfig().areTimestampsEnabled());
 
+		AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
+		AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
+		inputProcessor.setReporter(reporter);
+
 		inputProcessor.registerTaskEventListener(getCheckpointBarrierListener(), CheckpointBarrier.class);
 	}
 
@@ -107,7 +112,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT,
TwoInputS
 					LOG.warn("Exception while closing operator.", t);
 				}
 			}
-			
+
 			throw e;
 		}
 		finally {


Mime
View raw message