flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [04/28] git commit: [streaming] Examples updated with CoFunctions
Date Fri, 29 Aug 2014 19:03:37 GMT
[streaming] Examples updated with CoFunctions


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/35835131
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/35835131
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/35835131

Branch: refs/heads/master
Commit: 35835131347ff69759282d5461ed2f62ec1eab14
Parents: c3ec1e1
Author: mbalassi <balassi.marton@gmail.com>
Authored: Fri Aug 22 14:14:00 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Aug 29 21:01:56 2014 +0200

----------------------------------------------------------------------
 .../connectors/twitter/TwitterLocal.java        |   2 +-
 .../api/function/co/RichCoFlatMapFunction.java  |  40 ++++++
 .../api/function/co/RichCoReduceFunction.java   |  40 ++++++
 .../examples/basictopology/BasicTopology.java   |  32 +++--
 .../examples/cellinfo/CellInfoLocal.java        | 122 +++++++++----------
 .../streaming/examples/join/GradeSource.java    |  45 +++++++
 .../streaming/examples/join/JoinLocal.java      |  27 ++--
 .../flink/streaming/examples/join/JoinSink.java |  35 ------
 .../streaming/examples/join/JoinSourceOne.java  |  46 -------
 .../streaming/examples/join/JoinSourceTwo.java  |  46 -------
 .../flink/streaming/examples/join/JoinTask.java |  76 +++++++-----
 .../streaming/examples/join/SalarySource.java   |  45 +++++++
 .../examples/window/join/WindowJoinLocal.java   |   1 +
 .../examples/wordcount/WordCountCounter.java    |  32 -----
 .../examples/wordcount/WordCountLocal.java      |  34 +++++-
 .../examples/wordcount/WordCountSplitter.java   |  35 ------
 16 files changed, 332 insertions(+), 326 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35835131/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
index 941bad8..53445cb 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
@@ -25,7 +25,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.examples.function.JSONParseFlatMap;
-import org.apache.flink.streaming.examples.wordcount.WordCountCounter;
+import org.apache.flink.streaming.examples.wordcount.WordCountLocal.WordCountCounter;
 import org.apache.flink.util.Collector;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35835131/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoFlatMapFunction.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoFlatMapFunction.java
