flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [29/51] [abbrv] git commit: [streaming] Added support for simple types instead of Tuple1 in the API
Date Mon, 18 Aug 2014 17:26:06 GMT
[streaming] Added support for simple types instead of Tuple1 in the API


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/1162caca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/1162caca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/1162caca

Branch: refs/heads/master
Commit: 1162caca857142f237c6c05a04e9f7f2afc89572
Parents: b3cd5fd
Author: gyfora <gyula.fora@gmail.com>
Authored: Sat Aug 2 21:19:18 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Aug 18 16:22:48 2014 +0200

----------------------------------------------------------------------
 .../streaming/connectors/flume/FlumeSink.java   |   6 +-
 .../streaming/connectors/flume/FlumeSource.java |  19 +-
 .../connectors/flume/FlumeTopology.java         |  21 +-
 .../streaming/connectors/kafka/KafkaSink.java   |   3 +-
 .../streaming/connectors/kafka/KafkaSource.java |  11 +-
 .../streaming/connectors/rabbitmq/RMQSink.java  |  13 +-
 .../connectors/rabbitmq/RMQSource.java          |  16 +-
 .../connectors/rabbitmq/RMQTopology.java        |  23 +-
 .../connectors/twitter/TwitterLocal.java        |   9 +-
 .../connectors/twitter/TwitterSource.java       |  16 +-
 .../connectors/twitter/TwitterStreaming.java    |  17 +-
 .../apache/flink/streaming/api/DataStream.java  |  20 +-
 .../streaming/api/IterativeDataStream.java      |   5 +-
 .../flink/streaming/api/StreamConfig.java       |  15 +-
 .../api/StreamExecutionEnvironment.java         |  48 ++--
 .../flink/streaming/api/StreamOperator.java     |   3 +-
 .../api/collector/DirectedStreamCollector.java  |  15 +-
 .../streaming/api/collector/OutputSelector.java |  20 +-
 .../api/collector/StreamCollector.java          |  28 +--
 .../api/function/co/CoMapFunction.java          |   3 +-
 .../api/function/sink/PrintSinkFunction.java    |   3 +-
 .../api/function/sink/SinkFunction.java         |   5 +-
 .../api/function/sink/WriteFormat.java          |   4 +-
 .../api/function/sink/WriteFormatAsCsv.java     |   4 +-
 .../api/function/sink/WriteFormatAsText.java    |   4 +-
 .../api/function/sink/WriteSinkFunction.java    |   4 +-
 .../sink/WriteSinkFunctionByBatches.java        |   3 +-
 .../sink/WriteSinkFunctionByMillis.java         |   3 +-
 .../api/function/source/FileSourceFunction.java |   9 +-
 .../api/function/source/FileStreamFunction.java |   9 +-
 .../function/source/FromElementsFunction.java   |   9 +-
 .../function/source/GenSequenceFunction.java    |   7 +-
 .../api/function/source/SourceFunction.java     |   3 +-
 .../streaming/api/invokable/SinkInvokable.java  |  11 +-
 .../api/invokable/SourceInvokable.java          |   3 +-
 .../api/invokable/StreamComponentInvokable.java |   3 +-
 .../api/invokable/StreamRecordInvokable.java    |   3 +-
 .../api/invokable/UserTaskInvokable.java        |   4 +-
 .../operator/BatchReduceInvokable.java          |   7 +-
 .../api/invokable/operator/FilterInvokable.java |  15 +-
 .../invokable/operator/FlatMapInvokable.java    |  12 +-
 .../api/invokable/operator/MapInvokable.java    |  11 +-
 .../operator/StreamReduceInvokable.java         |   3 +-
 .../operator/WindowReduceInvokable.java         |   9 +-
 .../api/invokable/operator/co/CoInvokable.java  |   3 +-
 .../invokable/operator/co/CoMapInvokable.java   |  12 +-
 .../AbstractStreamComponent.java                |  19 +-
 .../api/streamcomponent/CoStreamTask.java       |  22 +-
 .../SingleInputAbstractStreamComponent.java     |  25 +-
 .../streamcomponent/StreamIterationSink.java    |   6 +-
 .../api/streamcomponent/StreamSink.java         |   7 +-
 .../api/streamcomponent/StreamTask.java         |   4 +-
 .../api/streamrecord/StreamRecord.java          |  55 ++--
 .../streamrecord/StreamRecordSerializer.java    |  20 +-
 .../partitioner/BroadcastPartitioner.java       |   3 +-
 .../partitioner/DistributePartitioner.java      |   3 +-
 .../partitioner/FieldsPartitioner.java          |   5 +-
 .../partitioner/ForwardPartitioner.java         |   3 +-
 .../partitioner/GlobalPartitioner.java          |   3 +-
 .../partitioner/ShufflePartitioner.java         |   3 +-
 .../partitioner/StreamPartitioner.java          |   3 +-
 .../util/serialization/FunctionTypeWrapper.java |  17 +-
 .../util/serialization/ObjectTypeWrapper.java   |  24 +-
 .../serialization/TypeSerializerWrapper.java    |  35 ++-
 .../apache/flink/streaming/api/IterateTest.java |  23 +-
 .../apache/flink/streaming/api/PrintTest.java   |   2 +-
 .../api/collector/DirectedOutputTest.java       |  33 ++-
 .../api/invokable/operator/CoMapTest.java       |  27 +-
 .../api/invokable/operator/FilterTest.java      |  23 +-
 .../api/invokable/operator/FlatMapTest.java     |  80 +++---
 .../api/invokable/operator/MapTest.java         | 250 ++++++++-----------
 .../api/streamcomponent/MockRecordWriter.java   |   2 +-
 .../partitioner/FieldsPartitionerTest.java      |   4 +-
 .../serialization/TypeSerializationTest.java    |  50 ++--
 .../examples/wordcount/WordCountCounter.java    |   7 +-
 .../examples/wordcount/WordCountLocal.java      |   2 +-
 .../examples/wordcount/WordCountSplitter.java   |  12 +-
 77 files changed, 577 insertions(+), 669 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
index 20a3a4a..6f943d1 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -21,10 +21,8 @@ package org.apache.flink.streaming.connectors.flume;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.DataStream;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
@@ -32,10 +30,10 @@ import org.apache.flume.api.RpcClient;
 import org.apache.flume.api.RpcClientFactory;
 import org.apache.flume.event.EventBuilder;
 
