flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [27/51] [abbrv] [streaming] Added support for simple types instead of Tuple1 in the API
Date Mon, 18 Aug 2014 17:26:04 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
index 2cea800..2a35de5 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
@@ -27,181 +27,180 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.flink.api.java.functions.MapFunction;
 import org.apache.flink.streaming.api.DataStream;
 import org.apache.flink.streaming.api.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.streaming.util.LogUtils;
-import org.junit.Test;
-import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.util.Collector;
 import org.apache.log4j.Level;
+import org.junit.Test;
 
 public class MapTest {
 
-	public static final class MySource extends SourceFunction<Tuple1<Integer>> {
+	public static final class MySource extends SourceFunction<Integer> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
+		public void invoke(Collector<Integer> collector) throws Exception {
 			for (int i = 0; i < 10; i++) {
-				collector.collect(new Tuple1<Integer>(i));
+				collector.collect(i);
 			}
 		}
 	}
-	
-	public static final class MySource1 extends SourceFunction<Tuple1<Integer>>
{
+
+	public static final class MySource1 extends SourceFunction<Integer> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
+		public void invoke(Collector<Integer> collector) throws Exception {
 			for (int i = 0; i < 5; i++) {
-				collector.collect(new Tuple1<Integer>(i));
+				collector.collect(i);
 			}
 		}
 	}
-	
-	public static final class MySource2 extends SourceFunction<Tuple1<Integer>>
{
+
+	public static final class MySource2 extends SourceFunction<Integer> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
+		public void invoke(Collector<Integer> collector) throws Exception {
 			for (int i = 5; i < 10; i++) {
-				collector.collect(new Tuple1<Integer>(i));
+				collector.collect(i);
 			}
 		}
 	}
-	
-	public static final class MySource3 extends SourceFunction<Tuple1<Integer>>
{
+
+	public static final class MySource3 extends SourceFunction<Integer> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
+		public void invoke(Collector<Integer> collector) throws Exception {
 			for (int i = 10; i < 15; i++) {
-				collector.collect(new Tuple1<Integer>(i));
+				collector.collect(new Integer(i));
 			}
 		}
 	}
 
-	public static final class MyMap extends MapFunction<Tuple1<Integer>, Tuple1<Integer>>
{
+	public static final class MyMap extends MapFunction<Integer, Integer> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception {
+		public Integer map(Integer value) throws Exception {
 			map++;
-			return new Tuple1<Integer>(value.f0 * value.f0);
+			return value * value;
 		}
 	}
-	
-	public static final class MySingleJoinMap extends MapFunction<Tuple1<Integer>,
Tuple1<Integer>> {
+
+	public static final class MySingleJoinMap extends MapFunction<Integer, Integer> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception {
-			singleJoinSetResult.add(value.f0);
-			return new Tuple1<Integer>(value.f0);
+		public Integer map(Integer value) throws Exception {
+			singleJoinSetResult.add(value);
+			return value;
 		}
 	}
-	
-	public static final class MyMultipleJoinMap extends MapFunction<Tuple1<Integer>,
Tuple1<Integer>> {
+
+	public static final class MyMultipleJoinMap extends MapFunction<Integer, Integer>
{
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception {
-			multipleJoinSetResult.add(value.f0);
-			return new Tuple1<Integer>(value.f0);
+		public Integer map(Integer value) throws Exception {
+			multipleJoinSetResult.add(value);
+			return value;
 		}
 	}
 
-	public static final class MyFieldsMap extends MapFunction<Tuple1<Integer>, Tuple1<Integer>>
{
+	public static final class MyFieldsMap extends MapFunction<Integer, Integer> {
 		private static final long serialVersionUID = 1L;
 
 		private int counter = 0;
 
 		@Override
-		public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception {
+		public Integer map(Integer value) throws Exception {
 			counter++;
 			if (counter == MAXSOURCE)
 				allInOne = true;
-			return new Tuple1<Integer>(value.f0 * value.f0);
+			return value * value;
 		}
 	}
-	
-	public static final class MyDiffFieldsMap extends MapFunction<Tuple1<Integer>,
Tuple1<Integer>> {
+
+	public static final class MyDiffFieldsMap extends MapFunction<Integer, Integer> {
 		private static final long serialVersionUID = 1L;
 
 		private int counter = 0;
 
 		@Override
-		public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception {
+		public Integer map(Integer value) throws Exception {
 			counter++;
 			if (counter > 3)
 				threeInAll = false;
-			return new Tuple1<Integer>(value.f0 * value.f0);
+			return value*value;
 		}
 	}
 
-	public static final class MySink extends SinkFunction<Tuple1<Integer>> {
+	public static final class MySink extends SinkFunction<Integer> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void invoke(Tuple1<Integer> tuple) {
-			result.add(tuple.f0);
+		public void invoke(Integer tuple) {
+			result.add(tuple);
 		}
 	}
 
-	public static final class MyBroadcastSink extends SinkFunction<Tuple1<Integer>>
{
+	public static final class MyBroadcastSink extends SinkFunction<Integer> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void invoke(Tuple1<Integer> tuple) {
+		public void invoke(Integer tuple) {
 			broadcastResult++;
 		}
 	}
 
-	public static final class MyShufflesSink extends SinkFunction<Tuple1<Integer>>
{
+	public static final class MyShufflesSink extends SinkFunction<Integer> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void invoke(Tuple1<Integer> tuple) {
+		public void invoke(Integer tuple) {
 			shuffleResult++;
 		}
 	}
 
-	public static final class MyFieldsSink extends SinkFunction<Tuple1<Integer>>
{
+	public static final class MyFieldsSink extends SinkFunction<Integer> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void invoke(Tuple1<Integer> tuple) {
+		public void invoke(Integer tuple) {
 			fieldsResult++;
 		}
 	}
-	
-	public static final class MyDiffFieldsSink extends SinkFunction<Tuple1<Integer>>
{
+
+	public static final class MyDiffFieldsSink extends SinkFunction<Integer> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void invoke(Tuple1<Integer> tuple) {
+		public void invoke(Integer tuple) {
 			diffFieldsResult++;
 		}
 	}
-	
-	public static final class MyGraphSink extends SinkFunction<Tuple1<Integer>>
{
+
+	public static final class MyGraphSink extends SinkFunction<Integer> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void invoke(Tuple1<Integer> tuple) {
+		public void invoke(Integer tuple) {
 			graphResult++;
 		}
 	}
-	
-	public static final class JoinSink extends SinkFunction<Tuple1<Integer>> {
+
+	public static final class JoinSink extends SinkFunction<Integer> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void invoke(Tuple1<Integer> tuple) {
+		public void invoke(Integer tuple) {
 		}
 	}
 
@@ -210,14 +209,14 @@ public class MapTest {
 	private static int broadcastResult = 0;
 	private static int shuffleResult = 0;
 	@SuppressWarnings("unused")
-  private static int fieldsResult = 0;
+	private static int fieldsResult = 0;
 	private static int diffFieldsResult = 0;
 	@SuppressWarnings("unused")
-  private static int graphResult = 0;
+	private static int graphResult = 0;
 	@SuppressWarnings("unused")
-  private static int map = 0;
+	private static int map = 0;
 	@SuppressWarnings("unused")
-  private static final int PARALLELISM = 1;
+	private static final int PARALLELISM = 1;
 	private static final long MEMORYSIZE = 32;
 	private static final int MAXSOURCE = 10;
 	private static boolean allInOne = false;
@@ -235,139 +234,110 @@ public class MapTest {
 			expected.add(i * i);
 		}
 	}
-	
+
 	private static void fillFromCollectionSet() {
-		if(fromCollectionSet.isEmpty()){
+		if (fromCollectionSet.isEmpty()) {
 			for (int i = 0; i < 10; i++) {
 				fromCollectionSet.add(i);
 			}
 		}
 	}
-	
+
 	private static void fillFromCollectionFieldsSet() {
-		if(fromCollectionFields.isEmpty()){
+		if (fromCollectionFields.isEmpty()) {
 			for (int i = 0; i < MAXSOURCE; i++) {
-				
+
 				fromCollectionFields.add(5);
 			}
 		}
 	}
-	
+
 	private static void fillFromCollectionDiffFieldsSet() {
-		if(fromCollectionDiffFieldsSet.isEmpty()){
+		if (fromCollectionDiffFieldsSet.isEmpty()) {
 			for (int i = 0; i < 9; i++) {
 				fromCollectionDiffFieldsSet.add(i);
 			}
 		}
 	}
-	
+
 	private static void fillSingleJoinSet() {
 		for (int i = 0; i < 10; i++) {
 			singleJoinSetExpected.add(i);
 		}
 	}
-	
+
 	private static void fillMultipleJoinSet() {
 		for (int i = 0; i < 15; i++) {
 			multipleJoinSetExpected.add(i);
 		}
 	}
 
-
 	@Test
 	public void mapTest() throws Exception {
 		LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
-		//mapTest
+		// mapTest
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(3);
 
 		fillFromCollectionSet();
-		
-		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream = env.fromCollection(fromCollectionSet)
-				.map(new MyMap()).addSink(new MySink());
 
+		@SuppressWarnings("unused")
+		DataStream<Integer> dataStream = env.fromCollection(fromCollectionSet).map(new MyMap())
+				.addSink(new MySink());
 
 		fillExpectedList();
-		
-	
-		//broadcastSinkTest
+
+		// broadcastSinkTest
 		fillFromCollectionSet();
-		
+
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream1 = env
-				.fromCollection(fromCollectionSet)
-				.broadcast()
-				.map(new MyMap())
-				.addSink(new MyBroadcastSink());
-		
-
-		//shuffleSinkTest
+		DataStream<Integer> dataStream1 = env.fromCollection(fromCollectionSet).broadcast()
+				.map(new MyMap()).addSink(new MyBroadcastSink());
+
+		// shuffleSinkTest
 		fillFromCollectionSet();
-		
+
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream2 = env
-				.fromCollection(fromCollectionSet)
-				.map(new MyMap()).setParallelism(3)
-				.addSink(new MyShufflesSink());
+		DataStream<Integer> dataStream2 = env.fromCollection(fromCollectionSet).map(new MyMap())
+				.setParallelism(3).addSink(new MyShufflesSink());
 
-		
-		//fieldsMapTest
+		// fieldsMapTest
 		fillFromCollectionFieldsSet();
-		
+
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream3 = env
-				.fromCollection(fromCollectionFields)
-				.partitionBy(0)
-				.map(new MyFieldsMap())
-				.addSink(new MyFieldsSink());
-
-		
-		//diffFieldsMapTest
+		DataStream<Integer> dataStream3 = env.fromCollection(fromCollectionFields).partitionBy(0)
+				.map(new MyFieldsMap()).addSink(new MyFieldsSink());
+
+		// diffFieldsMapTest
 		fillFromCollectionDiffFieldsSet();
-		
+
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream4 = env
-				.fromCollection(fromCollectionDiffFieldsSet)
-				.partitionBy(0)
-				.map(new MyDiffFieldsMap())
-				.addSink(new MyDiffFieldsSink());
-	
-		
-		//singleConnectWithTest
-		DataStream<Tuple1<Integer>> source1 = env.addSource(new MySource1(),
-				1);
-		
+		DataStream<Integer> dataStream4 = env.fromCollection(fromCollectionDiffFieldsSet)
+				.partitionBy(0).map(new MyDiffFieldsMap()).addSink(new MyDiffFieldsSink());
+
+		// singleConnectWithTest
+		DataStream<Integer> source1 = env.addSource(new MySource1(), 1);
+
 		@SuppressWarnings({ "unused", "unchecked" })
-		DataStream<Tuple1<Integer>> source2 = env
-				.addSource(new MySource2(), 1)
-				.connectWith(source1)
-				.partitionBy(0)
-				.map(new MySingleJoinMap()).setParallelism(1)
+		DataStream<Integer> source2 = env.addSource(new MySource2(), 1).connectWith(source1)
+				.partitionBy(0).map(new MySingleJoinMap()).setParallelism(1)
 				.addSink(new JoinSink());
 
-		
 		fillSingleJoinSet();
-		
-		
-		//multipleConnectWithTest
-		DataStream<Tuple1<Integer>> source3 = env.addSource(new MySource1(),
-				1);
-		
-		DataStream<Tuple1<Integer>> source4 = env.addSource(new MySource2(),
-				1);
-		
+
+		// multipleConnectWithTest
+		DataStream<Integer> source3 = env.addSource(new MySource1(), 1);
+
+		DataStream<Integer> source4 = env.addSource(new MySource2(), 1);
+
 		@SuppressWarnings({ "unused", "unchecked" })
-		DataStream<Tuple1<Integer>> source5 = env
-				.addSource(new MySource3(), 1)
-				.connectWith(source3, source4)
-				.partitionBy(0)
-				.map(new MyMultipleJoinMap()).setParallelism(1)
-				.addSink(new JoinSink());
+		DataStream<Integer> source5 = env.addSource(new MySource3(), 1)
+				.connectWith(source3, source4).partitionBy(0).map(new MyMultipleJoinMap())
+				.setParallelism(1).addSink(new JoinSink());
+
+		env.executeTest(MEMORYSIZE);
 
-		env.executeTest(MEMORYSIZE);		
-		
 		fillMultipleJoinSet();
-		
+
 		assertTrue(expected.equals(result));
 		assertEquals(30, broadcastResult);
 		assertEquals(10, shuffleResult);
@@ -376,7 +346,7 @@ public class MapTest {
 		assertEquals(9, diffFieldsResult);
 		assertEquals(singleJoinSetExpected, singleJoinSetResult);
 		assertEquals(multipleJoinSetExpected, multipleJoinSetResult);
-		
+
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/MockRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/MockRecordWriter.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/MockRecordWriter.java
index e99518c..463abc9 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/MockRecordWriter.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/MockRecordWriter.java
@@ -42,6 +42,6 @@ public class MockRecordWriter extends RecordWriter<SerializationDelegate<StreamR
 	
 	@Override
 	public void emit(SerializationDelegate<StreamRecord<Tuple1<Integer>>>
record) {
-		emittedRecords.add(record.getInstance().getTuple().f0);
+		emittedRecords.add(record.getInstance().getObject().f0);
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
index 716a869..01c2092 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
@@ -33,9 +33,9 @@ public class FieldsPartitionerTest {
 
 	private FieldsPartitioner<Tuple> fieldsPartitioner;
 	private StreamRecord<Tuple> streamRecord1 = new StreamRecord<Tuple>()
-			.setTuple(new Tuple2<String, Integer>("test", 0));
+			.setObject(new Tuple2<String, Integer>("test", 0));
 	private StreamRecord<Tuple> streamRecord2 = new StreamRecord<Tuple>()
-			.setTuple(new Tuple2<String, Integer>("test", 42));
+			.setObject(new Tuple2<String, Integer>("test", 42));
 	private SerializationDelegate<StreamRecord<Tuple>> sd1 = new SerializationDelegate<StreamRecord<Tuple>>(
 			null);
 	private SerializationDelegate<StreamRecord<Tuple>> sd2 = new SerializationDelegate<StreamRecord<Tuple>>(

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
index dceaf46..4ca191e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.streaming.util.serialization;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.api.java.functions.MapFunction;
@@ -50,15 +52,20 @@ public class TypeSerializationTest {
 		TypeSerializerWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>> ser2 =
(TypeSerializerWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>>) SerializationUtils
 				.deserialize(serializedType);
 
-		assertNotNull(ser.getInputTupleTypeInfo1());
-		assertNotNull(ser2.getInputTupleTypeInfo1());
-		
-		assertNotNull(ser.getOutputTupleTypeInfo());
-		assertNotNull(ser2.getOutputTupleTypeInfo());
-		
-		assertEquals(ser.getInputTupleTypeInfo1(), ser2.getInputTupleTypeInfo1());
-		assertEquals(ser.getInputTupleTypeInfo2(), ser2.getInputTupleTypeInfo2());
-		assertEquals(ser.getOutputTupleTypeInfo(), ser2.getOutputTupleTypeInfo());
+		assertNotNull(ser.getInputTypeInfo1());
+		assertNotNull(ser2.getInputTypeInfo1());
+
+		assertNotNull(ser.getOutputTypeInfo());
+		assertNotNull(ser2.getOutputTypeInfo());
+
+		assertEquals(ser.getInputTypeInfo1(), ser2.getInputTypeInfo1());
+		try {
+			ser.getInputTypeInfo2();
+			fail();
+		} catch (RuntimeException e) {
+			assertTrue(true);
+		}
+		assertEquals(ser.getOutputTypeInfo(), ser2.getOutputTypeInfo());
 	}
 
 	@SuppressWarnings("unchecked")
@@ -68,24 +75,29 @@ public class TypeSerializationTest {
 		Integer instance2 = null;
 		Integer instance3 = new Integer(34);
 
-		TypeSerializerWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>> ser =
new ObjectTypeWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>>(
+		TypeSerializerWrapper<Integer, Integer, Integer> ser = new ObjectTypeWrapper<Integer,
Integer, Integer>(
 				instance1, instance2, instance3);
 
-		System.out.println(ser.getInputTupleTypeInfo1());
+		// System.out.println(ser.getInputTupleTypeInfo1());
 
 		byte[] serializedType = SerializationUtils.serialize(ser);
 
 		TypeSerializerWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>> ser2 =
(TypeSerializerWrapper<Tuple1<Integer>, Tuple, Tuple1<Integer>>) SerializationUtils
 				.deserialize(serializedType);
 
-		assertNotNull(ser.getInputTupleTypeInfo1());
-		assertNotNull(ser2.getInputTupleTypeInfo1());
+		assertNotNull(ser.getInputTypeInfo1());
+		assertNotNull(ser2.getInputTypeInfo1());
 
-		assertNotNull(ser.getOutputTupleTypeInfo());
-		assertNotNull(ser2.getOutputTupleTypeInfo());
-		
-		assertEquals(ser.getInputTupleTypeInfo1(), ser2.getInputTupleTypeInfo1());
-		assertEquals(ser.getInputTupleTypeInfo2(), ser2.getInputTupleTypeInfo2());
-		assertEquals(ser.getOutputTupleTypeInfo(), ser2.getOutputTupleTypeInfo());
+		assertNotNull(ser.getOutputTypeInfo());
+		assertNotNull(ser2.getOutputTypeInfo());
+
+		assertEquals(ser.getInputTypeInfo1(), ser2.getInputTypeInfo1());
+		try {
+			ser.getInputTypeInfo2();
+			fail();
+		} catch (RuntimeException e) {
+			assertTrue(true);
+		}
+		assertEquals(ser.getOutputTypeInfo(), ser2.getOutputTypeInfo());
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountCounter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountCounter.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountCounter.java
index 42c0115..53c23d6 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountCounter.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountCounter.java
@@ -23,10 +23,9 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 
-public class WordCountCounter extends MapFunction<Tuple1<String>, Tuple2<String,
Integer>> {
+public class WordCountCounter extends MapFunction<String, Tuple2<String, Integer>>
{
 	private static final long serialVersionUID = 1L;
 
 	private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
@@ -37,8 +36,8 @@ public class WordCountCounter extends MapFunction<Tuple1<String>,
Tuple2<String,
 	
 	// Increments the counter of the occurrence of the input word
 	@Override
-	public Tuple2<String, Integer> map(Tuple1<String> inTuple) throws Exception
{
-		word = inTuple.f0;
+	public Tuple2<String, Integer> map(String inTuple) throws Exception {
+		word = inTuple;
 
 		if (wordCounts.containsKey(word)) {
 			count = wordCounts.get(word) + 1;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
index 7d13a5c..f77ab37 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
@@ -32,7 +32,7 @@ public class WordCountLocal {
 	public static void main(String[] args) {
 
 		TestDataUtil.downloadIfNotExists("hamlet.txt");
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
 		DataStream<Tuple2<String, Integer>> dataStream = env
 				.readTextFile("src/test/resources/testdata/hamlet.txt")

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountSplitter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountSplitter.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountSplitter.java
index afe25ce..f3e9310 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountSplitter.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountSplitter.java
@@ -20,21 +20,17 @@
 package org.apache.flink.streaming.examples.wordcount;
 
 import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.util.Collector;
 
-public class WordCountSplitter extends FlatMapFunction<Tuple1<String>, Tuple1<String>>
{
+public class WordCountSplitter extends FlatMapFunction<String, String> {
 	private static final long serialVersionUID = 1L;
 
-	private Tuple1<String> outTuple = new Tuple1<String>();
-
 	// Splits the lines according on spaces
 	@Override
-	public void flatMap(Tuple1<String> inTuple, Collector<Tuple1<String>>
out) throws Exception {
+	public void flatMap(String inTuple, Collector<String> out) throws Exception {
 		
-		for (String word : inTuple.f0.split(" ")) {
-			outTuple.f0 = word;
-			out.collect(outTuple);
+		for (String word : inTuple.split(" ")) {
+			out.collect(word);
 		}
 	}
 }
\ No newline at end of file


Mime
View raw message