flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [28/51] [abbrv] [streaming] Added support for simple types instead of Tuple1 in the API
Date Mon, 18 Aug 2014 17:26:05 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserTaskInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserTaskInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserTaskInvokable.java
index a42b0bb..0a88efd 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserTaskInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserTaskInvokable.java
@@ -21,9 +21,7 @@ package org.apache.flink.streaming.api.invokable;
 
 import java.io.Serializable;
 
-import org.apache.flink.api.java.tuple.Tuple;
-
-public abstract class UserTaskInvokable<IN extends Tuple, OUT extends Tuple> extends
+public abstract class UserTaskInvokable<IN, OUT> extends
 		StreamRecordInvokable<IN, OUT> implements Serializable {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
index 4aa540c..46e79de 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
@@ -23,9 +23,8 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple;
 
-public class BatchReduceInvokable<IN extends Tuple, OUT extends Tuple> extends
+public class BatchReduceInvokable<IN, OUT> extends
 		StreamReduceInvokable<IN, OUT> {
 	private static final long serialVersionUID = 1L;
 	private int batchSize;
@@ -53,7 +52,7 @@ public class BatchReduceInvokable<IN extends Tuple, OUT extends Tuple> extends
 					}
 				}
 				counter++;
-				tupleBatch.add(reuse.getTuple());
+				tupleBatch.add(reuse.getObject());
 				resetReuse();
 			} while (counter < batchSize);
 			reducer.reduce(tupleBatch.iterator(), collector);