-public abstract class FlumeSink<IN extends Tuple> extends SinkFunction<IN> {
+public abstract class FlumeSink<IN> extends SinkFunction<IN> {
 	private static final long serialVersionUID = 1L;
 
-	private static final Log LOG = LogFactory.getLog(RMQSource.class);
+	private static final Log LOG = LogFactory.getLog(FlumeSink.class);
 
 	private transient FlinkRpcClientFacade client;
 	boolean initDone = false;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
index 8b102a8..b141efb 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
@@ -21,17 +21,16 @@ package org.apache.flink.streaming.connectors.flume;
 
 import java.util.List;
 
+import org.apache.flink.streaming.api.DataStream;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.util.Collector;
 import org.apache.flume.Context;
 import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.source.AvroSource;
 import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.flume.source.avro.Status;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.api.DataStream;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.util.Collector;
 
-public abstract class FlumeSource<IN extends Tuple> extends SourceFunction<IN> {
+public abstract class FlumeSource<OUT> extends SourceFunction<OUT> {
 	private static final long serialVersionUID = 1L;
 
 	String host;
@@ -43,7 +42,7 @@ public abstract class FlumeSource<IN extends Tuple> extends SourceFunction<IN> {
 	}
 
 	public class MyAvroSource extends AvroSource {
-		Collector<IN> collector;
+		Collector<OUT> collector;
 
 		/**
 		 * Sends the AvroFlumeEvent from it's argument list to the Apache Flink
@@ -85,7 +84,7 @@ public abstract class FlumeSource<IN extends Tuple> extends SourceFunction<IN> {
 		 */
 		private void collect(AvroFlumeEvent avroEvent) {
 			byte[] b = avroEvent.getBody().array();
-			IN tuple = FlumeSource.this.deserialize(b);
+			OUT tuple = FlumeSource.this.deserialize(b);
 			if (!closeWithoutSend) {
 				collector.collect(tuple);
 			}
@@ -108,7 +107,7 @@ public abstract class FlumeSource<IN extends Tuple> extends SourceFunction<IN> {
 	 *            The incoming message in a byte array
 	 * @return The deserialized message in the required format.
 	 */
-	public abstract IN deserialize(byte[] message);
+	public abstract OUT deserialize(byte[] message);
 
 	/**
 	 * Configures the AvroSource. Also sets the collector so the application can
@@ -117,7 +116,7 @@ public abstract class FlumeSource<IN extends Tuple> extends SourceFunction<IN> {
 	 * @param collector
 	 *            The collector used in the invoke function
 	 */
-	public void configureAvroSource(Collector<IN> collector) {
+	public void configureAvroSource(Collector<OUT> collector) {
 
 		avroSource = new MyAvroSource();
 		avroSource.collector = collector;
@@ -138,7 +137,7 @@ public abstract class FlumeSource<IN extends Tuple> extends SourceFunction<IN> {
 	 *            The Collector for sending data to the datastream
 	 */
 	@Override
-	public void invoke(Collector<IN> collector) throws Exception {
+	public void invoke(Collector<OUT> collector) throws Exception {
 		configureAvroSource(collector);
 		avroSource.start();
 		while (true) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
index 3c45cd4..414795b 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
@@ -20,13 +20,12 @@
 package org.apache.flink.streaming.connectors.flume;
 
 import org.apache.commons.lang.SerializationUtils;
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.streaming.api.DataStream;
 import org.apache.flink.streaming.api.StreamExecutionEnvironment;
 
 public class FlumeTopology {
 
-	public static class MyFlumeSink extends FlumeSink<Tuple1<String>> {
+	public static class MyFlumeSink extends FlumeSink<String> {
 		private static final long serialVersionUID = 1L;
 
 		public MyFlumeSink(String host, int port) {
@@ -34,8 +33,8 @@ public class FlumeTopology {
 		}
 
 		@Override
-		public byte[] serialize(Tuple1<String> tuple) {
-			if (tuple.f0.equals("q")) {
+		public byte[] serialize(String tuple) {
+			if (tuple.equals("q")) {
 				try {
 					sendAndClose();
 				} catch (Exception e) {
@@ -43,12 +42,12 @@ public class FlumeTopology {
 							+ host, e);
 				}
 			}
-			return SerializationUtils.serialize((String) tuple.getField(0));
+			return SerializationUtils.serialize(tuple);
 		}
 
 	}
 
-	public static class MyFlumeSource extends FlumeSource<Tuple1<String>> {
+	public static class MyFlumeSource extends FlumeSource<String> {
 		private static final long serialVersionUID = 1L;
 
 		MyFlumeSource(String host, int port) {
@@ -56,14 +55,12 @@ public class FlumeTopology {
 		}
 
 		@Override
-		public Tuple1<String> deserialize(byte[] msg) {
+		public String deserialize(byte[] msg) {
 			String s = (String) SerializationUtils.deserialize(msg);
-			Tuple1<String> out = new Tuple1<String>();
-			out.f0 = s;
 			if (s.equals("q")) {
 				closeWithoutSend();
 			}
-			return out;
+			return s;
 		}
 
 	}
@@ -73,12 +70,12 @@ public class FlumeTopology {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<String>> dataStream1 = env
+		DataStream<String> dataStream1 = env
 			.addSource(new MyFlumeSource("localhost", 41414))
 			.print();
 
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<String>> dataStream2 = env
+		DataStream<String> dataStream2 = env
 			.fromElements("one", "two", "three", "four", "five", "q")
 			.addSink(new MyFlumeSink("localhost", 42424));
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
index 7e3f3db..955e8dc 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
@@ -25,10 +25,9 @@ import kafka.javaapi.producer.Producer;
 import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
 
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 
-public abstract class KafkaSink<IN extends Tuple, OUT> extends SinkFunction<IN> {
+public abstract class KafkaSink<IN, OUT> extends SinkFunction<IN> {
 	private static final long serialVersionUID = 1L;
 
 	private kafka.javaapi.producer.Producer<Integer, OUT> producer;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
index 623e3b8..228069a 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
@@ -29,12 +29,11 @@ import kafka.consumer.ConsumerIterator;
 import kafka.consumer.KafkaStream;
 import kafka.javaapi.consumer.ConsumerConnector;
 
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.DataStream;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.util.Collector;
 
-public abstract class KafkaSource<IN extends Tuple> extends SourceFunction<IN> {
+public abstract class KafkaSource<OUT> extends SourceFunction<OUT> {
 	private static final long serialVersionUID = 1L;
 
 	private final String zkQuorum;
@@ -45,7 +44,7 @@ public abstract class KafkaSource<IN extends Tuple> extends SourceFunction<IN> {
 	private boolean closeWithoutSend = false;
 	private boolean sendAndClose = false;
 
-	IN outTuple;
+	OUT outTuple;
 
 	public KafkaSource(String zkQuorum, String groupId, String topicId, int numThreads) {
 		this.zkQuorum = zkQuorum;
@@ -74,7 +73,7 @@ public abstract class KafkaSource<IN extends Tuple> extends SourceFunction<IN> {
 	 *            The Collector for sending data to the dataStream
 	 */
 	@Override
-	public void invoke(Collector<IN> collector) throws Exception {
+	public void invoke(Collector<OUT> collector) throws Exception {
 		initializeConnection();
 
 		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
@@ -85,7 +84,7 @@ public abstract class KafkaSource<IN extends Tuple> extends SourceFunction<IN> {
 		ConsumerIterator<byte[], byte[]> it = stream.iterator();
 
 		while (it.hasNext()) {
-			IN out = deserialize(it.next().message());
+			OUT out = deserialize(it.next().message());
 			if (closeWithoutSend) {
 				break;
 			}
@@ -104,7 +103,7 @@ public abstract class KafkaSource<IN extends Tuple> extends SourceFunction<IN> {
 	 *            The incoming message in a byte array
 	 * @return The deserialized message in the required format.
 	 */
-	public abstract IN deserialize(byte[] message);
+	public abstract OUT deserialize(byte[] message);
 
 	/**
 	 * Closes the connection immediately and no further data will be sent.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index d7ed17a..4225cd3 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -21,16 +21,15 @@ package org.apache.flink.streaming.connectors.rabbitmq;
 
 import java.io.IOException;
 
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 
-public abstract class RMQSink<IN extends Tuple> extends SinkFunction<IN> {
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+public abstract class RMQSink<IN> extends SinkFunction<IN> {
 	private static final long serialVersionUID = 1L;
 
 	private static final Log LOG = LogFactory.getLog(RMQSource.class);
@@ -103,7 +102,7 @@ public abstract class RMQSink<IN extends Tuple> extends SinkFunction<IN> {
 	 *            The tuple used for the serialization
 	 * @return The serialized byte array.
 	 */
-	public abstract byte[] serialize(Tuple tuple);
+	public abstract byte[] serialize(IN tuple);
 
 	/**
 	 * Closes the connection.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index dfea55a..8303b1a 100755
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -23,18 +23,16 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.flink.streaming.api.DataStream;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.util.Collector;
 
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.QueueingConsumer;
 
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.api.DataStream;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.util.Collector;
-
-public abstract class RMQSource<IN extends Tuple> extends SourceFunction<IN> {
+public abstract class RMQSource<OUT> extends SourceFunction<OUT> {
 	private static final long serialVersionUID = 1L;
 
 	private static final Log LOG = LogFactory.getLog(RMQSource.class);
@@ -50,7 +48,7 @@ public abstract class RMQSource<IN extends Tuple> extends SourceFunction<IN> {
 	private transient QueueingConsumer consumer;
 	private transient QueueingConsumer.Delivery delivery;
 
-	IN outTuple;
+	OUT outTuple;
 
 	public RMQSource(String HOST_NAME, String QUEUE_NAME) {
 		this.HOST_NAME = HOST_NAME;
@@ -82,7 +80,7 @@ public abstract class RMQSource<IN extends Tuple> extends SourceFunction<IN> {
 	 *            The Collector for sending data to the dataStream
 	 */
 	@Override
-	public void invoke(Collector<IN> collector) throws Exception {
+	public void invoke(Collector<OUT> collector) throws Exception {
 		initializeConnection();
 
 		while (true) {
@@ -122,7 +120,7 @@ public abstract class RMQSource<IN extends Tuple> extends SourceFunction<IN> {
 	 *            The incoming message in a byte array
 	 * @return The deserialized message in the required format.
 	 */
-	public abstract IN deserialize(byte[] message);
+	public abstract OUT deserialize(byte[] message);
 
 	/**
 	 * Closes the connection immediately and no further data will be sent.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
index 828c2fa..94ae43f 100755
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
@@ -20,15 +20,12 @@
 package org.apache.flink.streaming.connectors.rabbitmq;
 
 import org.apache.commons.lang.SerializationUtils;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.streaming.api.DataStream;
 import org.apache.flink.streaming.api.StreamExecutionEnvironment;
 
 public class RMQTopology {
 
-	public static final class MyRMQSink extends RMQSink<Tuple1<String>> {
+	public static final class MyRMQSink extends RMQSink<String> {
 		public MyRMQSink(String HOST_NAME, String QUEUE_NAME) {
 			super(HOST_NAME, QUEUE_NAME);
 		}
@@ -36,16 +33,16 @@ public class RMQTopology {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public byte[] serialize(Tuple t) {
-			if (t.getField(0).equals("q")) {
+		public byte[] serialize(String t) {
+			if (t.equals("q")) {
 				sendAndClose();
 			}
-			return SerializationUtils.serialize((String) t.getField(0));
+			return SerializationUtils.serialize((String) t);
 		}
 
 	}
 
-	public static final class MyRMQSource extends RMQSource<Tuple1<String>> {
+	public static final class MyRMQSource extends RMQSource<String> {
 
 		public MyRMQSource(String HOST_NAME, String QUEUE_NAME) {
 			super(HOST_NAME, QUEUE_NAME);
@@ -54,14 +51,12 @@ public class RMQTopology {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public Tuple1<String> deserialize(byte[] t) {
+		public String deserialize(byte[] t) {
 			String s = (String) SerializationUtils.deserialize(t);
-			Tuple1<String> out = new Tuple1<String>();
-			out.f0 = s;
 			if (s.equals("q")) {
 				closeWithoutSend();
 			}
-			return out;
+			return s;
 		}
 
 	}
@@ -71,12 +66,12 @@ public class RMQTopology {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<String>> dataStream1 = env
+		DataStream<String> dataStream1 = env
 			.addSource(new MyRMQSource("localhost", "hello"))
 			.print();
 
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<String>> dataStream2 = env
+		DataStream<String> dataStream2 = env
 			.fromElements("one", "two", "three", "four", "five", "q")
 			.addSink(new MyRMQSink("localhost", "hello"));
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
index 138fe05..cb868f5 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java
@@ -19,7 +19,6 @@
 
 package org.apache.flink.streaming.connectors.twitter;
 
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.DataStream;
 import org.apache.flink.streaming.api.StreamExecutionEnvironment;
@@ -41,7 +40,7 @@ public class TwitterLocal {
 	 * FlatMapFunction to determine the language of tweets if possible 
 	 */
 	public static class SelectLanguageFlatMap extends
-			JSONParseFlatMap<Tuple1<String>, Tuple1<String>> {
+			JSONParseFlatMap<String, String> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -49,9 +48,9 @@ public class TwitterLocal {
 		 * Select the language from the incoming JSON text
 		 */
 		@Override
-		public void flatMap(Tuple1<String> value, Collector<Tuple1<String>> out) throws Exception {
+		public void flatMap(String value, Collector<String> out) throws Exception {
 
-			out.collect(new Tuple1<String>(colationOfNull(getField(value.f0, "lang"))));
+			out.collect(colationOfNull(getField(value, "lang")));
 		}
 
 		/**
@@ -81,7 +80,7 @@ public class TwitterLocal {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment
 				.createLocalEnvironment(PARALLELISM);
 
-		DataStream<Tuple1<String>> streamSource = env.addSource(new TwitterSource(path, 100),
+		DataStream<String> streamSource = env.addSource(new TwitterSource(path, 100),
 				SOURCE_PARALLELISM);
 
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
index bbff732..bc0995d 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
@@ -29,8 +29,6 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.streaming.api.DataStream;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.util.Collector;
 
@@ -46,9 +44,9 @@ import com.twitter.hbc.httpclient.auth.OAuth1;
  * Implementation of {@link SourceFunction} specialized to emit tweets from Twitter.
  * It can connect to Twitter Streaming API, collect tweets and 
  */
-public class TwitterSource extends SourceFunction<Tuple1<String>> {
+public class TwitterSource extends SourceFunction<String> {
 
-	private static final Log LOG = LogFactory.getLog(DataStream.class);
+	private static final Log LOG = LogFactory.getLog(TwitterSource.class);
 
 	private static final long serialVersionUID = 1L;
 	private String authPath;
@@ -88,7 +86,7 @@ public class TwitterSource extends SourceFunction<Tuple1<String>> {
 	 * 
 	 */
 	@Override
-	public void invoke(Collector<Tuple1<String>> collector) throws Exception {
+	public void invoke(Collector<String> collector) throws Exception {
 
 		initializeConnection();
 
@@ -169,7 +167,7 @@ public class TwitterSource extends SourceFunction<Tuple1<String>> {
 	 * @param collector
 	 * @param piece
 	 */
-	protected void collectMessages(Collector<Tuple1<String>> collector, int piece) {
+	protected void collectMessages(Collector<String> collector, int piece) {
 
 		if (LOG.isInfoEnabled()) {
 			LOG.info("Collecting tweets");
@@ -189,7 +187,7 @@ public class TwitterSource extends SourceFunction<Tuple1<String>> {
 	 * @param collector
 	 * 
 	 */
-	protected void collectMessages(Collector<Tuple1<String>> collector) {
+	protected void collectMessages(Collector<String> collector) {
 
 		if (LOG.isInfoEnabled()) {
 			LOG.info("Tweet-stream begins");
@@ -204,7 +202,7 @@ public class TwitterSource extends SourceFunction<Tuple1<String>> {
 	 * Put one tweet into the collector.
 	 * @param collector
 	 */
-	protected void collectOneMessage(Collector<Tuple1<String>> collector) {
+	protected void collectOneMessage(Collector<String> collector) {
 		if (client.isDone()) {
 			if (LOG.isErrorEnabled()) {
 				LOG.error("Client connection closed unexpectedly: "
@@ -215,7 +213,7 @@ public class TwitterSource extends SourceFunction<Tuple1<String>> {
 		try {
 			String msg = queue.poll(waitSec, TimeUnit.SECONDS);
 			if (msg != null) {
-				collector.collect(new Tuple1<String>(msg));
+				collector.collect(msg);
 			} else {
 				if (LOG.isInfoEnabled()) {
 					LOG.info("Did not receive a message in " + waitSec

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
index 805bf06..ee986ea 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
@@ -19,7 +19,6 @@
 
 package org.apache.flink.streaming.connectors.twitter;
 
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.streaming.api.DataStream;
 import org.apache.flink.streaming.api.StreamExecutionEnvironment;
@@ -47,21 +46,21 @@ public class TwitterStreaming {
 	}
 	
 	public static class SelectDataFlatMap extends
-			JSONParseFlatMap<Tuple1<String>, Tuple5<Long, Long, String, String, String>> {
+			JSONParseFlatMap<String, Tuple5<Long, Long, String, String, String>> {
 
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void flatMap(Tuple1<String> value,
+		public void flatMap(String value,
 				Collector<Tuple5<Long, Long, String, String, String>> out)
 				throws Exception {
 
 			out.collect(new Tuple5<Long, Long, String, String, String>(
-					convertDateString2Long(getField(value.f0, "id")),
-					convertDateString2LongDate(getField(value.f0, "created_at")),
-					colationOfNull(getField(value.f0, "user.name")),
-					colationOfNull(getField(value.f0, "text")),
-					getField(value.f0, "lang")));
+					convertDateString2Long(getField(value, "id")),
+					convertDateString2LongDate(getField(value, "created_at")),
+					colationOfNull(getField(value, "user.name")),
+					colationOfNull(getField(value, "text")),
+					getField(value, "lang")));
 		}
 		
 		protected String colationOfNull(String in){
@@ -94,7 +93,7 @@ public class TwitterStreaming {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment
 				.createLocalEnvironment(PARALLELISM);
 
-		DataStream<Tuple1<String>> streamSource = env.addSource(
+		DataStream<String> streamSource = env.addSource(
 				new TwitterSource(path,100), SOURCE_PARALLELISM);
 
 		DataStream<Tuple5<Long, Long, String, String, String>> selectedDataStream = streamSource

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
index 1bde6a6..d0f1294 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
@@ -70,7 +70,7 @@ import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
  *            The type of the DataStream, i.e., the type of the elements of the
  *            DataStream.
  */
-public class DataStream<T extends Tuple> {
+public class DataStream<T> {
 
 	protected static Integer counter = 0;
 	protected final StreamExecutionEnvironment environment;
@@ -352,7 +352,7 @@ public class DataStream<T extends Tuple> {
 	 *            output type
 	 * @return The transformed DataStream.
 	 */
-	public <R extends Tuple> StreamOperator<T, R> map(MapFunction<T, R> mapper) {
+	public <R> StreamOperator<T, R> map(MapFunction<T, R> mapper) {
 		return addFunction("map", mapper, new FunctionTypeWrapper<T, Tuple, R>(mapper,
 				MapFunction.class, 0, -1, 1), new MapInvokable<T, R>(mapper));
 	}
@@ -372,7 +372,7 @@ public class DataStream<T extends Tuple> {
 	 *            {@link CoMapFunction#map2(Tuple)}
 	 * @return The transformed DataStream
 	 */
-	public <T2 extends Tuple, R extends Tuple> DataStream<R> coMapWith(
+	public <T2, R> DataStream<R> coMapWith(
 			CoMapFunction<T, T2, R> coMapper, DataStream<T2> otherStream) {
 		return addCoFunction("coMap", new DataStream<T>(this), new DataStream<T2>(otherStream),
 				coMapper,
@@ -394,7 +394,7 @@ public class DataStream<T extends Tuple> {
 	 *            output type
 	 * @return The transformed DataStream.
 	 */
-	public <R extends Tuple> StreamOperator<T, R> flatMap(FlatMapFunction<T, R> flatMapper) {
+	public <R> StreamOperator<T, R> flatMap(FlatMapFunction<T, R> flatMapper) {
 		return addFunction("flatMap", flatMapper, new FunctionTypeWrapper<T, Tuple, R>(flatMapper,
 				FlatMapFunction.class, 0, -1, 1), new FlatMapInvokable<T, R>(flatMapper));
 	}
@@ -430,7 +430,7 @@ public class DataStream<T extends Tuple> {
 	 *            output type
 	 * @return The modified DataStream.
 	 */
-	public <R extends Tuple> StreamOperator<T, R> batchReduce(GroupReduceFunction<T, R> reducer,
+	public <R> StreamOperator<T, R> batchReduce(GroupReduceFunction<T, R> reducer,
 			int batchSize) {
 		return addFunction("batchReduce", reducer, new FunctionTypeWrapper<T, Tuple, R>(reducer,
 				GroupReduceFunction.class, 0, -1, 1), new BatchReduceInvokable<T, R>(reducer,
@@ -453,7 +453,7 @@ public class DataStream<T extends Tuple> {
 	 *            output type
 	 * @return The modified DataStream.
 	 */
-	public <R extends Tuple> StreamOperator<T, R> windowReduce(GroupReduceFunction<T, R> reducer,
+	public <R> StreamOperator<T, R> windowReduce(GroupReduceFunction<T, R> reducer,
 			long windowSize) {
 		return addFunction("batchReduce", reducer, new FunctionTypeWrapper<T, Tuple, R>(reducer,
 				GroupReduceFunction.class, 0, -1, 1), new WindowReduceInvokable<T, R>(reducer,
@@ -476,7 +476,7 @@ public class DataStream<T extends Tuple> {
 	 *            type of the return stream
 	 * @return the data stream constructed
 	 */
-	private <R extends Tuple> StreamOperator<T, R> addFunction(String functionName,
+	private <R> StreamOperator<T, R> addFunction(String functionName,
 			final AbstractFunction function, TypeSerializerWrapper<T, Tuple, R> typeWrapper,
 			UserTaskInvokable<T, R> functionInvokable) {
 
@@ -500,7 +500,7 @@ public class DataStream<T extends Tuple> {
 		return returnStream;
 	}
 
-	protected <T1 extends Tuple, T2 extends Tuple, R extends Tuple> DataStream<R> addCoFunction(
+	protected <T1, T2, R> DataStream<R> addCoFunction(
 			String functionName, DataStream<T1> inputStream1, DataStream<T2> inputStream2,
 			final AbstractFunction function, TypeSerializerWrapper<T1, T2, R> typeWrapper,
 			CoInvokable<T1, T2, R> functionInvokable) {
@@ -535,7 +535,7 @@ public class DataStream<T extends Tuple> {
 	 * @param typeNumber
 	 *            Number of the type (used at co-functions)
 	 */
-	<X extends Tuple> void connectGraph(DataStream<X> inputStream, String outputID, int typeNumber) {
+	<X> void connectGraph(DataStream<X> inputStream, String outputID, int typeNumber) {
 		for (int i = 0; i < inputStream.connectIDs.size(); i++) {
 			String inputID = inputStream.connectIDs.get(i);
 			StreamPartitioner<X> partitioner = inputStream.partitioners.get(i);
@@ -926,7 +926,7 @@ public class DataStream<T extends Tuple> {
 		return new IterativeDataStream<T>(this);
 	}
 
-	protected <R extends Tuple> DataStream<T> addIterationSource(String iterationID) {
+	protected <R> DataStream<T> addIterationSource(String iterationID) {
 		DataStream<R> returnStream = new DataStream<R>(environment, "iterationSource");
 
 		jobGraphBuilder.addIterationSource(returnStream.getId(), this.getId(), iterationID,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/IterativeDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/IterativeDataStream.java
index 1cfb625..bfce834 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/IterativeDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/IterativeDataStream.java
@@ -19,7 +19,6 @@
 
 package org.apache.flink.streaming.api;
 
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.partitioner.ForwardPartitioner;
 
 /**
@@ -29,7 +28,7 @@ import org.apache.flink.streaming.partitioner.ForwardPartitioner;
  * @param <T>
  *            Type of the DataStream
  */
-public class IterativeDataStream<T extends Tuple> extends StreamOperator<T, T> {
+public class IterativeDataStream<T> extends StreamOperator<T, T> {
 
 	static Integer iterationCount = 0;
 
@@ -69,7 +68,7 @@ public class IterativeDataStream<T extends Tuple> extends StreamOperator<T, T> {
 	 *            when used with directed emits
 	 * 
 	 */
-	public <R extends Tuple> DataStream<T> closeWith(DataStream<T> iterationResult,
+	public <R> DataStream<T> closeWith(DataStream<T> iterationResult,
 			String iterationName) {
 		DataStream<R> returnStream = new DataStream<R>(environment, "iterationSink");
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 3d49928..a102a00 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -24,7 +24,6 @@ import java.io.IOException;
 import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
@@ -74,12 +73,12 @@ public class StreamConfig {
 	// CONFIGS
 
 	public void setTypeWrapper(
-			TypeSerializerWrapper<? extends Tuple, ? extends Tuple, ? extends Tuple> typeWrapper) {
+			TypeSerializerWrapper<?, ?, ?> typeWrapper) {
 		config.setBytes("typeWrapper", SerializationUtils.serialize(typeWrapper));
 	}
 
 	@SuppressWarnings("unchecked")
-	public <IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> TypeSerializerWrapper<IN1, IN2, OUT> getTypeWrapper() {
+	public <IN1, IN2, OUT> TypeSerializerWrapper<IN1, IN2, OUT> getTypeWrapper() {
 		byte[] serializedWrapper = config.getBytes("typeWrapper", null);
 
 		if (serializedWrapper == null) {
@@ -106,7 +105,7 @@ public class StreamConfig {
 		return config.getLong(BUFFER_TIMEOUT, DEFAULT_TIMEOUT);
 	}
 
-	public void setUserInvokable(StreamComponentInvokable<? extends Tuple> invokableObject) {
+	public void setUserInvokable(StreamComponentInvokable<?> invokableObject) {
 		if (invokableObject != null) {
 			config.setClass(USER_FUNCTION, invokableObject.getClass());
 
@@ -125,7 +124,7 @@ public class StreamConfig {
 	// return (Class<? extends T>) config.getClass(USER_FUNCTION, null);
 	// }
 
-	public <T extends Tuple> StreamComponentInvokable<T> getUserInvokableObject() {
+	public <T> StreamComponentInvokable<T> getUserInvokableObject() {
 		try {
 			return deserializeObject(config.getBytes(SERIALIZEDUDF, null));
 		} catch (Exception e) {
@@ -186,7 +185,7 @@ public class StreamConfig {
 		}
 	}
 
-	public <T extends Tuple> OutputSelector<T> getOutputSelector() {
+	public <T> OutputSelector<T> getOutputSelector() {
 		try {
 			return deserializeObject(config.getBytes(OUTPUT_SELECTOR, null));
 		} catch (Exception e) {
@@ -211,14 +210,14 @@ public class StreamConfig {
 		return config.getInteger(NUMBER_OF_OUTPUT_CHANNELS + outputIndex, 0);
 	}
 
-	public <T extends Tuple> void setPartitioner(int outputIndex,
+	public <T> void setPartitioner(int outputIndex,
 			StreamPartitioner<T> partitionerObject) {
 
 		config.setBytes(PARTITIONER_OBJECT + outputIndex,
 				SerializationUtils.serialize(partitionerObject));
 	}
 
-	public <T extends Tuple> StreamPartitioner<T> getPartitioner(int outputIndex)
+	public <T> StreamPartitioner<T> getPartitioner(int outputIndex)
 			throws ClassNotFoundException, IOException {
 		return deserializeObject(config.getBytes(PARTITIONER_OBJECT + outputIndex,
 				SerializationUtils.serialize(new ShufflePartitioner<T>())));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
index 4539126..0e77912 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
@@ -26,7 +26,6 @@ import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.streaming.api.function.source.FileSourceFunction;
 import org.apache.flink.streaming.api.function.source.FileStreamFunction;
 import org.apache.flink.streaming.api.function.source.FromElementsFunction;
@@ -152,11 +151,11 @@ public abstract class StreamExecutionEnvironment {
 	 *            "file:///some/local/file" or "hdfs://host:port/file/path").
 	 * @return The DataStream representing the text file.
 	 */
-	public DataStream<Tuple1<String>> readTextFile(String filePath) {
+	public DataStream<String> readTextFile(String filePath) {
 		return addSource(new FileSourceFunction(filePath), 1);
 	}
 
-	public DataStream<Tuple1<String>> readTextFile(String filePath, int parallelism) {
+	public DataStream<String> readTextFile(String filePath, int parallelism) {
 		return addSource(new FileSourceFunction(filePath), parallelism);
 	}
 
@@ -170,11 +169,11 @@ public abstract class StreamExecutionEnvironment {
 	 *            "file:///some/local/file" or "hdfs://host:port/file/path").
 	 * @return The DataStream representing the text file.
 	 */
-	public DataStream<Tuple1<String>> readTextStream(String filePath) {
+	public DataStream<String> readTextStream(String filePath) {
 		return addSource(new FileStreamFunction(filePath), 1);
 	}
 
-	public DataStream<Tuple1<String>> readTextStream(String filePath, int parallelism) {
+	public DataStream<String> readTextStream(String filePath, int parallelism) {
 		return addSource(new FileStreamFunction(filePath), parallelism);
 	}
 
@@ -191,15 +190,14 @@ public abstract class StreamExecutionEnvironment {
 	 *            type of the returned stream
 	 * @return The DataStream representing the elements.
 	 */
-	public <X extends Serializable> DataStream<Tuple1<X>> fromElements(X... data) {
-		DataStream<Tuple1<X>> returnStream = new DataStream<Tuple1<X>>(this, "elements");
+	public <X extends Serializable> DataStream<X> fromElements(X... data) {
+		DataStream<X> returnStream = new DataStream<X>(this, "elements");
 
 		try {
-			SourceFunction<Tuple1<X>> function = new FromElementsFunction<X>(data);
-			jobGraphBuilder.addSource(returnStream.getId(),
-					new SourceInvokable<Tuple1<X>>(function),
-					new ObjectTypeWrapper<Tuple1<X>, Tuple, Tuple1<X>>(data[0], null, data[0]),
-					"source", SerializationUtils.serialize(function), 1);
+			SourceFunction<X> function = new FromElementsFunction<X>(data);
+			jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<X>(function),
+					new ObjectTypeWrapper<X, Tuple, X>(data[0], null, data[0]), "source",
+					SerializationUtils.serialize(function), 1);
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize elements");
 		}
@@ -218,22 +216,22 @@ public abstract class StreamExecutionEnvironment {
 	 *            type of the returned stream
 	 * @return The DataStream representing the elements.
 	 */
-	public <X extends Serializable> DataStream<Tuple1<X>> fromCollection(Collection<X> data) {
-		DataStream<Tuple1<X>> returnStream = new DataStream<Tuple1<X>>(this, "elements");
+	@SuppressWarnings("unchecked")
+	public <X extends Serializable> DataStream<X> fromCollection(Collection<X> data) {
+		DataStream<X> returnStream = new DataStream<X>(this, "elements");
 
 		if (data.isEmpty()) {
 			throw new RuntimeException("Collection must not be empty");
 		}
 
 		try {
-			SourceFunction<Tuple1<X>> function = new FromElementsFunction<X>(data);
-
-			jobGraphBuilder
-					.addSource(returnStream.getId(), new SourceInvokable<Tuple1<X>>(
-							new FromElementsFunction<X>(data)),
-							new ObjectTypeWrapper<Tuple1<X>, Tuple, Tuple1<X>>(data.toArray()[0],
-									null, data.toArray()[0]), "source", SerializationUtils
-									.serialize(function), 1);
+			SourceFunction<X> function = new FromElementsFunction<X>(data);
+
+			jobGraphBuilder.addSource(
+					returnStream.getId(),
+					new SourceInvokable<X>(new FromElementsFunction<X>(data)),
+					new ObjectTypeWrapper<X, Tuple, X>((X) data.toArray()[0], null, (X) data
+							.toArray()[0]), "source", SerializationUtils.serialize(function), 1);
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize collection");
 		}
@@ -250,7 +248,7 @@ public abstract class StreamExecutionEnvironment {
 	 *            The number to stop at (inclusive)
 	 * @return A DataStrean, containing all number in the [from, to] interval.
 	 */
-	public DataStream<Tuple1<Long>> generateSequence(long from, long to) {
+	public DataStream<Long> generateSequence(long from, long to) {
 		return addSource(new GenSequenceFunction(from, to), 1);
 	}
 
@@ -265,7 +263,7 @@ public abstract class StreamExecutionEnvironment {
 	 *            type of the returned stream
 	 * @return the data stream constructed
 	 */
-	public <T extends Tuple> DataStream<T> addSource(SourceFunction<T> function, int parallelism) {
+	public <T> DataStream<T> addSource(SourceFunction<T> function, int parallelism) {
 		DataStream<T> returnStream = new DataStream<T>(this, "source");
 
 		try {
@@ -279,7 +277,7 @@ public abstract class StreamExecutionEnvironment {
 		return returnStream;
 	}
 
-	public <T extends Tuple> DataStream<T> addSource(SourceFunction<T> sourceFunction) {
+	public <T> DataStream<T> addSource(SourceFunction<T> sourceFunction) {
 		return addSource(sourceFunction, 1);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamOperator.java
index d8adb97..7edde1c 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamOperator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamOperator.java
@@ -19,9 +19,8 @@
 
 package org.apache.flink.streaming.api;
 
-import org.apache.flink.api.java.tuple.Tuple;
 
-public class StreamOperator<IN extends Tuple, OUT extends Tuple> extends DataStream<OUT> {
+public class StreamOperator<IN, OUT > extends DataStream<OUT> {
 
 	protected StreamOperator(StreamExecutionEnvironment environment, String operatorType) {
 		super(environment, operatorType);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
index f968b83..73a5749 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
@@ -23,7 +23,6 @@ import java.util.Collection;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.runtime.io.network.api.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
@@ -36,7 +35,7 @@ import org.apache.flink.util.StringUtils;
  * @param <T>
  *            Type of the Tuple collected.
  */
-public class DirectedStreamCollector<T extends Tuple> extends StreamCollector<T> {
+public class DirectedStreamCollector<T> extends StreamCollector<T> {
 
 	OutputSelector<T> outputSelector;
 	private static final Log log = LogFactory.getLog(DirectedStreamCollector.class);
@@ -47,7 +46,7 @@ public class DirectedStreamCollector<T extends Tuple> extends StreamCollector<T>
 	 * @param channelID
 	 *            Channel ID of the Task
 	 * @param serializationDelegate
-	 *            Serialization delegate used for tuple serialization
+	 *            Serialization delegate used for serialization
 	 * @param outputSelector
 	 *            User defined {@link OutputSelector}
 	 */
@@ -63,12 +62,12 @@ public class DirectedStreamCollector<T extends Tuple> extends StreamCollector<T>
 	 * Collects and emits a tuple to the outputs by reusing a StreamRecord
 	 * object.
 	 * 
-	 * @param tuple
-	 *            Tuple to be collected and emitted.
+	 * @param outputObject
+	 *            Object to be collected and emitted.
 	 */
 	@Override
-	public void collect(T tuple) {
-		streamRecord.setTuple(tuple);
+	public void collect(T outputObject) {
+		streamRecord.setObject(outputObject);
 		emit(streamRecord);
 	}
 
@@ -80,7 +79,7 @@ public class DirectedStreamCollector<T extends Tuple> extends StreamCollector<T>
 	 *            Record to emit.
 	 */
 	private void emit(StreamRecord<T> streamRecord) {
-		Collection<String> outputNames = outputSelector.getOutputs(streamRecord.getTuple());
+		Collection<String> outputNames = outputSelector.getOutputs(streamRecord.getObject());
 		streamRecord.setId(channelID);
 		serializationDelegate.setInstance(streamRecord);
 		for (String outputName : outputNames) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
index c4262b6..6d63385 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
@@ -23,17 +23,15 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 
-import org.apache.flink.api.java.tuple.Tuple;
-
 /**
  * Class for defining an OutputSelector for the directTo operator. Every output
- * tuple of a directed DataStream will run through this operator to select
+ * object of a directed DataStream will run through this operator to select
  * outputs.
  * 
  * @param <T>
- *            Type parameter of the directed tuples.
+ *            Type parameter of the directed tuples/objects.
  */
-public abstract class OutputSelector<T extends Tuple> implements Serializable {
+public abstract class OutputSelector<T> implements Serializable {
 	private static final long serialVersionUID = 1L;
 
 	private Collection<String> outputs;
@@ -42,21 +40,21 @@ public abstract class OutputSelector<T extends Tuple> implements Serializable {
 		outputs = new ArrayList<String>();
 	}
 
-	Collection<String> getOutputs(T tuple) {
+	Collection<String> getOutputs(T outputObject) {
 		outputs.clear();
-		select(tuple, outputs);
+		select(outputObject, outputs);
 		return outputs;
 	}
 
 	/**
-	 * Method for selecting output names for the emitted tuples when using the
+	 * Method for selecting output names for the emitted objects when using the
 	 * directTo operator. The tuple will be emitted only to output names which
 	 * are added to the outputs collection.
 	 * 
-	 * @param tuple
-	 *            Tuple for which the output selection should be made.
+	 * @param outputObject
+	 *            Output object for which the output selection should be made.
 	 * @param outputs
 	 *            Selected output names should be added to this collection.
 	 */
-	public abstract void select(T tuple, Collection<String> outputs);
+	public abstract void select(T outputObject, Collection<String> outputs);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
index 544a695..4317f75 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
@@ -26,7 +26,6 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.runtime.io.network.api.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
@@ -34,14 +33,14 @@ import org.apache.flink.util.Collector;
 import org.apache.flink.util.StringUtils;
 
 /**
- * Collector for tuples in Apache Flink stream processing. The collected tuples
- * will be wrapped with ID in a {@link StreamRecord} and then emitted to the
- * outputs.
+ * Collector for tuples in Apache Flink stream processing. The collected
+ * tuples/obecjts will be wrapped with ID in a {@link StreamRecord} and then
+ * emitted to the outputs.
  * 
  * @param <T>
- *            Type of the Tuple collected.
+ *            Type of the Tuples/Objects collected.
  */
-public class StreamCollector<T extends Tuple> implements Collector<T> {
+public class StreamCollector<T> implements Collector<T> {
 
 	private static final Log LOG = LogFactory.getLog(StreamCollector.class);
 
@@ -57,9 +56,10 @@ public class StreamCollector<T extends Tuple> implements Collector<T> {
 	 * @param channelID
 	 *            Channel ID of the Task
 	 * @param serializationDelegate
-	 *            Serialization delegate used for tuple serialization
+	 *            Serialization delegate used for serialization
 	 */
-	public StreamCollector(int channelID, SerializationDelegate<StreamRecord<T>> serializationDelegate) {
+	public StreamCollector(int channelID,
+			SerializationDelegate<StreamRecord<T>> serializationDelegate) {
 
 		this.serializationDelegate = serializationDelegate;
 		this.streamRecord = new StreamRecord<T>();
@@ -92,15 +92,15 @@ public class StreamCollector<T extends Tuple> implements Collector<T> {
 	}
 
 	/**
-	 * Collects and emits a tuple to the outputs by reusing a StreamRecord
-	 * object.
+	 * Collects and emits a tuple/object to the outputs by reusing a
+	 * StreamRecord object.
 	 * 
-	 * @param tuple
-	 *            Tuple to be collected and emitted.
+	 * @param outputObject
+	 *            Object to be collected and emitted.
 	 */
 	@Override
-	public void collect(T tuple) {
-		streamRecord.setTuple(tuple);
+	public void collect(T outputObject) {
+		streamRecord.setObject(outputObject);
 		emit(streamRecord);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
index 5885cbf..6e4d877 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
@@ -19,9 +19,8 @@
 package org.apache.flink.streaming.api.function.co;
 
 import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.java.tuple.Tuple;
 
-public abstract class CoMapFunction<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> extends AbstractFunction {
+public abstract class CoMapFunction<IN1, IN2, OUT> extends AbstractFunction {
 	private static final long serialVersionUID = 1L;
 	
 	public abstract OUT map1(IN1 value);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
index 7918e48..026c18e 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
@@ -19,7 +19,6 @@
 
 package org.apache.flink.streaming.api.function.sink;
 
-import org.apache.flink.api.java.tuple.Tuple;
 
 /**
  * Dummy implementation of the SinkFunction writing every tuple to the standard
@@ -28,7 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple;
  * @param <IN>
  *            Input tuple type
  */
-public class PrintSinkFunction<IN extends Tuple> extends SinkFunction<IN> {
+public class PrintSinkFunction<IN> extends SinkFunction<IN> {
 	private static final long serialVersionUID = 1L;
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
index cc4fb96..867c9f8 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
@@ -22,12 +22,11 @@ package org.apache.flink.streaming.api.function.sink;
 import java.io.Serializable;
 
 import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.java.tuple.Tuple;
 
-public abstract class SinkFunction<IN extends Tuple> extends AbstractFunction implements Serializable {
+public abstract class SinkFunction<IN> extends AbstractFunction implements Serializable {
 
 	private static final long serialVersionUID = 1L;
 
-	public abstract void invoke(IN tuple);
+	public abstract void invoke(IN value);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormat.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormat.java
index 18853b3..3e93a97 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormat.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormat.java
@@ -22,8 +22,6 @@ package org.apache.flink.streaming.api.function.sink;
 import java.io.Serializable;
 import java.util.ArrayList;
 
-import org.apache.flink.api.java.tuple.Tuple;
-
 /**
  * Abstract class for formatting the output of the writeAsText and writeAsCsv
  * functions.
@@ -31,7 +29,7 @@ import org.apache.flink.api.java.tuple.Tuple;
  * @param <IN>
  *            Input tuple type
  */
-public abstract class WriteFormat<IN extends Tuple> implements Serializable {
+public abstract class WriteFormat<IN> implements Serializable {
 	private static final long serialVersionUID = 1L;
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java
index e10a9c8..5fa099f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsCsv.java
@@ -25,15 +25,13 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
 
-import org.apache.flink.api.java.tuple.Tuple;
-
 /**
  * Writes tuples in csv format.
  *
  * @param <IN>
  *            Input tuple type
  */
-public class WriteFormatAsCsv<IN extends Tuple> extends WriteFormat<IN> {
+public class WriteFormatAsCsv<IN> extends WriteFormat<IN> {
 	private static final long serialVersionUID = 1L;
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsText.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsText.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsText.java
index 2d591ae..6a82877 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsText.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteFormatAsText.java
@@ -25,15 +25,13 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
 
-import org.apache.flink.api.java.tuple.Tuple;
-
 /**
  * Writes tuples in text format.
  *
  * @param <IN>
  *            Input tuple type
  */
-public class WriteFormatAsText<IN extends Tuple> extends WriteFormat<IN> {
+public class WriteFormatAsText<IN> extends WriteFormat<IN> {
 	private static final long serialVersionUID = 1L;
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
index d473190..774dd63 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunction.java
@@ -23,8 +23,6 @@ import java.io.FileNotFoundException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
 
-import org.apache.flink.api.java.tuple.Tuple;
-
 /**
  * Simple implementation of the SinkFunction writing tuples as simple text to
  * the file specified by path. Tuples are collected to a list and written to the
@@ -34,7 +32,7 @@ import org.apache.flink.api.java.tuple.Tuple;
  * @param <IN>
  *            Input tuple type
  */
-public abstract class WriteSinkFunction<IN extends Tuple> extends SinkFunction<IN> {
+public abstract class WriteSinkFunction<IN> extends SinkFunction<IN> {
 	private static final long serialVersionUID = 1L;
 
 	protected final String path;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByBatches.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByBatches.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByBatches.java
index 3797d13..c860c52 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByBatches.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByBatches.java
@@ -19,7 +19,6 @@
 
 package org.apache.flink.streaming.api.function.sink;
 
-import org.apache.flink.api.java.tuple.Tuple;
 
 /**
  * Implementation of WriteSinkFunction. Writes tuples to file in equally sized
@@ -28,7 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple;
  * @param <IN>
  *            Input tuple type
  */
-public class WriteSinkFunctionByBatches<IN extends Tuple> extends WriteSinkFunction<IN> {
+public class WriteSinkFunctionByBatches<IN> extends WriteSinkFunction<IN> {
 	private static final long serialVersionUID = 1L;
 
 	private final int batchSize;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
index cb77e6d..9271f36 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
@@ -19,7 +19,6 @@
 
 package org.apache.flink.streaming.api.function.sink;
 
-import org.apache.flink.api.java.tuple.Tuple;
 
 /**
  * Implementation of WriteSinkFunction. Writes tuples to file in every millis
@@ -28,7 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple;
  * @param <IN>
  *            Input tuple type
  */
-public class WriteSinkFunctionByMillis<IN extends Tuple> extends WriteSinkFunction<IN> {
+public class WriteSinkFunctionByMillis<IN> extends WriteSinkFunction<IN> {
 	private static final long serialVersionUID = 1L;
 
 	private final long millis;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
index f6c2c72..3a732be 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
@@ -23,27 +23,24 @@ import java.io.BufferedReader;
 import java.io.FileReader;
 import java.io.IOException;
 
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.util.Collector;
 
-public class FileSourceFunction extends SourceFunction<Tuple1<String>> {
+public class FileSourceFunction extends SourceFunction<String> {
 	private static final long serialVersionUID = 1L;
 
 	private final String path;
-	private Tuple1<String> outTuple = new Tuple1<String>();
 
 	public FileSourceFunction(String path) {
 		this.path = path;
 	}
 
 	@Override
-	public void invoke(Collector<Tuple1<String>> collector) throws IOException {
+	public void invoke(Collector<String> collector) throws IOException {
 		BufferedReader br = new BufferedReader(new FileReader(path));
 		String line = br.readLine();
 		while (line != null) {
 			if (line != "") {
-				outTuple.f0 = line;
-				collector.collect(outTuple);
+				collector.collect(line);
 			}
 			line = br.readLine();
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
index edadfc3..9cfb2ce 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
@@ -23,28 +23,25 @@ import java.io.BufferedReader;
 import java.io.FileReader;
 import java.io.IOException;
 
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.util.Collector;
 
-public class FileStreamFunction extends SourceFunction<Tuple1<String>> {
+public class FileStreamFunction extends SourceFunction<String> {
 	private static final long serialVersionUID = 1L;
 
 	private final String path;
-	private Tuple1<String> outTuple = new Tuple1<String>();
 
 	public FileStreamFunction(String path) {
 		this.path = path;
 	}
 
 	@Override
-	public void invoke(Collector<Tuple1<String>> collector) throws IOException {
+	public void invoke(Collector<String> collector) throws IOException {
 		while (true) {
 			BufferedReader br = new BufferedReader(new FileReader(path));
 			String line = br.readLine();
 			while (line != null) {
 				if (line != "") {
-					outTuple.f0 = line;
-					collector.collect(outTuple);
+					collector.collect(line);
 				}
 				line = br.readLine();
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
index dfe29d2..89f5182 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
@@ -22,14 +22,12 @@ package org.apache.flink.streaming.api.function.source;
 import java.util.Arrays;
 import java.util.Collection;
 
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.util.Collector;
 
-public class FromElementsFunction<T> extends SourceFunction<Tuple1<T>> {
+public class FromElementsFunction<T> extends SourceFunction<T> {
 	private static final long serialVersionUID = 1L;
 
 	Iterable<T> iterable;
-	Tuple1<T> outTuple = new Tuple1<T>();
 
 	public FromElementsFunction(T... elements) {
 		this.iterable = Arrays.asList(elements);
@@ -40,10 +38,9 @@ public class FromElementsFunction<T> extends SourceFunction<Tuple1<T>> {
 	}
 
 	@Override
-	public void invoke(Collector<Tuple1<T>> collector) throws Exception {
+	public void invoke(Collector<T> collector) throws Exception {
 		for (T element : iterable) {
-			outTuple.f0 = element;
-			collector.collect(outTuple);
+			collector.collect(element);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
index 706295e..d402374 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
@@ -26,7 +26,7 @@ import org.apache.flink.util.Collector;
  * Source Function used to generate the number sequence
  * 
  */
-public class GenSequenceFunction extends SourceFunction<Tuple1<Long>> {
+public class GenSequenceFunction extends SourceFunction<Long> {
 
 	private static final long serialVersionUID = 1L;
 
@@ -40,10 +40,9 @@ public class GenSequenceFunction extends SourceFunction<Tuple1<Long>> {
 	}
 
 	@Override
-	public void invoke(Collector<Tuple1<Long>> collector) throws Exception {
+	public void invoke(Collector<Long> collector) throws Exception {
 		for (long i = from; i <= to; i++) {
-			outTuple.f0 = i;
-			collector.collect(outTuple);
+			collector.collect(i);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
index 971533f..01d4dac 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
@@ -20,10 +20,9 @@
 package org.apache.flink.streaming.api.function.source;
 
 import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.util.Collector;
 
-public abstract class SourceFunction<OUT extends Tuple> extends AbstractFunction {
+public abstract class SourceFunction<OUT> extends AbstractFunction {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
index 81cfa81..92b1ea6 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
@@ -19,10 +19,9 @@
 
 package org.apache.flink.streaming.api.invokable;
 
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 
-public class SinkInvokable<IN extends Tuple> extends StreamRecordInvokable<IN, IN> {
+public class SinkInvokable<IN> extends StreamRecordInvokable<IN, IN> {
 	private static final long serialVersionUID = 1L;
 
 	private SinkFunction<IN> sinkFunction;
@@ -33,16 +32,16 @@ public class SinkInvokable<IN extends Tuple> extends StreamRecordInvokable<IN, I
 
 	@Override
 	protected void immutableInvoke() throws Exception {
-		while (recordIterator.next(reuse) != null) {
-			sinkFunction.invoke((IN) reuse.getTuple());
+		while ((reuse = recordIterator.next(reuse)) != null) {
+			sinkFunction.invoke((IN) reuse.getObject());
 			resetReuse();
 		}
 	}
 
 	@Override
 	protected void mutableInvoke() throws Exception {
-		while (recordIterator.next(reuse) != null) {
-			sinkFunction.invoke((IN) reuse.getTuple());
+		while ((reuse = recordIterator.next(reuse)) != null) {
+			sinkFunction.invoke((IN) reuse.getObject());
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
index 992a25e..c7f0f09 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
@@ -21,10 +21,9 @@ package org.apache.flink.streaming.api.invokable;
 
 import java.io.Serializable;
 
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 
-public class SourceInvokable<OUT extends Tuple> extends StreamComponentInvokable<OUT> implements
+public class SourceInvokable<OUT> extends StreamComponentInvokable<OUT> implements
 		Serializable {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
index daa7378..c011284 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
@@ -21,10 +21,9 @@ package org.apache.flink.streaming.api.invokable;
 
 import java.io.Serializable;
 
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.util.Collector;
 
-public abstract class StreamComponentInvokable<OUT extends Tuple> implements Serializable {
+public abstract class StreamComponentInvokable<OUT> implements Serializable {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1162caca/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
index 5be3c30..b1cdde1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
@@ -21,13 +21,12 @@ package org.apache.flink.streaming.api.invokable;
 
 import java.io.IOException;
 
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
-public abstract class StreamRecordInvokable<IN extends Tuple, OUT extends Tuple> extends
+public abstract class StreamRecordInvokable<IN, OUT> extends
 		StreamComponentInvokable<OUT> {
 
 	private static final long serialVersionUID = 1L;


Mime
View raw message