flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [07/14] flink git commit: [FLINK-1638] [streaming] Added connector for low level Kafka Consumer API
Date Tue, 10 Mar 2015 14:00:07 GMT
[FLINK-1638] [streaming] Added connector for low level Kafka Consumer API


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ccb87cf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ccb87cf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ccb87cf

Branch: refs/heads/master
Commit: 0ccb87cf76af1ea662c8209c9f083eb16c51d8e8
Parents: 452c39a
Author: mbalassi <mbalassi@apache.org>
Authored: Tue Mar 3 21:14:58 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue Mar 10 14:58:49 2015 +0100

----------------------------------------------------------------------
 .../connectors/kafka/KafkaConsumerExample.java  |   4 +-
 .../connectors/kafka/api/KafkaSource.java       |  10 +-
 .../kafka/api/simple/KafkaConsumerIterator.java | 221 +++++++++++++++++++
 .../KafkaDeserializingConsumerIterator.java     |  35 +++
 .../kafka/api/simple/MessageWithOffset.java     |  44 ++++
 .../kafka/api/simple/SimpleKafkaSource.java     |  68 ++++++
 6 files changed, 379 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0ccb87cf/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
index 20c9bd7..587d7b1 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
+import org.apache.flink.streaming.connectors.kafka.api.simple.SimpleKafkaSource;
 import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema;
 
 public class KafkaConsumerExample {
@@ -39,7 +40,8 @@ public class KafkaConsumerExample {
 		@SuppressWarnings("unused")
 		DataStream<String> stream1 = env
 				.addSource(
-						new KafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema()))
+//						new KafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema()))
+						new SimpleKafkaSource<String>(topic, host, port, new JavaDefaultStringSchema()))
 				.setParallelism(3)
 				.print().setParallelism(3);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0ccb87cf/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
index f4097e0..3075608 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
@@ -52,6 +52,7 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
 
 	private long zookeeperSyncTimeMillis;
 	private static final long ZOOKEEPER_DEFAULT_SYNC_TIME = 200;
+	private static final String DEFAULT_GROUP_ID = "flink-group";
 
 	private volatile boolean isRunning = false;
 
