flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [13/14] flink git commit: [FLINK-1638] [streaming] Kafka low level API example, documentation and fixes
Date Tue, 10 Mar 2015 14:00:13 GMT
[FLINK-1638] [streaming] Kafka low level API example, documentation and fixes


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed5ba95d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed5ba95d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed5ba95d

Branch: refs/heads/master
Commit: ed5ba95dee0c3aa4b8767313369778e7afce5155
Parents: 5327d56
Author: mbalassi <mbalassi@apache.org>
Authored: Thu Mar 5 17:34:51 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue Mar 10 14:58:49 2015 +0100

----------------------------------------------------------------------
 .../connectors/kafka/KafkaConsumerExample.java  |  17 ++-
 .../kafka/KafkaSimpleConsumerExample.java       |  63 +++++++++
 .../connectors/kafka/api/KafkaSource.java       |   2 +-
 .../kafka/api/simple/KafkaConsumerIterator.java | 128 ++++++++++++++++---
 .../KafkaDeserializingConsumerIterator.java     |   5 +-
 .../kafka/api/simple/KafkaTopicFactory.java     |   3 +
 .../kafka/api/simple/MessageWithOffset.java     |   3 +
 .../kafka/api/simple/PersistentKafkaSource.java |  29 ++---
 .../kafka/api/simple/SimpleKafkaSource.java     |  37 +++---
 9 files changed, 216 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ed5ba95d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
index 0a0c623..d9bb7d3 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
@@ -20,7 +20,11 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.runtime.state.OperatorState;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
+<<<<<<< HEAD
 import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