new file mode 100644
index 0000000..80363d3
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoFlatMapFunction.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.co;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+
+/**
+ * A RichCoFlatMapFunction represents a FlatMap transformation with two different input
+ * types. In addition to that the user can use the features provided by the
+ * {@link RichFunction} interface.
+ *
+ * @param <IN1>
+ *            Type of the first input.
+ * @param <IN2>
+ *            Type of the second input.
+ * @param <OUT>
+ *            Output type.
+ */
+public abstract class RichCoFlatMapFunction<IN1, IN2, OUT> extends AbstractRichFunction
implements
+		CoFlatMapFunction<IN1, IN2, OUT> {
+
+	private static final long serialVersionUID = 1L;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35835131/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoReduceFunction.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoReduceFunction.java
new file mode 100644
index 0000000..1eb677d
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoReduceFunction.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.co;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+
+/**
+ * A RichCoReduceFunction represents a Reduce transformation with two different input
+ * types. In addition to that the user can use the features provided by the
+ * {@link RichFunction} interface.
+ *
+ * @param <IN1>
+ *            Type of the first input.
+ * @param <IN2>
+ *            Type of the second input.
+ * @param <OUT>
+ *            Output type.
+ */
+public abstract class RichCoReduceFunction<IN1, IN2, OUT> extends AbstractRichFunction
implements
+		CoReduceFunction<IN1, IN2, OUT> {
+
+	private static final long serialVersionUID = 1L;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35835131/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
index 5fd803b..7d8a49c 100755
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
@@ -17,35 +17,33 @@
 
 package org.apache.flink.streaming.examples.basictopology;
 
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.util.Collector;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.util.Collector;
 
 public class BasicTopology {
 
-	public static class BasicSource implements SourceFunction<Tuple1<String>> {
+	public static class BasicSource implements SourceFunction<String> {
 
 		private static final long serialVersionUID = 1L;
-		Tuple1<String> tuple = new Tuple1<String>("streaming");
+		String str = new String("streaming");
 
 		@Override
-		public void invoke(Collector<Tuple1<String>> out) throws Exception {
-			// continuously emit a tuple
+		public void invoke(Collector<String> out) throws Exception {
+			// continuous emit
 			while (true) {
-				out.collect(tuple);
+				out.collect(str);
 			}
 		}
 	}
 
-	public static class BasicMap implements MapFunction<Tuple1<String>, Tuple1<String>>
{
+	public static class IdentityMap implements MapFunction<String, String> {
 		private static final long serialVersionUID = 1L;
-
-		// map to the same tuple
+		// map to the same value
 		@Override
-		public Tuple1<String> map(Tuple1<String> value) throws Exception {
+		public String map(String value) throws Exception {
 			return value;
 		}
 
@@ -58,8 +56,8 @@ public class BasicTopology {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment
 				.createLocalEnvironment(PARALLELISM);
 
-		DataStream<Tuple1<String>> stream = env.addSource(new BasicSource(), SOURCE_PARALLELISM)
-				.map(new BasicMap());
+		DataStream<String> stream = env.addSource(new BasicSource(), SOURCE_PARALLELISM)
+				.map(new IdentityMap());
 
 		stream.print();
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35835131/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java
index 910070c..69b933c 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java
@@ -19,109 +19,103 @@ package org.apache.flink.streaming.examples.cellinfo;
 
 import java.util.Random;
 
-import org.apache.flink.api.java.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.co.RichCoMapFunction;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.util.Collector;
 
 public class CellInfoLocal {
 
-	private static Random rand = new Random();
 	private final static int CELL_COUNT = 10;
 	private final static int LAST_MILLIS = 1000;
 	private final static int PARALLELISM = 1;
 	private final static int SOURCE_PARALLELISM = 1;
+	private final static int QUERY_SLEEP_TIME = 1000;
+	private final static int QUERY_COUNT = 10;
+	private final static int INFO_SLEEP_TIME = 100;
+	private final static int INFO_COUNT = 100;
 
-	private final static class QuerySource implements
-			SourceFunction<Tuple4<Boolean, Integer, Long, Integer>> {
+	public final static class InfoSource implements SourceFunction<Tuple3<Integer, Long,
Integer>> {
 		private static final long serialVersionUID = 1L;
 
-		Tuple4<Boolean, Integer, Long, Integer> tuple = new Tuple4<Boolean, Integer, Long,
Integer>(
-				true, 0, 0L, LAST_MILLIS);
+		private static Random rand = new Random();
+		private Tuple3<Integer, Long, Integer> tuple = new Tuple3<Integer, Long, Integer>(0,
0L, 0);
 
 		@Override
-		public void invoke(Collector<Tuple4<Boolean, Integer, Long, Integer>> collector)
-				throws Exception {
-			for (int i = 0; i < 100; i++) {
-				Thread.sleep(1000);
-				tuple.f1 = rand.nextInt(CELL_COUNT);
-				tuple.f2 = System.currentTimeMillis();
-				collector.collect(tuple);
+		public void invoke(Collector<Tuple3<Integer, Long, Integer>> out) throws Exception
{
+			for (int i = 0; i < INFO_COUNT; i++) {
+				Thread.sleep(INFO_SLEEP_TIME);
+				tuple.f0 = rand.nextInt(CELL_COUNT);
+				tuple.f1 = System.currentTimeMillis();
+
+				out.collect(tuple);
 			}
 		}
 	}
 
-	public final static class InfoSource implements
-			SourceFunction<Tuple4<Boolean, Integer, Long, Integer>> {
+	private final static class QuerySource implements
+			SourceFunction<Tuple3<Integer, Long, Integer>> {
 		private static final long serialVersionUID = 1L;
 
-		private Tuple4<Boolean, Integer, Long, Integer> tuple = new Tuple4<Boolean, Integer,
Long, Integer>(
-				false, 0, 0L, 0);
+		private static Random rand = new Random();
+		Tuple3<Integer, Long, Integer> tuple = new Tuple3<Integer, Long, Integer>(0,
0L,
+				LAST_MILLIS);
 
 		@Override
-		public void invoke(Collector<Tuple4<Boolean, Integer, Long, Integer>> out)
throws Exception {
-			for (int i = 0; i < 1000; i++) {
-				Thread.sleep(100);
-
-				tuple.f1 = rand.nextInt(CELL_COUNT);
-				tuple.f2 = System.currentTimeMillis();
-
-				out.collect(tuple);
+		public void invoke(Collector<Tuple3<Integer, Long, Integer>> collector) throws
Exception {
+			for (int i = 0; i < QUERY_COUNT; i++) {
+				Thread.sleep(QUERY_SLEEP_TIME);
+				tuple.f0 = rand.nextInt(CELL_COUNT);
+				tuple.f1 = System.currentTimeMillis();
+				collector.collect(tuple);
 			}
 		}
 	}
 
-	private final static class CellTask extends
-			RichFlatMapFunction<Tuple4<Boolean, Integer, Long, Integer>, Tuple1<String>>
{
+	private final static class CellTask	extends
+			RichCoMapFunction<Tuple3<Integer, Long, Integer>, Tuple3<Integer, Long, Integer>,
String> {
 		private static final long serialVersionUID = 1L;
 
-		private WorkerEngineExact engine = new WorkerEngineExact(10, 500,
-				System.currentTimeMillis());
-		Integer cellID;
-		Long timeStamp;
-		Integer lastMillis;
+		private WorkerEngineExact engine;
+		private Integer cellID;
+		private Long timeStamp;
+		private Integer lastMillis;
 
-		Tuple1<String> outTuple = new Tuple1<String>();
+		public CellTask() {
+			engine = new WorkerEngineExact(CELL_COUNT, LAST_MILLIS, System.currentTimeMillis());
+		}
 
-		// write information to String tuple based on the input tuple
+		// INFO
 		@Override
-		public void flatMap(Tuple4<Boolean, Integer, Long, Integer> value,
-				Collector<Tuple1<String>> out) throws Exception {
-			cellID = value.f1;
-			timeStamp = value.f2;
-
-			// QUERY
-			if (value.f0) {
-				lastMillis = value.f3;
-				outTuple.f0 = "QUERY:\t" + cellID + ": "
-						+ engine.get(timeStamp, lastMillis, cellID);
-				out.collect(outTuple);
-			}
-			// INFO
-			else {
-				engine.put(cellID, timeStamp);
-				outTuple.f0 = "INFO:\t" + cellID + " @ " + timeStamp;
-				out.collect(outTuple);
-			}
+		public String map1(Tuple3<Integer, Long, Integer> value) {
+			cellID = value.f0;
+			timeStamp = value.f1;
+			engine.put(cellID, timeStamp);
+			return "INFO:\t" + cellID + " @ " + timeStamp;
+		}
+
+		// QUERY
+		@Override
+		public String map2(Tuple3<Integer, Long, Integer> value) {
+			cellID = value.f0;
+			timeStamp = value.f1;
+			lastMillis = value.f2;
+			return "QUERY:\t" + cellID + ": " + engine.get(timeStamp, lastMillis, cellID);
 		}
 	}
 
-	// In this example two different source then connect the two stream and
-	// apply a function for the connected stream
-	// TODO add arguments
-	@SuppressWarnings("unchecked")
+	// Example for connecting data streams
 	public static void main(String[] args) {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.createLocalEnvironment(PARALLELISM).setBufferTimeout(100);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(
+				PARALLELISM).setBufferTimeout(100);
 
-		DataStream<Tuple4<Boolean, Integer, Long, Integer>> querySource = env.addSource(
-				new QuerySource(), SOURCE_PARALLELISM);
+		DataStream<Tuple3<Integer, Long, Integer>> querySource = env.addSource(new
QuerySource(),
+				SOURCE_PARALLELISM).partitionBy(0);
 
-		DataStream<Tuple1<String>> stream = env.addSource(new InfoSource(), SOURCE_PARALLELISM)
-				.merge(querySource).partitionBy(1).flatMap(new CellTask());
+		DataStream<String> stream = env.addSource(new InfoSource(), SOURCE_PARALLELISM)
+				.partitionBy(0).connect(querySource).map(new CellTask());
 		stream.print();
 
 		env.execute();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35835131/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/GradeSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/GradeSource.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/GradeSource.java
new file mode 100644
index 0000000..fe5c949
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/GradeSource.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.join;
+
+import java.util.Random;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.util.Collector;
+
+public class GradeSource implements SourceFunction<Tuple2<String, Integer>> {
+
+	private static final long serialVersionUID = -5897483980082089771L;
+
+	private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
+			"andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
+	private Random rand = new Random();
+	private Tuple2<String, Integer> outTuple = new Tuple2<String, Integer>();
+
+	@Override
+	public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception
{
+		// Continuously emit tuples with random names and integers (grades).
+		while (true) {
+			outTuple.f0 = names[rand.nextInt(names.length)];
+			outTuple.f1 = rand.nextInt(5) + 1;
+
+			out.collect(outTuple);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35835131/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinLocal.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinLocal.java
index a7a776a..472c8c3 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinLocal.java
@@ -17,11 +17,10 @@
 
 package org.apache.flink.streaming.examples.join;
 
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.LogUtils;
-import org.apache.log4j.Level;
-import org.apache.flink.api.java.tuple.Tuple3;
 
 public class JoinLocal {
 
@@ -31,24 +30,24 @@ public class JoinLocal {
 	// This example will join two streams. One which emits people's grades and
 	// one which emits people's salaries.
 
-	@SuppressWarnings("unchecked")
 	public static void main(String[] args) {
 
-		LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.createLocalEnvironment(PARALLELISM).setBufferTimeout(100);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(
+				PARALLELISM).setBufferTimeout(100);
 
-		DataStream<Tuple3<String, String, Integer>> source1 = env.addSource(new JoinSourceOne(),
+		DataStream<Tuple2<String, Integer>> grades = env.addSource(new GradeSource(),
+				SOURCE_PARALLELISM);
+		
+		DataStream<Tuple2<String, Integer>> salaries = env.addSource(new SalarySource(),
 				SOURCE_PARALLELISM);
 
-		@SuppressWarnings("unused")
-		DataStream<Tuple3<String, Integer, Integer>> source2 = env
-				.addSource(new JoinSourceTwo(), SOURCE_PARALLELISM).merge(source1)
-				.partitionBy(1).flatMap(new JoinTask()).addSink(new JoinSink());
+		DataStream<Tuple3<String, Integer, Integer>> joinedStream = grades.connect(salaries)
+				.flatMap(new JoinTask());
+		
+		System.out.println("(NAME, GRADE, SALARY)");
+		joinedStream.print();
 
 		env.execute();
 
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35835131/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinSink.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinSink.java
deleted file mode 100644
index 9b0ef07..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinSink.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.join;
-
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.api.java.tuple.Tuple3;
-
-public class JoinSink implements SinkFunction<Tuple3<String, Integer, Integer>>
{
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void invoke(Tuple3<String, Integer, Integer> tuple) {
-		System.out.println("received record...");
-		System.out.println("============================================");
-
-		System.out.println("name=" + tuple.f0 + ", grade=" + tuple.f1 + ", salary=" + tuple.f2);
-
-		System.out.println("============================================");
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35835131/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinSourceOne.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinSourceOne.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinSourceOne.java
deleted file mode 100644
index 28f665b..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinSourceOne.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.join;
-
-import java.util.Random;
-
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.util.Collector;
-
-public class JoinSourceOne implements SourceFunction<Tuple3<String, String, Integer>>
{
-
-	private static final long serialVersionUID = 6670933703432267728L;
-
-	private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
-			"andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
-	private Random rand = new Random();
-	private Tuple3<String, String, Integer> outTuple = new Tuple3<String, String, Integer>();
-
-	@Override
-	public void invoke(Collector<Tuple3<String, String, Integer>> out) throws Exception
{
-		// Continuously emit tuples with random names and integers (salaries).
-		while (true) {
-
-			outTuple.f0 = "salary";
-			outTuple.f1 = names[rand.nextInt(names.length)];
-			outTuple.f2 = rand.nextInt(10000);
-			out.collect(outTuple);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35835131/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinSourceTwo.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinSourceTwo.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinSourceTwo.java
deleted file mode 100644
index fa7f242..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinSourceTwo.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.join;
-
-import java.util.Random;
-
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.util.Collector;
-
-public class JoinSourceTwo implements SourceFunction<Tuple3<String, String, Integer>>
{
-
-	private static final long serialVersionUID = -5897483980082089771L;
-
-	private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
-			"andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
-	private Random rand = new Random();
-	private Tuple3<String, String, Integer> outTuple = new Tuple3<String, String, Integer>();
-
-	@Override
-	public void invoke(Collector<Tuple3<String, String, Integer>> out) throws Exception
{
-		// Continuously emit tuples with random names and integers (grades).
-		while (true) {
-			outTuple.f0 = "grade";
-			outTuple.f1 = names[rand.nextInt(names.length)];
-			outTuple.f2 = rand.nextInt(5) + 1;
-
-			out.collect(outTuple);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35835131/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinTask.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinTask.java
index 3c2f3a7..f0ee71e 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinTask.java
@@ -20,55 +20,65 @@ package org.apache.flink.streaming.examples.join;
 import java.util.ArrayList;
 import java.util.HashMap;
 
-import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.function.co.RichCoFlatMapFunction;
 import org.apache.flink.util.Collector;
 
+//Joins the input value with the already known values. If it is a grade
+// then with the salaries, if it is a salary then with the grades. Also
+// stores the new element.
 public class JoinTask extends
-		RichFlatMapFunction<Tuple3<String, String, Integer>, Tuple3<String, Integer,
Integer>> {
-	private static final long serialVersionUID = 749913336259789039L;
+		RichCoFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>,
Tuple3<String, Integer, Integer>> {
+	private static final long serialVersionUID = 1L;
 
 	private HashMap<String, ArrayList<Integer>> gradeHashmap;
 	private HashMap<String, ArrayList<Integer>> salaryHashmap;
+	private String name;
 
 	public JoinTask() {
 		gradeHashmap = new HashMap<String, ArrayList<Integer>>();
 		salaryHashmap = new HashMap<String, ArrayList<Integer>>();
+		name = new String();
 	}
 
-	@Override
-	public void flatMap(Tuple3<String, String, Integer> value,
-			Collector<Tuple3<String, Integer, Integer>> out) throws Exception {
-		String streamId = value.f0;
-		String name = value.f1;
+	Tuple3<String, Integer, Integer> outputTuple = new Tuple3<String, Integer, Integer>();
 
-		// Joins the input value with the already known values. If it is a grade
-		// then with the salaries, if it is a salary then with the grades. Also
-		// stores the new element.
-		if (streamId.equals("grade")) {
-			if (salaryHashmap.containsKey(name)) {
-				for (Integer salary : salaryHashmap.get(name)) {
-					Tuple3<String, Integer, Integer> outputTuple = new Tuple3<String, Integer,
Integer>(
-							name, value.f2, salary);
-					out.collect(outputTuple);
-				}
-			}
-			if (!gradeHashmap.containsKey(name)) {
-				gradeHashmap.put(name, new ArrayList<Integer>());
-			}
-			gradeHashmap.get(name).add(value.f2);
-		} else {
-			if (gradeHashmap.containsKey(name)) {
-				for (Integer grade : gradeHashmap.get(name)) {
-					Tuple3<String, Integer, Integer> outputTuple = new Tuple3<String, Integer,
Integer>(
-							name, grade, value.f2);
-					out.collect(outputTuple);
-				}
+	// GRADES
+	@Override
+	public void flatMap1(Tuple2<String, Integer> value,
+			Collector<Tuple3<String, Integer, Integer>> out) {
+		name = value.f0;
+		outputTuple.f0 = name;
+		outputTuple.f1 = value.f1;
+		if (salaryHashmap.containsKey(name)) {
+			for (Integer salary : salaryHashmap.get(name)) {
+				outputTuple.f2 = salary;
+				out.collect(outputTuple);
 			}
-			if (!salaryHashmap.containsKey(name)) {
-				salaryHashmap.put(name, new ArrayList<Integer>());
+		}
+		if (!gradeHashmap.containsKey(name)) {
+			gradeHashmap.put(name, new ArrayList<Integer>());
+		}
+		gradeHashmap.get(name).add(value.f1);
+	}
+
+	// SALARIES
+	@Override
+	public void flatMap2(Tuple2<String, Integer> value,
+			Collector<Tuple3<String, Integer, Integer>> out) {
+		name = value.f0;
+		outputTuple.f0 = name;
+		outputTuple.f2 = value.f1;
+		if (gradeHashmap.containsKey(name)) {
+			for (Integer grade : gradeHashmap.get(name)) {
+				outputTuple.f1 = grade;
+				out.collect(outputTuple);
 			}
-			salaryHashmap.get(name).add(value.f2);
 		}
+		if (!salaryHashmap.containsKey(name)) {
+			salaryHashmap.put(name, new ArrayList<Integer>());
+		}
+		salaryHashmap.get(name).add(value.f1);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35835131/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/SalarySource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/SalarySource.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/SalarySource.java
new file mode 100644
index 0000000..31a1e41
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/SalarySource.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.join;
+
+import java.util.Random;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.util.Collector;
+
+public class SalarySource implements SourceFunction<Tuple2<String, Integer>>
{
+
+	private static final long serialVersionUID = 1L;
+
+	private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
+			"andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
+	private Random rand = new Random();
+	private Tuple2<String, Integer> outTuple = new Tuple2<String, Integer>();
+
+	@Override
+	public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception
{
+		// Continuously emit tuples with random names and integers (salaries).
+		while (true) {
+
+			outTuple.f0 = names[rand.nextInt(names.length)];
+			outTuple.f1 = rand.nextInt(10000);
+			out.collect(outTuple);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35835131/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
index a2a3885..e9cef1e 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
@@ -32,6 +32,7 @@ public class WindowJoinLocal {
 	// This example will join two streams with a sliding window. One which emits
 	// people's grades and one which emits people's salaries.
 
+	//TODO update and reconsider
 	public static void main(String[] args) {
 
 		LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35835131/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountCounter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountCounter.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountCounter.java
deleted file mode 100644
index f3015ff..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountCounter.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.wordcount;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-public class WordCountCounter implements ReduceFunction<Tuple2<String, Integer>>
{
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
-			Tuple2<String, Integer> value2) throws Exception {
-		return new Tuple2<String, Integer>(value1.f0, value1.f1 + value2.f1);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35835131/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
index a31edd3..cea90b0 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
@@ -17,26 +17,54 @@
 
 package org.apache.flink.streaming.examples.wordcount;
 
+import java.util.StringTokenizer;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.TestDataUtil;
+import org.apache.flink.util.Collector;
+
 
+// This example will count the occurrence of each word in the input file.
 public class WordCountLocal {
 
-	// This example will count the occurrence of each word in the input file.
+	public static class WordCountSplitter implements FlatMapFunction<String, Tuple2<String,
Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap(String inTuple, Collector<Tuple2<String, Integer>> out)
throws Exception {
+			StringTokenizer tokenizer = new StringTokenizer(inTuple);
+			while (tokenizer.hasMoreTokens()) {
+				out.collect(new Tuple2<String, Integer>(tokenizer.nextToken(), 1));
+			}
+		}
+	}
+	
+	public static class WordCountCounter implements ReduceFunction<Tuple2<String, Integer>>
{
+		private static final long serialVersionUID = 1L;
 
+		@Override
+		public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
+				Tuple2<String, Integer> value2) throws Exception {
+			return new Tuple2<String, Integer>(value1.f0, value1.f1 + value2.f1);
+		}
+
+	}
+	
 	public static void main(String[] args) {
 
 		TestDataUtil.downloadIfNotExists("hamlet.txt");
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
 
 		DataStream<Tuple2<String, Integer>> dataStream = env
 				.readTextFile("src/test/resources/testdata/hamlet.txt")
 				.flatMap(new WordCountSplitter())
 				.groupBy(0)
 				.reduce(new WordCountCounter());
-
+		
 		dataStream.print();
 
 		env.execute();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/35835131/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountSplitter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountSplitter.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountSplitter.java
deleted file mode 100644
index 222fe03..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountSplitter.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.wordcount;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-
-public class WordCountSplitter implements FlatMapFunction<String, Tuple2<String, Integer>>
{
-	private static final long serialVersionUID = 1L;
-
-	// Splits the lines according on spaces
-	@Override
-	public void flatMap(String inTuple, Collector<Tuple2<String, Integer>> out)
throws Exception {
-
-		for (String word : inTuple.split(" ")) {
-			out.collect(new Tuple2<String, Integer>(word, 1));
-		}
-	}
-}
\ No newline at end of file


Mime
View raw message