flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [23/36] flink git commit: [streaming] Updated connector type handling to suport generic classes by GenericSourceFunction interface
Date Wed, 07 Jan 2015 14:13:02 GMT
[streaming] Updated connector type handling to suport generic classes by GenericSourceFunction
interface


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/380934b1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/380934b1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/380934b1

Branch: refs/heads/release-0.8
Commit: 380934b1beb3a27a2ca3b99bc03ca7a10905589d
Parents: e0fd9df
Author: Gyula Fora <gyfora@apache.org>
Authored: Thu Dec 25 18:00:26 2014 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Mon Jan 5 18:08:54 2015 +0100

----------------------------------------------------------------------
 .../streaming/connectors/ConnectorSource.java   | 42 ++++++++++++++++++++
 .../streaming/connectors/flume/FlumeSink.java   | 20 +++++-----
 .../streaming/connectors/flume/FlumeSource.java | 15 ++++---
 .../connectors/flume/FlumeTopology.java         |  8 ++--
 .../streaming/connectors/kafka/KafkaSink.java   |  8 ++--
 .../streaming/connectors/kafka/KafkaSource.java | 15 ++++---
 .../connectors/kafka/KafkaTopology.java         | 21 ++--------
 .../streaming/connectors/rabbitmq/RMQSink.java  | 10 ++---
 .../connectors/rabbitmq/RMQSource.java          | 16 ++++----
 .../connectors/rabbitmq/RMQTopology.java        |  8 ++--
 .../connectors/util/DeserializationSchema.java  | 42 ++++++++++++++++++++
 .../connectors/util/DeserializationScheme.java  | 42 --------------------
 .../streaming/connectors/util/RawSchema.java    | 39 ++++++++++++++++++
 .../streaming/connectors/util/RawScheme.java    | 39 ------------------
 .../connectors/util/SerializationSchema.java    | 33 +++++++++++++++
 .../connectors/util/SerializationScheme.java    | 33 ---------------
 .../connectors/util/SimpleStringSchema.java     | 40 +++++++++++++++++++
 .../connectors/util/SimpleStringScheme.java     | 40 -------------------
 .../environment/StreamExecutionEnvironment.java | 10 ++++-
 .../function/source/GenericSourceFunction.java  | 25 ++++++++++++
 20 files changed, 282 insertions(+), 224 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/380934b1/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
