flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [1/2] flink git commit: [FLINK-2286] [streaming] Wrapped ParallelMerge into stream operator
Date Wed, 19 Aug 2015 13:36:06 GMT
Repository: flink
Updated Branches:
  refs/heads/release-0.9 e8802f90a -> c15831731


[FLINK-2286] [streaming] Wrapped ParallelMerge into stream operator

Closes #994


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/87119749
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/87119749
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/87119749

Branch: refs/heads/release-0.9
Commit: 87119749aa2e6b129d1099b2ede9ad5854b33a55
Parents: e8802f9
Author: Gábor Hermann <reckoner42@gmail.com>
Authored: Thu Aug 6 15:42:54 2015 +0200
Committer: mbalassi <mbalassi@apache.org>
Committed: Tue Aug 18 15:36:31 2015 +0200

----------------------------------------------------------------------
 .../api/datastream/DiscretizedStream.java       |  8 ++--
 .../api/operators/co/CoStreamFlatMap.java       |  4 ++
 .../api/operators/windowing/ParallelMerge.java  |  3 ++
 .../windowing/ParallelMergeOperator.java        | 43 ++++++++++++++++++++
 4 files changed, 53 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/87119749/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
index ba28fa4..e35592e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
@@ -30,11 +30,9 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.functions.WindowMapFunction;
-import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamFilter;
 import org.apache.flink.streaming.api.operators.StreamFlatMap;
-import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
 import org.apache.flink.streaming.api.operators.windowing.EmptyWindowFilter;
 import org.apache.flink.streaming.api.operators.windowing.ParallelGroupedMerge;
 import org.apache.flink.streaming.api.operators.windowing.ParallelMerge;
@@ -45,6 +43,7 @@ import org.apache.flink.streaming.api.operators.windowing.WindowMerger;
 import org.apache.flink.streaming.api.operators.windowing.WindowPartExtractor;
 import org.apache.flink.streaming.api.operators.windowing.WindowPartitioner;
 import org.apache.flink.streaming.api.operators.windowing.WindowReducer;
+import org.apache.flink.streaming.api.operators.windowing.ParallelMergeOperator;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.streaming.api.windowing.StreamWindowTypeInfo;
 import org.apache.flink.streaming.api.windowing.WindowUtils.WindowKey;
@@ -147,7 +146,7 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT>
{
 			DataStream<Tuple2<Integer, Integer>> numOfParts, DiscretizedStream<OUT>
reduced,
 			ReduceFunction<OUT> reduceFunction) {
 
-		CoFlatMapFunction<StreamWindow<OUT>, Tuple2<Integer, Integer>, StreamWindow<OUT>>
parallelMerger = isGrouped() ? new ParallelGroupedMerge<OUT>()
+		ParallelMerge<OUT> parallelMerger = isGrouped() ? new ParallelGroupedMerge<OUT>()
 				: new ParallelMerge<OUT>(reduceFunction);
 
 		return reduced.discretizedStream
@@ -156,8 +155,7 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT>
{
 				.transform(
 						"CoFlatMap",
 						reduced.discretizedStream.getType(),
-						new CoStreamFlatMap<StreamWindow<OUT>, Tuple2<Integer, Integer>, StreamWindow<OUT>>(
-								parallelMerger));
+						new ParallelMergeOperator<OUT>(parallelMerger));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/87119749/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
index e3662d6..777b165 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
@@ -41,4 +41,8 @@ public class CoStreamFlatMap<IN1, IN2, OUT>
 	public void processElement2(IN2 element) throws Exception {
 		userFunction.flatMap2(element, output);
 	}
+
+	protected TimestampedCollector<OUT> getCollector() {
+		return collector;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/87119749/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMerge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMerge.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMerge.java
index cd239fc..ce7d887 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMerge.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMerge.java
@@ -139,4 +139,7 @@ public class ParallelMerge<OUT> extends
 		this.numberOfDiscretizers = getRuntimeContext().getNumberOfParallelSubtasks();
 	}
 
+	Map<Integer, Tuple2<StreamWindow<OUT>, Integer>> getReceivedWindows()
{
+		return receivedWindows;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/87119749/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeOperator.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeOperator.java
new file mode 100644
index 0000000..74df3ad
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeOperator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.windowing;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+
+public class ParallelMergeOperator<OUT> extends CoStreamFlatMap<StreamWindow<OUT>,
Tuple2<Integer, Integer>, StreamWindow<OUT>> {
+
+	private ParallelMerge<OUT> parallelMerge;
+
+	public ParallelMergeOperator(ParallelMerge<OUT> parallelMerge) {
+		super(parallelMerge);
+		this.parallelMerge = parallelMerge;
+	}
+
+	@Override
+	public void close() throws Exception {
+		// emit remaining (partial) windows
+
+		for (Tuple2<StreamWindow<OUT>, Integer> receivedWindow : parallelMerge.getReceivedWindows().values())
{
+			getCollector().collect(receivedWindow.f0);
+		}
+
+		super.close();
+	}
+}


Mime
View raw message