flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [22/27] incubator-flink git commit: [streaming] Source parallelism + connector rework
Date Sun, 04 Jan 2015 20:51:12 GMT
[streaming] Source parallelism + connector rework


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/2425885c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/2425885c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/2425885c

Branch: refs/heads/master
Commit: 2425885c0391da04ff600c538fe56081dc44ab1f
Parents: b9d0241
Author: Gyula Fora <gyfora@apache.org>
Authored: Tue Dec 23 22:13:22 2014 +0100
Committer: Gyula Fora <gyfora@apache.org>
Committed: Sat Jan 3 21:44:42 2015 +0100

----------------------------------------------------------------------
 .../streaming/connectors/flume/FlumeSink.java   |  64 +++-------
 .../streaming/connectors/flume/FlumeSource.java |  61 +++------
 .../connectors/flume/FlumeTopology.java         |  71 ++---------
 .../streaming/connectors/kafka/KafkaSink.java   |  49 ++-----
 .../streaming/connectors/kafka/KafkaSource.java |  56 +++-----
 .../connectors/kafka/KafkaTopology.java         |  52 ++------
 .../streaming/connectors/rabbitmq/RMQSink.java  |  61 +++------
 .../connectors/rabbitmq/RMQSource.java          |  56 +++-----
 .../connectors/rabbitmq/RMQTopology.java        | 127 ++++++-------------
 .../connectors/twitter/TwitterSource.java       |   4 +-
 .../connectors/util/DeserializationScheme.java  |  42 ++++++
 .../streaming/connectors/util/RawScheme.java    |  39 ++++++
 .../connectors/util/SerializationScheme.java    |  33 +++++
 .../connectors/util/SimpleStringScheme.java     |  40 ++++++
 .../streaming/api/datastream/DataStream.java    |   2 +-
 .../api/datastream/DataStreamSource.java        |  77 ++++++-----
 .../environment/StreamExecutionEnvironment.java |  84 ++++++++----
 .../function/source/GenSequenceFunction.java    |  92 ++++++++------
 .../function/source/ParallelSourceFunction.java |  26 ++++
 .../source/RichParallelSourceFunction.java      |  27 ++++
 .../api/function/source/RichSourceFunction.java |  50 ++++----
 .../apache/flink/streaming/api/SourceTest.java  |  19 +--
 .../streaming/StreamExecutionEnvironment.scala  |  11 +-
 23 files changed, 562 insertions(+), 581 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2425885c/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