new file mode 100644
index 0000000..1623943
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.function.source.GenericSourceFunction;
+import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.connectors.util.DeserializationSchema;
+
+public abstract class ConnectorSource<OUT> extends RichParallelSourceFunction<OUT>
implements
+		GenericSourceFunction<OUT> {
+
+	private static final long serialVersionUID = 1L;
+	protected DeserializationSchema<OUT> schema;
+
+	public ConnectorSource(DeserializationSchema<OUT> schema) {
+		this.schema = schema;
+	}
+
+	@Override
+	public TypeInformation<OUT> getType() {
+		return TypeExtractor.createTypeInfo(DeserializationSchema.class, schema.getClass(), 0,
+				null, null);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/380934b1/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
index 3e68c45..8a2f2b8 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -17,9 +17,10 @@
 
 package org.apache.flink.streaming.connectors.flume;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.util.SerializationScheme;
+import org.apache.flink.streaming.connectors.util.SerializationSchema;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
@@ -38,12 +39,12 @@ public class FlumeSink<IN> extends RichSinkFunction<IN> {
 	boolean initDone = false;
 	String host;
 	int port;
-	SerializationScheme<IN, byte[]> scheme;
+	SerializationSchema<IN, byte[]> scheme;
 
-	public FlumeSink(String host, int port, SerializationScheme<IN, byte[]> scheme) {
+	public FlumeSink(String host, int port, SerializationSchema<IN, byte[]> schema) {
 		this.host = host;
 		this.port = port;
-		this.scheme = scheme;
+		this.scheme = schema;
 	}
 
 	/**
@@ -56,11 +57,6 @@ public class FlumeSink<IN> extends RichSinkFunction<IN> {
 	@Override
 	public void invoke(IN value) {
 
-		if (!initDone) {
-			client = new FlinkRpcClientFacade();
-			client.init(host, port);
-		}
-
 		byte[] data = scheme.serialize(value);
 		client.sendDataToFlume(data);
 
@@ -136,4 +132,10 @@ public class FlumeSink<IN> extends RichSinkFunction<IN> {
 	public void close() {
 		client.client.close();
 	}
+
+	@Override
+	public void open(Configuration config) {
+		client = new FlinkRpcClientFacade();
+		client.init(host, port);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/380934b1/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
index ceb2be8..4f6ec2d 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
@@ -20,8 +20,8 @@ package org.apache.flink.streaming.connectors.flume;
 import java.util.List;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.function.source.ParallelSourceFunction;
-import org.apache.flink.streaming.connectors.util.DeserializationScheme;
+import org.apache.flink.streaming.connectors.ConnectorSource;
+import org.apache.flink.streaming.connectors.util.DeserializationSchema;
 import org.apache.flink.util.Collector;
 import org.apache.flume.Context;
 import org.apache.flume.channel.ChannelProcessor;
@@ -29,18 +29,17 @@ import org.apache.flume.source.AvroSource;
 import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.flume.source.avro.Status;
 
-public class FlumeSource<OUT> implements ParallelSourceFunction<OUT> {
+public class FlumeSource<OUT> extends ConnectorSource<OUT> {
 	private static final long serialVersionUID = 1L;
 
 	String host;
 	String port;
-	DeserializationScheme<OUT> scheme;
 	volatile boolean finished = false;
 
-	FlumeSource(String host, int port, DeserializationScheme<OUT> scheme) {
+	FlumeSource(String host, int port, DeserializationSchema<OUT> deserializationSchema)
{
+		super(deserializationSchema);
 		this.host = host;
 		this.port = Integer.toString(port);
-		this.scheme = scheme;
 	}
 
 	public class MyAvroSource extends AvroSource {
@@ -87,9 +86,9 @@ public class FlumeSource<OUT> implements ParallelSourceFunction<OUT>
{
 		 */
 		private void collect(AvroFlumeEvent avroEvent) {
 			byte[] b = avroEvent.getBody().array();
-			OUT out = FlumeSource.this.scheme.deserialize(b);
+			OUT out = FlumeSource.this.schema.deserialize(b);
 
-			if (scheme.isEndOfStream(out)) {
+			if (schema.isEndOfStream(out)) {
 				FlumeSource.this.finished = true;
 				this.stop();
 				FlumeSource.this.notifyAll();

http://git-wip-us.apache.org/repos/asf/flink/blob/380934b1/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
index 366b1a5..3cfd7d4 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
@@ -19,8 +19,8 @@ package org.apache.flink.streaming.connectors.flume;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.util.SerializationScheme;
-import org.apache.flink.streaming.connectors.util.SimpleStringScheme;
+import org.apache.flink.streaming.connectors.util.SerializationSchema;
+import org.apache.flink.streaming.connectors.util.SimpleStringSchema;
 
 public class FlumeTopology {
 
@@ -30,13 +30,13 @@ public class FlumeTopology {
 
 		@SuppressWarnings("unused")
 		DataStream<String> dataStream1 = env.addSource(
-				new FlumeSource<String>("localhost", 41414, new SimpleStringScheme())).addSink(
+				new FlumeSource<String>("localhost", 41414, new SimpleStringSchema())).addSink(
 				new FlumeSink<String>("localhost", 42424, new StringToByteSerializer()));
 
 		env.execute();
 	}
 
-	public static class StringToByteSerializer implements SerializationScheme<String, byte[]>
{
+	public static class StringToByteSerializer implements SerializationSchema<String, byte[]>
{
 
 		private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/380934b1/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
index 71e25f2..9bb87a0 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
@@ -24,7 +24,7 @@ import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
 
 import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.util.SerializationScheme;
+import org.apache.flink.streaming.connectors.util.SerializationSchema;
 
 public class KafkaSink<IN, OUT> extends RichSinkFunction<IN> {
 	private static final long serialVersionUID = 1L;
@@ -34,13 +34,13 @@ public class KafkaSink<IN, OUT> extends RichSinkFunction<IN>
{
 	private String topicId;
 	private String brokerAddr;
 	private boolean initDone = false;
-	private SerializationScheme<IN, OUT> scheme;
+	private SerializationSchema<IN, OUT> scheme;
 
 	public KafkaSink(String topicId, String brokerAddr,
-			SerializationScheme<IN, OUT> serializationScheme) {
+			SerializationSchema<IN, OUT> serializationSchema) {
 		this.topicId = topicId;
 		this.brokerAddr = brokerAddr;
-		this.scheme = serializationScheme;
+		this.scheme = serializationSchema;
 
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/380934b1/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
index 49e9144..7328500 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
@@ -29,27 +29,26 @@ import kafka.javaapi.consumer.ConsumerConnector;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.connectors.util.DeserializationScheme;
+import org.apache.flink.streaming.connectors.ConnectorSource;
+import org.apache.flink.streaming.connectors.util.DeserializationSchema;
 import org.apache.flink.util.Collector;
 
-public class KafkaSource<OUT> extends RichParallelSourceFunction<OUT> {
+public class KafkaSource<OUT> extends ConnectorSource<OUT> {
 	private static final long serialVersionUID = 1L;
 
 	private final String zkQuorum;
 	private final String groupId;
 	private final String topicId;
 	private ConsumerConnector consumer;
-	private DeserializationScheme<OUT> scheme;
 
 	OUT outTuple;
 
 	public KafkaSource(String zkQuorum, String groupId, String topicId,
-			DeserializationScheme<OUT> deserializationScheme) {
+			DeserializationSchema<OUT> deserializationSchema) {
+		super(deserializationSchema);
 		this.zkQuorum = zkQuorum;
 		this.groupId = groupId;
 		this.topicId = topicId;
-		this.scheme = deserializationScheme;
 	}
 
 	/**
@@ -81,8 +80,8 @@ public class KafkaSource<OUT> extends RichParallelSourceFunction<OUT>
{
 		ConsumerIterator<byte[], byte[]> it = stream.iterator();
 
 		while (it.hasNext()) {
-			OUT out = scheme.deserialize(it.next().message());
-			if (scheme.isEndOfStream(out)) {
+			OUT out = schema.deserialize(it.next().message());
+			if (schema.isEndOfStream(out)) {
 				break;
 			}
 			collector.collect(out);

http://git-wip-us.apache.org/repos/asf/flink/blob/380934b1/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
index 16f123e..7801d56 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTopology.java
@@ -19,15 +19,11 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.streaming.connectors.util.SimpleStringScheme;
+import org.apache.flink.streaming.connectors.util.SimpleStringSchema;
 import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class KafkaTopology {
-	private static final Logger LOG = LoggerFactory.getLogger(KafkaTopology.class);
 
 	public static final class MySource implements SourceFunction<String> {
 		private static final long serialVersionUID = 1L;
@@ -42,17 +38,6 @@ public class KafkaTopology {
 		}
 	}
 
-	public static final class MyKafkaPrintSink implements SinkFunction<String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void invoke(String value) {
-			if (LOG.isInfoEnabled()) {
-				LOG.info("String: <{}> arrived from Kafka", value);
-			}
-		}
-	}
-
 	public static void main(String[] args) throws Exception {
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -61,11 +46,11 @@ public class KafkaTopology {
 		DataStream<String> stream1 = env
 				.addSource(
 						new KafkaSource<String>("localhost:2181", "group", "test",
-								new SimpleStringScheme())).addSink(new MyKafkaPrintSink());
+								new SimpleStringSchema())).print();
 
 		@SuppressWarnings("unused")
 		DataStream<String> stream2 = env.addSource(new MySource()).addSink(
-				new KafkaSink<String, String>("test", "localhost:9092", new SimpleStringScheme()));
+				new KafkaSink<String, String>("test", "localhost:9092", new SimpleStringSchema()));
 
 		env.execute();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/380934b1/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index c4f4615..38c4f5f 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -21,7 +21,7 @@ import java.io.IOException;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.util.SerializationScheme;
+import org.apache.flink.streaming.connectors.util.SerializationSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,12 +39,12 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
 	private transient ConnectionFactory factory;
 	private transient Connection connection;
 	private transient Channel channel;
-	private SerializationScheme<IN, byte[]> scheme;
+	private SerializationSchema<IN, byte[]> scheme;
 
-	public RMQSink(String HOST_NAME, String QUEUE_NAME, SerializationScheme<IN, byte[]>
scheme) {
+	public RMQSink(String HOST_NAME, String QUEUE_NAME, SerializationSchema<IN, byte[]>
schema) {
 		this.HOST_NAME = HOST_NAME;
 		this.QUEUE_NAME = QUEUE_NAME;
-		this.scheme = scheme;
+		this.scheme = schema;
 	}
 
 	/**
@@ -56,6 +56,7 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
 		try {
 			connection = factory.newConnection();
 			channel = connection.createChannel();
+			channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 
 		} catch (IOException e) {
 			throw new RuntimeException(e);
@@ -71,7 +72,6 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
 	@Override
 	public void invoke(IN value) {
 		try {
-			channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 			byte[] msg = scheme.serialize(value);
 
 			channel.basicPublish("", QUEUE_NAME, null, msg);

http://git-wip-us.apache.org/repos/asf/flink/blob/380934b1/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 240deab..7ce864e 100755
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -21,8 +21,8 @@ import java.io.IOException;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.connectors.util.DeserializationScheme;
+import org.apache.flink.streaming.connectors.ConnectorSource;
+import org.apache.flink.streaming.connectors.util.DeserializationSchema;
 import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,7 +32,7 @@ import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.QueueingConsumer;
 
-public class RMQSource<OUT> extends RichParallelSourceFunction<OUT> {
+public class RMQSource<OUT> extends ConnectorSource<OUT> {
 	private static final long serialVersionUID = 1L;
 
 	private static final Logger LOG = LoggerFactory.getLogger(RMQSource.class);
@@ -46,11 +46,11 @@ public class RMQSource<OUT> extends RichParallelSourceFunction<OUT>
{
 	private transient QueueingConsumer consumer;
 	private transient QueueingConsumer.Delivery delivery;
 
-	private DeserializationScheme<OUT> scheme;
-
 	OUT out;
 
-	public RMQSource(String HOST_NAME, String QUEUE_NAME, DeserializationScheme<OUT> scheme)
{
+	public RMQSource(String HOST_NAME, String QUEUE_NAME,
+			DeserializationSchema<OUT> deserializationSchema) {
+		super(deserializationSchema);
 		this.HOST_NAME = HOST_NAME;
 		this.QUEUE_NAME = QUEUE_NAME;
 	}
@@ -92,8 +92,8 @@ public class RMQSource<OUT> extends RichParallelSourceFunction<OUT>
{
 				}
 			}
 
-			out = scheme.deserialize(delivery.getBody());
-			if (scheme.isEndOfStream(out)) {
+			out = schema.deserialize(delivery.getBody());
+			if (schema.isEndOfStream(out)) {
 				break;
 			} else {
 				collector.collect(out);

http://git-wip-us.apache.org/repos/asf/flink/blob/380934b1/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
index 4f82b24..a6ca9ae 100755
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
@@ -19,8 +19,8 @@ package org.apache.flink.streaming.connectors.rabbitmq;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.util.SerializationScheme;
-import org.apache.flink.streaming.connectors.util.SimpleStringScheme;
+import org.apache.flink.streaming.connectors.util.SerializationSchema;
+import org.apache.flink.streaming.connectors.util.SimpleStringSchema;
 
 public class RMQTopology {
 
@@ -30,7 +30,7 @@ public class RMQTopology {
 
 		@SuppressWarnings("unused")
 		DataStream<String> dataStream1 = env.addSource(
-				new RMQSource<String>("localhost", "hello", new SimpleStringScheme())).print();
+				new RMQSource<String>("localhost", "hello", new SimpleStringSchema())).print();
 
 		@SuppressWarnings("unused")
 		DataStream<String> dataStream2 = env.fromElements("one", "two", "three", "four",
"five",
@@ -40,7 +40,7 @@ public class RMQTopology {
 		env.execute();
 	}
 
-	public static class StringToByteSerializer implements SerializationScheme<String, byte[]>
{
+	public static class StringToByteSerializer implements SerializationSchema<String, byte[]>
{
 
 		private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/380934b1/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java
new file mode 100644
index 0000000..4507a1d
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.util;
+
+import java.io.Serializable;
+
+public interface DeserializationSchema<T> extends Serializable {
+
+	/**
+	 * Deserializes the incoming data.
+	 * 
+	 * @param message
+	 *            The incoming message in a byte array
+	 * @return The deserialized message in the required format.
+	 */
+	public T deserialize(byte[] message);
+
+	/**
+	 * Method to decide whether the element signals the end of the stream. If
+	 * true is returned the element won't be emitted
+	 * 
+	 * @param nextElement
+	 *            The element to test for end signal
+	 * @return The end signal, if true the stream shuts down
+	 */
+	public boolean isEndOfStream(T nextElement);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/380934b1/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationScheme.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationScheme.java
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationScheme.java
deleted file mode 100644
index 43420a3..0000000
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationScheme.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.util;
-
-import java.io.Serializable;
-
-public interface DeserializationScheme<T> extends Serializable {
-
-	/**
-	 * Deserializes the incoming data.
-	 * 
-	 * @param message
-	 *            The incoming message in a byte array
-	 * @return The deserialized message in the required format.
-	 */
-	public T deserialize(byte[] message);
-
-	/**
-	 * Method to decide whether the element signals the end of the stream. If
-	 * true is returned the element won't be emitted
-	 * 
-	 * @param nextElement
-	 *            The element to test for end signal
-	 * @return The end signal, if true the stream shuts down
-	 */
-	public boolean isEndOfStream(T nextElement);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/380934b1/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java
new file mode 100644
index 0000000..29c749a
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.util;
+
+public class RawSchema implements DeserializationSchema<byte[]>,
+		SerializationSchema<byte[], byte[]> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public byte[] deserialize(byte[] message) {
+		return message;
+	}
+
+	@Override
+	public boolean isEndOfStream(byte[] nextElement) {
+		return false;
+	}
+
+	@Override
+	public byte[] serialize(byte[] element) {
+		return element;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/380934b1/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawScheme.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawScheme.java
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawScheme.java
deleted file mode 100644
index 34db00e..0000000
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawScheme.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.util;
-
-public class RawScheme implements DeserializationScheme<byte[]>,
-		SerializationScheme<byte[], byte[]> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public byte[] deserialize(byte[] message) {
-		return message;
-	}
-
-	@Override
-	public boolean isEndOfStream(byte[] nextElement) {
-		return false;
-	}
-
-	@Override
-	public byte[] serialize(byte[] element) {
-		return element;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/380934b1/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java
new file mode 100644
index 0000000..7c32312
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.util;
+
+import java.io.Serializable;
+
+public interface SerializationSchema<T,R> extends Serializable {
+
+	/**
+	 * Serializes the incoming element to a specified type.
+	 * 
+	 * @param element
+	 *            The incoming element to be serialized
+	 * @return The serialized element.
+	 */
+	public R serialize(T element);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/380934b1/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationScheme.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationScheme.java
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationScheme.java
deleted file mode 100644
index bb25885..0000000
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationScheme.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.util;
-
-import java.io.Serializable;
-
-public interface SerializationScheme<T,R> extends Serializable {
-
-	/**
-	 * Serializes the incoming element to a specified type.
-	 * 
-	 * @param element
-	 *            The incoming element to be serialized
-	 * @return The serialized element.
-	 */
-	public R serialize(T element);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/380934b1/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java
new file mode 100644
index 0000000..4b21580
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.util;
+
+public class SimpleStringSchema implements DeserializationSchema<String>,
+		SerializationSchema<String, String> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public String deserialize(byte[] message) {
+		return new String(message);
+	}
+
+	@Override
+	public boolean isEndOfStream(String nextElement) {
+		return false;
+	}
+
+	@Override
+	public String serialize(String element) {
+		return element;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/380934b1/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringScheme.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringScheme.java
b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringScheme.java
deleted file mode 100644
index 9d1ce6c..0000000
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringScheme.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.util;
-
-public class SimpleStringScheme implements DeserializationScheme<String>,
-		SerializationScheme<String, String> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public String deserialize(byte[] message) {
-		return new String(message);
-	}
-
-	@Override
-	public boolean isEndOfStream(String nextElement) {
-		return false;
-	}
-
-	@Override
-	public String serialize(String element) {
-		return element;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/380934b1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index c7792a4..6fa9d74 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -40,6 +40,7 @@ import org.apache.flink.streaming.api.function.source.FileSourceFunction;
 import org.apache.flink.streaming.api.function.source.FileStreamFunction;
 import org.apache.flink.streaming.api.function.source.FromElementsFunction;
 import org.apache.flink.streaming.api.function.source.GenSequenceFunction;
+import org.apache.flink.streaming.api.function.source.GenericSourceFunction;
 import org.apache.flink.streaming.api.function.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.function.source.SocketTextStreamFunction;
@@ -414,12 +415,17 @@ public abstract class StreamExecutionEnvironment {
 	 *            type of the returned stream
 	 * @return the data stream constructed
 	 */
+	@SuppressWarnings("unchecked")
 	private <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function,
 			TypeInformation<OUT> outTypeInfo, String sourceName) {
 
 		if (outTypeInfo == null) {
-			outTypeInfo = TypeExtractor.createTypeInfo(SourceFunction.class, function.getClass(),
-					0, null, null);
+			if (function instanceof GenericSourceFunction) {
+				outTypeInfo = ((GenericSourceFunction<OUT>) function).getType();
+			} else {
+				outTypeInfo = TypeExtractor.createTypeInfo(SourceFunction.class,
+						function.getClass(), 0, null, null);
+			}
 		}
 
 		boolean isParallel = function instanceof ParallelSourceFunction;

http://git-wip-us.apache.org/repos/asf/flink/blob/380934b1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenericSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenericSourceFunction.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenericSourceFunction.java
new file mode 100644
index 0000000..664d39a
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenericSourceFunction.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.source;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+public interface GenericSourceFunction<T> {
+
+	public TypeInformation<T> getType();
+}


Mime
View raw message