flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [5/7] flink git commit: [FLINK-1874] [streaming] Connectors separated into individual projects
Date Thu, 28 May 2015 13:36:31 GMT
[FLINK-1874] [streaming] Connectors separated into individual projects


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/665bcec7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/665bcec7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/665bcec7

Branch: refs/heads/master
Commit: 665bcec779ef16e9ae2ba535631765d329f81392
Parents: 72828b5
Author: mbalassi <mbalassi@apache.org>
Authored: Sat May 23 10:50:58 2015 +0200
Committer: mbalassi <mbalassi@apache.org>
Committed: Thu May 28 14:44:22 2015 +0200

----------------------------------------------------------------------
 docs/apis/streaming_guide.md                    |   30 +-
 .../flink-connector-flume/pom.xml               |  171 +++
 .../streaming/connectors/flume/FlumeSink.java   |  141 +++
 .../streaming/connectors/flume/FlumeSource.java |  149 +++
 .../connectors/flume/FlumeTopology.java         |   49 +
 .../flink-connector-kafka/pom.xml               |  103 ++
 .../connectors/kafka/KafkaConsumerExample.java  |   58 +
 .../connectors/kafka/KafkaProducerExample.java  |   80 ++
 .../flink/streaming/connectors/kafka/Utils.java |   70 ++
 .../connectors/kafka/api/KafkaSink.java         |  193 ++++
 .../connectors/kafka/api/KafkaSource.java       |  219 ++++
 .../kafka/api/config/PartitionerWrapper.java    |   49 +
 .../api/persistent/PersistentKafkaSource.java   |  338 ++++++
 .../partitioner/KafkaConstantPartitioner.java   |   33 +
 .../SerializableKafkaPartitioner.java           |   24 +
 .../flink/streaming/kafka/KafkaITCase.java      | 1023 ++++++++++++++++++
 .../kafka/util/KafkaLocalSystemTime.java        |   48 +
 .../src/test/resources/log4j-test.properties    |   27 +
 .../src/test/resources/logback-test.xml         |   30 +
 .../flink-connector-rabbitmq/pom.xml            |   59 +
 .../streaming/connectors/rabbitmq/RMQSink.java  |  111 ++
 .../connectors/rabbitmq/RMQSource.java          |  138 +++
 .../connectors/rabbitmq/RMQTopology.java        |   52 +
 .../flink-connector-twitter/pom.xml             |   95 ++
 .../connectors/json/JSONParseFlatMap.java       |  144 +++
 .../streaming/connectors/json/JSONParser.java   |  175 +++
 .../connectors/twitter/TwitterSource.java       |  322 ++++++
 .../connectors/twitter/TwitterStreaming.java    |   99 ++
 .../connectors/twitter/TwitterTopology.java     |   92 ++
 .../connectors/json/JSONParserTest.java         |   74 ++
 .../connectors/json/JSONParserTest2.java        |   95 ++
 .../src/test/resources/log4j-test.properties    |   27 +
 .../src/test/resources/logback-test.xml         |   30 +
 .../flink-streaming-connectors/pom.xml          |  215 +---
 .../streaming/connectors/ConnectorSource.java   |   38 -
 .../streaming/connectors/flume/FlumeSink.java   |  141 ---
 .../streaming/connectors/flume/FlumeSource.java |  149 ---
 .../connectors/flume/FlumeTopology.java         |   49 -
 .../connectors/json/JSONParseFlatMap.java       |  144 ---
 .../streaming/connectors/json/JSONParser.java   |  175 ---
 .../connectors/kafka/KafkaConsumerExample.java  |   58 -
 .../connectors/kafka/KafkaProducerExample.java  |   80 --
 .../flink/streaming/connectors/kafka/Utils.java |   70 --
 .../connectors/kafka/api/KafkaSink.java         |  193 ----
 .../connectors/kafka/api/KafkaSource.java       |  219 ----
 .../kafka/api/config/PartitionerWrapper.java    |   49 -
 .../api/persistent/PersistentKafkaSource.java   |  338 ------
 .../partitioner/KafkaConstantPartitioner.java   |   33 -
 .../SerializableKafkaPartitioner.java           |   24 -
 .../streaming/connectors/rabbitmq/RMQSink.java  |  111 --
 .../connectors/rabbitmq/RMQSource.java          |  138 ---
 .../connectors/rabbitmq/RMQTopology.java        |   52 -
 .../connectors/twitter/TwitterSource.java       |  322 ------
 .../connectors/twitter/TwitterStreaming.java    |   99 --
 .../connectors/twitter/TwitterTopology.java     |   92 --
 .../connectors/json/JSONParserTest.java         |   74 --
 .../connectors/json/JSONParserTest2.java        |   95 --
 .../streaming/connectors/kafka/KafkaITCase.java | 1023 ------------------
 .../kafka/util/KafkaLocalSystemTime.java        |   48 -
 .../src/test/resources/log4j-test.properties    |   27 -
 .../src/test/resources/logback-test.xml         |   30 -
 .../api/functions/source/ConnectorSource.java   |   38 +
 flink-staging/pom.xml                           |   12 +-
 63 files changed, 4397 insertions(+), 4087 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index ab23695..6565fbf 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -1213,7 +1213,15 @@ To run an application using one of these connectors usually additional third par
 
 ### Apache Kafka
 