index a8fc57e..3e68c45 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -17,18 +17,19 @@
 
 package org.apache.flink.streaming.connectors.flume;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.util.SerializationScheme;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
 import org.apache.flume.api.RpcClient;
 import org.apache.flume.api.RpcClientFactory;
 import org.apache.flume.event.EventBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public abstract class FlumeSink<IN> implements SinkFunction<IN> {
+public class FlumeSink<IN> extends RichSinkFunction<IN> {
 	private static final long serialVersionUID = 1L;
 
 	private static final Logger LOG = LoggerFactory.getLogger(FlumeSink.class);
@@ -37,17 +38,17 @@ public abstract class FlumeSink<IN> implements SinkFunction<IN> {
 	boolean initDone = false;
 	String host;
 	int port;
-	private boolean sendAndClose = false;
-	private boolean closeWithoutSend = false;
+	SerializationScheme<IN, byte[]> scheme;
 
-	public FlumeSink(String host, int port) {
+	public FlumeSink(String host, int port, SerializationScheme<IN, byte[]> scheme) {
 		this.host = host;
 		this.port = port;
+		this.scheme = scheme;
 	}
 
 	/**
-	 * Receives tuples from the Apache Flink {@link DataStream} and forwards them to
-	 * Apache Flume.
+	 * Receives tuples from the Apache Flink {@link DataStream} and forwards
+	 * them to Apache Flume.
 	 * 
 	 * @param value
 	 *            The tuple arriving from the datastream
@@ -60,25 +61,11 @@ public abstract class FlumeSink<IN> implements SinkFunction<IN> {
 			client.init(host, port);
 		}
 
-		byte[] data = serialize(value);
-		if (!closeWithoutSend) {
-			client.sendDataToFlume(data);
-		}
-		if (sendAndClose) {
-			client.close();
-		}
+		byte[] data = scheme.serialize(value);
+		client.sendDataToFlume(data);
 
 	}
 
-	/**
-	 * Serializes tuples into byte arrays.
-	 * 
-	 * @param value
-	 *            The tuple used for the serialization
-	 * @return The serialized byte array.
-	 */
-	public abstract byte[] serialize(IN value);
-
 	private class FlinkRpcClientFacade {
 		private RpcClient client;
 		private String hostname;
@@ -99,7 +86,8 @@ public abstract class FlumeSink<IN> implements SinkFunction<IN> {
 			int initCounter = 0;
 			while (true) {
 				if (initCounter >= 90) {
-					throw new RuntimeException("Cannot establish connection with" + port + " at " + host);
+					throw new RuntimeException("Cannot establish connection with" + port + " at "
+							+ host);
 				}
 				try {
 					this.client = RpcClientFactory.getDefaultInstance(hostname, port);
@@ -142,28 +130,10 @@ public abstract class FlumeSink<IN> implements SinkFunction<IN> {
 			}
 		}
 
-		/**
-		 * Closes the RpcClient.
-		 */
-		public void close() {
-			client.close();
-		}
-
 	}
 
-	/**
-	 * Closes the connection only when the next message is sent after this call.
-	 */
-	public void sendAndClose() {
-		sendAndClose = true;
-	}
-
-	/**
-	 * Closes the connection immediately and no further data will be sent.
-	 */
-	public void closeWithoutSend() {
-		client.close();
-		closeWithoutSend = true;
+	@Override
+	public void close() {
+		client.client.close();
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2425885c/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
index d5faa34..ceb2be8 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
@@ -20,7 +20,8 @@ package org.apache.flink.streaming.connectors.flume;
 import java.util.List;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.function.source.RichSourceFunction;
+import org.apache.flink.streaming.api.function.source.ParallelSourceFunction;
+import org.apache.flink.streaming.connectors.util.DeserializationScheme;
 import org.apache.flink.util.Collector;
 import org.apache.flume.Context;
 import org.apache.flume.channel.ChannelProcessor;
@@ -28,15 +29,18 @@ import org.apache.flume.source.AvroSource;
 import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.flume.source.avro.Status;
 
-public abstract class FlumeSource<OUT> extends RichSourceFunction<OUT> {
+public class FlumeSource<OUT> implements ParallelSourceFunction<OUT> {
 	private static final long serialVersionUID = 1L;
 
 	String host;
 	String port;
+	DeserializationScheme<OUT> scheme;
+	volatile boolean finished = false;
 
-	FlumeSource(String host, int port) {
+	FlumeSource(String host, int port, DeserializationScheme<OUT> scheme) {
 		this.host = host;
 		this.port = Integer.toString(port);
+		this.scheme = scheme;
 	}
 
 	public class MyAvroSource extends AvroSource {
@@ -48,7 +52,8 @@ public abstract class FlumeSource<OUT> extends RichSourceFunction<OUT> {
 		 * 
 		 * @param avroEvent
 		 *            The event that should be sent to the dataStream
-		 * @return A {@link Status}.OK message if sending the event was successful.
+		 * @return A {@link Status}.OK message if sending the event was
+		 *         successful.
 		 */
 		@Override
 		public Status append(AvroFlumeEvent avroEvent) {
@@ -82,30 +87,21 @@ public abstract class FlumeSource<OUT> extends RichSourceFunction<OUT> {
 		 */
 		private void collect(AvroFlumeEvent avroEvent) {
 			byte[] b = avroEvent.getBody().array();
-			OUT tuple = FlumeSource.this.deserialize(b);
-			if (!closeWithoutSend) {
-				collector.collect(tuple);
-			}
-			if (sendAndClose) {
-				sendDone = true;
+			OUT out = FlumeSource.this.scheme.deserialize(b);
+
+			if (scheme.isEndOfStream(out)) {
+				FlumeSource.this.finished = true;
+				this.stop();
+				FlumeSource.this.notifyAll();
+			} else {
+				collector.collect(out);
 			}
+
 		}
 
 	}
 
 	MyAvroSource avroSource;
-	private volatile boolean closeWithoutSend = false;
-	private boolean sendAndClose = false;
-	private volatile boolean sendDone = false;
-
-	/**
-	 * Deserializes the incoming data.
-	 * 
-	 * @param message
-	 *            The incoming message in a byte array
-	 * @return The deserialized message in the required format.
-	 */
-	public abstract OUT deserialize(byte[] message);
 
 	/**
 	 * Configures the AvroSource. Also sets the collector so the application can
@@ -138,26 +134,9 @@ public abstract class FlumeSource<OUT> extends RichSourceFunction<OUT> {
 	public void invoke(Collector<OUT> collector) throws Exception {
 		configureAvroSource(collector);
 		avroSource.start();
-		while (true) {
-			if (closeWithoutSend || sendDone) {
-				break;
-			}
+		while (!finished) {
+			this.wait();
 		}
-		avroSource.stop();
-	}
-
-	/**
-	 * Closes the connection only when the next message is sent after this call.
-	 */
-	public void sendAndClose() {
-		sendAndClose = true;
-	}
-
-	/**
-	 * Closes the connection immediately and no further data will be sent.
-	 */
-	public void closeWithoutSend() {
-		closeWithoutSend = true;
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2425885c/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
index 8dc3c5a..366b1a5 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
@@ -17,80 +17,33 @@
 
 package org.apache.flink.streaming.connectors.flume;
 
-import org.apache.commons.lang.SerializationUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.util.SerializationScheme;
+import org.apache.flink.streaming.connectors.util.SimpleStringScheme;
 
 public class FlumeTopology {
-	private static final Logger LOG = LoggerFactory.getLogger(FlumeTopology.class);
 
-	public static class MyFlumeSink extends FlumeSink<String> {
-		private static final long serialVersionUID = 1L;
+	public static void main(String[] args) throws Exception {
 
-		public MyFlumeSink(String host, int port) {
-			super(host, port);
-		}
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
-		@Override
-		public byte[] serialize(String tuple) {
-			if (tuple.equals("q")) {
-				try {
-					sendAndClose();
-				} catch (Exception e) {
-					throw new RuntimeException("Error while closing Flume connection with " + port
-							+ " at " + host, e);
-				}
-			}
-			return SerializationUtils.serialize(tuple);
-		}
+		@SuppressWarnings("unused")
+		DataStream<String> dataStream1 = env.addSource(
+				new FlumeSource<String>("localhost", 41414, new SimpleStringScheme())).addSink(
+				new FlumeSink<String>("localhost", 42424, new StringToByteSerializer()));
 
+		env.execute();
 	}
 
-	public static final class MyFlumePrintSink implements SinkFunction<String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void invoke(String value) {
-			if (LOG.isInfoEnabled()) {
-				LOG.info("String: <{}> arrived from Flume", value);
-			}
-		}
-
-	}
+	public static class StringToByteSerializer implements SerializationScheme<String, byte[]> {
 
-	public static class MyFlumeSource extends FlumeSource<String> {
 		private static final long serialVersionUID = 1L;
 
-		MyFlumeSource(String host, int port) {
-			super(host, port);
-		}
-
 		@Override
-		public String deserialize(byte[] msg) {
-			String s = (String) SerializationUtils.deserialize(msg);
-			if (s.equals("q")) {
-				closeWithoutSend();
-			}
-			return s;
+		public byte[] serialize(String element) {
+			return element.getBytes();
 		}
-
 	}
 
-	public static void main(String[] args) throws Exception {
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
-		@SuppressWarnings("unused")
-		DataStream<String> dataStream1 = env.addSource(new MyFlumeSource("localhost", 41414))
-				.addSink(new MyFlumePrintSink());
-
-		@SuppressWarnings("unused")
-		DataStream<String> dataStream2 = env.fromElements("one", "two", "three", "four", "five",
-				"q").addSink(new MyFlumeSink("localhost", 42424));
-
-		env.execute();
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2425885c/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
index f7f998f..71e25f2 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
@@ -23,22 +23,24 @@ import kafka.javaapi.producer.Producer;
 import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
 
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.util.SerializationScheme;
 
-public abstract class KafkaSink<IN, OUT> implements SinkFunction<IN> {
+public class KafkaSink<IN, OUT> extends RichSinkFunction<IN> {
 	private static final long serialVersionUID = 1L;
 
 	private kafka.javaapi.producer.Producer<Integer, OUT> producer;
 	private Properties props;
 	private String topicId;
 	private String brokerAddr;
-	private boolean sendAndClose = false;
-	private boolean closeWithoutSend = false;
 	private boolean initDone = false;
+	private SerializationScheme<IN, OUT> scheme;
 
-	public KafkaSink(String topicId, String brokerAddr) {
+	public KafkaSink(String topicId, String brokerAddr,
+			SerializationScheme<IN, OUT> serializationScheme) {
 		this.topicId = topicId;
 		this.brokerAddr = brokerAddr;
+		this.scheme = serializationScheme;
 
 	}
 
@@ -60,49 +62,22 @@ public abstract class KafkaSink<IN, OUT> implements SinkFunction<IN> {
 	/**
 	 * Called when new data arrives to the sink, and forwards it to Kafka.
 	 * 
-	 * @param value
+	 * @param next
 	 *            The incoming data
 	 */
 	@Override
-	public void invoke(IN value) {
+	public void invoke(IN next) {
 		if (!initDone) {
 			initialize();
 		}
 
-		OUT out = serialize(value);
-		KeyedMessage<Integer, OUT> data = new KeyedMessage<Integer, OUT>(topicId, out);
+		producer.send(new KeyedMessage<Integer, OUT>(topicId, scheme.serialize(next)));
 
-		if (!closeWithoutSend) {
-			producer.send(data);
-		}
-
-		if (sendAndClose) {
-			producer.close();
-		}
 	}
 
-	/**
-	 * Serializes tuples into byte arrays.
-	 * 
-	 * @param value
-	 *            The tuple used for the serialization
-	 * @return The serialized byte array.
-	 */
-	public abstract OUT serialize(IN value);
-
-	/**
-	 * Closes the connection immediately and no further data will be sent.
-	 */
-	public void closeWithoutSend() {
+	@Override
+	public void close() {
 		producer.close();
-		closeWithoutSend = true;
-	}
-
-	/**
-	 * Closes the connection only when the next message is sent after this call.
-	 */
-	public void sendAndClose() {
-		sendAndClose = true;
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2425885c/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
index 75c3983..49e9144 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -27,28 +27,29 @@ import kafka.consumer.ConsumerIterator;
 import kafka.consumer.KafkaStream;
 import kafka.javaapi.consumer.ConsumerConnector;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.function.source.RichSourceFunction;
+import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.connectors.util.DeserializationScheme;
 import org.apache.flink.util.Collector;
 
-public abstract class KafkaSource<OUT> extends RichSourceFunction<OUT> {
+public class KafkaSource<OUT> extends RichParallelSourceFunction<OUT> {
 	private static final long serialVersionUID = 1L;
 
 	private final String zkQuorum;
 	private final String groupId;
 	private final String topicId;
-	private final int numThreads;
 	private ConsumerConnector consumer;
-	private boolean closeWithoutSend = false;
-	private boolean sendAndClose = false;
+	private DeserializationScheme<OUT> scheme;
 
 	OUT outTuple;
 
-	public KafkaSource(String zkQuorum, String groupId, String topicId, int numThreads) {
+	public KafkaSource(String zkQuorum, String groupId, String topicId,
+			DeserializationScheme<OUT> deserializationScheme) {
 		this.zkQuorum = zkQuorum;
 		this.groupId = groupId;
 		this.topicId = topicId;
-		this.numThreads = numThreads;
+		this.scheme = deserializationScheme;
 	}
 
 	/**
@@ -58,7 +59,7 @@ public abstract class KafkaSource<OUT> extends RichSourceFunction<OUT> {
 		Properties props = new Properties();
 		props.put("zookeeper.connect", zkQuorum);
 		props.put("group.id", groupId);
-		props.put("zookeeper.session.timeout.ms", "400");
+		props.put("zookeeper.session.timeout.ms", "2000");
 		props.put("zookeeper.sync.time.ms", "200");
 		props.put("auto.commit.interval.ms", "1000");
 		consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
@@ -72,48 +73,25 @@ public abstract class KafkaSource<OUT> extends RichSourceFunction<OUT> {
 	 */
 	@Override
 	public void invoke(Collector<OUT> collector) throws Exception {
-		initializeConnection();
 
-		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
-		topicCountMap.put(topicId, numThreads);
 		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
-				.createMessageStreams(topicCountMap);
+				.createMessageStreams(Collections.singletonMap(topicId, 1));
+
 		KafkaStream<byte[], byte[]> stream = consumerMap.get(topicId).get(0);
 		ConsumerIterator<byte[], byte[]> it = stream.iterator();
 
 		while (it.hasNext()) {
-			OUT out = deserialize(it.next().message());
-			if (closeWithoutSend) {
+			OUT out = scheme.deserialize(it.next().message());
+			if (scheme.isEndOfStream(out)) {
 				break;
 			}
 			collector.collect(out);
-			if (sendAndClose) {
-				break;
-			}
 		}
 		consumer.shutdown();
 	}
 
-	/**
-	 * Deserializes the incoming data.
-	 * 
-	 * @param message
-	 *            The incoming message in a byte array
-	 * @return The deserialized message in the required format.
-	 */
-	public abstract OUT deserialize(byte[] message);
-
-	/**
-	 * Closes the connection immediately and no further data will be sent.
-	 */
-	public void closeWithoutSend() {
-		closeWithoutSend = true;
-	}
-
-	/**
-	 * Closes the connection only when the next message is sent after this call.
-	 */
-	public void sendAndClose() {
-		sendAndClose = true;
+	@Override
+	public void open(Configuration config) {
+		initializeConnection();
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2425885c/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
index 4d043c4..16f123e 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
@@ -21,13 +21,14 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.connectors.util.SimpleStringScheme;
 import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class KafkaTopology {
 	private static final Logger LOG = LoggerFactory.getLogger(KafkaTopology.class);
-	
+
 	public static final class MySource implements SourceFunction<String> {
 		private static final long serialVersionUID = 1L;
 
@@ -41,41 +42,6 @@ public class KafkaTopology {
 		}
 	}
 
-	public static final class MyKafkaSource extends KafkaSource<String> {
-		private static final long serialVersionUID = 1L;
-
-		public MyKafkaSource(String zkQuorum, String groupId, String topicId, int numThreads) {
-			super(zkQuorum, groupId, topicId, numThreads);
-		}
-
-		@Override
-		public String deserialize(byte[] msg) {
-			String s = new String(msg);
-			if (s.equals("q")) {
-				closeWithoutSend();
-			}
-			return new String(s);
-		}
-
-	}
-
-	public static final class MyKafkaSink extends KafkaSink<String, String> {
-		private static final long serialVersionUID = 1L;
-
-		public MyKafkaSink(String topicId, String brokerAddr) {
-			super(topicId, brokerAddr);
-		}
-
-		@Override
-		public String serialize(String tuple) {
-			if (tuple.equals("q")) {
-				sendAndClose();
-			}
-			return tuple;
-		}
-
-	}
-	
 	public static final class MyKafkaPrintSink implements SinkFunction<String> {
 		private static final long serialVersionUID = 1L;
 
@@ -87,21 +53,19 @@ public class KafkaTopology {
 		}
 	}
 
-	private static final int SOURCE_PARALELISM = 1;
-
 	public static void main(String[] args) throws Exception {
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		@SuppressWarnings("unused")
 		DataStream<String> stream1 = env
-			.addSource(new MyKafkaSource("localhost:2181", "group", "test", 1)).setParallelism(1)
-			.addSink(new MyKafkaPrintSink());
+				.addSource(
+						new KafkaSource<String>("localhost:2181", "group", "test",
+								new SimpleStringScheme())).addSink(new MyKafkaPrintSink());
 
 		@SuppressWarnings("unused")
-		DataStream<String> stream2 = env
-			.addSource(new MySource())
-			.addSink(new MyKafkaSink("test", "localhost:9092"));
+		DataStream<String> stream2 = env.addSource(new MySource()).addSink(
+				new KafkaSink<String, String>("test", "localhost:9092", new SimpleStringScheme()));
 
 		env.execute();
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2425885c/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index f7197d5..c4f4615 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -19,32 +19,32 @@ package org.apache.flink.streaming.connectors.rabbitmq;
 
 import java.io.IOException;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.util.SerializationScheme;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
 
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 
-public abstract class RMQSink<IN> implements SinkFunction<IN> {
+public class RMQSink<IN> extends RichSinkFunction<IN> {
 	private static final long serialVersionUID = 1L;
 
 	private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class);
 
-	private boolean sendAndClose = false;
-	private boolean closeWithoutSend = false;
-
 	private String QUEUE_NAME;
 	private String HOST_NAME;
 	private transient ConnectionFactory factory;
 	private transient Connection connection;
 	private transient Channel channel;
-	private boolean initDone = false;
+	private SerializationScheme<IN, byte[]> scheme;
 
-	public RMQSink(String HOST_NAME, String QUEUE_NAME) {
+	public RMQSink(String HOST_NAME, String QUEUE_NAME, SerializationScheme<IN, byte[]> scheme) {
 		this.HOST_NAME = HOST_NAME;
 		this.QUEUE_NAME = QUEUE_NAME;
+		this.scheme = scheme;
 	}
 
 	/**
@@ -60,8 +60,6 @@ public abstract class RMQSink<IN> implements SinkFunction<IN> {
 		} catch (IOException e) {
 			throw new RuntimeException(e);
 		}
-
-		initDone = true;
 	}
 
 	/**
@@ -72,37 +70,21 @@ public abstract class RMQSink<IN> implements SinkFunction<IN> {
 	 */
 	@Override
 	public void invoke(IN value) {
-		if (!initDone) {
-			initializeConnection();
-		}
-
 		try {
 			channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-			byte[] msg = serialize(value);
-			if (!closeWithoutSend) {
-				channel.basicPublish("", QUEUE_NAME, null, msg);
-			}
+			byte[] msg = scheme.serialize(value);
+
+			channel.basicPublish("", QUEUE_NAME, null, msg);
+
 		} catch (IOException e) {
 			if (LOG.isErrorEnabled()) {
 				LOG.error("Cannot send RMQ message {} at {}", QUEUE_NAME, HOST_NAME);
 			}
 		}
 
-		if (sendAndClose) {
-			closeChannel();
-		}
 	}
 
 	/**
-	 * Serializes tuples into byte arrays.
-	 * 
-	 * @param value
-	 *            The tuple used for the serialization
-	 * @return The serialized byte array.
-	 */
-	public abstract byte[] serialize(IN value);
-
-	/**
 	 * Closes the connection.
 	 */
 	private void closeChannel() {
@@ -110,25 +92,20 @@ public abstract class RMQSink<IN> implements SinkFunction<IN> {
 			channel.close();
 			connection.close();
 		} catch (IOException e) {
-			throw new RuntimeException("Error while closing RMQ connection with " + QUEUE_NAME + " at "
-					+ HOST_NAME, e);
+			throw new RuntimeException("Error while closing RMQ connection with " + QUEUE_NAME
+					+ " at " + HOST_NAME, e);
 		}
 
 	}
 
-	/**
-	 * Closes the connection immediately and no further data will be sent.
-	 */
-	public void closeWithoutSend() {
-		closeChannel();
-		closeWithoutSend = true;
+	@Override
+	public void open(Configuration config) {
+		initializeConnection();
 	}
 
-	/**
-	 * Closes the connection only when the next message is sent after this call.
-	 */
-	public void sendAndClose() {
-		sendAndClose = true;
+	@Override
+	public void close() {
+		closeChannel();
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2425885c/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index ee83ffd..240deab 100755
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -19,8 +19,10 @@ package org.apache.flink.streaming.connectors.rabbitmq;
 
 import java.io.IOException;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.function.source.RichSourceFunction;
+import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.connectors.util.DeserializationScheme;
 import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,15 +32,13 @@ import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.QueueingConsumer;
 
-public abstract class RMQSource<OUT> extends RichSourceFunction<OUT> {
+public class RMQSource<OUT> extends RichParallelSourceFunction<OUT> {
 	private static final long serialVersionUID = 1L;
 
 	private static final Logger LOG = LoggerFactory.getLogger(RMQSource.class);
 
 	private final String QUEUE_NAME;
 	private final String HOST_NAME;
-	private boolean closeWithoutSend = false;
-	private boolean sendAndClose = false;
 
 	private transient ConnectionFactory factory;
 	private transient Connection connection;
@@ -46,9 +46,11 @@ public abstract class RMQSource<OUT> extends RichSourceFunction<OUT> {
 	private transient QueueingConsumer consumer;
 	private transient QueueingConsumer.Delivery delivery;
 
-	OUT outTuple;
+	private DeserializationScheme<OUT> scheme;
 
-	public RMQSource(String HOST_NAME, String QUEUE_NAME) {
+	OUT out;
+
+	public RMQSource(String HOST_NAME, String QUEUE_NAME, DeserializationScheme<OUT> scheme) {
 		this.HOST_NAME = HOST_NAME;
 		this.QUEUE_NAME = QUEUE_NAME;
 	}
@@ -79,7 +81,6 @@ public abstract class RMQSource<OUT> extends RichSourceFunction<OUT> {
 	 */
 	@Override
 	public void invoke(Collector<OUT> collector) throws Exception {
-		initializeConnection();
 
 		while (true) {
 
@@ -91,17 +92,23 @@ public abstract class RMQSource<OUT> extends RichSourceFunction<OUT> {
 				}
 			}
 
-			outTuple = deserialize(delivery.getBody());
-			if (closeWithoutSend) {
+			out = scheme.deserialize(delivery.getBody());
+			if (scheme.isEndOfStream(out)) {
 				break;
 			} else {
-				collector.collect(outTuple);
-			}
-			if (sendAndClose) {
-				break;
+				collector.collect(out);
 			}
 		}
 
+	}
+
+	@Override
+	public void open(Configuration config) {
+		initializeConnection();
+	}
+
+	@Override
+	public void close() {
 		try {
 			connection.close();
 		} catch (IOException e) {
@@ -111,27 +118,4 @@ public abstract class RMQSource<OUT> extends RichSourceFunction<OUT> {
 
 	}
 
-	/**
-	 * Deserializes the incoming data.
-	 * 
-	 * @param message
-	 *            The incoming message in a byte array
-	 * @return The deserialized message in the required format.
-	 */
-	public abstract OUT deserialize(byte[] message);
-
-	/**
-	 * Closes the connection immediately and no further data will be sent.
-	 */
-	public void closeWithoutSend() {
-		closeWithoutSend = true;
-	}
-
-	/**
-	 * Closes the connection only when the next message is sent after this call.
-	 */
-	public void sendAndClose() {
-		sendAndClose = true;
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2425885c/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
index f640ee0..4f82b24 100755
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
@@ -1,95 +1,52 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.rabbitmq;
-
-import org.apache.commons.lang.SerializationUtils;
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.connectors.kafka.KafkaTopology;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RMQTopology {
-	private static final Logger LOG = LoggerFactory.getLogger(KafkaTopology.class);
-	
-	public static final class MyRMQSink extends RMQSink<String> {
-		public MyRMQSink(String HOST_NAME, String QUEUE_NAME) {
-			super(HOST_NAME, QUEUE_NAME);
-		}
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public byte[] serialize(String t) {
-			if (t.equals("q")) {
-				sendAndClose();
-			}
-			return SerializationUtils.serialize((String) t);
-		}
-
-	}
+import org.apache.flink.streaming.connectors.util.SerializationScheme;
+import org.apache.flink.streaming.connectors.util.SimpleStringScheme;
+
+public class RMQTopology {
+
+	public static void main(String[] args) throws Exception {
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+		@SuppressWarnings("unused")
+		DataStream<String> dataStream1 = env.addSource(
+				new RMQSource<String>("localhost", "hello", new SimpleStringScheme())).print();
+
+		@SuppressWarnings("unused")
+		DataStream<String> dataStream2 = env.fromElements("one", "two", "three", "four", "five",
+				"q").addSink(
+				new RMQSink<String>("localhost", "hello", new StringToByteSerializer()));
+
+		env.execute();
+	}
+
+	public static class StringToByteSerializer implements SerializationScheme<String, byte[]> {
 
-	public static final class MyRMQPrintSink implements SinkFunction<String> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void invoke(String value) {
-			if (LOG.isInfoEnabled()) {
-				LOG.info("String: <{}> arrived from RMQ", value);
-			}
+		public byte[] serialize(String element) {
+			return element.getBytes();
 		}
-		
 	}
-	
-	public static final class MyRMQSource extends RMQSource<String> {
-
-		public MyRMQSource(String HOST_NAME, String QUEUE_NAME) {
-			super(HOST_NAME, QUEUE_NAME);
-		}
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String deserialize(byte[] t) {
-			String s = (String) SerializationUtils.deserialize(t);
-			if (s.equals("q")) {
-				closeWithoutSend();
-			}
-			return s;
-		}
-
-	}
-
-	public static void main(String[] args) throws Exception {
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
-		@SuppressWarnings("unused")
-		DataStream<String> dataStream1 = env
-			.addSource(new MyRMQSource("localhost", "hello"))
-			.addSink(new MyRMQPrintSink());
-
-		@SuppressWarnings("unused")
-		DataStream<String> dataStream2 = env
-			.fromElements("one", "two", "three", "four", "five", "q")
-			.addSink(new MyRMQSink("localhost", "hello"));
-
-		env.execute();
-	}
-}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2425885c/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
index 3ffa8ae..ddb2538 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
@@ -25,7 +25,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.function.source.RichSourceFunction;
+import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
@@ -43,7 +43,7 @@ import com.twitter.hbc.httpclient.auth.OAuth1;
  * Implementation of {@link SourceFunction} specialized to emit tweets from
  * Twitter. It can connect to Twitter Streaming API, collect tweets and
  */
-public class TwitterSource extends RichSourceFunction<String> {
+public class TwitterSource extends RichParallelSourceFunction<String> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(TwitterSource.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2425885c/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationScheme.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationScheme.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationScheme.java
new file mode 100644
index 0000000..43420a3
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationScheme.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.util;
+
+import java.io.Serializable;
+
+public interface DeserializationScheme<T> extends Serializable {
+
+	/**
+	 * Deserializes the incoming data.
+	 * 
+	 * @param message
+	 *            The incoming message in a byte array
+	 * @return The deserialized message in the required format.
+	 */
+	public T deserialize(byte[] message);
+
+	/**
+	 * Method to decide whether the element signals the end of the stream. If
+	 * true is returned the element won't be emitted
+	 * 
+	 * @param nextElement
+	 *            The element to test for end signal
+	 * @return The end signal, if true the stream shuts down
+	 */
+	public boolean isEndOfStream(T nextElement);
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2425885c/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawScheme.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawScheme.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawScheme.java
new file mode 100644
index 0000000..34db00e
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawScheme.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.util;
+
+public class RawScheme implements DeserializationScheme<byte[]>,
+		SerializationScheme<byte[], byte[]> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public byte[] deserialize(byte[] message) {
+		return message;
+	}
+
+	@Override
+	public boolean isEndOfStream(byte[] nextElement) {
+		return false;
+	}
+
+	@Override
+	public byte[] serialize(byte[] element) {
+		return element;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2425885c/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationScheme.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationScheme.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationScheme.java
new file mode 100644
index 0000000..bb25885
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationScheme.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.util;
+
+import java.io.Serializable;
+
+public interface SerializationScheme<T,R> extends Serializable {
+
+	/**
+	 * Serializes the incoming element to a specified type.
+	 * 
+	 * @param element
+	 *            The incoming element to be serialized
+	 * @return The serialized element.
+	 */
+	public R serialize(T element);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2425885c/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringScheme.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringScheme.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringScheme.java
new file mode 100644
index 0000000..9d1ce6c
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringScheme.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.util;
+
+public class SimpleStringScheme implements DeserializationScheme<String>,
+		SerializationScheme<String, String> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public String deserialize(byte[] message) {
+		return new String(message);
+	}
+
+	@Override
+	public boolean isEndOfStream(String nextElement) {
+		return false;
+	}
+
+	@Override
+	public String serialize(String element) {
+		return element;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2425885c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 422df5b..8d0020e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -1005,7 +1005,7 @@ public class DataStream<OUT> {
 
 	protected <R> DataStream<OUT> addIterationSource(Integer iterationID, long waitTime) {
 
-		DataStream<R> returnStream = new DataStreamSource<R>(environment, "iterationSource", null);
+		DataStream<R> returnStream = new DataStreamSource<R>(environment, "iterationSource", null, true);
 
 		jobGraphBuilder.addIterationHead(returnStream.getId(), this.getId(), iterationID,
 				degreeOfParallelism, waitTime);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2425885c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
index 978ea42..a649f59 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
@@ -1,38 +1,47 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-/**
- * The DataStreamSource represents the starting point of a DataStream.
- *
- * @param <OUT>
- *            Type of the DataStream created.
- */
-public class DataStreamSource<OUT> extends SingleOutputStreamOperator<OUT, DataStreamSource<OUT>> {
-
-	public DataStreamSource(StreamExecutionEnvironment environment, String operatorType, TypeInformation<OUT> outTypeInfo) {
-		super(environment, operatorType, outTypeInfo);
-	}
-
-	public DataStreamSource(DataStream<OUT> dataStream) {
-		super(dataStream);
-	}
-}
+
+/**
+ * The DataStreamSource represents the starting point of a DataStream.
+ * 
+ * @param <OUT>
+ *            Type of the DataStream created.
+ */
+public class DataStreamSource<OUT> extends SingleOutputStreamOperator<OUT, DataStreamSource<OUT>> {
+
+	boolean isParallel;
+
+	public DataStreamSource(StreamExecutionEnvironment environment, String operatorType,
+			TypeInformation<OUT> outTypeInfo, boolean isParallel) {
+		super(environment, operatorType, outTypeInfo);
+		this.isParallel = isParallel;
+	}
+
+	@Override
+	public DataStreamSource<OUT> setParallelism(int dop) {
+		if (dop > 1 && !isParallel) {
+			throw new IllegalArgumentException("Source: " + this.id + " is not a parallel source");
+		} else {
+			return (DataStreamSource<OUT>) super.setParallelism(dop);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2425885c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 6108975..ecc2545 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -40,6 +40,8 @@ import org.apache.flink.streaming.api.function.source.FileSourceFunction;
 import org.apache.flink.streaming.api.function.source.FileStreamFunction;
 import org.apache.flink.streaming.api.function.source.FromElementsFunction;
 import org.apache.flink.streaming.api.function.source.GenSequenceFunction;
+import org.apache.flink.streaming.api.function.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.function.source.SocketTextStreamFunction;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.streaming.api.invokable.SourceInvokable;
@@ -222,7 +224,7 @@ public abstract class StreamExecutionEnvironment {
 	 */
 	public DataStreamSource<String> readTextStream(String filePath) {
 		checkIfFileExists(filePath);
-		return addSource(new FileStreamFunction(filePath));
+		return addSource(new FileStreamFunction(filePath), null, "textStream");
 	}
 
 	private static void checkIfFileExists(String filePath) {
@@ -260,14 +262,10 @@ public abstract class StreamExecutionEnvironment {
 		}
 
 		TypeInformation<OUT> outTypeInfo = TypeExtractor.getForObject(data[0]);
-		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements",
-				outTypeInfo);
 
 		SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
-		jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(function),
-				null, outTypeInfo, "source", 1);
 
-		return returnStream;
+		return addSource(function, outTypeInfo, "elements");
 	}
 
 	/**
@@ -292,13 +290,9 @@ public abstract class StreamExecutionEnvironment {
 		}
 
 		TypeInformation<OUT> outTypeInfo = TypeExtractor.getForObject(data.iterator().next());
-		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "collection",
-				outTypeInfo);
-
-		jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(
-				new FromElementsFunction<OUT>(data)), null, outTypeInfo, "source", 1);
+		SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
 
-		return returnStream;
+		return addSource(function, outTypeInfo, "collection");
 	}
 
 	/**
@@ -316,7 +310,8 @@ public abstract class StreamExecutionEnvironment {
 	 * @return A DataStream, containing the strings received from socket.
 	 */
 	public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter) {
-		return addSource(new SocketTextStreamFunction(hostname, port, delimiter));
+		return addSource(new SocketTextStreamFunction(hostname, port, delimiter), null,
+				"socketStrean");
 	}
 
 	/**
@@ -348,20 +343,26 @@ public abstract class StreamExecutionEnvironment {
 		if (from > to) {
 			throw new IllegalArgumentException("Start of sequence must not be greater than the end");
 		}
-		return addSource(new GenSequenceFunction(from, to));
+		return addSource(new GenSequenceFunction(from, to), null, "sequence");
 	}
 
 	private DataStreamSource<String> addFileSource(InputFormat<String, ?> inputFormat,
 			TypeInformation<String> typeInfo) {
 		FileSourceFunction function = new FileSourceFunction(inputFormat, typeInfo);
-		DataStreamSource<String> returnStream = addSource(function);
+		DataStreamSource<String> returnStream = addSource(function, null, "fileSource");
 		jobGraphBuilder.setInputFormat(returnStream.getId(), inputFormat);
 		return returnStream;
 	}
 
 	/**
 	 * Create a DataStream using a user defined source function for arbitrary
-	 * source functionality.
+	 * source functionality.</p> By default sources have a parallelism of 1. To
+	 * enable parallel execution, the user defined source should implement
+	 * {@link ParallelSourceFunction} or extend
+	 * {@link RichParallelSourceFunction}. In these cases the resulting source
+	 * will have the parallelism of the environment. To change this afterwards
+	 * call {@link DataStreamSource#setParallelism(int)}
+	 * 
 	 * 
 	 * @param function
 	 *            the user defined function
@@ -370,22 +371,19 @@ public abstract class StreamExecutionEnvironment {
 	 * @return the data stream constructed
 	 */
 	public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) {
-		TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(SourceFunction.class,
-				function.getClass(), 0, null, null);
-
-		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "source", outTypeInfo);
-
-		jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(function),
-				null, outTypeInfo, "source", 1);
-
-		return returnStream;
+		return addSource(function, null);
 	}
 
 	/**
 	 * Ads a data source with a custom type information thus opening a
 	 * {@link DataStream}. Only in very special cases does the user need to
 	 * support type information. Otherwise use
-	 * {@link #addSource(SourceFunction)}
+	 * {@link #addSource(SourceFunction)} </p> By default sources have a
+	 * parallelism of 1. To enable parallel execution, the user defined source
+	 * should implement {@link ParallelSourceFunction} or extend
+	 * {@link RichParallelSourceFunction}. In these cases the resulting source
+	 * will have the parallelism of the environment. To change this afterwards
+	 * call {@link DataStreamSource#setParallelism(int)}
 	 * 
 	 * @param function
 	 *            the user defined function
@@ -397,11 +395,41 @@ public abstract class StreamExecutionEnvironment {
 	 */
 	public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function,
 			TypeInformation<OUT> outTypeInfo) {
+		return addSource(function, outTypeInfo, function.getClass().getName());
+	}
+
+	/**
+	 * Ads a data source with a custom type information thus opening a
+	 * {@link DataStream}. Only in very special cases does the user need to
+	 * support type information. Otherwise use
+	 * {@link #addSource(SourceFunction)}
+	 * 
+	 * @param function
+	 *            the user defined function
+	 * @param outTypeInfo
+	 *            the user defined type information for the stream
+	 * @param sourceName
+	 *            Name of the data source
+	 * @param <OUT>
+	 *            type of the returned stream
+	 * @return the data stream constructed
+	 */
+	private <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function,
+			TypeInformation<OUT> outTypeInfo, String sourceName) {
+
+		if (outTypeInfo == null) {
+			outTypeInfo = TypeExtractor.createTypeInfo(SourceFunction.class, function.getClass(),
+					0, null, null);
+		}
+
+		boolean isParallel = function instanceof ParallelSourceFunction;
+		int dop = isParallel ? getDegreeOfParallelism() : 1;
 
-		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "source", outTypeInfo);
+		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, sourceName,
+				outTypeInfo, isParallel);
 
 		jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(function),
-				null, outTypeInfo, "source", 1);
+				null, outTypeInfo, sourceName, dop);
 
 		return returnStream;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2425885c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
index 69601ff..3afd06e 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
@@ -1,45 +1,53 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.source;
-
+ */
+
+package org.apache.flink.streaming.api.function.source;
+
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
-
-/**
- * Source Function used to generate the number sequence
- * 
- */
-public class GenSequenceFunction implements SourceFunction<Long> {
-
-	private static final long serialVersionUID = 1L;
-
-	long from;
-	long to;
-
-	public GenSequenceFunction(long from, long to) {
-		this.from = from;
-		this.to = to;
-	}
-
-	@Override
-	public void invoke(Collector<Long> collector) throws Exception {
-		for (long i = from; i <= to; i++) {
-			collector.collect(i);
-		}
-	}
-
-}
+import org.apache.flink.util.NumberSequenceIterator;
+
+/**
+ * Source Function used to generate the number sequence
+ * 
+ */
+public class GenSequenceFunction extends RichParallelSourceFunction<Long> {
+
+	private static final long serialVersionUID = 1L;
+
+	private NumberSequenceIterator fullIterator;
+	private NumberSequenceIterator splitIterator;
+
+	public GenSequenceFunction(long from, long to) {
+		fullIterator = new NumberSequenceIterator(from, to);
+	}
+
+	@Override
+	public void invoke(Collector<Long> collector) throws Exception {
+		while (splitIterator.hasNext()) {
+			collector.collect(splitIterator.next());
+		}
+	}
+
+	@Override
+	public void open(Configuration config) {
+		int splitNumber = getRuntimeContext().getIndexOfThisSubtask();
+		int numOfSubTasks = getRuntimeContext().getNumberOfParallelSubtasks();
+		splitIterator = fullIterator.split(numOfSubTasks)[splitNumber];
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2425885c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
new file mode 100644
index 0000000..46d4fe9
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.source;
+
+import org.apache.flink.util.Collector;
+
+public interface ParallelSourceFunction<OUT> extends SourceFunction<OUT> {
+
+	public void invoke(Collector<OUT> collector) throws Exception;
+		
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2425885c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichParallelSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichParallelSourceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichParallelSourceFunction.java
new file mode 100644
index 0000000..5bbfd4c
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichParallelSourceFunction.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.source;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+
+public abstract class RichParallelSourceFunction<OUT> extends AbstractRichFunction implements
+		ParallelSourceFunction<OUT> {
+
+	private static final long serialVersionUID = 1L;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2425885c/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichSourceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichSourceFunction.java
index d212c3c..4b947c7 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichSourceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichSourceFunction.java
@@ -1,27 +1,27 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.function.source;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-
-public abstract class RichSourceFunction<OUT> extends AbstractRichFunction implements
-		SourceFunction<OUT> {
-
-	private static final long serialVersionUID = 1L;
-
-}
+ */
+
+package org.apache.flink.streaming.api.function.source;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+
+public abstract class RichSourceFunction<OUT> extends AbstractRichFunction implements
+		SourceFunction<OUT> {
+
+	private static final long serialVersionUID = 1L;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2425885c/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java
index 156a8f2..9be7de6 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java
@@ -28,7 +28,6 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.flink.streaming.api.function.source.FromElementsFunction;
-import org.apache.flink.streaming.api.function.source.GenSequenceFunction;
 import org.apache.flink.streaming.api.function.source.SocketTextStreamFunction;
 import org.apache.flink.streaming.util.MockCollector;
 import org.apache.flink.streaming.util.MockSource;
@@ -39,21 +38,16 @@ public class SourceTest {
 	@Test
 	public void fromElementsTest() {
 		List<Integer> expectedList = Arrays.asList(1, 2, 3);
-		List<Integer> actualList = MockSource.createAndExecute(new FromElementsFunction<Integer>(1, 2, 3));
+		List<Integer> actualList = MockSource.createAndExecute(new FromElementsFunction<Integer>(1,
+				2, 3));
 		assertEquals(expectedList, actualList);
 	}
 
 	@Test
 	public void fromCollectionTest() {
 		List<Integer> expectedList = Arrays.asList(1, 2, 3);
-		List<Integer> actualList = MockSource.createAndExecute(new FromElementsFunction<Integer>(Arrays.asList(1, 2, 3)));
-		assertEquals(expectedList, actualList);
-	}
-
-	@Test
-	public void genSequenceTest() {
-		List<Long> expectedList = Arrays.asList(1L, 2L, 3L);
-		List<Long> actualList = MockSource.createAndExecute(new GenSequenceFunction(1, 3));
+		List<Integer> actualList = MockSource.createAndExecute(new FromElementsFunction<Integer>(
+				Arrays.asList(1, 2, 3)));
 		assertEquals(expectedList, actualList);
 	}
 
@@ -62,14 +56,15 @@ public class SourceTest {
 		List<String> expectedList = Arrays.asList("a", "b", "c");
 		List<String> actualList = new ArrayList<String>();
 
-		byte[] data = {'a', '\n', 'b', '\n', 'c', '\n'};
+		byte[] data = { 'a', '\n', 'b', '\n', 'c', '\n' };
 
 		Socket socket = mock(Socket.class);
 		when(socket.getInputStream()).thenReturn(new ByteArrayInputStream(data));
 		when(socket.isClosed()).thenReturn(false);
 		when(socket.isConnected()).thenReturn(true);
 
-		new SocketTextStreamFunction("", 0, '\n').streamFromSocket(new MockCollector<String>(actualList), socket);
+		new SocketTextStreamFunction("", 0, '\n').streamFromSocket(new MockCollector<String>(
+				actualList), socket);
 		assertEquals(expectedList, actualList);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2425885c/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
index 9c66b24..bcd586e 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
@@ -145,14 +145,11 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     data: Seq[T]): DataStream[T] = {
     Validate.notNull(data, "Data must not be null.")
     val typeInfo = implicitly[TypeInformation[T]]
-    val returnStream = new DataStreamSource[T](javaEnv,
-      "elements", typeInfo);
 
-    javaEnv.getJobGraphBuilder.addStreamVertex(returnStream.getId(),
-      new SourceInvokable[T](new FromElementsFunction[T](scala.collection.JavaConversions
-        .asJavaCollection(data))), null, typeInfo,
-      "source", 1);
-    returnStream
+    val sourceFunction = new FromElementsFunction[T](scala.collection.JavaConversions
+        .asJavaCollection(data))
+        
+    javaEnv.addSource(sourceFunction, typeInfo)
   }
 
   /**


Mime
View raw message