flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [2/3] flink git commit: [streaming] Replaced partitionBy with groupBy + re-added global partitioning
Date Thu, 08 Jan 2015 14:54:22 GMT
[streaming] Replaced partitionBy with groupBy + re-added global partitioning


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/10a81862
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/10a81862
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/10a81862

Branch: refs/heads/master
Commit: 10a81862562d6e87300cdb21c7cd719cddeee083
Parents: b22406a
Author: Gyula Fora <gyfora@apache.org>
Authored: Wed Jan 7 10:24:00 2015 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Thu Jan 8 13:35:28 2015 +0100

----------------------------------------------------------------------
 docs/streaming_guide.md                         |  6 +-
 .../connectors/twitter/TwitterLocal.java        | 98 --------------------
 .../connectors/twitter/TwitterTopology.java     | 92 ++++++++++++++++++
 .../streaming/api/datastream/DataStream.java    | 67 ++++++-------
 .../datastream/SingleOutputStreamOperator.java  | 16 ----
 .../examples/twitter/TwitterStream.java         |  2 +-
 .../windowing/MultiplePoliciesExample.java      |  9 +-
 .../flink/streaming/api/scala/DataStream.scala  | 40 ++------
 8 files changed, 137 insertions(+), 193 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/10a81862/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index c7e7060..7808b9e 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -162,11 +162,11 @@ Usage: `dataStream.shuffle()`
  * *Distribute*: Distribute partitioning directs the output data stream to the next operator
in a round-robin fashion, achieving a balanced distribution.
 Usage: `dataStream.distribute()`
  * *Field/Key*: Field/Key partitioning partitions the output data stream based on the hash
code of a selected key of the tuples. Data points with the same key are directed to the same
operator instance. The user can define keys by field positions (for tuple and array types),
field expressions (for Pojo types) and custom keys using the `KeySelector` interface. 
-Usage: `dataStream.partitionBy(keys)`
+Usage: `dataStream.groupBy(keys)`
  * *Broadcast*: Broadcast partitioning sends the output data stream to all parallel instances
of the next operator.
 Usage: `dataStream.broadcast()`
- * *Global*: All data points end up at the same operator instance. To achieve this use the
parallelism setting of the corresponding operator.
-Usage: `operator.setParallelism(1)`
+ * *Global*: All data are sent to the first instance of the next processing operator. Use
this option with care to avoid serious performance bottlenecks.
+Usage: `dataStream.global()`
 
 ### Sources
 