@@ -67,16 +68,21 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT>
{
 	 * @param zookeeperSyncTimeMillis
 	 *            Synchronization time with zookeeper.
 	 */
-	public KafkaSource(String zookeeperHost, String topicId,
+	public KafkaSource(String zookeeperHost, String topicId, String groupId,
 			DeserializationSchema<OUT> deserializationSchema, long zookeeperSyncTimeMillis)
{
 		super(deserializationSchema);
 		this.zookeeperHost = zookeeperHost;
-		this.groupId = "flink-group";
+		this.groupId = groupId;
 		this.topicId = topicId;
 		this.zookeeperSyncTimeMillis = zookeeperSyncTimeMillis;
 	}
 
 	public KafkaSource(String zookeeperHost, String topicId,
+					   DeserializationSchema<OUT> deserializationSchema, long zookeeperSyncTimeMillis){
+		this(zookeeperHost, topicId, DEFAULT_GROUP_ID, deserializationSchema, ZOOKEEPER_DEFAULT_SYNC_TIME);
+	}
+
+	public KafkaSource(String zookeeperHost, String topicId,
 			DeserializationSchema<OUT> deserializationSchema) {
 		this(zookeeperHost, topicId, deserializationSchema, ZOOKEEPER_DEFAULT_SYNC_TIME);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ccb87cf/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java
new file mode 100644
index 0000000..6a01e43
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaConsumerIterator.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.api.simple;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import kafka.api.FetchRequest;
+import kafka.api.FetchRequestBuilder;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.OffsetResponse;
+import kafka.javaapi.PartitionMetadata;
+import kafka.javaapi.TopicMetadata;
+import kafka.javaapi.TopicMetadataRequest;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.message.MessageAndOffset;
+
+public class KafkaConsumerIterator {
+
+	private static final long serialVersionUID = 1L;
+
+	private List<String> hosts;
+	private String topic;
+	private int port;
+	private int partition;
+	private long readOffset;
+	private long waitOnEmptyFetch;
+	private transient SimpleConsumer consumer;
+	private List<String> replicaBrokers;
+	private String clientName;
+
+	private transient Iterator<MessageAndOffset> iter;
+	private transient FetchResponse fetchResponse;
+
+	public KafkaConsumerIterator(String host, int port, String topic, int partition,
+			long waitOnEmptyFetch) {
+
+		this.hosts = new ArrayList<String>();
+		hosts.add(host);
+		this.port = port;
+
+		this.topic = topic;
+		this.partition = partition;
+		this.waitOnEmptyFetch = waitOnEmptyFetch;
+
+		replicaBrokers = new ArrayList<String>();
+	}
+
+	private void initialize() {
+		PartitionMetadata metadata;
+		do {
+			metadata = findLeader(hosts, port, topic, partition);
+			try {
+				Thread.sleep(1000L);
+			} catch (InterruptedException e) {
+				e.printStackTrace();
+			}
+		} while (metadata == null);
+
+		if (metadata.leader() == null) {
+			throw new RuntimeException("Can't find Leader for Topic and Partition. (at " + hosts.get(0)
+					+ ":" + port);
+		}
+
+		String leadBroker = metadata.leader().host();
+		clientName = "Client_" + topic + "_" + partition;
+
+		consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
+	}
+
+	public void initializeFromBeginning() {
+		initialize();
+		readOffset = getLastOffset(consumer, topic, partition,
+				kafka.api.OffsetRequest.EarliestTime(), clientName);
+
+		resetFetchResponse(readOffset);
+	}
+
+	public void initializeFromCurrent() {
+		initialize();
+		readOffset = getLastOffset(consumer, topic, partition,
+				kafka.api.OffsetRequest.LatestTime(), clientName);
+
+		resetFetchResponse(readOffset);
+	}
+
+	public void initializeFromOffset(long offset) {
+		initialize();
+		readOffset = offset;
+		resetFetchResponse(readOffset);
+	}
+
+	public boolean hasNext() {
+		return true;
+	}
+
+	public byte[] next() {
+		return nextWithOffset().getMessage();
+	}
+
+	private void resetFetchResponse(long offset) {
+		FetchRequest req = new FetchRequestBuilder().clientId(clientName)
+				.addFetch(topic, partition, offset, 100000).build();
+		fetchResponse = consumer.fetch(req);
+		iter = fetchResponse.messageSet(topic, partition).iterator();
+	}
+
+	public MessageWithOffset nextWithOffset() {
+
+		synchronized (fetchResponse) {
+			while (!iter.hasNext()) {
+				resetFetchResponse(readOffset);
+				try {
+					Thread.sleep(waitOnEmptyFetch);
+				} catch (InterruptedException e) {
+					e.printStackTrace();
+				}
+			}
+
+			MessageAndOffset messageAndOffset = iter.next();
+			long currentOffset = messageAndOffset.offset();
+
+			while (currentOffset < readOffset) {
+				messageAndOffset = iter.next();
+				currentOffset = messageAndOffset.offset();
+			}
+
+			readOffset = messageAndOffset.nextOffset();
+			ByteBuffer payload = messageAndOffset.message().payload();
+
+			byte[] bytes = new byte[payload.limit()];
+			payload.get(bytes);
+			return new MessageWithOffset(messageAndOffset.offset(), bytes);
+		}
+	}
+
+	public void reset(long offset) {
+		synchronized (fetchResponse) {
+			readOffset = offset;
+			resetFetchResponse(offset);
+		}
+	}
+
+	private PartitionMetadata findLeader(List<String> a_hosts, int a_port, String a_topic,
+			int a_partition) {
+		PartitionMetadata returnMetaData = null;
+		loop: for (String seed : a_hosts) {
+			SimpleConsumer consumer = null;
+			try {
+				consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
+				List<String> topics = Collections.singletonList(a_topic);
+				TopicMetadataRequest req = new TopicMetadataRequest(topics);
+				kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
+
+				List<TopicMetadata> metaData = resp.topicsMetadata();
+				for (TopicMetadata item : metaData) {
+					for (PartitionMetadata part : item.partitionsMetadata()) {
+						if (part.partitionId() == a_partition) {
+							returnMetaData = part;
+							break loop;
+						}
+					}
+				}
+			} catch (Exception e) {
+				throw new RuntimeException("Error communicating with Broker [" + seed
+						+ "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: "
+						+ e);
+			} finally {
+				if (consumer != null) {
+					consumer.close();
+				}
+			}
+		}
+		if (returnMetaData != null) {
+			replicaBrokers.clear();
+			for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
+				replicaBrokers.add(replica.host());
+			}
+		}
+		return returnMetaData;
+	}
+
+	private static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
+			long whichTime, String clientName) {
+		TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
+		Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition,
PartitionOffsetRequestInfo>();
+		requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
+		kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo,
+				kafka.api.OffsetRequest.CurrentVersion(), clientName);
+		OffsetResponse response = consumer.getOffsetsBefore(request);
+
+		if (response.hasError()) {
+			throw new RuntimeException("Error fetching data Offset Data the Broker. Reason: "
+					+ response.errorCode(topic, partition));
+		}
+		long[] offsets = response.offsets(topic, partition);
+		return offsets[0];
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ccb87cf/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java
new file mode 100644
index 0000000..e1d02ef
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaDeserializingConsumerIterator.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.api.simple;
+
+import org.apache.flink.streaming.connectors.util.DeserializationSchema;
+
+public class KafkaDeserializingConsumerIterator<IN> extends KafkaConsumerIterator {
+
+	private DeserializationSchema<IN> deserializationSchema;
+
+	public KafkaDeserializingConsumerIterator(String host, int port, String topic, int partition,
long waitOnEmptyFetch, DeserializationSchema<IN> deserializationSchema) {
+		super(host, port, topic, partition, waitOnEmptyFetch);
+		this.deserializationSchema = deserializationSchema;
+	}
+
+	public IN nextRecord() {
+		return deserializationSchema.deserialize(next());
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/0ccb87cf/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithOffset.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithOffset.java
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithOffset.java
new file mode 100644
index 0000000..6b8f4dd
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/MessageWithOffset.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.api.simple;
+
+public class MessageWithOffset {
+	private long offset;
+	private byte[] message;
+
+	public MessageWithOffset(long offset, byte[] message) {
+		this.offset = offset;
+		this.message = message;
+	}
+
+	public long getOffset() {
+		return offset;
+	}
+
+	public void setOffset(long offset) {
+		this.offset = offset;
+	}
+
+	public byte[] getMessage() {
+		return message;
+	}
+
+	public void setMessage(byte[] message) {
+		this.message = message;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/0ccb87cf/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
new file mode 100644
index 0000000..db75571
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.api.simple;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.ConnectorSource;
+import org.apache.flink.streaming.connectors.util.DeserializationSchema;
+import org.apache.flink.util.Collector;
+
+public class SimpleKafkaSource<OUT> extends ConnectorSource<OUT> {
+	private static final long serialVersionUID = 1L;
+
+	private String topicId;
+	private final String host;
+	private final int port;
+	private KafkaConsumerIterator iterator;
+
+	/**
+	 * Partition index is set automatically by instance id.
+	 * @param topicId
+	 * @param host
+	 * @param port
+	 */
+	public SimpleKafkaSource(String topicId,
+							 String host, int port, DeserializationSchema<OUT> deserializationSchema) {
+		super(deserializationSchema);
+		this.topicId = topicId;
+		this.host = host;
+		this.port = port;
+	}
+
+	private void initializeConnection() {
+		int partitionIndex = getRuntimeContext().getIndexOfThisSubtask();
+		iterator = new KafkaConsumerIterator(host, port, topicId, 0, 100L);
+		iterator.initializeFromCurrent();
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void invoke(Collector<OUT> collector) throws Exception {
+		while (iterator.hasNext()) {
+			MessageWithOffset msg = iterator.nextWithOffset();
+			OUT out = schema.deserialize(msg.getMessage());
+			collector.collect(out);
+		}
+	}
+
+	@Override
+	public void open(Configuration config) {
+		initializeConnection();
+	}
+}


Mime
View raw message