-This connector provides access to data streams from [Apache Kafka](https://kafka.apache.org/).
+This connector provides access to data streams from [Apache Kafka](https://kafka.apache.org/). For using this connector add the following dependency to your project:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-kafka-connector</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
 
 #### Installing Apache Kafka
 * Follow the instructions from [Kafka's quickstart](https://kafka.apache.org/documentation.html#quickstart) to download the code and launch a server (launching a Zookeeper and a Kafka server is required every time before starting the application).
@@ -1433,7 +1441,15 @@ More on Flume can be found [here](http://flume.apache.org).
 
 ### RabbitMQ
 
-This connector provides access to data streams from [RabbitMQ](http://www.rabbitmq.com/).
+This connector provides access to data streams from [RabbitMQ](http://www.rabbitmq.com/).For using this connector add the following dependency to your project:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-rabbitmq-connector</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
 
 ##### Installing RabbitMQ
 Follow the instructions from the [RabbitMQ download page](http://www.rabbitmq.com/download.html). After the installation the server automatically starts and the application connecting to RabbitMQ can be launched.
@@ -1497,7 +1513,15 @@ More about RabbitMQ can be found [here](http://www.rabbitmq.com/).
 
 ### Twitter Streaming API
 
-Twitter Streaming API provides opportunity to connect to the stream of tweets made available by Twitter. Flink Streaming comes with a built-in `TwitterSource` class for establishing a connection to this stream.
+Twitter Streaming API provides opportunity to connect to the stream of tweets made available by Twitter. Flink Streaming comes with a built-in `TwitterSource` class for establishing a connection to this stream.For using this connector add the following dependency to your project:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-twitter-connector</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
 
 #### Authentication
 In order to connect to Twitter stream the user has to register their program and acquire the necessary information for the authentication. The process is described below.

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/pom.xml
new file mode 100644
index 0000000..9ed9777
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/pom.xml
@@ -0,0 +1,171 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-streaming-connectors-parent</artifactId>
+		<version>0.9-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-connector-flume</artifactId>
+	<name>flink-connector-flume</name>
+
+	<packaging>jar</packaging>
+
+	<!-- Allow users to pass custom connector versions -->
+	<properties>
+		<flume-ng.version>1.5.0</flume-ng.version>
+	</properties>
+
+	<dependencies>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flume</groupId>
+			<artifactId>flume-ng-core</artifactId>
+			<version>${flume-ng.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-log4j12</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>log4j</groupId>
+					<artifactId>log4j</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>commons-io</groupId>
+					<artifactId>commons-io</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>commons-codec</groupId>
+					<artifactId>commons-codec</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>commons-cli</groupId>
+					<artifactId>commons-cli</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>commons-lang</groupId>
+					<artifactId>commons-lang</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.apache.avro</groupId>
+					<artifactId>avro</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.codehaus.jackson</groupId>
+					<artifactId>jackson-core-asl</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.codehaus.jackson</groupId>
+					<artifactId>jackson-mapper-asl</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.thoughtworks.paranamer</groupId>
+					<artifactId>paranamer</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.xerial.snappy</groupId>
+					<artifactId>snappy-java</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.tukaani</groupId>
+					<artifactId>xz</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.apache.velocity</groupId>
+					<artifactId>velocity</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>commons-collections</groupId>
+					<artifactId>commons-collections</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>servlet-api</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty-util</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.mortbay.jetty</groupId>
+					<artifactId>jetty</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.google.code.gson</groupId>
+					<artifactId>gson</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.apache.thrift</groupId>
+					<artifactId>libthrift</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<!-- Override artifactSet configuration to build fat-jar with all dependencies packed. -->
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<version>2.3</version>
+				<executions>
+					<execution>
+						<id>shade-flink</id>
+						<configuration>
+							<artifactSet>
+								<includes combine.children="append">
+									<!-- We include all dependencies that transitively depend on guava -->
+									<include>org.apache.flume:*</include>
+								</includes>
+							</artifactSet>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
new file mode 100644
index 0000000..50f5770
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.flume;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientFactory;
+import org.apache.flume.event.EventBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlumeSink<IN> extends RichSinkFunction<IN> {
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(FlumeSink.class);
+
+	private transient FlinkRpcClientFacade client;
+	boolean initDone = false;
+	String host;
+	int port;
+	SerializationSchema<IN, byte[]> schema;
+
+	public FlumeSink(String host, int port, SerializationSchema<IN, byte[]> schema) {
+		this.host = host;
+		this.port = port;
+		this.schema = schema;
+	}
+
+	/**
+	 * Receives tuples from the Apache Flink {@link DataStream} and forwards
+	 * them to Apache Flume.
+	 * 
+	 * @param value
+	 *            The tuple arriving from the datastream
+	 */
+	@Override
+	public void invoke(IN value) {
+
+		byte[] data = schema.serialize(value);
+		client.sendDataToFlume(data);
+
+	}
+
+	private class FlinkRpcClientFacade {
+		private RpcClient client;
+		private String hostname;
+		private int port;
+
+		/**
+		 * Initializes the connection to Apache Flume.
+		 * 
+		 * @param hostname
+		 *            The host
+		 * @param port
+		 *            The port.
+		 */
+		public void init(String hostname, int port) {
+			// Setup the RPC connection
+			this.hostname = hostname;
+			this.port = port;
+			int initCounter = 0;
+			while (true) {
+				if (initCounter >= 90) {
+					throw new RuntimeException("Cannot establish connection with" + port + " at "
+							+ host);
+				}
+				try {
+					this.client = RpcClientFactory.getDefaultInstance(hostname, port);
+				} catch (FlumeException e) {
+					// Wait one second if the connection failed before the next
+					// try
+					try {
+						Thread.sleep(1000);
+					} catch (InterruptedException e1) {
+						if (LOG.isErrorEnabled()) {
+							LOG.error("Interrupted while trying to connect {} at {}", port, host);
+						}
+					}
+				}
+				if (client != null) {
+					break;
+				}
+				initCounter++;
+			}
+			initDone = true;
+		}
+
+		/**
+		 * Sends byte arrays as {@link Event} series to Apache Flume.
+		 * 
+		 * @param data
+		 *            The byte array to send to Apache FLume
+		 */
+		public void sendDataToFlume(byte[] data) {
+			Event event = EventBuilder.withBody(data);
+
+			try {
+				client.append(event);
+
+			} catch (EventDeliveryException e) {
+				// clean up and recreate the client
+				client.close();
+				client = null;
+				client = RpcClientFactory.getDefaultInstance(hostname, port);
+			}
+		}
+
+	}
+
+	@Override
+	public void close() {
+		client.client.close();
+	}
+
+	@Override
+	public void open(Configuration config) {
+		client = new FlinkRpcClientFacade();
+		client.init(host, port);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
new file mode 100644
index 0000000..8fecd0f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
@@ -0,0 +1,149 @@
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one or more
+// * contributor license agreements.  See the NOTICE file distributed with
+// * this work for additional information regarding copyright ownership.
+// * The ASF licenses this file to You under the Apache License, Version 2.0
+// * (the "License"); you may not use this file except in compliance with
+// * the License.  You may obtain a copy of the License at
+// *
+// *    http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.flink.streaming.connectors.flume;
+//
+//import java.util.List;
+//
+//import org.apache.flink.streaming.api.datastream.DataStream;
+//import org.apache.flink.streaming.api.functions.source.ConnectorSource;
+//import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+//import org.apache.flink.util.Collector;
+//import org.apache.flume.Context;
+//import org.apache.flume.channel.ChannelProcessor;
+//import org.apache.flume.source.AvroSource;
+//import org.apache.flume.source.avro.AvroFlumeEvent;
+//import org.apache.flume.source.avro.Status;
+//
+//public class FlumeSource<OUT> extends ConnectorSource<OUT> {
+//	private static final long serialVersionUID = 1L;
+//
+//	String host;
+//	String port;
+//	volatile boolean finished = false;
+//
+//	private volatile boolean isRunning = false;
+//
+//	FlumeSource(String host, int port, DeserializationSchema<OUT> deserializationSchema) {
+//		super(deserializationSchema);
+//		this.host = host;
+//		this.port = Integer.toString(port);
+//	}
+//
+//	public class MyAvroSource extends AvroSource {
+//		Collector<OUT> output;
+//
+//		/**
+//		 * Sends the AvroFlumeEvent from it's argument list to the Apache Flink
+//		 * {@link DataStream}.
+//		 *
+//		 * @param avroEvent
+//		 *            The event that should be sent to the dataStream
+//		 * @return A {@link Status}.OK message if sending the event was
+//		 *         successful.
+//		 */
+//		@Override
+//		public Status append(AvroFlumeEvent avroEvent) {
+//			collect(avroEvent);
+//			return Status.OK;
+//		}
+//
+//		/**
+//		 * Sends the AvroFlumeEvents from it's argument list to the Apache Flink
+//		 * {@link DataStream}.
+//		 *
+//		 * @param events
+//		 *            The events that is sent to the dataStream
+//		 * @return A Status.OK message if sending the events was successful.
+//		 */
+//		@Override
+//		public Status appendBatch(List<AvroFlumeEvent> events) {
+//			for (AvroFlumeEvent avroEvent : events) {
+//				collect(avroEvent);
+//			}
+//
+//			return Status.OK;
+//		}
+//
+//		/**
+//		 * Deserializes the AvroFlumeEvent before sending it to the Apache Flink
+//		 * {@link DataStream}.
+//		 *
+//		 * @param avroEvent
+//		 *            The event that is sent to the dataStream
+//		 */
+//		private void collect(AvroFlumeEvent avroEvent) {
+//			byte[] b = avroEvent.getBody().array();
+//			OUT out = FlumeSource.this.schema.deserialize(b);
+//
+//			if (schema.isEndOfStream(out)) {
+//				FlumeSource.this.finished = true;
+//				this.stop();
+//				FlumeSource.this.notifyAll();
+//			} else {
+//				output.collect(out);
+//			}
+//
+//		}
+//
+//	}
+//
+//	MyAvroSource avroSource;
+//
+//	/**
+//	 * Configures the AvroSource. Also sets the output so the application can
+//	 * use it from outside of the invoke function.
+//	 *
+//	 * @param output
+//	 *            The output used in the invoke function
+//	 */
+//	public void configureAvroSource(Collector<OUT> output) {
+//
+//		avroSource = new MyAvroSource();
+//		avroSource.output = output;
+//		Context context = new Context();
+//		context.put("port", port);
+//		context.put("bind", host);
+//		avroSource.configure(context);
+//		// An instance of a ChannelProcessor is required for configuring the
+//		// avroSource although it will not be used in this case.
+//		ChannelProcessor cp = new ChannelProcessor(null);
+//		avroSource.setChannelProcessor(cp);
+//	}
+//
+//	/**
+//	 * Configures the AvroSource and runs until the user calls a close function.
+//	 *
+//	 * @param output
+//	 *            The Collector for sending data to the datastream
+//	 */
+//	@Override
+//	public void run(Collector<OUT> output) throws Exception {
+//		isRunning = true;
+//		configureAvroSource(output);
+//		avroSource.start();
+//		while (!finished && isRunning) {
+//			this.wait();
+//		}
+//	}
+//
+//	@Override
+//	public void cancel() {
+//		isRunning = false;
+//	}
+//
+//}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
new file mode 100644
index 0000000..f630bce
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
@@ -0,0 +1,49 @@
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one or more
+// * contributor license agreements.  See the NOTICE file distributed with
+// * this work for additional information regarding copyright ownership.
+// * The ASF licenses this file to You under the Apache License, Version 2.0
+// * (the "License"); you may not use this file except in compliance with
+// * the License.  You may obtain a copy of the License at
+// *
+// *    http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.flink.streaming.connectors.flume;
+//
+//import org.apache.flink.streaming.api.datastream.DataStream;
+//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+//import org.apache.flink.streaming.util.serialization.SerializationSchema;
+//import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+//
+//public class FlumeTopology {
+//
+//	public static void main(String[] args) throws Exception {
+//
+//		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+//
+//		@SuppressWarnings("unused")
+//		DataStream<String> dataStream1 = env.addSource(
+//				new FlumeSource<String>("localhost", 41414, new SimpleStringSchema())).addSink(
+//				new FlumeSink<String>("localhost", 42424, new StringToByteSerializer()));
+//
+//		env.execute();
+//	}
+//
+//	public static class StringToByteSerializer implements SerializationSchema<String, byte[]> {
+//
+//		private static final long serialVersionUID = 1L;
+//
+//		@Override
+//		public byte[] serialize(String element) {
+//			return element.getBytes();
+//		}
+//	}
+//
+//}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/pom.xml
new file mode 100644
index 0000000..e9c9da2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/pom.xml
@@ -0,0 +1,103 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-streaming-connectors-parent</artifactId>
+		<version>0.9-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-connector-kafka</artifactId>
+	<name>flink-connector-kafka</name>
+
+	<packaging>jar</packaging>
+
+	<!-- Allow users to pass custom connector versions -->
+	<properties>
+		<kafka.version>0.8.2.0</kafka.version>
+	</properties>
+
+	<dependencies>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka_${scala.binary.version}</artifactId>
+			<version>${kafka.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>com.sun.jmx</groupId>
+					<artifactId>jmxri</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.sun.jdmk</groupId>
+					<artifactId>jmxtools</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>log4j</groupId>
+					<artifactId>log4j</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-simple</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>net.sf.jopt-simple</groupId>
+					<artifactId>jopt-simple</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.scala-lang</groupId>
+					<artifactId>scala-reflect</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.scala-lang</groupId>
+					<artifactId>scala-compiler</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.yammer.metrics</groupId>
+					<artifactId>metrics-annotation</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.xerial.snappy</groupId>
+					<artifactId>snappy-java</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.curator</groupId>
+			<artifactId>curator-test</artifactId>
+			<version>2.7.1</version>
+		</dependency>
+
+	</dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
new file mode 100644
index 0000000..fe6684d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
+
+public class KafkaConsumerExample {
+
+	private static String host;
+	private static int port;
+	private static String topic;
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4);
+
+		DataStream<String> kafkaStream = env
+				.addSource(new KafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema()));
+		kafkaStream.print();
+
+		env.execute();
+	}
+
+	private static boolean parseParameters(String[] args) {
+		if (args.length == 3) {
+			host = args[0];
+			port = Integer.parseInt(args[1]);
+			topic = args[2];
+			return true;
+		} else {
+			System.err.println("Usage: KafkaConsumerExample <host> <port> <topic>");
+			return false;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
new file mode 100644
index 0000000..4dd5577
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
+
+public class KafkaProducerExample {
+
+	private static String host;
+	private static int port;
+	private static String topic;
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4);
+
+		@SuppressWarnings({ "unused", "serial" })
+		DataStream<String> stream1 = env.addSource(new SourceFunction<String>() {
+
+			private int index = 0;
+
+			@Override
+			public boolean reachedEnd() throws Exception {
+				return index >= 20;
+			}
+
+			@Override
+			public String next() throws Exception {
+				if (index < 20) {
+					String result = "message #" + index;
+					index++;
+					return result;
+				}
+
+				return "q";
+			}
+
+		}).addSink(
+				new KafkaSink<String>(host + ":" + port, topic, new JavaDefaultStringSchema())
+		)
+		.setParallelism(3);
+
+		env.execute();
+	}
+
+	private static boolean parseParameters(String[] args) {
+		if (args.length == 3) {
+			host = args[0];
+			port = Integer.parseInt(args[1]);
+			topic = args[2];
+			return true;
+		} else {
+			System.err.println("Usage: KafkaProducerExample <host> <port> <topic>");
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java
new file mode 100644
index 0000000..4286196
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import java.io.IOException;
+
+public class Utils {
+	public static class TypeInformationSerializationSchema<T>
+			implements DeserializationSchema<T>, SerializationSchema<T, byte[]> {
+		private final TypeSerializer<T> serializer;
+		private final TypeInformation<T> ti;
+
+		public TypeInformationSerializationSchema(Object type, ExecutionConfig ec) {
+			this.ti = (TypeInformation<T>) TypeExtractor.getForObject(type);
+			this.serializer = ti.createSerializer(ec);
+		}
+		@Override
+		public T deserialize(byte[] message) {
+			try {
+				return serializer.deserialize(new ByteArrayInputView(message));
+			} catch (IOException e) {
+				throw new RuntimeException("Unable to deserialize message", e);
+			}
+		}
+
+		@Override
+		public boolean isEndOfStream(T nextElement) {
+			return false;
+		}
+
+		@Override
+		public byte[] serialize(T element) {
+			DataOutputSerializer dos = new DataOutputSerializer(16);
+			try {
+				serializer.serialize(element, dos);
+			} catch (IOException e) {
+				throw new RuntimeException("Unable to serialize record", e);
+			}
+			return dos.getByteArray();
+		}
+
+		@Override
+		public TypeInformation<T> getProducedType() {
+			return ti;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
new file mode 100644
index 0000000..0965b29
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.api;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.kafka.api.config.PartitionerWrapper;
+import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import kafka.serializer.DefaultEncoder;
+
+
+/**
+ * Sink that emits its inputs to a Kafka topic.
+ *
+ * @param <IN>
+ * 		Type of the sink input
+ */
+public class KafkaSink<IN> extends RichSinkFunction<IN> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
+
+	private Producer<IN, byte[]> producer;
+	private Properties userDefinedProperties;
+	private String topicId;
+	private String brokerList;
+	private SerializationSchema<IN, byte[]> schema;
+	private SerializableKafkaPartitioner partitioner;
+	private Class<? extends SerializableKafkaPartitioner> partitionerClass = null;
+
+	/**
+	 * Creates a KafkaSink for a given topic. The sink produces its input to
+	 * the topic.
+	 *
+	 * @param brokerList
+	 *			Addresses of the brokers
+	 * @param topicId
+	 * 		ID of the Kafka topic.
+	 * @param serializationSchema
+	 * 		User defined serialization schema.
+	 */
+	public KafkaSink(String brokerList, String topicId,
+			SerializationSchema<IN, byte[]> serializationSchema) {
+		this(brokerList, topicId, new Properties(), serializationSchema);
+	}
+
+	/**
+	 * Creates a KafkaSink for a given topic with custom Producer configuration.
+	 * If you use this constructor, the broker should be set with the "metadata.broker.list"
+	 * configuration.
+	 *
+	 * @param brokerList
+	 * 		Addresses of the brokers
+	 * @param topicId
+	 * 		ID of the Kafka topic.
+	 * @param producerConfig
+	 * 		Configurations of the Kafka producer
+	 * @param serializationSchema
+	 * 		User defined serialization schema.
+	 */
+	public KafkaSink(String brokerList, String topicId, Properties producerConfig,
+			SerializationSchema<IN, byte[]> serializationSchema) {
+		String[] elements = brokerList.split(",");
+		for(String broker: elements) {
+			NetUtils.ensureCorrectHostnamePort(broker);
+		}
+		Preconditions.checkNotNull(topicId, "TopicID not set");
+
+		this.brokerList = brokerList;
+		this.topicId = topicId;
+		this.schema = serializationSchema;
+		this.partitionerClass = null;
+		this.userDefinedProperties = producerConfig;
+	}
+
+	/**
+	 * Creates a KafkaSink for a given topic. The sink produces its input to
+	 * the topic.
+	 *
+	 * @param brokerList
+	 * @param topicId
+	 * 		ID of the Kafka topic.
+	 * @param serializationSchema
+	 * 		User defined serialization schema.
+	 * @param partitioner
+	 * 		User defined partitioner.
+	 */
+	public KafkaSink(String brokerList, String topicId,
+			SerializationSchema<IN, byte[]> serializationSchema, SerializableKafkaPartitioner partitioner) {
+		this(brokerList, topicId, serializationSchema);
+		ClosureCleaner.ensureSerializable(partitioner);
+		this.partitioner = partitioner;
+	}
+
+	public KafkaSink(String brokerList,
+			String topicId,
+			SerializationSchema<IN, byte[]> serializationSchema,
+			Class<? extends SerializableKafkaPartitioner> partitioner) {
+		this(brokerList, topicId, serializationSchema);
+		this.partitionerClass = partitioner;
+	}
+
+	/**
+	 * Initializes the connection to Kafka.
+	 */
+	@Override
+	public void open(Configuration configuration) {
+
+		Properties properties = new Properties();
+
+		properties.put("metadata.broker.list", brokerList);
+		properties.put("request.required.acks", "-1");
+		properties.put("message.send.max.retries", "10");
+
+		properties.put("serializer.class", DefaultEncoder.class.getCanonicalName());
+
+		// this will not be used as the key will not be serialized
+		properties.put("key.serializer.class", DefaultEncoder.class.getCanonicalName());
+
+		for (Map.Entry<Object, Object> propertiesEntry : userDefinedProperties.entrySet()) {
+			properties.put(propertiesEntry.getKey(), propertiesEntry.getValue());
+		}
+
+		if (partitioner != null) {
+			properties.put("partitioner.class", PartitionerWrapper.class.getCanonicalName());
+			// java serialization will do the rest.
+			properties.put(PartitionerWrapper.SERIALIZED_WRAPPER_NAME, partitioner);
+		}
+		if (partitionerClass != null) {
+			properties.put("partitioner.class", partitionerClass);
+		}
+
+		ProducerConfig config = new ProducerConfig(properties);
+
+		try {
+			producer = new Producer<IN, byte[]>(config);
+		} catch (NullPointerException e) {
+			throw new RuntimeException("Cannot connect to Kafka broker " + brokerList, e);
+		}
+	}
+
+	/**
+	 * Called when new data arrives to the sink, and forwards it to Kafka.
+	 *
+	 * @param next
+	 * 		The incoming data
+	 */
+	@Override
+	public void invoke(IN next) {
+		byte[] serialized = schema.serialize(next);
+
+		// Sending message without serializable key.
+		producer.send(new KeyedMessage<IN, byte[]>(topicId, null, next, serialized));
+	}
+
+	@Override
+	public void close() {
+		if (producer != null) {
+			producer.close();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
new file mode 100644
index 0000000..4a7ec15
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.api;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import com.google.common.base.Preconditions;
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.ConnectorSource;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Source that listens to a Kafka topic using the high level Kafka API.
+ * 
+ * @param <OUT>
+ *            Type of the messages on the topic.
+ */
+public class KafkaSource<OUT> extends ConnectorSource<OUT> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
+
+	private final String zookeeperAddress;
+	private final String groupId;
+	private final String topicId;
+	private Properties customProperties;
+
+	private transient ConsumerConnector consumer;
+	private transient ConsumerIterator<byte[], byte[]> consumerIterator;
+
+	private long zookeeperSyncTimeMillis;
+	private static final long ZOOKEEPER_DEFAULT_SYNC_TIME = 200;
+	private static final String DEFAULT_GROUP_ID = "flink-group";
+
+	// We must read this in reachedEnd() to check for the end. We keep it to return it in
+	// next()
+	private OUT nextElement;
+
+	/**
+	 * Creates a KafkaSource that consumes a topic.
+	 *
+	 * @param zookeeperAddress
+	 *            Address of the Zookeeper host (with port number).
+	 * @param topicId
+	 *            ID of the Kafka topic.
+	 * @param groupId
+	 * 			   ID of the consumer group.
+	 * @param deserializationSchema
+	 *            User defined deserialization schema.
+	 * @param zookeeperSyncTimeMillis
+	 *            Synchronization time with zookeeper.
+	 */
+	public KafkaSource(String zookeeperAddress,
+					String topicId, String groupId,
+					DeserializationSchema<OUT> deserializationSchema,
+					long zookeeperSyncTimeMillis) {
+		this(zookeeperAddress, topicId, groupId, deserializationSchema, zookeeperSyncTimeMillis, null);
+	}
+	/**
+	 * Creates a KafkaSource that consumes a topic.
+	 * 
+	 * @param zookeeperAddress
+	 *            Address of the Zookeeper host (with port number).
+	 * @param topicId
+	 *            ID of the Kafka topic.
+	 * @param groupId
+	 * 			   ID of the consumer group.
+	 * @param deserializationSchema
+	 *            User defined deserialization schema.
+	 * @param zookeeperSyncTimeMillis
+	 *            Synchronization time with zookeeper.
+	 * @param customProperties
+	 * 			  Custom properties for Kafka
+	 */
+	public KafkaSource(String zookeeperAddress,
+					String topicId, String groupId,
+					DeserializationSchema<OUT> deserializationSchema,
+					long zookeeperSyncTimeMillis, Properties customProperties) {
+		super(deserializationSchema);
+		Preconditions.checkNotNull(zookeeperAddress, "ZK address is null");
+		Preconditions.checkNotNull(topicId, "Topic ID is null");
+		Preconditions.checkNotNull(deserializationSchema, "deserializationSchema is null");
+		Preconditions.checkArgument(zookeeperSyncTimeMillis >= 0, "The ZK sync time must be positive");
+
+		this.zookeeperAddress = zookeeperAddress;
+		this.groupId = groupId;
+		this.topicId = topicId;
+		this.zookeeperSyncTimeMillis = zookeeperSyncTimeMillis;
+		this.customProperties = customProperties;
+	}
+
+	/**
+	 * Creates a KafkaSource that consumes a topic.
+	 *
+	 * @param zookeeperAddress
+	 *            Address of the Zookeeper host (with port number).
+	 * @param topicId
+	 *            ID of the Kafka topic.
+	 * @param deserializationSchema
+	 *            User defined deserialization schema.
+	 * @param zookeeperSyncTimeMillis
+	 *            Synchronization time with zookeeper.
+	 */
+	public KafkaSource(String zookeeperAddress, String topicId, DeserializationSchema<OUT> deserializationSchema, long zookeeperSyncTimeMillis) {
+		this(zookeeperAddress, topicId, DEFAULT_GROUP_ID, deserializationSchema, zookeeperSyncTimeMillis, null);
+	}
+	/**
+	 * Creates a KafkaSource that consumes a topic.
+	 *
+	 * @param zookeeperAddress
+	 *            Address of the Zookeeper host (with port number).
+	 * @param topicId
+	 *            ID of the Kafka topic.
+	 * @param deserializationSchema
+	 *            User defined deserialization schema.
+	 */
+	public KafkaSource(String zookeeperAddress, String topicId, DeserializationSchema<OUT> deserializationSchema) {
+		this(zookeeperAddress, topicId, deserializationSchema, ZOOKEEPER_DEFAULT_SYNC_TIME);
+	}
+
+	/**
+	 * Initializes the connection to Kafka.
+	 */
+	private void initializeConnection() {
+		Properties props = new Properties();
+		props.put("zookeeper.connect", zookeeperAddress);
+		props.put("group.id", groupId);
+		props.put("zookeeper.session.timeout.ms", "10000");
+		props.put("zookeeper.sync.time.ms", Long.toString(zookeeperSyncTimeMillis));
+		props.put("auto.commit.interval.ms", "1000");
+
+		if(customProperties != null) {
+			for(Map.Entry<Object, Object> e : props.entrySet()) {
+				if(props.contains(e.getKey())) {
+					LOG.warn("Overwriting property "+e.getKey()+" with value "+e.getValue());
+				}
+				props.put(e.getKey(), e.getValue());
+			}
+		}
+
+		consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
+
+		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
+				.createMessageStreams(Collections.singletonMap(topicId, 1));
+		List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topicId);
+		KafkaStream<byte[], byte[]> stream = streams.get(0);
+
+		consumer.commitOffsets();
+
+		consumerIterator = stream.iterator();
+	}
+
+	@Override
+	public void open(Configuration config) throws Exception {
+		super.open(config);
+		initializeConnection();
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+		if (consumer != null) {
+			consumer.shutdown();
+		}
+	}
+
+	@Override
+	public boolean reachedEnd() throws Exception {
+		if (nextElement != null) {
+			return false;
+		} else if (consumerIterator.hasNext()) {
+			OUT out = schema.deserialize(consumerIterator.next().message());
+			if (schema.isEndOfStream(out)) {
+				return true;
+			}
+			nextElement = out;
+		}
+		return false;
+	}
+
+	@Override
+	public OUT next() throws Exception {
+		if (!reachedEnd()) {
+			OUT result = nextElement;
+			nextElement = null;
+			return result;
+		} else {
+			throw new RuntimeException("Source exhausted");
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java
new file mode 100644
index 0000000..7ae17df
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.api.config;
+
+import kafka.producer.Partitioner;
+import kafka.utils.VerifiableProperties;
+
+/**
+ * Hacky wrapper to send an object instance through a Properties - map.
+ *
+ * This works as follows:
+ * The recommended way of creating a KafkaSink is specifying a classname for the partitioner.
+ *
+ * Otherwise (if the user gave a (serializable) class instance), we give Kafka the PartitionerWrapper class of Flink.
+ * This is set in the key-value (java.util.Properties) map.
+ * In addition to that, we use the Properties.put(Object, Object) to store the instance of the (serializable).
+ * This is a hack because the put() method is called on the underlying Hashmap.
+ *
+ * This PartitionerWrapper is called with the Properties. From there, we extract the wrapped Partitioner instance.
+ *
+ * The serializable PartitionerWrapper is serialized into the Properties Hashmap and also deserialized from there.
+ */
+public class PartitionerWrapper implements Partitioner {
+	public final static String SERIALIZED_WRAPPER_NAME = "flink.kafka.wrapper.serialized";
+
+	private Partitioner wrapped;
+	public PartitionerWrapper(VerifiableProperties properties) {
+		wrapped = (Partitioner) properties.props().get(SERIALIZED_WRAPPER_NAME);
+	}
+
+	@Override
+	public int partition(Object value, int numberOfPartitions) {
+		return wrapped.partition(value, numberOfPartitions);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
new file mode 100644
index 0000000..032ed08
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.api.persistent;
+
+import com.google.common.base.Preconditions;
+import kafka.common.TopicAndPartition;
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.MessageAndMetadata;
+import kafka.utils.ZKGroupTopicDirs;
+import kafka.utils.ZkUtils;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Source for reading from Kafka using Flink Streaming Fault Tolerance.
+ * This source is updating the committed offset in Zookeeper based on the internal checkpointing of Flink.
+ *
+ * Note that the autocommit feature of Kafka needs to be disabled for using this source.
+ */
+public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT> implements
+		ResultTypeQueryable<OUT>,
+		CheckpointCommitter,
+		CheckpointedAsynchronously<long[]> {
+	private static final Logger LOG = LoggerFactory.getLogger(PersistentKafkaSource.class);
+
+	protected transient ConsumerConfig consumerConfig;
+	private transient ConsumerIterator<byte[], byte[]> iteratorToRead;
+	private transient ConsumerConnector consumer;
+
+	private String topicName;
+	private DeserializationSchema<OUT> deserializationSchema;
+	private boolean running = true;
+
+	private transient long[] lastOffsets;
+	private transient ZkClient zkClient;
+	private transient long[] commitedOffsets; // maintain committed offsets, to avoid committing the same over and over again.
+
+	// We set this in reachedEnd to carry it over to next()
+	private OUT nextElement = null;
+
+	/**
+	 *
+	 * For the @param consumerConfig, specify at least the "groupid" and "zookeeper.connect" string.
+	 * The config will be passed into the Kafka High Level Consumer.
+	 * For a full list of possible values, check this out: https://kafka.apache.org/documentation.html#consumerconfigs
+	 */
+	public PersistentKafkaSource(String topicName, DeserializationSchema<OUT> deserializationSchema, ConsumerConfig consumerConfig) {
+		Preconditions.checkNotNull(topicName);
+		Preconditions.checkNotNull(deserializationSchema);
+		Preconditions.checkNotNull(consumerConfig);
+
+		this.topicName = topicName;
+		this.deserializationSchema = deserializationSchema;
+		this.consumerConfig = consumerConfig;
+		if(consumerConfig.autoCommitEnable()) {
+			throw new IllegalArgumentException("'auto.commit.enable' is set to 'true'. " +
+					"This source can only be used with auto commit disabled because the " +
+					"source is committing to zookeeper by itself (not using the KafkaConsumer).");
+		}
+		if(!consumerConfig.offsetsStorage().equals("zookeeper")) {
+			// we can currently only commit to ZK.
+			throw new IllegalArgumentException("The 'offsets.storage' has to be set to 'zookeeper' for this Source to work reliably");
+		}
+	}
+
+	// ---------------------- ParallelSourceFunction Lifecycle -----------------
+
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		ConsumerConnector consumer = Consumer.createJavaConsumerConnector(this.consumerConfig);
+		// we request only one stream per consumer instance. Kafka will make sure that each consumer group
+		// will see each message only once.
+		Map<String,Integer> topicCountMap = Collections.singletonMap(topicName, 1);
+		Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumer.createMessageStreams(topicCountMap);
+		if(streams.size() != 1) {
+			throw new RuntimeException("Expected only one message stream but got "+streams.size());
+		}
+		List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
+		if(kafkaStreams == null) {
+			throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString());
+		}
+		if(kafkaStreams.size() != 1) {
+			throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+" streams");
+		}
+		LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, consumerConfig.groupId());
+		this.iteratorToRead = kafkaStreams.get(0).iterator();
+		this.consumer = consumer;
+
+		zkClient = new ZkClient(consumerConfig.zkConnect(),
+			consumerConfig.zkSessionTimeoutMs(),
+			consumerConfig.zkConnectionTimeoutMs(),
+			new KafkaZKStringSerializer());
+
+		// most likely the number of offsets we're going to store here will be lower than the number of partitions.
+		int numPartitions = getNumberOfPartitions();
+		LOG.debug("The topic {} has {} partitions", topicName, numPartitions);
+		this.lastOffsets = new long[numPartitions];
+		this.commitedOffsets = new long[numPartitions];
+		Arrays.fill(this.lastOffsets, -1);
+		Arrays.fill(this.commitedOffsets, 0); // just to make it clear
+
+		nextElement = null;
+	}
+
+	@Override
+	public boolean reachedEnd() throws Exception {
+		if (nextElement != null) {
+			return false;
+		}
+
+		while (iteratorToRead.hasNext()) {
+			MessageAndMetadata<byte[], byte[]> message = iteratorToRead.next();
+			if(lastOffsets[message.partition()] >= message.offset()) {
+				LOG.info("Skipping message with offset {} from partition {}", message.offset(), message.partition());
+				continue;
+			}
+			lastOffsets[message.partition()] = message.offset();
+
+			OUT out = deserializationSchema.deserialize(message.message());
+			if (deserializationSchema.isEndOfStream(out)) {
+				LOG.info("DeserializationSchema signaled end of stream for this source");
+				break;
+			}
+
+			nextElement = out;
+			if (LOG.isTraceEnabled()) {
+				LOG.trace("Processed record with offset {} from partition {}", message.offset(), message.partition());
+			}
+			break;
+		}
+
+		return nextElement == null;
+	}
+
+	@Override
+	public OUT next() throws Exception {
+		if (!reachedEnd()) {
+			OUT result = nextElement;
+			nextElement = null;
+			return result;
+		} else {
+			throw new RuntimeException("Source exhausted");
+		}
+	}
+
+	@Override
+	public void close() {
+		LOG.info("Closing Kafka consumer");
+		this.consumer.shutdown();
+		zkClient.close();
+	}
+
+
+	// ---------------------- State / Checkpoint handling  -----------------
+	// this source is keeping the partition offsets in Zookeeper
+
+	private Map<Long, long[]> pendingCheckpoints = new HashMap<Long, long[]>();
+
+	@Override
+	public long[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+		if(lastOffsets == null) {
+			LOG.warn("State snapshot requested on not yet opened source. Returning null");
+			return null;
+		}
+		LOG.info("Snapshotting state. Offsets: {}, checkpoint id {}, timestamp {}", Arrays.toString(lastOffsets), checkpointId, checkpointTimestamp);
+
+		long[] currentOffsets = Arrays.copyOf(lastOffsets, lastOffsets.length);
+		pendingCheckpoints.put(checkpointId, currentOffsets);
+		return currentOffsets;
+	}
+
+	@Override
+	public void restoreState(long[] state) {
+		// we maintain the offsets in Kafka, so nothing to do.
+	}
+
+
+	/**
+	 * Notification on completed checkpoints
+	 * @param checkpointId The ID of the checkpoint that has been completed.
+	 */
+	@Override
+	public void commitCheckpoint(long checkpointId) {
+		LOG.info("Commit checkpoint {}", checkpointId);
+		long[] checkpointOffsets = pendingCheckpoints.remove(checkpointId);
+		if(checkpointOffsets == null) {
+			LOG.warn("Unable to find pending checkpoint for id {}", checkpointId);
+			return;
+		}
+		LOG.info("Got corresponding offsets {}", Arrays.toString(checkpointOffsets));
+
+		for(int partition = 0; partition < checkpointOffsets.length; partition++) {
+			long offset = checkpointOffsets[partition];
+			if(offset != -1) {
+				setOffset(partition, offset);
+			}
+		}
+	}
+
+	// --------------------- Zookeeper / Offset handling -----------------------------
+
+	private int getNumberOfPartitions() {
+		scala.collection.immutable.List<String> scalaSeq = JavaConversions.asScalaBuffer(Collections.singletonList(topicName)).toList();
+		scala.collection.mutable.Map<String, Seq<Object>> list =  ZkUtils.getPartitionsForTopics(zkClient, scalaSeq);
+		Option<Seq<Object>> topicOption = list.get(topicName);
+		if(topicOption.isEmpty()) {
+			throw new IllegalArgumentException("Unable to get number of partitions for topic "+topicName+" from "+list.toString());
+		}
+		Seq<Object> topic = topicOption.get();
+		return topic.size();
+	}
+
+	protected void setOffset(int partition, long offset) {
+		if(commitedOffsets[partition] < offset) {
+			setOffset(zkClient, consumerConfig.groupId(), topicName, partition, offset);
+			commitedOffsets[partition] = offset;
+		} else {
+			LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, partition);
+		}
+	}
+
+
+
+	// the following two methods are static to allow access from the outside as well (Testcases)
+
+	/**
+	 * This method's code is based on ZookeeperConsumerConnector.commitOffsetToZooKeeper()
+	 */
+	public static void setOffset(ZkClient zkClient, String groupId, String topic, int partition, long offset) {
+		LOG.info("Setting offset for partition {} of topic {} in group {} to offset {}", partition, topic, groupId, offset);
+		TopicAndPartition tap = new TopicAndPartition(topic, partition);
+		ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
+		ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir() + "/" + tap.partition(), Long.toString(offset));
+	}
+
+	public static long getOffset(ZkClient zkClient, String groupId, String topic, int partition) {
+		TopicAndPartition tap = new TopicAndPartition(topic, partition);
+		ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
+		scala.Tuple2<String, Stat> data = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir() + "/" + tap.partition());
+		return Long.valueOf(data._1());
+	}
+
+
+	// ---------------------- (Java)Serialization methods for the consumerConfig -----------------
+
+	private void writeObject(ObjectOutputStream out)
+			throws IOException, ClassNotFoundException {
+		out.defaultWriteObject();
+		out.writeObject(consumerConfig.props().props());
+	}
+
+	private void readObject(ObjectInputStream in)
+			throws IOException, ClassNotFoundException {
+		in.defaultReadObject();
+		Properties props = (Properties) in.readObject();
+		consumerConfig = new ConsumerConfig(props);
+	}
+
+
+	@Override
+	public TypeInformation<OUT> getProducedType() {
+		return deserializationSchema.getProducedType();
+	}
+
+
+	// ---------------------- Zookeeper Serializer copied from Kafka (because it has private access there)  -----------------
+
+	public static class KafkaZKStringSerializer implements ZkSerializer {
+
+		@Override
+		public byte[] serialize(Object data) throws ZkMarshallingError {
+			try {
+				return ((String) data).getBytes("UTF-8");
+			} catch (UnsupportedEncodingException e) {
+				throw new RuntimeException(e);
+			}
+		}
+
+		@Override
+		public Object deserialize(byte[] bytes) throws ZkMarshallingError {
+			if (bytes == null) {
+				return null;
+			} else {
+				try {
+					return new String(bytes, "UTF-8");
+				} catch (UnsupportedEncodingException e) {
+					throw new RuntimeException(e);
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaConstantPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaConstantPartitioner.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaConstantPartitioner.java
new file mode 100644
index 0000000..661d0bd
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaConstantPartitioner.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+public class KafkaConstantPartitioner implements SerializableKafkaPartitioner {
+
+	private static final long serialVersionUID = 1L;
+	private int partition;
+
+	public KafkaConstantPartitioner(int partition) {
+		this.partition = partition;
+	}
+
+	@Override
+	public int partition(Object value, int numberOfPartitions) {
+		return partition;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/SerializableKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/SerializableKafkaPartitioner.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/SerializableKafkaPartitioner.java
new file mode 100644
index 0000000..77a774e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/SerializableKafkaPartitioner.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+import kafka.producer.Partitioner;
+import java.io.Serializable;
+
+public interface SerializableKafkaPartitioner extends Serializable, Partitioner {
+
+}


Mime
View raw message