http://git-wip-us.apache.org/repos/asf/flink/blob/10a81862/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
deleted file mode 100644
index 3058047..0000000
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.twitter;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
-import org.apache.flink.util.Collector;
-import org.apache.sling.commons.json.JSONException;
-
-/**
- * This program demonstrate the use of TwitterSource. 
- * Its aim is to count the frequency of the languages of tweets
- */
-public class TwitterLocal {
-
-	private static final int PARALLELISM = 1;
-	private static final int SOURCE_PARALLELISM = 1;
-	private static final int NUMBEROFTWEETS = 100;
-	
-	/**
-	 * FlatMapFunction to determine the language of tweets if possible 
-	 */
-	public static class SelectLanguageFlatMap extends
-			JSONParseFlatMap<String, String> {
-
-		private static final long serialVersionUID = 1L;
-
-		/**
-		 * Select the language from the incoming JSON text
-		 */
-		@Override
-		public void flatMap(String value, Collector<String> out) throws Exception {
-			try{
-				out.collect(getString(value, "lang"));
-			}
-			catch (JSONException e){
-				out.collect("");
-			}
-		}
-
-	}
-
-	public static void main(String[] args) throws Exception {
-
-		String path = new String();
-
-		if (args != null && args.length == 1) {
-			path = args[0];
-		} else {
-			System.err.println("USAGE:\nTwitterLocal <pathToPropertiesFile>");
-			return;
-		}
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.createLocalEnvironment(PARALLELISM);
-
-		DataStream<String> streamSource = env.addSource(new TwitterSource(path, NUMBEROFTWEETS))
-				.setParallelism(SOURCE_PARALLELISM);
-
-
-		DataStream<Tuple2<String, Integer>> dataStream = streamSource
-				.flatMap(new SelectLanguageFlatMap())
-				.partitionBy(0)
-				.map(new MapFunction<String, Tuple2<String, Integer>>() {
-
-					private static final long serialVersionUID = 1L;
-					
-					@Override
-					public Tuple2<String, Integer> map(String value) throws Exception {
-						return new Tuple2<String, Integer>(value, 1);
-					}
-				})
-				.groupBy(0)
-				.sum(1);
-
-		dataStream.print();
-
-		env.execute();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/10a81862/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
new file mode 100644
index 0000000..4bc6df0
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.twitter;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
+import org.apache.flink.util.Collector;
+import org.apache.sling.commons.json.JSONException;
+
+/**
+ * This program demonstrate the use of TwitterSource. 
+ * Its aim is to count the frequency of the languages of tweets
+ */
+public class TwitterTopology {
+
+	private static final int NUMBEROFTWEETS = 100;
+	
+	/**
+	 * FlatMapFunction to determine the language of tweets if possible 
+	 */
+	public static class SelectLanguageFlatMap extends
+			JSONParseFlatMap<String, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		/**
+		 * Select the language from the incoming JSON text
+		 */
+		@Override
+		public void flatMap(String value, Collector<String> out) throws Exception {
+			try{
+				out.collect(getString(value, "lang"));
+			}
+			catch (JSONException e){
+				out.collect("");
+			}
+		}
+
+	}
+
+	public static void main(String[] args) throws Exception {
+
+		String path = new String();
+
+		if (args != null && args.length == 1) {
+			path = args[0];
+		} else {
+			System.err.println("USAGE:\nTwitterLocal <pathToPropertiesFile>");
+			return;
+		}
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<String> streamSource = env.addSource(new TwitterSource(path, NUMBEROFTWEETS));
+
+
+		DataStream<Tuple2<String, Integer>> dataStream = streamSource
+				.flatMap(new SelectLanguageFlatMap())
+				.map(new MapFunction<String, Tuple2<String, Integer>>() {
+					private static final long serialVersionUID = 1L;
+					
+					@Override
+					public Tuple2<String, Integer> map(String value) throws Exception {
+						return new Tuple2<String, Integer>(value, 1);
+					}
+				})
+				.groupBy(0)
+				.sum(1);
+
+		dataStream.print();
+
+		env.execute();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/10a81862/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index e969647..8e21218 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -67,6 +67,7 @@ import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 import org.apache.flink.streaming.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.partitioner.DistributePartitioner;
 import org.apache.flink.streaming.partitioner.FieldsPartitioner;
+import org.apache.flink.streaming.partitioner.GlobalPartitioner;
 import org.apache.flink.streaming.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.util.keys.KeySelectorUtil;
@@ -239,7 +240,9 @@ public class DataStream<OUT> {
 	/**
 	 * Groups the elements of a {@link DataStream} by the given key positions to
 	 * be used with grouped operators like
-	 * {@link GroupedDataStream#reduce(ReduceFunction)}
+	 * {@link GroupedDataStream#reduce(ReduceFunction)}</p> This operator also
+	 * affects the partitioning of the stream, by forcing values with the same
+	 * key to go to the same processing instance.
 	 * 
 	 * @param fields
 	 *            The position of the fields on which the {@link DataStream}
@@ -259,7 +262,9 @@ public class DataStream<OUT> {
 	 * is either the name of a public field or a getter method with parentheses
 	 * of the {@link DataStream}S underlying type. A dot can be used to drill
 	 * down into objects, as in {@code "field1.getInnerField2()" }. This method
-	 * returns an {@link GroupedDataStream}.
+	 * returns an {@link GroupedDataStream}.</p> This operator also affects the
+	 * partitioning of the stream, by forcing values with the same key to go to
+	 * the same processing instance.
 	 * 
 	 * @param fields
 	 *            One or more field expressions on which the DataStream will be
@@ -275,7 +280,10 @@ public class DataStream<OUT> {
 	/**
 	 * Groups the elements of a {@link DataStream} by the key extracted by the
 	 * {@link KeySelector} to be used with grouped operators like
-	 * {@link GroupedDataStream#reduce(ReduceFunction)}
+	 * {@link GroupedDataStream#reduce(ReduceFunction)}.
+	 * <p/>
+	 * This operator also affects the partitioning of the stream, by forcing
+	 * values with the same key to go to the same processing instance.
 	 * 
 	 * @param keySelector
 	 *            The {@link KeySelector} that will be used to extract keys for
@@ -293,42 +301,6 @@ public class DataStream<OUT> {
 
 	/**
 	 * Sets the partitioning of the {@link DataStream} so that the output is
-	 * partitioned by the selected fields. This setting only effects the how the
-	 * outputs will be distributed between the parallel instances of the next
-	 * processing operator.
-	 * 
-	 * @param fields
-	 *            The fields to partition by.
-	 * @return The DataStream with fields partitioning set.
-	 */
-	public DataStream<OUT> partitionBy(int... fields) {
-		if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo)
{
-			return partitionBy(new KeySelectorUtil.ArrayKeySelector<OUT>(fields));
-		} else {
-			return partitionBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
-		}
-	}
-
-	/**
-	 * Sets the partitioning of the {@link DataStream} so that the output is
-	 * partitioned by the given field expressions. This setting only effects the
-	 * how the outputs will be distributed between the parallel instances of the
-	 * next processing operator.
-	 * 
-	 * @param fields
-	 *            The fields expressions to partition by.
-	 * @return The DataStream with fields partitioning set.
-	 */
-	public DataStream<OUT> partitionBy(String... fields) {
-		return partitionBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
-	}
-
-	private DataStream<OUT> partitionBy(Keys<OUT> keys) {
-		return partitionBy(KeySelectorUtil.getSelectorForKeys(keys, getType()));
-	}
-
-	/**
-	 * Sets the partitioning of the {@link DataStream} so that the output is
 	 * partitioned using the given {@link KeySelector}. This setting only
 	 * effects the how the outputs will be distributed between the parallel
 	 * instances of the next processing operator.
@@ -336,7 +308,7 @@ public class DataStream<OUT> {
 	 * @param keySelector
 	 * @return
 	 */
-	public DataStream<OUT> partitionBy(KeySelector<OUT, ?> keySelector) {
+	protected DataStream<OUT> partitionBy(KeySelector<OUT, ?> keySelector) {
 		return setConnectionType(new FieldsPartitioner<OUT>(clean(keySelector)));
 	}
 
@@ -390,6 +362,18 @@ public class DataStream<OUT> {
 	}
 
 	/**
+	 * Sets the partitioning of the {@link DataStream} so that the output values
+	 * all go to the first instance of the next processing operator. Use this
+	 * setting with care since it might cause a serious performance bottleneck
+	 * in the application.
+	 * 
+	 * @return The DataStream with shuffle partitioning set.
+	 */
+	public DataStream<OUT> global() {
+		return setConnectionType(new GlobalPartitioner<OUT>());
+	}
+
+	/**
 	 * Initiates an iterative part of the program that feeds back data streams.
 	 * The iterative part needs to be closed by calling
 	 * {@link IterativeDataStream#closeWith(DataStream)}. The transformation of
@@ -1007,7 +991,8 @@ public class DataStream<OUT> {
 
 	protected <R> DataStream<OUT> addIterationSource(Integer iterationID, long waitTime)
{
 
-		DataStream<R> returnStream = new DataStreamSource<R>(environment, "iterationSource",
null, true);
+		DataStream<R> returnStream = new DataStreamSource<R>(environment, "iterationSource",
null,
+				true);
 
 		jobGraphBuilder.addIterationHead(returnStream.getId(), this.getId(), iterationID,
 				degreeOfParallelism, waitTime);

http://git-wip-us.apache.org/repos/asf/flink/blob/10a81862/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 4b6edc0..aa85579 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -24,7 +24,6 @@ import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
@@ -151,21 +150,6 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 	}
 
 	@SuppressWarnings("unchecked")
-	public SingleOutputStreamOperator<OUT, O> partitionBy(int... keypositions) {
-		return (SingleOutputStreamOperator<OUT, O>) super.partitionBy(keypositions);
-	}
-
-	@SuppressWarnings("unchecked")
-	public SingleOutputStreamOperator<OUT, O> partitionBy(String... fields) {
-		return (SingleOutputStreamOperator<OUT, O>) super.partitionBy(fields);
-	}
-
-	@SuppressWarnings("unchecked")
-	public SingleOutputStreamOperator<OUT, O> partitionBy(KeySelector<OUT, ?> keySelector)
{
-		return (SingleOutputStreamOperator<OUT, O>) super.partitionBy(keySelector);
-	}
-
-	@SuppressWarnings("unchecked")
 	public SingleOutputStreamOperator<OUT, O> broadcast() {
 		return (SingleOutputStreamOperator<OUT, O>) super.broadcast();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/10a81862/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
index 08aa5cb..1901475 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
@@ -71,7 +71,7 @@ public class TwitterStream {
 
 		DataStream<Tuple2<String, Integer>> tweets = streamSource
 		// selecting English tweets and splitting to words
-				.flatMap(new SelectEnglishAndTokenizeFlatMap()).partitionBy(0)
+				.flatMap(new SelectEnglishAndTokenizeFlatMap())
 				// returning (word, 1)
 				.map(new MapFunction<String, Tuple2<String, Integer>>() {
 					private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/10a81862/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
index 6f031c3..48783f2 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.examples.windowing;
 
 import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
@@ -43,7 +44,13 @@ public class MultiplePoliciesExample {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		DataStream<String> stream = env.addSource(new BasicSource())
-				.groupBy(0)
+				.groupBy(new KeySelector<String, String>(){
+					private static final long serialVersionUID = 1L;
+					@Override
+					public String getKey(String value) throws Exception {
+						return value;
+					}	
+				})
 				.window(Count.of(2))
 				.every(Count.of(3), Count.of(5))
 				.reduceGroup(new Concat());

http://git-wip-us.apache.org/repos/asf/flink/blob/10a81862/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index ffe91cb..dfaa316 100644
--- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -127,39 +127,6 @@ class DataStream[T](javaStream: JavaStream[T]) {
   }
 
   /**
-   * Sets the partitioning of the DataStream so that the output is
-   * partitioned by the selected fields. This setting only effects the how the outputs will
be
-   * distributed between the parallel instances of the next processing operator.
-   *
-   */
-  def partitionBy(fields: Int*): DataStream[T] =
-    javaStream.partitionBy(fields: _*)
-
-  /**
-   * Sets the partitioning of the DataStream so that the output is
-   * partitioned by the selected fields. This setting only effects the how the outputs will
be
-   * distributed between the parallel instances of the next processing operator.
-   *
-   */
-  def partitionBy(firstField: String, otherFields: String*): DataStream[T] =
-   javaStream.partitionBy(firstField +: otherFields.toArray: _*)
-
-  /**
-   * Sets the partitioning of the DataStream so that the output is
-   * partitioned by the given Key. This setting only effects the how the outputs will be
-   * distributed between the parallel instances of the next processing operator.
-   *
-   */
-  def partitionBy[K: TypeInformation](fun: T => K): DataStream[T] = {
-
-    val keyExtractor = new KeySelector[T, K] {
-      val cleanFun = clean(fun)
-      def getKey(in: T) = cleanFun(in)
-    }
-    javaStream.partitionBy(keyExtractor)
-  }
-
-  /**
    * Sets the partitioning of the DataStream so that the output tuples
    * are broadcasted to every parallel instance of the next component. This
    * setting only effects the how the outputs will be distributed between the
@@ -167,6 +134,13 @@ class DataStream[T](javaStream: JavaStream[T]) {
    *
    */
   def broadcast: DataStream[T] = javaStream.broadcast()
+  
+  /**
+   * Sets the partitioning of the DataStream so that the output values all go to 
+   * the first instance of the next processing operator. Use this setting with care
+   * since it might cause a serious performance bottlenect in the application.
+   */
+  def global: DataStream[T] = javaStream.global()
 
   /**
    * Sets the partitioning of the DataStream so that the output tuples


Mime
View raw message