+=======
+>>>>>>> a62796a... s
 import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema;
 
 public class KafkaConsumerExample {
@@ -37,15 +41,10 @@ public class KafkaConsumerExample {
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setDegreeOfParallelism(4);
 
-		@SuppressWarnings("unused")
-		DataStream<String> stream1 = env
-				.addSource(
-//						new KafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema()))
-//						new SimpleKafkaSource<String>(topic, host, port, new JavaDefaultStringSchema()))
-						new PersistentKafkaSource<String>(topic, host, port, 10L, new JavaDefaultStringSchema()))
-				.registerState("kafka", new OperatorState<Long>(null))
-				.setParallelism(3)
-				.print().setParallelism(3);
+		DataStream<String> kafkaStream = env
+				.addSource(new KafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema()));
+
+		kafkaStream.print();
 
 		env.execute();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ed5ba95d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
new file mode 100644
index 0000000..b8d4d2c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
+import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema;
+
+public class KafkaSimpleConsumerExample {
+
+	private static String host;
+	private static int port;
+	private static String topic;
+	private static int partition;
+	private static long offset;
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setDegreeOfParallelism(4);
+
+		DataStream<String> kafkaStream = env
+				.addSource(new PersistentKafkaSource<String>(topic, host, port, partition, offset,
new JavaDefaultStringSchema()));
+
+		kafkaStream.print();
+
+		env.execute();
+	}
+
+	private static boolean parseParameters(String[] args) {
+		if (args.length == 4) {
+			host = args[0];
+			port = Integer.parseInt(args[1]);
+			topic = args[2];
+			partition = Integer.parseInt(args[3]);
+			offset = Long.parseLong(args[4]);
+			return true;
+		} else {
+			System.err.println("Usage: KafkaConsumerExample <host> <port> <topic>
<partition> <offset>");
+			return false;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ed5ba95d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
index 0c6cd4a..1baaba7 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
@@ -35,7 +35,7 @@ import org.apache.flink.streaming.connectors.util.DeserializationSchema;
 import org.apache.flink.util.Collector;
 
 /**
- * Source that listens to a Kafka topic.
+ * Source that listens to a Kafka topic using the high level Kafka API.
  * 
  * @param <OUT>
  *            Type of the messages on the topic.

http://git-wip-us.apache.org/repos/asf/flink/blob/ed5ba95d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java
index 6a01e43..92d351a 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java
@@ -36,11 +36,18 @@ import kafka.javaapi.TopicMetadata;
 import kafka.javaapi.TopicMetadataRequest;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.message.MessageAndOffset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+/**
+ * Iterates the records received from a partition of a Kafka topic as byte arrays.
+ */
 public class KafkaConsumerIterator {
 
 	private static final long serialVersionUID = 1L;
 
+	private static final long DEFAULT_WAIT_ON_EMPTY_FETCH = 1000L;
+
 	private List<String> hosts;
 	private String topic;
 	private int port;
@@ -54,11 +61,21 @@ public class KafkaConsumerIterator {
 	private transient Iterator<MessageAndOffset> iter;
 	private transient FetchResponse fetchResponse;
 
-	public KafkaConsumerIterator(String host, int port, String topic, int partition,
+	/**
+	 * Constructor with configurable wait time on empty fetch. For connecting to the Kafka service
+	 * we use the so called simple or low level Kafka API thus directly connecting to one of
the brokers.
+	 *
+	 * @param hostName Hostname of a known Kafka broker
+	 * @param port Port of the known Kafka broker
+	 * @param topic Name of the topic to listen to
+	 * @param partition Partition in the chosen topic
+	 * @param waitOnEmptyFetch wait time on empty fetch in millis
+	 */
+	public KafkaConsumerIterator(String hostName, int port, String topic, int partition,
 			long waitOnEmptyFetch) {
 
 		this.hosts = new ArrayList<String>();
-		hosts.add(host);
+		hosts.add(hostName);
 		this.port = port;
 
 		this.topic = topic;
@@ -68,14 +85,37 @@ public class KafkaConsumerIterator {
 		replicaBrokers = new ArrayList<String>();
 	}
 
-	private void initialize() {
+	/**
+	 * Constructor without configurable wait time on empty fetch. For connecting to the Kafka
service
+	 * we use the so called simple or low level Kafka API thus directly connecting to one of
the brokers.
+	 *
+	 * @param hostName Hostname of a known Kafka broker
+	 * @param port Port of the known Kafka broker
+	 * @param topic Name of the topic to listen to
+	 * @param partition Partition in the chosen topic
+	 */
+	public KafkaConsumerIterator(String hostName, int port, String topic, int partition){
+		this(hostName, port, topic, partition, DEFAULT_WAIT_ON_EMPTY_FETCH);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Initializing a connection
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Initializes the connection by detecting the leading broker of
+	 * the topic and establishing a connection to it.
+	 */
+	private void initialize() throws InterruptedException {
 		PartitionMetadata metadata;
 		do {
 			metadata = findLeader(hosts, port, topic, partition);
-			try {
-				Thread.sleep(1000L);
-			} catch (InterruptedException e) {
-				e.printStackTrace();
+			if (metadata == null) {
+				try {
+					Thread.sleep(waitOnEmptyFetch);
+				} catch (InterruptedException e) {
+					throw new InterruptedException("Establishing connection to Kafka failed");
+				}
 			}
 		} while (metadata == null);
 
@@ -90,7 +130,10 @@ public class KafkaConsumerIterator {
 		consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
 	}
 
-	public void initializeFromBeginning() {
+	/**
+	 * Initializes a connection from the earliest available offset.
+	 */
+	public void initializeFromBeginning() throws InterruptedException {
 		initialize();
 		readOffset = getLastOffset(consumer, topic, partition,
 				kafka.api.OffsetRequest.EarliestTime(), clientName);
@@ -98,7 +141,10 @@ public class KafkaConsumerIterator {
 		resetFetchResponse(readOffset);
 	}
 
-	public void initializeFromCurrent() {
+	/**
+	 * Initializes a connection from the latest available offset.
+	 */
+	public void initializeFromCurrent() throws InterruptedException {
 		initialize();
 		readOffset = getLastOffset(consumer, topic, partition,
 				kafka.api.OffsetRequest.LatestTime(), clientName);
@@ -106,28 +152,48 @@ public class KafkaConsumerIterator {
 		resetFetchResponse(readOffset);
 	}
 
-	public void initializeFromOffset(long offset) {
+	/**
+	 * Initializes a connection from the specified offset.
+	 *
+	 * @param offset Desired Kafka offset
+	 */
+	public void initializeFromOffset(long offset) throws InterruptedException {
 		initialize();
 		readOffset = offset;
 		resetFetchResponse(readOffset);
 	}
 
+
+	// --------------------------------------------------------------------------------------------
+	//  Iterator methods
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Convenience method to emulate iterator behaviour.
+	 *
+	 * @return whether the iterator has a next element
+	 */
 	public boolean hasNext() {
 		return true;
 	}
 
-	public byte[] next() {
+	/**
+	 * Returns the next message received from Kafka as a
+	 * byte array.
+	 *
+	 * @return next message as a byte array.
+	 */
+	public byte[] next() throws InterruptedException {
 		return nextWithOffset().getMessage();
 	}
 
-	private void resetFetchResponse(long offset) {
-		FetchRequest req = new FetchRequestBuilder().clientId(clientName)
-				.addFetch(topic, partition, offset, 100000).build();
-		fetchResponse = consumer.fetch(req);
-		iter = fetchResponse.messageSet(topic, partition).iterator();
-	}
-
-	public MessageWithOffset nextWithOffset() {
+	/**
+	 * Returns the next message and its offset received from
+	 * Kafka encapsulated in a POJO.
+	 *
+	 * @return next message and its offset.
+	 */
+	public MessageWithOffset nextWithOffset() throws InterruptedException {
 
 		synchronized (fetchResponse) {
 			while (!iter.hasNext()) {
@@ -135,7 +201,7 @@ public class KafkaConsumerIterator {
 				try {
 					Thread.sleep(waitOnEmptyFetch);
 				} catch (InterruptedException e) {
-					e.printStackTrace();
+					throw new InterruptedException("Fetching from Kafka was interrupted");
 				}
 			}
 
@@ -152,10 +218,16 @@ public class KafkaConsumerIterator {
 
 			byte[] bytes = new byte[payload.limit()];
 			payload.get(bytes);
+
 			return new MessageWithOffset(messageAndOffset.offset(), bytes);
 		}
 	}
 
+	/**
+	 * Resets the iterator to a given offset.
+	 *
+	 * @param offset Desired Kafka offset.
+	 */
 	public void reset(long offset) {
 		synchronized (fetchResponse) {
 			readOffset = offset;
@@ -163,6 +235,20 @@ public class KafkaConsumerIterator {
 		}
 	}
 
+	// --------------------------------------------------------------------------------------------
+	//  Internal utilities
+	// --------------------------------------------------------------------------------------------
+
+	private void resetFetchResponse(long offset) {
+		FetchRequest req = new FetchRequestBuilder().clientId(clientName)
+				.addFetch(topic, partition, offset, 100000).build();
+		fetchResponse = consumer.fetch(req);
+
+		//TODO deal with broker failures
+
+		iter = fetchResponse.messageSet(topic, partition).iterator();
+	}
+
 	private PartitionMetadata findLeader(List<String> a_hosts, int a_port, String a_topic,
 			int a_partition) {
 		PartitionMetadata returnMetaData = null;
@@ -212,7 +298,7 @@ public class KafkaConsumerIterator {
 		OffsetResponse response = consumer.getOffsetsBefore(request);
 
 		if (response.hasError()) {
-			throw new RuntimeException("Error fetching data Offset Data the Broker. Reason: "
+			throw new RuntimeException("Error fetching data from Kafka broker. Reason: "
 					+ response.errorCode(topic, partition));
 		}
 		long[] offsets = response.offsets(topic, partition);

http://git-wip-us.apache.org/repos/asf/flink/blob/ed5ba95d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java
index e1d02ef..6ca4c81 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java
@@ -23,12 +23,13 @@ public class KafkaDeserializingConsumerIterator<IN> extends KafkaConsumerIterato
 
 	private DeserializationSchema<IN> deserializationSchema;
 
-	public KafkaDeserializingConsumerIterator(String host, int port, String topic, int partition,
long waitOnEmptyFetch, DeserializationSchema<IN> deserializationSchema) {
+	public KafkaDeserializingConsumerIterator(String host, int port, String topic, int partition,
long waitOnEmptyFetch,
+												DeserializationSchema<IN> deserializationSchema) {
 		super(host, port, topic, partition, waitOnEmptyFetch);
 		this.deserializationSchema = deserializationSchema;
 	}
 
-	public IN nextRecord() {
+	public IN nextRecord() throws InterruptedException {
 		return deserializationSchema.deserialize(next());
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ed5ba95d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicFactory.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicFactory.java
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicFactory.java
index f949b9a..9e6dea7 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicFactory.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicFactory.java
@@ -26,6 +26,9 @@ import org.I0Itec.zkclient.serialize.ZkSerializer;
 
 import kafka.admin.AdminUtils;
 
+/**
+ * Factory for creating custom Kafka partitions.
+ */
 public class KafkaTopicFactory {
 
 	public static void createTopic(String zookeeperServer, String topicName, int numOfPartitions,
int replicationFactor) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ed5ba95d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithOffset.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithOffset.java
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithOffset.java
index 6b8f4dd..c5b8e32 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithOffset.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithOffset.java
@@ -17,6 +17,9 @@
 
 package org.apache.flink.streaming.connectors.kafka.api.simple;
 
+/**
+ * POJO encapsulating records received from Kafka with their offset.
+ */
 public class MessageWithOffset {
 	private long offset;
 	private byte[] message;

http://git-wip-us.apache.org/repos/asf/flink/blob/ed5ba95d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
index 5f15d12..fd428c0 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
@@ -23,6 +23,14 @@ import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.util.DeserializationSchema;
 import org.apache.flink.util.Collector;
 
+/**
+ * Kafka source persisting its offset through the {@link OperatorState} interface.
+ * This allows the offset to be restored to the latest one that has been acknowledged
+ * by the whole execution graph.
+ *
+ * @param <OUT>
+ *            Type of the messages on the topic.
+ */
 public class PersistentKafkaSource<OUT> extends SimpleKafkaSource<OUT> {
 
 	private static final long serialVersionUID = 1L;
@@ -31,22 +39,14 @@ public class PersistentKafkaSource<OUT> extends SimpleKafkaSource<OUT>
{
 
 	private transient OperatorState<Long> kafkaOffSet;
 
-	/**
-	 * Partition index is set automatically by instance id.
-	 * 
-	 * @param topicId
-	 * @param host
-	 * @param port
-	 * @param deserializationSchema
-	 */
-	public PersistentKafkaSource(String topicId, String host, int port, long initialOffset,
+	public PersistentKafkaSource(String topicId, String host, int port, int partition, long
initialOffset,
 			DeserializationSchema<OUT> deserializationSchema) {
-		super(topicId, host, port, deserializationSchema);
+		super(topicId, host, port, partition, deserializationSchema);
 		this.initialOffset = initialOffset;
 	}
 
 	@Override
-	public void open(Configuration parameters) {
+	public void open(Configuration parameters) throws InterruptedException {
 		StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
 		@SuppressWarnings("unchecked")
 		OperatorState<Long> lastKafkaOffSet = (OperatorState<Long>) context.getState("kafka");
@@ -62,21 +62,16 @@ public class PersistentKafkaSource<OUT> extends SimpleKafkaSource<OUT>
{
 	}
 
 	@Override
-	protected void setInitialOffset(Configuration config) {
+	protected void setInitialOffset(Configuration config) throws InterruptedException{
 		iterator.initializeFromOffset(kafkaOffSet.getState());
 	}
 
-	@Override
-	protected void gotMessage(MessageWithOffset msg) {
-		System.out.println(msg.getOffset() + " :: " + schema.deserialize(msg.getMessage()));
-	}
 
 	@Override
 	public void run(Collector<OUT> collector) throws Exception {
 		MessageWithOffset msg;
 		while (iterator.hasNext()) {
 			msg = iterator.nextWithOffset();
-			gotMessage(msg);
 			OUT out = schema.deserialize(msg.getMessage());
 			collector.collect(out);
 			kafkaOffSet.update(msg.getOffset());

http://git-wip-us.apache.org/repos/asf/flink/blob/ed5ba95d/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
index fa925b3..61fd173 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
@@ -23,47 +23,42 @@ import org.apache.flink.streaming.connectors.ConnectorSource;
 import org.apache.flink.streaming.connectors.util.DeserializationSchema;
 import org.apache.flink.util.Collector;
 
+/**
+ * Source that listens to a Kafka topic using the low level or simple Kafka API.
+ *
+ * @param <OUT>
+ *            Type of the messages on the topic.
+ */
 public class SimpleKafkaSource<OUT> extends ConnectorSource<OUT> {
 	private static final long serialVersionUID = 1L;
 
 	private String topicId;
-	private final String host;
+	private final String hostName;
 	private final int port;
+	private final int partition;
 	protected KafkaConsumerIterator iterator;
 
-	/**
-	 * Partition index is set automatically by instance id.
-	 * @param topicId
-	 * @param host
-	 * @param port
-	 * @param deserializationSchema
-	 */
-	public SimpleKafkaSource(String topicId, String host, int port, DeserializationSchema<OUT>
deserializationSchema) {
+	public SimpleKafkaSource(String topic, String hostName, int port, int partition,
+								DeserializationSchema<OUT> deserializationSchema) {
 		super(deserializationSchema);
-		this.topicId = topicId;
-		this.host = host;
+		this.topicId = topic;
+		this.hostName = hostName;
 		this.port = port;
+		this.partition = partition;
 	}
 
 	private void initializeConnection() {
-		//TODO: Fix this
-		int partitionIndex = getRuntimeContext().getIndexOfThisSubtask();
-		iterator = new KafkaConsumerIterator(host, port, topicId, 0, 100L);
+		iterator = new KafkaConsumerIterator(hostName, port, topicId, partition);
 	}
 
-	protected void setInitialOffset(Configuration config) {
+	protected void setInitialOffset(Configuration config) throws InterruptedException {
 		iterator.initializeFromCurrent();
 	}
 
-	//This just for debug purposes
-	protected void gotMessage(MessageWithOffset msg) {
-	}
-
 	@Override
 	public void run(Collector<OUT> collector) throws Exception {
 		while (iterator.hasNext()) {
 			MessageWithOffset msg = iterator.nextWithOffset();
-			gotMessage(msg);
 			OUT out = schema.deserialize(msg.getMessage());
 			collector.collect(out);
 		}
@@ -75,7 +70,7 @@ public class SimpleKafkaSource<OUT> extends ConnectorSource<OUT>
{
 
 
 	@Override
-	public void open(Configuration config) {
+	public void open(Configuration config) throws InterruptedException {
 		initializeConnection();
 		setInitialOffset(config);
 	}


Mime
View raw message