@@ -99,7 +98,7 @@ public class BatchReduceInvokable<IN extends Tuple, OUT extends Tuple> extends
 			if (hasNext()) {
 				counter++;
 				loadedNext = false;
-				return reuse.getTuple();
+				return reuse.getObject();
 			} else {
 				counter++;
 				loadedNext = false;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
index edeb79a..f2b2930 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
@@ -20,10 +20,9 @@
 package org.apache.flink.streaming.api.invokable.operator;
 
 import org.apache.flink.api.java.functions.FilterFunction;
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
 
-public class FilterInvokable<IN extends Tuple> extends UserTaskInvokable<IN, IN> {
+public class FilterInvokable<IN> extends UserTaskInvokable<IN, IN> {
 
 	private static final long serialVersionUID = 1L;
 
@@ -35,9 +34,9 @@ public class FilterInvokable<IN extends Tuple> extends UserTaskInvokable<IN, IN>
 
 	@Override
 	protected void immutableInvoke() throws Exception {
-		while (recordIterator.next(reuse) != null) {
-			if (filterFunction.filter(reuse.getTuple())) {
-				collector.collect(reuse.getTuple());
+		while ((reuse = recordIterator.next(reuse)) != null) {
+			if (filterFunction.filter(reuse.getObject())) {
+				collector.collect(reuse.getObject());
 			}
 			resetReuse();
 		}
@@ -45,9 +44,9 @@ public class FilterInvokable<IN extends Tuple> extends UserTaskInvokable<IN, IN>
 
 	@Override
 	protected void mutableInvoke() throws Exception {
-		while (recordIterator.next(reuse) != null) {
-			if (filterFunction.filter(reuse.getTuple())) {
-				collector.collect(reuse.getTuple());
+		while ((reuse = recordIterator.next(reuse)) != null) {
+			if (filterFunction.filter(reuse.getObject())) {
+				collector.collect(reuse.getObject());
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
index 279b160..11e7853 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
@@ -20,11 +20,9 @@
 package org.apache.flink.streaming.api.invokable.operator;
 
 import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
 
-public class FlatMapInvokable<IN extends Tuple, OUT extends Tuple> extends
-		UserTaskInvokable<IN, OUT> {
+public class FlatMapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
 	private static final long serialVersionUID = 1L;
 
 	private FlatMapFunction<IN, OUT> flatMapper;
@@ -35,16 +33,16 @@ public class FlatMapInvokable<IN extends Tuple, OUT extends Tuple> extends
 
 	@Override
 	protected void immutableInvoke() throws Exception {
-		while (recordIterator.next(reuse) != null) {
-			flatMapper.flatMap(reuse.getTuple(), collector);
+		while ((reuse = recordIterator.next(reuse)) != null) {
+			flatMapper.flatMap(reuse.getObject(), collector);
 			resetReuse();
 		}
 	}
 
 	@Override
 	protected void mutableInvoke() throws Exception {
-		while (recordIterator.next(reuse) != null) {
-			flatMapper.flatMap(reuse.getTuple(), collector);
+		while ((reuse = recordIterator.next(reuse)) != null) {
+			flatMapper.flatMap(reuse.getObject(), collector);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
index 3c56b6f..794d765 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
@@ -20,10 +20,9 @@
 package org.apache.flink.streaming.api.invokable.operator;
 
 import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
 
-public class MapInvokable<IN extends Tuple, OUT extends Tuple> extends UserTaskInvokable<IN, OUT> {
+public class MapInvokable<IN, OUT> extends UserTaskInvokable<IN, OUT> {
 	private static final long serialVersionUID = 1L;
 
 	private MapFunction<IN, OUT> mapper;
@@ -34,16 +33,16 @@ public class MapInvokable<IN extends Tuple, OUT extends Tuple> extends UserTaskI
 
 	@Override
 	protected void immutableInvoke() throws Exception {
-		while (recordIterator.next(reuse) != null) {
-			collector.collect(mapper.map(reuse.getTuple()));
+		while ((reuse = recordIterator.next(reuse)) != null) {
+			collector.collect(mapper.map(reuse.getObject()));
 			resetReuse();
 		}
 	}
 
 	@Override
 	protected void mutableInvoke() throws Exception {
-		while (recordIterator.next(reuse) != null) {
-			collector.collect(mapper.map(reuse.getTuple()));
+		while ((reuse = recordIterator.next(reuse)) != null) {
+			collector.collect(mapper.map(reuse.getObject()));
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
index f7ea566..a574ebc 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
@@ -20,10 +20,9 @@
 package org.apache.flink.streaming.api.invokable.operator;
 
 import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
 
-public abstract class StreamReduceInvokable<IN extends Tuple, OUT extends Tuple> extends
+public abstract class StreamReduceInvokable<IN, OUT> extends
 		UserTaskInvokable<IN, OUT> {
 	private static final long serialVersionUID = 1L;
 	protected GroupReduceFunction<IN, OUT> reducer;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
index 67c15dc..7710bd8 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
@@ -23,9 +23,8 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple;
 
-public class WindowReduceInvokable<IN extends Tuple, OUT extends Tuple> extends
+public class WindowReduceInvokable<IN, OUT> extends
 		StreamReduceInvokable<IN, OUT> {
 	private static final long serialVersionUID = 1L;
 	private long windowSize;
@@ -54,7 +53,7 @@ public class WindowReduceInvokable<IN extends Tuple, OUT extends Tuple> extends
 						break;
 					}
 				}
-				tupleBatch.add(reuse.getTuple());
+				tupleBatch.add(reuse.getObject());
 				resetReuse();
 			} while (System.currentTimeMillis() - startTime < windowSize);
 			reducer.reduce(tupleBatch.iterator(), collector);
@@ -99,10 +98,10 @@ public class WindowReduceInvokable<IN extends Tuple, OUT extends Tuple> extends
 		public IN next() {
 			if (hasNext()) {
 				loadedNext = false;
-				return reuse.getTuple();
+				return reuse.getObject();
 			} else {
 				loadedNext = false;
-				return reuse.getTuple();
+				return reuse.getObject();
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index 884e361..d854e89 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -19,14 +19,13 @@
 
 package org.apache.flink.streaming.api.invokable.operator.co;
 
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
-public abstract class CoInvokable<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> extends
+public abstract class CoInvokable<IN1, IN2, OUT> extends
 		StreamComponentInvokable<OUT> {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
index be5c42f..e899367 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
@@ -19,11 +19,9 @@
 
 package org.apache.flink.streaming.api.invokable.operator.co;
 
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.function.co.CoMapFunction;
 
-public class CoMapInvokable<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> extends
-		CoInvokable<IN1, IN2, OUT> {
+public class CoMapInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
 	private static final long serialVersionUID = 1L;
 
 	private CoMapFunction<IN1, IN2, OUT> mapper;
@@ -39,14 +37,14 @@ public class CoMapInvokable<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tu
 		boolean noMoreRecordOnInput2 = false;
 
 		do {
-			noMoreRecordOnInput1 = recordIterator1.next(reuse1) == null;
+			noMoreRecordOnInput1 = ((reuse1 = recordIterator1.next(reuse1)) == null);
 			if (!noMoreRecordOnInput1) {
-				collector.collect(mapper.map1(reuse1.getTuple()));
+				collector.collect(mapper.map1(reuse1.getObject()));
 			}
 
-			noMoreRecordOnInput2 = recordIterator2.next(reuse2) == null;
+			noMoreRecordOnInput2 = ((reuse2 = recordIterator2.next(reuse2)) == null);
 			if (!noMoreRecordOnInput2) {
-				collector.collect(mapper.map2(reuse2.getTuple()));
+				collector.collect(mapper.map2(reuse2.getObject()));
 			}
 
 			if (!this.isMutable) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
index fcf87e2..e50803f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
@@ -26,8 +26,6 @@ import org.apache.commons.lang.SerializationUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.runtime.io.network.api.MutableReader;
 import org.apache.flink.runtime.io.network.api.RecordWriter;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -43,21 +41,22 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
+import org.apache.flink.types.TypeInformation;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
-public abstract class AbstractStreamComponent<OUT extends Tuple> extends AbstractInvokable {
+public abstract class AbstractStreamComponent<OUT> extends AbstractInvokable {
 
 	protected static final String SOURCE = "source";
 
 	private static final Log LOG = LogFactory.getLog(AbstractStreamComponent.class);
 
-	protected TupleTypeInfo<OUT> outTupleTypeInfo = null;
-	protected StreamRecordSerializer<OUT> outTupleSerializer = null;
+	protected TypeInformation<OUT> outTypeInfo = null;
+	protected StreamRecordSerializer<OUT> outSerializer = null;
 	protected SerializationDelegate<StreamRecord<OUT>> outSerializationDelegate = null;
 
 	protected StreamConfig configuration;
-	protected TypeSerializerWrapper<? extends Tuple, ? extends Tuple, OUT> typeWrapper;
+	protected TypeSerializerWrapper<?, ?, OUT> typeWrapper;
 	protected StreamCollector<OUT> collector;
 	protected int instanceID;
 	protected String name;
@@ -105,9 +104,9 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
 	}
 
 	protected void setSerializer() {
-		outTupleTypeInfo = typeWrapper.getOutputTupleTypeInfo();
-		outTupleSerializer = new StreamRecordSerializer<OUT>(outTupleTypeInfo.createSerializer());
-		outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(outTupleSerializer);
+		outTypeInfo = typeWrapper.getOutputTypeInfo();
+		outSerializer = new StreamRecordSerializer<OUT>(outTypeInfo);
+		outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(outSerializer);
 	}
 
 	protected void setConfigOutputs(
@@ -171,7 +170,7 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
 		return (T) configuration.getUserInvokableObject();
 	}
 
-	protected <IN extends Tuple> MutableObjectIterator<StreamRecord<IN>> createInputIterator(
+	protected <IN> MutableObjectIterator<StreamRecord<IN>> createInputIterator(
 			MutableReader<?> inputReader, TypeSerializer<?> serializer) {
 
 		// generic data type serialization

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
index 0c02c16..ede30b4 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
@@ -27,7 +27,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.common.functions.AbstractFunction;
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.io.network.api.MutableReader;
 import org.apache.flink.runtime.io.network.api.MutableRecordReader;
@@ -38,14 +37,15 @@ import org.apache.flink.streaming.api.function.co.CoMapFunction;
 import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.types.TypeInformation;
 import org.apache.flink.util.MutableObjectIterator;
 
 public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> extends
 		AbstractStreamComponent<OUT> {
 	private static final Log LOG = LogFactory.getLog(CoStreamTask.class);
 
-	protected StreamRecordSerializer<IN1> inTupleDeserializer1 = null;
-	protected StreamRecordSerializer<IN2> inTupleDeserializer2 = null;
+	protected StreamRecordSerializer<IN1> inputDeserializer1 = null;
+	protected StreamRecordSerializer<IN2> inputDeserializer2 = null;
 
 	private MutableReader<IOReadableWritable> inputs1;
 	private MutableReader<IOReadableWritable> inputs2;
@@ -83,11 +83,11 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
 
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	private void setDeserializers(Object function, Class<? extends AbstractFunction> clazz) {
-		TupleTypeInfo<IN1> inTupleTypeInfo = (TupleTypeInfo<IN1>) typeWrapper.getInputTupleTypeInfo1();
-		inTupleDeserializer1 = new StreamRecordSerializer<IN1>(inTupleTypeInfo.createSerializer());
+		TypeInformation<IN1> inputTypeInfo1 = (TypeInformation<IN1>) typeWrapper.getInputTypeInfo1();
+		inputDeserializer1 = new StreamRecordSerializer<IN1>(inputTypeInfo1);
 
-		inTupleTypeInfo = (TupleTypeInfo<IN1>) typeWrapper.getInputTupleTypeInfo2();
-		inTupleDeserializer2 = new StreamRecordSerializer(inTupleTypeInfo.createSerializer());
+		TypeInformation<IN2> inputTypeInfo2 = (TypeInformation<IN2>) typeWrapper.getInputTypeInfo2();
+		inputDeserializer2 = new StreamRecordSerializer(inputTypeInfo2);
 	}
 
 	@Override
@@ -95,15 +95,15 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
 		setConfigOutputs(outputs);
 		setConfigInputs();
 
-		inputIter1 = createInputIterator(inputs1, inTupleDeserializer1);
-		inputIter2 = createInputIterator(inputs2, inTupleDeserializer2);
+		inputIter1 = createInputIterator(inputs1, inputDeserializer1);
+		inputIter2 = createInputIterator(inputs2, inputDeserializer2);
 	}
 
 	@Override
 	protected void setInvokable() {
 		userInvokable = getInvokable();
-		userInvokable.initialize(collector, inputIter1, inTupleDeserializer1, inputIter2,
-				inTupleDeserializer2, isMutable);
+		userInvokable.initialize(collector, inputIter1, inputDeserializer1, inputIter2,
+				inputDeserializer2, isMutable);
 	}
 
 	protected void setConfigInputs() throws StreamComponentException {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
index 8355b78..0c042bc 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
@@ -19,20 +19,19 @@
 
 package org.apache.flink.streaming.api.streamcomponent;
 
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.io.network.api.MutableReader;
 import org.apache.flink.runtime.io.network.api.MutableRecordReader;
 import org.apache.flink.runtime.io.network.api.MutableUnionRecordReader;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.types.TypeInformation;
 import org.apache.flink.util.MutableObjectIterator;
 
-public abstract class SingleInputAbstractStreamComponent<IN extends Tuple, OUT extends Tuple>
-		extends AbstractStreamComponent<OUT> {
+public abstract class SingleInputAbstractStreamComponent<IN, OUT> extends
+		AbstractStreamComponent<OUT> {
 
-	protected StreamRecordSerializer<IN> inTupleSerializer = null;
+	protected StreamRecordSerializer<IN> inputSerializer = null;
 	protected MutableObjectIterator<StreamRecord<IN>> inputIter;
 	protected MutableReader<IOReadableWritable> inputs;
 
@@ -46,17 +45,19 @@ public abstract class SingleInputAbstractStreamComponent<IN extends Tuple, OUT e
 
 	@SuppressWarnings("unchecked")
 	private void setDeserializer() {
-		TupleTypeInfo<IN> inTupleTypeInfo = (TupleTypeInfo<IN>) typeWrapper
-				.getInputTupleTypeInfo1();
-		inTupleSerializer = new StreamRecordSerializer<IN>(inTupleTypeInfo.createSerializer());
+		TypeInformation<IN> inTupleTypeInfo = (TypeInformation<IN>) typeWrapper
+				.getInputTypeInfo1();
+		inputSerializer = new StreamRecordSerializer<IN>(inTupleTypeInfo);
 	}
 
 	@SuppressWarnings("unchecked")
 	protected void setSinkSerializer() {
-		if (outSerializationDelegate != null) {
-			TupleTypeInfo<IN> inTupleTypeInfo = (TupleTypeInfo<IN>) outTupleTypeInfo;
-
-			inTupleSerializer = new StreamRecordSerializer<IN>(inTupleTypeInfo.createSerializer());
+		try {
+			TypeInformation<IN> inputTypeInfo = (TypeInformation<IN>) typeWrapper
+					.getOutputTypeInfo();
+			inputSerializer = new StreamRecordSerializer<IN>(inputTypeInfo);
+		} catch (RuntimeException e) {
+			// User implemented sink, nothing to do
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
index 911b550..436ebbf 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSink.java
@@ -48,7 +48,7 @@ public class StreamIterationSink<IN extends Tuple> extends
 			setConfigInputs();
 			setSinkSerializer();
 
-			inputIter = createInputIterator(inputs, inTupleSerializer);
+			inputIter = createInputIterator(inputs, inputSerializer);
 
 			iterationId = configuration.getIterationId();
 			dataChannel = BlockingQueueBroker.instance().get(iterationId);
@@ -73,11 +73,11 @@ public class StreamIterationSink<IN extends Tuple> extends
 	}
 
 	protected void forwardRecords() throws Exception {
-		StreamRecord<IN> reuse = inTupleSerializer.createInstance().setId(0);
+		StreamRecord<IN> reuse = inputSerializer.createInstance().setId(0);
 		while ((reuse = inputIter.next(reuse)) != null) {
 			pushToQueue(reuse);
 			// TODO: Fix object reuse for iteration
-			reuse = inTupleSerializer.createInstance();
+			reuse = inputSerializer.createInstance();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
index 0ead3c6..2969e69 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamSink.java
@@ -21,10 +21,9 @@ package org.apache.flink.streaming.api.streamcomponent;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.invokable.StreamRecordInvokable;
 
-public class StreamSink<IN extends Tuple> extends SingleInputAbstractStreamComponent<IN, IN> {
+public class StreamSink<IN> extends SingleInputAbstractStreamComponent<IN, IN> {
 
 	private static final Log LOG = LogFactory.getLog(StreamSink.class);
 
@@ -40,7 +39,7 @@ public class StreamSink<IN extends Tuple> extends SingleInputAbstractStreamCompo
 			setConfigInputs();
 			setSinkSerializer();
 
-			inputIter = createInputIterator(inputs, inTupleSerializer);
+			inputIter = createInputIterator(inputs, inputSerializer);
 		} catch (Exception e) {
 			throw new StreamComponentException("Cannot register inputs for "
 					+ getClass().getSimpleName(), e);
@@ -50,7 +49,7 @@ public class StreamSink<IN extends Tuple> extends SingleInputAbstractStreamCompo
 	@Override
 	protected void setInvokable() {
 		userInvokable = getInvokable();
-		userInvokable.initialize(collector, inputIter, inTupleSerializer, isMutable);
+		userInvokable.initialize(collector, inputIter, inputSerializer, isMutable);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
index 7cb1d71..3a3f9cf 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamTask.java
@@ -51,13 +51,13 @@ public class StreamTask<IN extends Tuple, OUT extends Tuple> extends
 		setConfigInputs();
 		setConfigOutputs(outputs);
 
-		inputIter = createInputIterator(inputs, inTupleSerializer);
+		inputIter = createInputIterator(inputs, inputSerializer);
 	}
 
 	@Override
 	protected void setInvokable() {
 		userInvokable = getInvokable();
-		userInvokable.initialize(collector, inputIter, inTupleSerializer, isMutable);
+		userInvokable.initialize(collector, inputIter, inputSerializer, isMutable);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecord.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecord.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecord.java
index 9de2f38..bb21019 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecord.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecord.java
@@ -22,20 +22,21 @@ package org.apache.flink.streaming.api.streamrecord;
 import java.io.Serializable;
 
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 
 /**
- * Object for wrapping a tuple with ID used for sending records between
- * streaming task in Apache Flink stream processing.
+ * Object for wrapping a tuple or other object with ID used for sending records
+ * between streaming task in Apache Flink stream processing.
  */
-public class StreamRecord<T extends Tuple> implements Serializable {
+public class StreamRecord<T> implements Serializable {
 	private static final long serialVersionUID = 1L;
 
 	private UID uid;
-	private T tuple;
-
-	protected TupleSerializer<T> tupleSerializer;
+	private T streamObject;
+	public boolean isTuple;
 
+	/**
+	 * Creates an empty StreamRecord and initializes an empty ID
+	 */
 	public StreamRecord() {
 		uid = new UID();
 	}
@@ -60,28 +61,50 @@ public class StreamRecord<T extends Tuple> implements Serializable {
 	}
 
 	/**
+	 * Gets the wrapped object from the StreamRecord
+	 * 
+	 * @return The object wrapped
+	 */
+	public T getObject() {
+		return streamObject;
+	}
+
+	/**
+	 * Gets the field of the contained object at the given position. If a tuple
+	 * is wrapped then the getField method is invoked. If the StreamRecord
+	 * contains and object of Basic types only position 0 could be returned.
 	 * 
-	 * @return The tuple contained
+	 * @param pos
+	 *            Position of the field to get.
+	 * @return Returns the object contained in the position.
 	 */
-	public T getTuple() {
-		return tuple;
+	public Object getField(int pos) {
+		if (isTuple) {
+			return ((Tuple) streamObject).getField(pos);
+		} else {
+			if (pos == 0) {
+				return streamObject;
+			} else {
+				throw new IndexOutOfBoundsException();
+			}
+		}
 	}
 
 	/**
-	 * Sets the tuple stored
+	 * Sets the object stored
 	 * 
-	 * @param tuple
-	 *            Value to set
+	 * @param object
+	 *            Object to set
 	 * @return Returns the StreamRecord object
 	 */
-	public StreamRecord<T> setTuple(T tuple) {
-		this.tuple = tuple;
+	public StreamRecord<T> setObject(T object) {
+		this.streamObject = object;
 		return this;
 	}
 
 	@Override
 	public String toString() {
-		return tuple.toString();
+		return streamObject.toString();
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
index 66c6751..66cb0bd 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
@@ -21,19 +21,20 @@ package org.apache.flink.streaming.api.streamrecord;
 import java.io.IOException;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.TypeInformation;
 
-public final class StreamRecordSerializer<T extends Tuple> extends TypeSerializer<StreamRecord<T>> {
+public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord<T>> {
 
 	private static final long serialVersionUID = 1L;
 
-	private final TupleSerializer<T> tupleSerializer;
+	private final TypeSerializer<T> typeSerializer;
+	private final boolean isTuple;
 
-	public StreamRecordSerializer(TupleSerializer<T> tupleSerializer) {
-		this.tupleSerializer = tupleSerializer;
+	public StreamRecordSerializer(TypeInformation<T> typeInfo) {
+		this.typeSerializer = typeInfo.createSerializer();
+		this.isTuple = typeInfo.isTupleType();
 	}
 
 	@Override
@@ -51,7 +52,8 @@ public final class StreamRecordSerializer<T extends Tuple> extends TypeSerialize
 		try {
 			@SuppressWarnings("unchecked")
 			StreamRecord<T> t = StreamRecord.class.newInstance();
-			t.setTuple(tupleSerializer.createInstance());
+			t.isTuple = isTuple;
+			t.setObject(typeSerializer.createInstance());
 			return t;
 		} catch (Exception e) {
 			throw new RuntimeException("Cannot instantiate StreamRecord.", e);
@@ -79,14 +81,14 @@ public final class StreamRecordSerializer<T extends Tuple> extends TypeSerialize
 	@Override
 	public void serialize(StreamRecord<T> value, DataOutputView target) throws IOException {
 		value.getId().write(target);
-		tupleSerializer.serialize(value.getTuple(), target);
+		typeSerializer.serialize(value.getObject(), target);
 	}
 
 	@Override
 	public StreamRecord<T> deserialize(StreamRecord<T> reuse, DataInputView source)
 			throws IOException {
 		reuse.getId().read(source);
-		tupleSerializer.deserialize(reuse.getTuple(), source);
+		reuse.setObject(typeSerializer.deserialize(reuse.getObject(), source));
 		return reuse;
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/BroadcastPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/BroadcastPartitioner.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/BroadcastPartitioner.java
index 06f059d..231bab1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/BroadcastPartitioner.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/BroadcastPartitioner.java
@@ -19,7 +19,6 @@
 
 package org.apache.flink.streaming.partitioner;
 
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 
@@ -29,7 +28,7 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecord;
  * @param <T>
  *            Type of the Tuple
  */
-public class BroadcastPartitioner<T extends Tuple> implements StreamPartitioner<T> {
+public class BroadcastPartitioner<T> implements StreamPartitioner<T> {
 	private static final long serialVersionUID = 1L;
 
 	int[] returnArray;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/DistributePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/DistributePartitioner.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/DistributePartitioner.java
index 025106a..9f492a3 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/DistributePartitioner.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/DistributePartitioner.java
@@ -19,7 +19,6 @@
 
 package org.apache.flink.streaming.partitioner;
 
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 
@@ -30,7 +29,7 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecord;
  * @param <T>
  *            Type of the Tuple
  */
-public class DistributePartitioner<T extends Tuple> implements StreamPartitioner<T> {
+public class DistributePartitioner<T> implements StreamPartitioner<T> {
 	private static final long serialVersionUID = 1L;
 
 	private int currentChannelIndex;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/FieldsPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/FieldsPartitioner.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/FieldsPartitioner.java
index 51bb54e..44a674d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/FieldsPartitioner.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/FieldsPartitioner.java
@@ -19,7 +19,6 @@
 
 package org.apache.flink.streaming.partitioner;
 
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 
@@ -30,7 +29,7 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecord;
  * @param <T>
  *            Type of the Tuple
  */
-public class FieldsPartitioner<T extends Tuple> implements StreamPartitioner<T> {
+public class FieldsPartitioner<T> implements StreamPartitioner<T> {
 	private static final long serialVersionUID = 1L;
 
 	private int keyPosition;
@@ -44,7 +43,7 @@ public class FieldsPartitioner<T extends Tuple> implements StreamPartitioner<T>
 	@Override
 	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
 			int numberOfOutputChannels) {
-		returnArray[0] = Math.abs(record.getInstance().getTuple().getField(keyPosition).hashCode())
+		returnArray[0] = Math.abs(record.getInstance().getField(keyPosition).hashCode())
 				% numberOfOutputChannels;
 		return returnArray;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/ForwardPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/ForwardPartitioner.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/ForwardPartitioner.java
index 88da063..95ede97 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/ForwardPartitioner.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/ForwardPartitioner.java
@@ -19,7 +19,6 @@
 
 package org.apache.flink.streaming.partitioner;
 
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 
@@ -29,7 +28,7 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecord;
  * @param <T>
  *            Type of the Tuple
  */
-public class ForwardPartitioner<T extends Tuple> implements StreamPartitioner<T> {
+public class ForwardPartitioner<T> implements StreamPartitioner<T> {
 	private static final long serialVersionUID = 1L;
 
 	private int[] returnArray;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/GlobalPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/GlobalPartitioner.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/GlobalPartitioner.java
index 649f4b7..83f7baf 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/GlobalPartitioner.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/GlobalPartitioner.java
@@ -19,12 +19,11 @@
 
 package org.apache.flink.streaming.partitioner;
 
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 
 //Group to the partitioner with the lowest id
-public class GlobalPartitioner<T extends Tuple> implements StreamPartitioner<T> {
+public class GlobalPartitioner<T> implements StreamPartitioner<T> {
 	private static final long serialVersionUID = 1L;
 	
 	private int[] returnArray;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/ShufflePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/ShufflePartitioner.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/ShufflePartitioner.java
index 8d292e7..c56fecf 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/ShufflePartitioner.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/ShufflePartitioner.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.partitioner;
 
 import java.util.Random;
 
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 
@@ -32,7 +31,7 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecord;
  * @param <T>
  *            Type of the Tuple
  */
-public class ShufflePartitioner<T extends Tuple> implements StreamPartitioner<T> {
+public class ShufflePartitioner<T> implements StreamPartitioner<T> {
 	private static final long serialVersionUID = 1L;
 
 	private Random random = new Random();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/StreamPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/StreamPartitioner.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/StreamPartitioner.java
index c0e9c1a..18af2f6 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/StreamPartitioner.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/StreamPartitioner.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.partitioner;
 
 import java.io.Serializable;
 
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.runtime.io.network.api.ChannelSelector;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
@@ -31,6 +30,6 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecord;
  * @param <T>
  *            Type of the Tuple
  */
-public interface StreamPartitioner<T extends Tuple> extends ChannelSelector<SerializationDelegate<StreamRecord<T>>>,
+public interface StreamPartitioner<T> extends ChannelSelector<SerializationDelegate<StreamRecord<T>>>,
 		Serializable {
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
index c868e4f..02a7554 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
@@ -22,11 +22,9 @@ package org.apache.flink.streaming.util.serialization;
 import java.io.IOException;
 
 import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 
-public class FunctionTypeWrapper<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> extends
+public class FunctionTypeWrapper<IN1, IN2, OUT> extends
 		TypeSerializerWrapper<IN1, IN2, OUT> {
 	private static final long serialVersionUID = 1L;
 
@@ -44,30 +42,29 @@ public class FunctionTypeWrapper<IN1 extends Tuple, IN2 extends Tuple, OUT exten
 		this.inTypeParameter1 = inTypeParameter1;
 		this.inTypeParameter2 = inTypeParameter2;
 		this.outTypeParameter = outTypeParameter;
-		setTupleTypeInfo();
+		setTypeInfo();
 	}
 
 	private void readObject(java.io.ObjectInputStream in) throws IOException,
 			ClassNotFoundException {
 		in.defaultReadObject();
-		setTupleTypeInfo();
+		setTypeInfo();
 	}
 
-	@SuppressWarnings({ "unchecked", "rawtypes" })
 	@Override
-	protected void setTupleTypeInfo() {
+	protected void setTypeInfo() {
 		if (inTypeParameter1 != -1) {
-			inTupleTypeInfo1 = (TupleTypeInfo) TypeExtractor.createTypeInfo(functionSuperClass,
+			inTypeInfo1 = TypeExtractor.createTypeInfo(functionSuperClass,
 					function.getClass(), inTypeParameter1, null, null);
 		}
 
 		if (inTypeParameter2 != -1) {
-			inTupleTypeInfo2 = (TupleTypeInfo) TypeExtractor.createTypeInfo(functionSuperClass,
+			inTypeInfo2 = TypeExtractor.createTypeInfo(functionSuperClass,
 					function.getClass(), inTypeParameter2, null, null);
 		}
 
 		if (outTypeParameter != -1) {
-			outTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(functionSuperClass,
+			outTypeInfo = TypeExtractor.createTypeInfo(functionSuperClass,
 					function.getClass(), outTypeParameter, null, null);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
index 5ed2312..c1bf52c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
@@ -21,41 +21,39 @@ package org.apache.flink.streaming.util.serialization;
 
 import java.io.IOException;
 
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 
-public class ObjectTypeWrapper<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> extends
+public class ObjectTypeWrapper<IN1, IN2, OUT> extends
 		TypeSerializerWrapper<IN1, IN2, OUT> {
 	private static final long serialVersionUID = 1L;
 
-	private Object inInstance1;
-	private Object inInstance2;
-	private Object outInstance;
+	private IN1 inInstance1;
+	private IN2 inInstance2;
+	private OUT outInstance;
 
-	public ObjectTypeWrapper(Object inInstance1, Object inInstance2, Object outInstance) {
+	public ObjectTypeWrapper(IN1 inInstance1, IN2 inInstance2, OUT outInstance) {
 		this.inInstance1 = inInstance1;
 		this.inInstance2 = inInstance2;
 		this.outInstance = outInstance;
-		setTupleTypeInfo();
+		setTypeInfo();
 	}
 
 	private void readObject(java.io.ObjectInputStream in) throws IOException,
 			ClassNotFoundException {
 		in.defaultReadObject();
-		setTupleTypeInfo();
+		setTypeInfo();
 	}
 
 	@Override
-	protected void setTupleTypeInfo() {
+	protected void setTypeInfo() {
 		if (inInstance1 != null) {
-			inTupleTypeInfo1 = new TupleTypeInfo<IN1>(TypeExtractor.getForObject(inInstance1));
+			inTypeInfo1 = TypeExtractor.getForObject(inInstance1);
 		}
 		if (inInstance2 != null) {
-			inTupleTypeInfo2 = new TupleTypeInfo<IN2>(TypeExtractor.getForObject(inInstance2));
+			inTypeInfo2 = TypeExtractor.getForObject(inInstance2);
 		}
 		if (outInstance != null) {
-			outTupleTypeInfo = new TupleTypeInfo<OUT>(TypeExtractor.getForObject(outInstance));
+			outTypeInfo = TypeExtractor.getForObject(outInstance);
 		}
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java
index 473ce7c..2aa50f2 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java
@@ -21,37 +21,36 @@ package org.apache.flink.streaming.util.serialization;
 
 import java.io.Serializable;
 
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.types.TypeInformation;
 
-public abstract class TypeSerializerWrapper<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple>
+public abstract class TypeSerializerWrapper<IN1, IN2, OUT>
 		implements Serializable {
 	private static final long serialVersionUID = 1L;
 
-	protected transient TupleTypeInfo<IN1> inTupleTypeInfo1 = null;
-	protected transient TupleTypeInfo<IN2> inTupleTypeInfo2 = null;
-	protected transient TupleTypeInfo<OUT> outTupleTypeInfo = null;
+	protected transient TypeInformation<IN1> inTypeInfo1 = null;
+	protected transient TypeInformation<IN2> inTypeInfo2 = null;
+	protected transient TypeInformation<OUT> outTypeInfo = null;
 
-	public TupleTypeInfo<IN1> getInputTupleTypeInfo1() {
-		if (inTupleTypeInfo1 == null) {
+	public TypeInformation<IN1> getInputTypeInfo1() {
+		if (inTypeInfo1 == null) {
 			throw new RuntimeException("There is no TypeInfo for the first input");
 		}
-		return inTupleTypeInfo1;
+		return inTypeInfo1;
 	}
 
-	public TupleTypeInfo<IN2> getInputTupleTypeInfo2() {
-		if (inTupleTypeInfo1 == null) {
-			throw new RuntimeException("There is no TypeInfo for the first input");
+	public TypeInformation<IN2> getInputTypeInfo2() {
+		if (inTypeInfo2 == null) {
+			throw new RuntimeException("There is no TypeInfo for the second input");
 		}
-		return inTupleTypeInfo2;
+		return inTypeInfo2;
 	}
 
-	public TupleTypeInfo<OUT> getOutputTupleTypeInfo() {
-		if (inTupleTypeInfo1 == null) {
-			throw new RuntimeException("There is no TypeInfo for the first input");
+	public TypeInformation<OUT> getOutputTypeInfo() {
+		if (outTypeInfo == null) {
+			throw new RuntimeException("There is no TypeInfo for the output");
 		}
-		return outTupleTypeInfo;
+		return outTypeInfo;
 	}
 
-	protected abstract void setTupleTypeInfo();
+	protected abstract void setTypeInfo();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 8ba58c5..1ea165f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -25,7 +25,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.util.LogUtils;
 import org.apache.flink.util.Collector;
@@ -38,13 +37,13 @@ public class IterateTest {
 	private static boolean iterated = false;
 
 	public static final class IterationHead extends
-			FlatMapFunction<Tuple1<Boolean>, Tuple1<Boolean>> {
+			FlatMapFunction<Boolean, Boolean> {
 
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void flatMap(Tuple1<Boolean> value, Collector<Tuple1<Boolean>> out) throws Exception {
-			if (value.f0) {
+		public void flatMap(Boolean value, Collector<Boolean> out) throws Exception {
+			if (value) {
 				iterated = true;
 			} else {
 				out.collect(value);
@@ -55,24 +54,24 @@ public class IterateTest {
 	}
 
 	public static final class IterationTail extends
-			FlatMapFunction<Tuple1<Boolean>, Tuple1<Boolean>> {
+			FlatMapFunction<Boolean,Boolean> {
 
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void flatMap(Tuple1<Boolean> value, Collector<Tuple1<Boolean>> out) throws Exception {
-			out.collect(new Tuple1<Boolean>(true));
+		public void flatMap(Boolean value, Collector<Boolean> out) throws Exception {
+			out.collect(true);
 
 		}
 
 	}
 
-	public static final class MySink extends SinkFunction<Tuple1<Boolean>> {
+	public static final class MySink extends SinkFunction<Boolean> {
 
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void invoke(Tuple1<Boolean> tuple) {
+		public void invoke(Boolean tuple) {
 		}
 
 	}
@@ -87,12 +86,12 @@ public class IterateTest {
 		for (int i = 0; i < 100000; i++) {
 			bl.add(false);
 		}
-		DataStream<Tuple1<Boolean>> source =  env
+		DataStream<Boolean> source =  env
 				.fromCollection(bl);
 
-		IterativeDataStream<Tuple1<Boolean>> iteration = source.iterate();
+		IterativeDataStream<Boolean> iteration = source.iterate();
 				
-		DataStream<Tuple1<Boolean>> increment = iteration.flatMap(new IterationHead()).flatMap(new IterationTail());
+		DataStream<Boolean> increment = iteration.flatMap(new IterationHead()).flatMap(new IterationTail());
 
 		iteration.closeWith(increment).addSink(new MySink());
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
index 67dce9d..438887a 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
@@ -35,7 +35,7 @@ public class PrintTest {
 
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
-		env.generateSequence(1, 10).print();
+		env.generateSequence(1, 100000).print();
 		env.executeTest(MEMORYSIZE);
 
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index 8f64cb4..e8742de 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -26,7 +26,6 @@ import java.util.Collection;
 import java.util.HashSet;
 
 import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.streaming.api.DataStream;
 import org.apache.flink.streaming.api.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
@@ -39,45 +38,45 @@ public class DirectedOutputTest {
 	static HashSet<Long> evenSet = new HashSet<Long>();
 	static HashSet<Long> oddSet = new HashSet<Long>();
 	
-	private static class PlusTwo extends MapFunction<Tuple1<Long>, Tuple1<Long>> {
+	private static class PlusTwo extends MapFunction<Long, Long> {
 	
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public Tuple1<Long> map(Tuple1<Long> arg0) throws Exception {
-			arg0.f0 += 2;
+		public Long map(Long arg0) throws Exception {
+			arg0 += 2;
 			return arg0;
 		}
 	}
 
-	private static class EvenSink extends SinkFunction<Tuple1<Long>> {
+	private static class EvenSink extends SinkFunction<Long> {
 
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void invoke(Tuple1<Long> tuple) {
-			evenSet.add(tuple.f0);
+		public void invoke(Long tuple) {
+			evenSet.add(tuple);
 		}
 	}
 	
-	private static class OddSink extends SinkFunction<Tuple1<Long>> {
+	private static class OddSink extends SinkFunction<Long> {
 
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void invoke(Tuple1<Long> tuple) {
-			oddSet.add(tuple.f0);
+		public void invoke(Long tuple) {
+			oddSet.add(tuple);
 		}
 	}
 	
 	
-	private static class MySelector extends OutputSelector<Tuple1<Long>> {
+	private static class MySelector extends OutputSelector<Long> {
 		
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void select(Tuple1<Long> tuple, Collection<String> outputs) {
-			int mod = (int) (tuple.f0 % 2);
+		public void select(Long tuple, Collection<String> outputs) {
+			int mod = (int) (tuple % 2);
 			switch (mod) {
 				case 0:
 					outputs.add("ds1");
@@ -96,10 +95,10 @@ public class DirectedOutputTest {
 		LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-		DataStream<Tuple1<Long>> s = env.generateSequence(1, 6).directTo(new MySelector());
-		DataStream<Tuple1<Long>> ds1 = s.map(new PlusTwo()).name("ds1").addSink(new EvenSink());
-		DataStream<Tuple1<Long>> ds2 = s.map(new PlusTwo()).name("ds2").addSink(new OddSink());
-		DataStream<Tuple1<Long>> ds3 = s.map(new PlusTwo()).addSink(new OddSink());
+		DataStream<Long> s = env.generateSequence(1, 6).directTo(new MySelector());
+		DataStream<Long> ds1 = s.map(new PlusTwo()).name("ds1").addSink(new EvenSink());
+		DataStream<Long> ds2 = s.map(new PlusTwo()).name("ds2").addSink(new OddSink());
+		DataStream<Long> ds3 = s.map(new PlusTwo()).addSink(new OddSink());
 
 		env.execute();
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
index d3cbcbc..2c3f480 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
@@ -23,7 +23,6 @@ import java.io.Serializable;
 import java.util.HashSet;
 import java.util.Set;
 
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.streaming.api.DataStream;
 import org.apache.flink.streaming.api.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.StreamExecutionEnvironment;
@@ -40,28 +39,28 @@ public class CoMapTest implements Serializable {
 	private static Set<String> result;
 	private static Set<String> expected = new HashSet<String>();
 
-	private final static class EmptySink extends SinkFunction<Tuple1<Boolean>> {
+	private final static class EmptySink extends SinkFunction<Boolean> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void invoke(Tuple1<Boolean> tuple) {
+		public void invoke(Boolean tuple) {
 		}
 	}
 
 	private final static class MyCoMap extends
-			CoMapFunction<Tuple1<String>, Tuple1<Integer>, Tuple1<Boolean>> {
+			CoMapFunction<String, Integer, Boolean> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public Tuple1<Boolean> map1(Tuple1<String> value) {
-			result.add(value.f0);
-			return new Tuple1<Boolean>(true);
+		public Boolean map1(String value) {
+			result.add(value);
+			return true;
 		}
 
 		@Override
-		public Tuple1<Boolean> map2(Tuple1<Integer> value) {
-			result.add(value.f0.toString());
-			return new Tuple1<Boolean>(false);
+		public Boolean map2(Integer value) {
+			result.add(value.toString());
+			return false;
 		}
 	}
 
@@ -80,14 +79,14 @@ public class CoMapTest implements Serializable {
 
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
-		DataStream<Tuple1<Integer>> ds1 = env.fromElements(1, 3);
+		DataStream<Integer> ds1 = env.fromElements(1, 3);
 		@SuppressWarnings("unchecked")
-		DataStream<Tuple1<Integer>> ds2 = env.fromElements(2, 4).connectWith(ds1);
+		DataStream<Integer> ds2 = env.fromElements(2, 4).connectWith(ds1);
 
-		DataStream<Tuple1<String>> ds3 = env.fromElements("a", "b");
+		DataStream<String> ds3 = env.fromElements("a", "b");
 
 		@SuppressWarnings({ "unused", "unchecked" })
-		DataStream<Tuple1<Boolean>> ds4 = env.fromElements("c").connectWith(ds3)
+		DataStream<Boolean> ds4 = env.fromElements("c").connectWith(ds3)
 				.coMapWith(new MyCoMap(),
 
 				ds2).addSink(new EmptySink());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
index 054becc..3286ef1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
@@ -24,7 +24,6 @@ import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.flink.api.java.functions.FilterFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.streaming.api.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.util.LogUtils;
@@ -34,37 +33,37 @@ import org.junit.Test;
 
 public class FilterTest implements Serializable {
 	private static final long serialVersionUID = 1L;
-	
+
 	private static Set<Integer> set = new HashSet<Integer>();
 
-	private static class SetSink extends SinkFunction<Tuple1<Integer>> {
+	private static class MySink extends SinkFunction<Integer> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void invoke(Tuple1<Integer> tuple) {
-			set.add(tuple.f0);
+		public void invoke(Integer value) {
+			set.add(value);
 		}
 	}
 
-	static class MyFilter extends FilterFunction<Tuple1<Integer>> {
+	static class MyFilter extends FilterFunction<Integer> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public boolean filter(Tuple1<Integer> value) throws Exception {
-			return value.f0 % 2 == 0;
+		public boolean filter(Integer value) throws Exception {
+			return value % 2 == 0;
 		}
 	}
 
 	@Test
 	public void test() {
 		LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
-		
+
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-		
-		env.fromElements(1, 2, 3, 4, 5, 6, 7).filter(new MyFilter()).addSink(new SetSink());
+
+		env.fromElements(1, 2, 3, 4, 5, 6, 7).filter(new MyFilter()).addSink(new MySink());
 
 		env.execute();
-		
+
 		Assert.assertArrayEquals(new Integer[] { 2, 4, 6 }, set.toArray());
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
index 893ec37..b299407 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
@@ -25,91 +25,88 @@ import static org.junit.Assert.assertTrue;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.flink.api.java.functions.FlatMapFunction;
 import org.apache.flink.streaming.api.DataStream;
 import org.apache.flink.streaming.api.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.util.LogUtils;
-import org.junit.Test;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.util.Collector;
 import org.apache.log4j.Level;
+import org.junit.Test;
 
 public class FlatMapTest {
 
-	public static final class MyFlatMap extends FlatMapFunction<Tuple1<Integer>, Tuple1<Integer>> {
+	public static final class MyFlatMap extends FlatMapFunction<Integer, Integer> {
 
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void flatMap(Tuple1<Integer> value, Collector<Tuple1<Integer>> out) throws Exception {
-			out.collect(new Tuple1<Integer>(value.f0 * value.f0));
+		public void flatMap(Integer value, Collector<Integer> out) throws Exception {
+			out.collect(value * value);
 
 		}
 
 	}
 
-	public static final class ParallelFlatMap extends
-			FlatMapFunction<Tuple1<Integer>, Tuple1<Integer>> {
+	public static final class ParallelFlatMap extends FlatMapFunction<Integer, Integer> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void flatMap(Tuple1<Integer> value, Collector<Tuple1<Integer>> out) throws Exception {
+		public void flatMap(Integer value, Collector<Integer> out) throws Exception {
 			numberOfElements++;
 
 		}
 
 	}
 
-	public static final class GenerateSequenceFlatMap extends
-			FlatMapFunction<Tuple1<Long>, Tuple1<Long>> {
+	public static final class GenerateSequenceFlatMap extends FlatMapFunction<Long, Long> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void flatMap(Tuple1<Long> value, Collector<Tuple1<Long>> out) throws Exception {
-			out.collect(new Tuple1<Long>(value.f0 * value.f0));
+		public void flatMap(Long value, Collector<Long> out) throws Exception {
+			out.collect(value * value);
 
 		}
 
 	}
 
-	public static final class MySink extends SinkFunction<Tuple1<Integer>> {
+	public static final class MySink extends SinkFunction<Integer> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void invoke(Tuple1<Integer> tuple) {
-			result.add(tuple.f0);
+		public void invoke(Integer tuple) {
+			result.add(tuple);
 		}
 
 	}
 
-	public static final class FromElementsSink extends SinkFunction<Tuple1<Integer>> {
+	public static final class FromElementsSink extends SinkFunction<Integer> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void invoke(Tuple1<Integer> tuple) {
-			fromElementsResult.add(tuple.f0);
+		public void invoke(Integer tuple) {
+			fromElementsResult.add(tuple);
 		}
 
 	}
 
-	public static final class FromCollectionSink extends SinkFunction<Tuple1<Integer>> {
+	public static final class FromCollectionSink extends SinkFunction<Integer> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void invoke(Tuple1<Integer> tuple) {
-			fromCollectionResult.add(tuple.f0);
+		public void invoke(Integer tuple) {
+			fromCollectionResult.add(tuple);
 		}
 
 	}
 
-	public static final class GenerateSequenceSink extends SinkFunction<Tuple1<Long>> {
+	public static final class GenerateSequenceSink extends SinkFunction<Long> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void invoke(Tuple1<Long> tuple) {
-			generateSequenceResult.add(tuple.f0);
+		public void invoke(Long tuple) {
+			generateSequenceResult.add(tuple);
 		}
 
 	}
@@ -170,7 +167,7 @@ public class FlatMapTest {
 		fillFromCollectionSet();
 
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream = env.fromCollection(fromCollectionSet)
+		DataStream<Integer> dataStream = env.fromCollection(fromCollectionSet)
 				.flatMap(new MyFlatMap()).addSink(new MySink());
 
 		fillExpectedList();
@@ -178,43 +175,38 @@ public class FlatMapTest {
 		// parallelShuffleconnectTest
 		fillFromCollectionSet();
 
-		DataStream<Tuple1<Integer>> source = env.fromCollection(fromCollectionSet);
+		DataStream<Integer> source = env.fromCollection(fromCollectionSet);
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> map = source
-				.flatMap(new ParallelFlatMap())
-				.addSink(new MySink());
+		DataStream<Integer> map = source.flatMap(new ParallelFlatMap()).addSink(
+				new MySink());
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> map2 = source
-				.flatMap(new ParallelFlatMap())
-				.addSink(new MySink());
+		DataStream<Integer> map2 = source.flatMap(new ParallelFlatMap()).addSink(
+				new MySink());
 
 		// fromElementsTest
-		DataStream<Tuple1<Integer>> fromElementsMap = env
-				.fromElements(2, 5, 9)
-				.flatMap(new MyFlatMap());
+		DataStream<Integer> fromElementsMap = env.fromElements(2, 5, 9).flatMap(
+				new MyFlatMap());
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> sink = fromElementsMap.addSink(new FromElementsSink());
+		DataStream<Integer> sink = fromElementsMap.addSink(new FromElementsSink());
 
 		fillFromElementsExpected();
 
 		// fromCollectionTest
 		fillFromCollectionSet();
 
-		DataStream<Tuple1<Integer>> fromCollectionMap = env
-				.fromCollection(fromCollectionSet)
+		DataStream<Integer> fromCollectionMap = env.fromCollection(fromCollectionSet)
 				.flatMap(new MyFlatMap());
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> fromCollectionSink = fromCollectionMap
+		DataStream<Integer> fromCollectionSink = fromCollectionMap
 				.addSink(new FromCollectionSink());
 
 		// generateSequenceTest
 		fillSequenceSet();
 
-		DataStream<Tuple1<Long>> generateSequenceMap = env
-				.generateSequence(0, 9)
-				.flatMap(new GenerateSequenceFlatMap());
+		DataStream<Long> generateSequenceMap = env.generateSequence(0, 9).flatMap(
+				new GenerateSequenceFlatMap());
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<Long>> generateSequenceSink = generateSequenceMap
+		DataStream<Long> generateSequenceSink = generateSequenceMap
 				.addSink(new GenerateSequenceSink());
 
 		fillLongSequenceSet();


Mime
View raw message