flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [22/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Add comments to all backported kafka sources and move them to 'org.apache.flink.kafka_backport'
Date Thu, 27 Aug 2015 11:25:39 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/KafkaConsumer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/KafkaConsumer.java
new file mode 100644
index 0000000..8800954
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/KafkaConsumer.java
@@ -0,0 +1,1130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients.consumer;
+
+import org.apache.flink.kafka_backport.clients.Metadata;
+import org.apache.flink.kafka_backport.clients.consumer.internals.DelayedTask;
+import org.apache.flink.kafka_backport.common.KafkaException;
+import org.apache.flink.kafka_backport.common.metrics.MetricConfig;
+import org.apache.flink.kafka_backport.clients.ClientUtils;
+import org.apache.flink.kafka_backport.clients.NetworkClient;
+import org.apache.flink.kafka_backport.clients.consumer.internals.ConsumerNetworkClient;
+import org.apache.flink.kafka_backport.clients.consumer.internals.Coordinator;
+import org.apache.flink.kafka_backport.clients.consumer.internals.Fetcher;
+import org.apache.flink.kafka_backport.clients.consumer.internals.SubscriptionState;
+import org.apache.flink.kafka_backport.common.Cluster;
+import org.apache.flink.kafka_backport.common.Metric;
+import org.apache.flink.kafka_backport.common.MetricName;
+import org.apache.flink.kafka_backport.common.PartitionInfo;
+import org.apache.flink.kafka_backport.common.TopicPartition;
+import org.apache.flink.kafka_backport.common.metrics.JmxReporter;
+import org.apache.flink.kafka_backport.common.metrics.Metrics;
+import org.apache.flink.kafka_backport.common.metrics.MetricsReporter;
+import org.apache.flink.kafka_backport.common.network.Selector;
+import org.apache.flink.kafka_backport.common.serialization.Deserializer;
+import org.apache.flink.kafka_backport.common.utils.SystemTime;
+import org.apache.flink.kafka_backport.common.utils.Time;
+import org.apache.flink.kafka_backport.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.kafka_backport.common.utils.Utils.min;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A Kafka client that consumes records from a Kafka cluster.
+ * <p>
+ * It will transparently handle the failure of servers in the Kafka cluster, and transparently adapt as partitions of
+ * data it subscribes to migrate within the cluster. This client also interacts with the server to allow groups of
+ * consumers to load balance consumption using consumer groups (as described below).
+ * <p>
+ * The consumer maintains TCP connections to the necessary brokers to fetch data for the topics it subscribes to.
+ * Failure to close the consumer after use will leak these connections.
+ * <p>
+ * The consumer is not thread-safe. See <a href="#multithreaded">Multi-threaded Processing</a> for more details.
+ *
+ * <h3>Offsets and Consumer Position</h3>
+ * Kafka maintains a numerical offset for each record in a partition. This offset acts as a kind of unique identifier of
+ * a record within that partition, and also denotes the position of the consumer in the partition. That is, a consumer
+ * which has position 5 has consumed records with offsets 0 through 4 and will next receive record with offset 5. There
+ * are actually two notions of position relevant to the user of the consumer.
+ * <p>
+ * The {@link #position(TopicPartition) position} of the consumer gives the offset of the next record that will be given
+ * out. It will be one larger than the highest offset the consumer has seen in that partition. It automatically advances
+ * every time the consumer receives data calls {@link #poll(long)} and receives messages.
+ * <p>
+ * The {@link #commit(CommitType) committed position} is the last offset that has been saved securely. Should the
+ * process fail and restart, this is the offset that it will recover to. The consumer can either automatically commit
+ * offsets periodically, or it can choose to control this committed position manually by calling
+ * {@link #commit(CommitType) commit}.
+ * <p>
+ * This distinction gives the consumer control over when a record is considered consumed. It is discussed in further
+ * detail below.
+ * 
+ * <h3>Consumer Groups</h3>
+ * 
+ * Kafka uses the concept of <i>consumer groups</i> to allow a pool of processes to divide up the work of consuming and
+ * processing records. These processes can either be running on the same machine or, as is more likely, they can be
+ * distributed over many machines to provide additional scalability and fault tolerance for processing.
+ * <p>
+ * Each Kafka consumer must specify a consumer group that it belongs to. Kafka will deliver each message in the
+ * subscribed topics to one process in each consumer group. This is achieved by balancing the partitions in the topic
+ * over the consumer processes in each group. So if there is a topic with four partitions, and a consumer group with two
+ * processes, each process would consume from two partitions. This group membership is maintained dynamically: if a
+ * process fails the partitions assigned to it will be reassigned to other processes in the same group, and if a new
+ * process joins the group, partitions will be moved from existing consumers to this new process.
+ * <p>
+ * So if two processes subscribe to a topic both specifying different groups they will each get all the records in that
+ * topic; if they both specify the same group they will each get about half the records.
+ * <p>
+ * Conceptually you can think of a consumer group as being a single logical subscriber that happens to be made up of
+ * multiple processes. As a multi-subscriber system, Kafka naturally supports having any number of consumer groups for a
+ * given topic without duplicating data (additional consumers are actually quite cheap).
+ * <p>
+ * This is a slight generalization of the functionality that is common in messaging systems. To get semantics similar to
+ * a queue in a traditional messaging system all processes would be part of a single consumer group and hence record
+ * delivery would be balanced over the group like with a queue. Unlike a traditional messaging system, though, you can
+ * have multiple such groups. To get semantics similar to pub-sub in a traditional messaging system each process would
+ * have it's own consumer group, so each process would subscribe to all the records published to the topic.
+ * <p>
+ * In addition, when offsets are committed they are always committed for a given consumer group.
+ * <p>
+ * It is also possible for the consumer to manually specify the partitions it subscribes to, which disables this dynamic
+ * partition balancing.
+ * 
+ * <h3>Usage Examples</h3>
+ * The consumer APIs offer flexibility to cover a variety of consumption use cases. Here are some examples to
+ * demonstrate how to use them.
+ * 
+ * <h4>Simple Processing</h4>
+ * This example demonstrates the simplest usage of Kafka's consumer api.
+ * 
+ * <pre>
+ *     Properties props = new Properties();
+ *     props.put(&quot;bootstrap.servers&quot;, &quot;localhost:9092&quot;);
+ *     props.put(&quot;group.id&quot;, &quot;test&quot;);
+ *     props.put(&quot;enable.auto.commit&quot;, &quot;true&quot;);
+ *     props.put(&quot;auto.commit.interval.ms&quot;, &quot;1000&quot;);
+ *     props.put(&quot;session.timeout.ms&quot;, &quot;30000&quot;);
+ *     props.put(&quot;key.deserializer&quot;, &quot;org.apache.StringDeserializer&quot;);
+ *     props.put(&quot;value.deserializer&quot;, &quot;org.apache.StringDeserializer&quot;);
+ *     KafkaConsumer&lt;String, String&gt; consumer = new KafkaConsumer&lt;String, String&gt;(props);
+ *     consumer.subscribe(&quot;foo&quot;, &quot;bar&quot;);
+ *     while (true) {
+ *         ConsumerRecords&lt;String, String&gt; records = consumer.poll(100);
+ *         for (ConsumerRecord&lt;String, String&gt; record : records)
+ *             System.out.printf(&quot;offset = %d, key = %s, value = %s&quot;, record.offset(), record.key(), record.value());
+ *     }
+ * </pre>
+ * 
+ * Setting <code>enable.auto.commit</code> means that offsets are committed automatically with a frequency controlled by
+ * the config <code>auto.commit.interval.ms</code>.
+ * <p>
+ * The connection to the cluster is bootstrapped by specifying a list of one or more brokers to contact using the
+ * configuration <code>bootstrap.servers</code>. This list is just used to discover the rest of the brokers in the
+ * cluster and need not be an exhaustive list of servers in the cluster (though you may want to specify more than one in
+ * case there are servers down when the client is connecting).
+ * <p>
+ * In this example the client is subscribing to the topics <i>foo</i> and <i>bar</i> as part of a group of consumers
+ * called <i>test</i> as described above.
+ * <p>
+ * The broker will automatically detect failed processes in the <i>test</i> group by using a heartbeat mechanism. The
+ * consumer will automatically ping the cluster periodically, which let's the cluster know that it is alive. As long as
+ * the consumer is able to do this it is considered alive and retains the right to consume from the partitions assigned
+ * to it. If it stops heartbeating for a period of time longer than <code>session.timeout.ms</code> then it will be
+ * considered dead and it's partitions will be assigned to another process.
+ * <p>
+ * The deserializer settings specify how to turn bytes into objects. For example, by specifying string deserializers, we
+ * are saying that our record's key and value will just be simple strings.
+ * 
+ * <h4>Controlling When Messages Are Considered Consumed</h4>
+ * 
+ * In this example we will consume a batch of records and batch them up in memory, when we have sufficient records
+ * batched we will insert them into a database. If we allowed offsets to auto commit as in the previous example messages
+ * would be considered consumed after they were given out by the consumer, and it would be possible that our process
+ * could fail after we have read messages into our in-memory buffer but before they had been inserted into the database.
+ * To avoid this we will manually commit the offsets only once the corresponding messages have been inserted into the
+ * database. This gives us exact control of when a message is considered consumed. This raises the opposite possibility:
+ * the process could fail in the interval after the insert into the database but before the commit (even though this
+ * would likely just be a few milliseconds, it is a possibility). In this case the process that took over consumption
+ * would consume from last committed offset and would repeat the insert of the last batch of data. Used in this way
+ * Kafka provides what is often called "at-least once delivery" guarantees, as each message will likely be delivered one
+ * time but in failure cases could be duplicated.
+ * 
+ * <pre>
+ *     Properties props = new Properties();
+ *     props.put(&quot;bootstrap.servers&quot;, &quot;localhost:9092&quot;);
+ *     props.put(&quot;group.id&quot;, &quot;test&quot;);
+ *     props.put(&quot;enable.auto.commit&quot;, &quot;false&quot;);
+ *     props.put(&quot;auto.commit.interval.ms&quot;, &quot;1000&quot;);
+ *     props.put(&quot;session.timeout.ms&quot;, &quot;30000&quot;);
+ *     props.put(&quot;key.deserializer&quot;, &quot;org.apache.StringDeserializer&quot;);
+ *     props.put(&quot;value.deserializer&quot;, &quot;org.apache.StringDeserializer&quot;);
+ *     KafkaConsumer&lt;String, String&gt; consumer = new KafkaConsumer&lt;String, String&gt;(props);
+ *     consumer.subscribe(&quot;foo&quot;, &quot;bar&quot;);
+ *     int commitInterval = 200;
+ *     List&lt;ConsumerRecord&lt;String, String&gt;&gt; buffer = new ArrayList&lt;ConsumerRecord&lt;String, String&gt;&gt;();
+ *     while (true) {
+ *         ConsumerRecords&lt;String, String&gt; records = consumer.poll(100);
+ *         for (ConsumerRecord&lt;String, String&gt; record : records) {
+ *             buffer.add(record);
+ *             if (buffer.size() &gt;= commitInterval) {
+ *                 insertIntoDb(buffer);
+ *                 consumer.commit(CommitType.SYNC);
+ *                 buffer.clear();
+ *             }
+ *         }
+ *     }
+ * </pre>
+ * 
+ * <h4>Subscribing To Specific Partitions</h4>
+ * 
+ * In the previous examples we subscribed to the topics we were interested in and let Kafka give our particular process
+ * a fair share of the partitions for those topics. This provides a simple load balancing mechanism so multiple
+ * instances of our program can divided up the work of processing records.
+ * <p>
+ * In this mode the consumer will just get the partitions it subscribes to and if the consumer instance fails no attempt
+ * will be made to rebalance partitions to other instances.
+ * <p>
+ * There are several cases where this makes sense:
+ * <ul>
+ * <li>The first case is if the process is maintaining some kind of local state associated with that partition (like a
+ * local on-disk key-value store) and hence it should only get records for the partition it is maintaining on disk.
+ * <li>Another case is if the process itself is highly available and will be restarted if it fails (perhaps using a
+ * cluster management framework like YARN, Mesos, or AWS facilities, or as part of a stream processing framework). In
+ * this case there is no need for Kafka to detect the failure and reassign the partition, rather the consuming process
+ * will be restarted on another machine.
+ * </ul>
+ * <p>
+ * This mode is easy to specify, rather than subscribing to the topic, the consumer just subscribes to particular
+ * partitions:
+ * 
+ * <pre>
+ *     String topic = &quot;foo&quot;;
+ *     TopicPartition partition0 = new TopicPartition(topic, 0);
+ *     TopicPartition partition1 = new TopicPartition(topic, 1);
+ *     consumer.subscribe(partition0);
+ *     consumer.subscribe(partition1);
+ * </pre>
+ * 
+ * The group that the consumer specifies is still used for committing offsets, but now the set of partitions will only
+ * be changed if the consumer specifies new partitions, and no attempt at failure detection will be made.
+ * <p>
+ * It isn't possible to mix both subscription to specific partitions (with no load balancing) and to topics (with load
+ * balancing) using the same consumer instance.
+ * 
+ * <h4>Managing Your Own Offsets</h4>
+ * 
+ * The consumer application need not use Kafka's built-in offset storage, it can store offsets in a store of it's own
+ * choosing. The primary use case for this is allowing the application to store both the offset and the results of the
+ * consumption in the same system in a way that both the results and offsets are stored atomically. This is not always
+ * possible, but when it is it will make the consumption fully atomic and give "exactly once" semantics that are
+ * stronger than the default "at-least once" semantics you get with Kafka's offset commit functionality.
+ * <p>
+ * Here are a couple of examples of this type of usage:
+ * <ul>
+ * <li>If the results of the consumption are being stored in a relational database, storing the offset in the database
+ * as well can allow committing both the results and offset in a single transaction. Thus either the transaction will
+ * succeed and the offset will be updated based on what was consumed or the result will not be stored and the offset
+ * won't be updated.
+ * <li>If the results are being stored in a local store it may be possible to store the offset there as well. For
+ * example a search index could be built by subscribing to a particular partition and storing both the offset and the
+ * indexed data together. If this is done in a way that is atomic, it is often possible to have it be the case that even
+ * if a crash occurs that causes unsync'd data to be lost, whatever is left has the corresponding offset stored as well.
+ * This means that in this case the indexing process that comes back having lost recent updates just resumes indexing
+ * from what it has ensuring that no updates are lost.
+ * </ul>
+ * 
+ * Each record comes with it's own offset, so to manage your own offset you just need to do the following:
+ * <ol>
+ * <li>Configure <code>enable.auto.commit=false</code>
+ * <li>Use the offset provided with each {@link ConsumerRecord} to save your position.
+ * <li>On restart restore the position of the consumer using {@link #seek(TopicPartition, long)}.
+ * </ol>
+ * 
+ * This type of usage is simplest when the partition assignment is also done manually (this would be likely in the
+ * search index use case described above). If the partition assignment is done automatically special care will also be
+ * needed to handle the case where partition assignments change. This can be handled using a special callback specified
+ * using <code>rebalance.callback.class</code>, which specifies an implementation of the interface
+ * {@link ConsumerRebalanceCallback}. When partitions are taken from a consumer the consumer will want to commit its
+ * offset for those partitions by implementing
+ * {@link ConsumerRebalanceCallback#onPartitionsRevoked(Consumer, Collection)}. When partitions are assigned to a
+ * consumer, the consumer will want to look up the offset for those new partitions an correctly initialize the consumer
+ * to that position by implementing {@link ConsumerRebalanceCallback#onPartitionsAssigned(Consumer, Collection)}.
+ * <p>
+ * Another common use for {@link ConsumerRebalanceCallback} is to flush any caches the application maintains for
+ * partitions that are moved elsewhere.
+ * 
+ * <h4>Controlling The Consumer's Position</h4>
+ * 
+ * In most use cases the consumer will simply consume records from beginning to end, periodically committing it's
+ * position (either automatically or manually). However Kafka allows the consumer to manually control it's position,
+ * moving forward or backwards in a partition at will. This means a consumer can re-consume older records, or skip to
+ * the most recent records without actually consuming the intermediate records.
+ * <p>
+ * There are several instances where manually controlling the consumer's position can be useful.
+ * <p>
+ * One case is for time-sensitive record processing it may make sense for a consumer that falls far enough behind to not
+ * attempt to catch up processing all records, but rather just skip to the most recent records.
+ * <p>
+ * Another use case is for a system that maintains local state as described in the previous section. In such a system
+ * the consumer will want to initialize it's position on start-up to whatever is contained in the local store. Likewise
+ * if the local state is destroyed (say because the disk is lost) the state may be recreated on a new machine by
+ * reconsuming all the data and recreating the state (assuming that Kafka is retaining sufficient history).
+ * 
+ * Kafka allows specifying the position using {@link #seek(TopicPartition, long)} to specify the new position. Special
+ * methods for seeking to the earliest and latest offset the server maintains are also available (
+ * {@link #seekToBeginning(TopicPartition...)} and {@link #seekToEnd(TopicPartition...)} respectively).
+ * 
+ *
+ * <h3><a name="multithreaded">Multi-threaded Processing</a></h3>
+ * 
+ * The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application
+ * making the call. It is the responsibility of the user to ensure that multi-threaded access
+ * is properly synchronized. Un-synchronized access will result in {@link ConcurrentModificationException}.
+ *
+ * <p>
+ * The only exception to this rule is {@link #wakeup()}, which can safely be used from an external thread to
+ * interrupt an active operation. In this case, a {@link ConsumerWakeupException} will be thrown from the thread
+ * blocking on the operation. This can be used to shutdown the consumer from another thread. The following
+ * snippet shows the typical pattern:
+ *
+ * <pre>
+ * public class KafkaConsumerRunner implements Runnable {
+ *     private final AtomicBoolean closed = new AtomicBoolean(false);
+ *     private final KafkaConsumer consumer;
+ *
+ *     public void run() {
+ *         try {
+ *             consumer.subscribe("topic");
+ *             while (!closed.get()) {
+ *                 ConsumerRecords records = consumer.poll(10000);
+ *                 // Handle new records
+ *             }
+ *         } catch (ConsumerWakeupException e) {
+ *             // Ignore exception if closing
+ *             if (!closed.get()) throw e;
+ *         } finally {
+ *             consumer.close();
+ *         }
+ *     }
+ *
+ *     // Shutdown hook which can be called from a separate thread
+ *     public void shutdown() {
+ *         closed.set(true);
+ *         consumer.wakeup();
+ *     }
+ * }
+ * </pre>
+ *
+ * Then in a separate thread, the consumer can be shutdown by setting the closed flag and waking up the consumer.
+ *
+ * <pre>
+ *     closed.set(true);
+ *     consumer.wakeup();
+ * </pre>
+ *
+ * <p>
+ * We have intentionally avoided implementing a particular threading model for processing. This leaves several
+ * options for implementing multi-threaded processing of records.
+ *
+ * 
+ * <h4>1. One Consumer Per Thread</h4>
+ * 
+ * A simple option is to give each thread it's own consumer instance. Here are the pros and cons of this approach:
+ * <ul>
+ * <li><b>PRO</b>: It is the easiest to implement
+ * <li><b>PRO</b>: It is often the fastest as no inter-thread co-ordination is needed
+ * <li><b>PRO</b>: It makes in-order processing on a per-partition basis very easy to implement (each thread just
+ * processes messages in the order it receives them).
+ * <li><b>CON</b>: More consumers means more TCP connections to the cluster (one per thread). In general Kafka handles
+ * connections very efficiently so this is generally a small cost.
+ * <li><b>CON</b>: Multiple consumers means more requests being sent to the server and slightly less batching of data
+ * which can cause some drop in I/O throughput.
+ * <li><b>CON</b>: The number of total threads across all processes will be limited by the total number of partitions.
+ * </ul>
+ * 
+ * <h4>2. Decouple Consumption and Processing</h4>
+ * 
+ * Another alternative is to have one or more consumer threads that do all data consumption and hands off
+ * {@link ConsumerRecords} instances to a blocking queue consumed by a pool of processor threads that actually handle
+ * the record processing.
+ * 
+ * This option likewise has pros and cons:
+ * <ul>
+ * <li><b>PRO</b>: This option allows independently scaling the number of consumers and processors. This makes it
+ * possible to have a single consumer that feeds many processor threads, avoiding any limitation on partitions.
+ * <li><b>CON</b>: Guaranteeing order across the processors requires particular care as the threads will execute
+ * independently an earlier chunk of data may actually be processed after a later chunk of data just due to the luck of
+ * thread execution timing. For processing that has no ordering requirements this is not a problem.
+ * <li><b>CON</b>: Manually committing the position becomes harder as it requires that all threads co-ordinate to ensure
+ * that processing is complete for that partition.
+ * </ul>
+ * 
+ * There are many possible variations on this approach. For example each processor thread can have it's own queue, and
+ * the consumer threads can hash into these queues using the TopicPartition to ensure in-order consumption and simplify
+ * commit.
+ * 
+ */
+public class KafkaConsumer<K, V> implements Consumer<K, V> {
+
+    private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
+    private static final long NO_CURRENT_THREAD = -1L;
+    private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
+
+    private final Coordinator coordinator;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+    private final Fetcher<K, V> fetcher;
+
+    private final Time time;
+    private final ConsumerNetworkClient client;
+    private final Metrics metrics;
+    private final SubscriptionState subscriptions;
+    private final Metadata metadata;
+    private final long retryBackoffMs;
+    private final boolean autoCommit;
+    private final long autoCommitIntervalMs;
+    private boolean closed = false;
+
+    // currentThread holds the threadId of the current thread accessing KafkaConsumer
+    // and is used to prevent multi-threaded access
+    private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
+    // refcount is used to allow reentrant access by the thread who has acquired currentThread
+    private final AtomicInteger refcount = new AtomicInteger(0);
+
+    // TODO: This timeout controls how long we should wait before retrying a request. We should be able
+    //       to leverage the work of KAFKA-2120 to get this value from configuration.
+    private long requestTimeoutMs = 5000L;
+
+    /**
+     * A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
+     * are documented <a href="http://kafka.apache.org/documentation.html#consumerconfigs" >here</a>. Values can be
+     * either strings or objects of the appropriate type (for example a numeric configuration would accept either the
+     * string "42" or the integer 42).
+     * <p>
+     * Valid configuration strings are documented at {@link ConsumerConfig}
+     * 
+     * @param configs The consumer configs
+     */
+    public KafkaConsumer(Map<String, Object> configs) {
+        this(configs, null, null, null);
+    }
+
+    /**
+     * A consumer is instantiated by providing a set of key-value pairs as configuration, a
+     * {@link ConsumerRebalanceCallback} implementation, a key and a value {@link Deserializer}.
+     * <p>
+     * Valid configuration strings are documented at {@link ConsumerConfig}
+     * 
+     * @param configs The consumer configs
+     * @param callback A callback interface that the user can implement to manage customized offsets on the start and
+     *            end of every rebalance operation.
+     * @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method
+     *            won't be called in the consumer when the deserializer is passed in directly.
+     * @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method
+     *            won't be called in the consumer when the deserializer is passed in directly.
+     */
+    public KafkaConsumer(Map<String, Object> configs,
+                         ConsumerRebalanceCallback callback,
+                         Deserializer<K> keyDeserializer,
+                         Deserializer<V> valueDeserializer) {
+        this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(configs, keyDeserializer, valueDeserializer)),
+            callback,
+            keyDeserializer,
+            valueDeserializer);
+    }
+
+    /**
+     * A consumer is instantiated by providing a {@link Properties} object as configuration. Valid
+     * configuration strings are documented at {@link ConsumerConfig} A consumer is instantiated by providing a
+     * {@link Properties} object as configuration. Valid configuration strings are documented at
+     * {@link ConsumerConfig}
+     */
+    public KafkaConsumer(Properties properties) {
+        this(properties, null, null, null);
+    }
+
+    /**
+     * A consumer is instantiated by providing a {@link Properties} object as configuration and a
+     * {@link ConsumerRebalanceCallback} implementation, a key and a value {@link Deserializer}.
+     * <p>
+     * Valid configuration strings are documented at {@link ConsumerConfig}
+     * 
+     * @param properties The consumer configuration properties
+     * @param callback A callback interface that the user can implement to manage customized offsets on the start and
+     *            end of every rebalance operation.
+     * @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method
+     *            won't be called in the consumer when the deserializer is passed in directly.
+     * @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method
+     *            won't be called in the consumer when the deserializer is passed in directly.
+     */
+    public KafkaConsumer(Properties properties,
+                         ConsumerRebalanceCallback callback,
+                         Deserializer<K> keyDeserializer,
+                         Deserializer<V> valueDeserializer) {
+        this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(properties, keyDeserializer, valueDeserializer)),
+             callback,
+             keyDeserializer,
+             valueDeserializer);
+    }
+
+    @SuppressWarnings("unchecked")
+    private KafkaConsumer(ConsumerConfig config,
+                          ConsumerRebalanceCallback callback,
+                          Deserializer<K> keyDeserializer,
+                          Deserializer<V> valueDeserializer) {
+        try {
+            log.debug("Starting the Kafka consumer");
+            if (callback == null)
+                callback = config.getConfiguredInstance(ConsumerConfig.CONSUMER_REBALANCE_CALLBACK_CLASS_CONFIG,
+                        ConsumerRebalanceCallback.class);
+            this.time = new SystemTime();
+            this.autoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+            this.autoCommitIntervalMs = config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
+
+            MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
+                    .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
+                            TimeUnit.MILLISECONDS);
+            String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
+            String jmxPrefix = "kafka.consumer";
+            if (clientId.length() <= 0)
+                clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
+            List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
+                    MetricsReporter.class);
+            reporters.add(new JmxReporter(jmxPrefix));
+            this.metrics = new Metrics(metricConfig, reporters, time);
+            this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+            this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG));
+            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+            this.metadata.update(Cluster.bootstrap(addresses), 0);
+
+            String metricGrpPrefix = "consumer";
+            Map<String, String> metricsTags = new LinkedHashMap<String, String>();
+            metricsTags.put("client-id", clientId);
+            NetworkClient netClient = new NetworkClient(
+                    new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, metricsTags),
+                    this.metadata,
+                    clientId,
+                    100, // a fixed large enough value will suffice
+                    config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
+                    config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
+                    config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG));
+            this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs);
+            OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase());
+            this.subscriptions = new SubscriptionState(offsetResetStrategy);
+            this.coordinator = new Coordinator(this.client,
+                    config.getString(ConsumerConfig.GROUP_ID_CONFIG),
+                    config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
+                    config.getString(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
+                    this.subscriptions,
+                    metrics,
+                    metricGrpPrefix,
+                    metricsTags,
+                    this.time,
+                    requestTimeoutMs,
+                    retryBackoffMs,
+                    wrapRebalanceCallback(callback));
+            if (keyDeserializer == null) {
+                this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                        Deserializer.class);
+                this.keyDeserializer.configure(config.originals(), true);
+            } else {
+                this.keyDeserializer = keyDeserializer;
+            }
+            if (valueDeserializer == null) {
+                this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                        Deserializer.class);
+                this.valueDeserializer.configure(config.originals(), false);
+            } else {
+                this.valueDeserializer = valueDeserializer;
+            }
+            this.fetcher = new Fetcher<K, V>(this.client,
+                    config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
+                    config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
+                    config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
+                    config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
+                    this.keyDeserializer,
+                    this.valueDeserializer,
+                    this.metadata,
+                    this.subscriptions,
+                    metrics,
+                    metricGrpPrefix,
+                    metricsTags,
+                    this.time,
+                    this.retryBackoffMs);
+
+            config.logUnused();
+
+            if (autoCommit)
+                scheduleAutoCommitTask(autoCommitIntervalMs);
+
+            log.debug("Kafka consumer created");
+        } catch (Throwable t) {
+            // call close methods if internal objects are already constructed
+            // this is to prevent resource leak. see KAFKA-2121
+            close(true);
+            // now propagate the exception
+            throw new KafkaException("Failed to construct kafka consumer", t);
+        }
+    }
+
+    /**
+     * The set of partitions currently assigned to this consumer. If subscription happened by directly subscribing to
+     * partitions using {@link #subscribe(TopicPartition...)} then this will simply return the list of partitions that
+     * were subscribed to. If subscription was done by specifying only the topic using {@link #subscribe(String...)}
+     * then this will give the set of topics currently assigned to the consumer (which may be none if the assignment
+     * hasn't happened yet, or the partitions are in the process of getting reassigned).
+     */
+    public Set<TopicPartition> subscriptions() {
+        acquire();
+        try {
+            return Collections.unmodifiableSet(this.subscriptions.assignedPartitions());
+        } finally {
+            release();
+        }
+    }
+
+    /**
+     * Incrementally subscribes to the given list of topics and uses the consumer's group management functionality
+     * <p>
+     * As part of group management, the consumer will keep track of the list of consumers that belong to a particular
+     * group and will trigger a rebalance operation if one of the following events trigger -
+     * <ul>
+     * <li>Number of partitions change for any of the subscribed list of topics
+     * <li>Topic is created or deleted
+     * <li>An existing member of the consumer group dies
+     * <li>A new member is added to an existing consumer group via the join API
+     * </ul>
+     * 
+     * @param topics A variable list of topics that the consumer wants to subscribe to
+     */
+    @Override
+    public void subscribe(String... topics) {
+        acquire();
+        try {
+            log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
+            for (String topic : topics)
+                this.subscriptions.subscribe(topic);
+            metadata.addTopics(topics);
+        } finally {
+            release();
+        }
+    }
+
+    /**
+     * Incrementally subscribes to a specific topic partition and does not use the consumer's group management
+     * functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic
+     * metadata change.
+     * <p>
+     *
+     * @param partitions Partitions to incrementally subscribe to
+     */
+    @Override
+    public void subscribe(TopicPartition... partitions) {
+        acquire();
+        try {
+            log.debug("Subscribed to partitions(s): {}", Utils.join(partitions, ", "));
+            for (TopicPartition tp : partitions) {
+                this.subscriptions.subscribe(tp);
+                metadata.addTopics(tp.topic());
+            }
+        } finally {
+            release();
+        }
+    }
+
+    /**
+     * Unsubscribe from the specific topics. This will trigger a rebalance operation and records for this topic will not
+     * be returned from the next {@link #poll(long) poll()} onwards
+     * 
+     * @param topics Topics to unsubscribe from
+     */
+    public void unsubscribe(String... topics) {
+        acquire();
+        try {
+            log.debug("Unsubscribed from topic(s): {}", Utils.join(topics, ", "));
+            // throw an exception if the topic was never subscribed to
+            for (String topic : topics)
+                this.subscriptions.unsubscribe(topic);
+        } finally {
+            release();
+        }
+    }
+
+    /**
+     * Unsubscribe from the specific topic partitions. records for these partitions will not be returned from the next
+     * {@link #poll(long) poll()} onwards
+     * 
+     * @param partitions Partitions to unsubscribe from
+     */
+    public void unsubscribe(TopicPartition... partitions) {
+        acquire();
+        try {
+            log.debug("Unsubscribed from partitions(s): {}", Utils.join(partitions, ", "));
+            // throw an exception if the partition was never subscribed to
+            for (TopicPartition partition : partitions)
+                this.subscriptions.unsubscribe(partition);
+        } finally {
+            release();
+        }
+    }
+
+    /**
+     * Fetches data for the topics or partitions specified using one of the subscribe APIs. It is an error to not have
+     * subscribed to any topics or partitions before polling for data.
+     * <p>
+     * The offset used for fetching the data is governed by whether or not {@link #seek(TopicPartition, long)} is used.
+     * If {@link #seek(TopicPartition, long)} is used, it will use the specified offsets on startup and on every
+     * rebalance, to consume data from that offset sequentially on every poll. If not, it will use the last checkpointed
+     * offset using {@link #commit(Map, CommitType) commit(offsets, sync)} for the subscribed list of partitions.
+     * 
+     * @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, returns
+     *            immediately with any records available now. Must not be negative.
+     * @return map of topic to records since the last fetch for the subscribed list of topics and partitions
+     * 
+     * @throws NoOffsetForPartitionException If there is no stored offset for a subscribed partition and no automatic
+     *             offset reset policy has been configured.
+     */
+    @Override
+    public ConsumerRecords<K, V> poll(long timeout) {
+        acquire();
+        try {
+            if (timeout < 0)
+                throw new IllegalArgumentException("Timeout must not be negative");
+
+            // poll for new data until the timeout expires
+            long remaining = timeout;
+            while (remaining >= 0) {
+                long start = time.milliseconds();
+                Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
+                long end = time.milliseconds();
+
+                if (!records.isEmpty()) {
+                    // if data is available, then return it, but first send off the
+                    // next round of fetches to enable pipelining while the user is
+                    // handling the fetched records.
+                    fetcher.initFetches(metadata.fetch());
+                    client.poll(0);
+                    return new ConsumerRecords<K, V>(records);
+                }
+
+                remaining -= end - start;
+
+                // nothing was available, so we should backoff before retrying
+                if (remaining > 0) {
+                    Utils.sleep(min(remaining, retryBackoffMs));
+                    remaining -= time.milliseconds() - end;
+                }
+            }
+
+            return ConsumerRecords.empty();
+        } finally {
+            release();
+        }
+    }
+
+    /**
+     * Do one round of polling. In addition to checking for new data, this does any needed
+     * heart-beating, auto-commits, and offset updates.
+     * @param timeout The maximum time to block in the underlying poll
+     * @return The fetched records (may be empty)
+     */
+    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
+        // TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
+        coordinator.ensureCoordinatorKnown();
+
+        // ensure we have partitions assigned if we expect to
+        if (subscriptions.partitionsAutoAssigned())
+            coordinator.ensurePartitionAssignment();
+
+        // fetch positions if we have partitions we're subscribed to that we
+        // don't know the offset for
+        if (!subscriptions.hasAllFetchPositions())
+            updateFetchPositions(this.subscriptions.missingFetchPositions());
+
+        // init any new fetches (won't resend pending fetches)
+        Cluster cluster = this.metadata.fetch();
+        fetcher.initFetches(cluster);
+        client.poll(timeout);
+        return fetcher.fetchedRecords();
+    }
+
+    private void scheduleAutoCommitTask(final long interval) {
+        DelayedTask task = new DelayedTask() {
+            public void run(long now) {
+                commit(CommitType.ASYNC);
+                client.schedule(this, now + interval);
+            }
+        };
+        client.schedule(task, time.milliseconds() + interval);
+    }
+
+    /**
+     * Commits the specified offsets for the specified list of topics and partitions to Kafka.
+     * <p>
+     * This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
+     * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
+     * should not be used.
+     * <p>
+     * Asynchronous commits (i.e. {@link CommitType#ASYNC} will not block. Any errors encountered during an asynchronous
+     * commit are silently discarded. If you need to determine the result of an asynchronous commit, you should use
+     * {@link #commit(Map, CommitType, ConsumerCommitCallback)}. Synchronous commits (i.e. {@link CommitType#SYNC})
+     * block until either the commit succeeds or an unrecoverable error is encountered (in which case it is thrown
+     * to the caller).
+     *
+     * @param offsets The list of offsets per partition that should be committed to Kafka.
+     * @param commitType Control whether the commit is blocking
+     */
+    @Override
+    public void commit(final Map<TopicPartition, Long> offsets, CommitType commitType) {
+        commit(offsets, commitType, null);
+    }
+
+    /**
+     * Commits the specified offsets for the specified list of topics and partitions to Kafka.
+     * <p>
+     * This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
+     * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
+     * should not be used.
+     * <p>
+     * Asynchronous commits (i.e. {@link CommitType#ASYNC} will not block. Any errors encountered during an asynchronous
+     * commit are either passed to the callback (if provided) or silently discarded. Synchronous commits (i.e.
+     * {@link CommitType#SYNC}) block until either the commit succeeds or an unrecoverable error is encountered. In
+     * this case, the error is either passed to the callback (if provided) or thrown to the caller.
+     *
+     * @param offsets The list of offsets per partition that should be committed to Kafka.
+     * @param commitType Control whether the commit is blocking
+     * @param callback Callback to invoke when the commit completes
+     */
+    @Override
+    public void commit(final Map<TopicPartition, Long> offsets, CommitType commitType, ConsumerCommitCallback callback) {
+        acquire();
+        try {
+            log.debug("Committing offsets ({}): {} ", commitType.toString().toLowerCase(), offsets);
+            coordinator.commitOffsets(offsets, commitType, callback);
+        } finally {
+            release();
+        }
+    }
+
+    /**
+     * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions.
+     * <p>
+     * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after
+     * every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
+     * should not be used.
+     * <p>
+     * Asynchronous commits (i.e. {@link CommitType#ASYNC} will not block. Any errors encountered during an asynchronous
+     * commit are either passed to the callback (if provided) or silently discarded. Synchronous commits (i.e.
+     * {@link CommitType#SYNC}) block until either the commit succeeds or an unrecoverable error is encountered. In
+     * this case, the error is either passed to the callback (if provided) or thrown to the caller.
+     *
+     * @param commitType Whether or not the commit should block until it is acknowledged.
+     * @param callback Callback to invoke when the commit completes
+     */
+    @Override
+    public void commit(CommitType commitType, ConsumerCommitCallback callback) {
+        acquire();
+        try {
+            // need defensive copy to ensure offsets are not removed before completion (e.g. in rebalance)
+            Map<TopicPartition, Long> allConsumed = new HashMap<TopicPartition, Long>(this.subscriptions.allConsumed());
+            commit(allConsumed, commitType, callback);
+        } finally {
+            release();
+        }
+    }
+
+    /**
+     * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions.
+     * <p>
+     * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after
+     * every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
+     * should not be used.
+     * <p>
+     * Asynchronous commits (i.e. {@link CommitType#ASYNC} will not block. Any errors encountered during an asynchronous
+     * commit are silently discarded. If you need to determine the result of an asynchronous commit, you should use
+     * {@link #commit(CommitType, ConsumerCommitCallback)}. Synchronous commits (i.e. {@link CommitType#SYNC})
+     * block until either the commit succeeds or an unrecoverable error is encountered (in which case it is thrown
+     * to the caller).
+     *
+     * @param commitType Whether or not the commit should block until it is acknowledged.
+     */
+    @Override
+    public void commit(CommitType commitType) {
+        commit(commitType, null);
+    }
+
+    /**
+     * Overrides the fetch offsets that the consumer will use on the next {@link #poll(long) poll(timeout)}. If this API
+     * is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that
+     * you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets
+     */
+    @Override
+    public void seek(TopicPartition partition, long offset) {
+        acquire();
+        try {
+            log.debug("Seeking to offset {} for partition {}", offset, partition);
+            this.subscriptions.seek(partition, offset);
+        } finally {
+            release();
+        }
+    }
+
+    /**
+     * Seek to the first offset for each of the given partitions
+     */
+    public void seekToBeginning(TopicPartition... partitions) {
+        acquire();
+        try {
+            Collection<TopicPartition> parts = partitions.length == 0 ? this.subscriptions.assignedPartitions()
+                    : Arrays.asList(partitions);
+            for (TopicPartition tp : parts) {
+                log.debug("Seeking to beginning of partition {}", tp);
+                subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
+            }
+        } finally {
+            release();
+        }
+    }
+
+    /**
+     * Seek to the last offset for each of the given partitions
+     */
+    public void seekToEnd(TopicPartition... partitions) {
+        acquire();
+        try {
+            Collection<TopicPartition> parts = partitions.length == 0 ? this.subscriptions.assignedPartitions()
+                    : Arrays.asList(partitions);
+            for (TopicPartition tp : parts) {
+                log.debug("Seeking to end of partition {}", tp);
+                subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
+            }
+        } finally {
+            release();
+        }
+    }
+
+    /**
+     * Returns the offset of the <i>next record</i> that will be fetched (if a record with that offset exists).
+     * 
+     * @param partition The partition to get the position for
+     * @return The offset
+     * @throws NoOffsetForPartitionException If a position hasn't been set for a given partition, and no reset policy is
+     *             available.
+     */
+    public long position(TopicPartition partition) {
+        acquire();
+        try {
+            if (!this.subscriptions.assignedPartitions().contains(partition))
+                throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
+            Long offset = this.subscriptions.consumed(partition);
+            if (offset == null) {
+                updateFetchPositions(Collections.singleton(partition));
+                return this.subscriptions.consumed(partition);
+            } else {
+                return offset;
+            }
+        } finally {
+            release();
+        }
+    }
+
+    /**
+     * Fetches the last committed offset for the given partition (whether the commit happened by this process or
+     * another). This offset will be used as the position for the consumer in the event of a failure.
+     * <p>
+     * This call may block to do a remote call if the partition in question isn't assigned to this consumer or if the
+     * consumer hasn't yet initialized it's cache of committed offsets.
+     * 
+     * @param partition The partition to check
+     * @return The last committed offset or null if no offset has been committed
+     * @throws NoOffsetForPartitionException If no offset has ever been committed by any process for the given
+     *             partition.
+     */
+    @Override
+    public long committed(TopicPartition partition) {
+        acquire();
+        try {
+            Long committed;
+            if (subscriptions.assignedPartitions().contains(partition)) {
+                committed = this.subscriptions.committed(partition);
+                if (committed == null) {
+                    coordinator.refreshCommittedOffsetsIfNeeded();
+                    committed = this.subscriptions.committed(partition);
+                }
+            } else {
+                Map<TopicPartition, Long> offsets = coordinator.fetchCommittedOffsets(Collections.singleton(partition));
+                committed = offsets.get(partition);
+            }
+
+            if (committed == null)
+                throw new NoOffsetForPartitionException("No offset has been committed for partition " + partition);
+
+            return committed;
+        } finally {
+            release();
+        }
+    }
+
+    /**
+     * Get the metrics kept by the consumer
+     */
+    @Override
+    public Map<MetricName, ? extends Metric> metrics() {
+        return Collections.unmodifiableMap(this.metrics.metrics());
+    }
+
+    /**
+     * Get metadata about the partitions for a given topic. This method will issue a remote call to the server if it
+     * does not already have any metadata about the given topic.
+     * 
+     * @param topic The topic to get partition metadata for
+     * @return The list of partitions
+     */
+    @Override
+    public List<PartitionInfo> partitionsFor(String topic) {
+        acquire();
+        try {
+            Cluster cluster = this.metadata.fetch();
+            List<PartitionInfo> parts = cluster.partitionsForTopic(topic);
+            if (parts == null) {
+                metadata.add(topic);
+                client.awaitMetadataUpdate();
+                parts = metadata.fetch().partitionsForTopic(topic);
+            }
+            return parts;
+        } finally {
+            release();
+        }
+    }
+
+    @Override
+    public void close() {
+        acquire();
+        try {
+            if (closed) return;
+            close(false);
+        } finally {
+            release();
+        }
+    }
+
+    /**
+     * Wakeup the consumer. This method is thread-safe and is useful in particular to abort a long poll.
+     * The thread which is blocking in an operation will throw {@link ConsumerWakeupException}.
+     */
+    @Override
+    public void wakeup() {
+        this.client.wakeup();
+    }
+
+    private void close(boolean swallowException) {
+        log.trace("Closing the Kafka consumer.");
+        AtomicReference<Throwable> firstException = new AtomicReference<Throwable>();
+        this.closed = true;
+        ClientUtils.closeQuietly(metrics, "consumer metrics", firstException);
+        ClientUtils.closeQuietly(client, "consumer network client", firstException);
+        ClientUtils.closeQuietly(keyDeserializer, "consumer key deserializer", firstException);
+        ClientUtils.closeQuietly(valueDeserializer, "consumer value deserializer", firstException);
+        log.debug("The Kafka consumer has closed.");
+        if (firstException.get() != null && !swallowException) {
+            throw new KafkaException("Failed to close kafka consumer", firstException.get());
+        }
+    }
+
+    private Coordinator.RebalanceCallback wrapRebalanceCallback(final ConsumerRebalanceCallback callback) {
+        return new Coordinator.RebalanceCallback() {
+            @Override
+            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+                callback.onPartitionsAssigned(KafkaConsumer.this, partitions);
+            }
+
+            @Override
+            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+                callback.onPartitionsRevoked(KafkaConsumer.this, partitions);
+            }
+        };
+    }
+
+    /**
+     * Set the fetch position to the committed position (if there is one)
+     * or reset it using the offset reset policy the user has configured.
+     *
+     * @param partitions The partitions that needs updating fetch positions
+     * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is
+     *             defined
+     */
+    private void updateFetchPositions(Set<TopicPartition> partitions) {
+        // refresh commits for all assigned partitions
+        coordinator.refreshCommittedOffsetsIfNeeded();
+
+        // then do any offset lookups in case some positions are not known
+        fetcher.updateFetchPositions(partitions);
+    }
+
+    /*
+     * Check that the consumer hasn't been closed.
+     */
+    private void ensureNotClosed() {
+        if (this.closed)
+            throw new IllegalStateException("This consumer has already been closed.");
+    }
+
+    /**
+     * Acquire the light lock protecting this consumer from multi-threaded access. Instead of blocking
+     * when the lock is not available, however, we just throw an exception (since multi-threaded usage is not
+     * supported).
+     * @throws IllegalStateException if the consumer has been closed
+     * @throws ConcurrentModificationException if another thread already has the lock
+     */
+    private void acquire() {
+        ensureNotClosed();
+        long threadId = Thread.currentThread().getId();
+        if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
+            throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
+        refcount.incrementAndGet();
+    }
+
+    /**
+     * Release the light lock protecting the consumer from multi-threaded access.
+     */
+    private void release() {
+        if (refcount.decrementAndGet() == 0)
+            currentThread.set(NO_CURRENT_THREAD);
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/MockConsumer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/MockConsumer.java
new file mode 100644
index 0000000..1d08519
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/MockConsumer.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients.consumer;
+
+import org.apache.flink.kafka_backport.clients.consumer.internals.SubscriptionState;
+import org.apache.flink.kafka_backport.common.Metric;
+import org.apache.flink.kafka_backport.common.MetricName;
+import org.apache.flink.kafka_backport.common.PartitionInfo;
+import org.apache.flink.kafka_backport.common.TopicPartition;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A mock of the {@link Consumer} interface you can use for testing code that uses Kafka. This class is <i> not
+ * threadsafe </i>
+ * <p>
+ * The consumer runs in the user thread and multiplexes I/O over TCP connections to each of the brokers it needs to
+ * communicate with. Failure to close the consumer after use will leak these resources.
+ */
+public class MockConsumer<K, V> implements Consumer<K, V> {
+
+    private final Map<String, List<PartitionInfo>> partitions;
+    private final SubscriptionState subscriptions;
+    private Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
+    private boolean closed;
+
+    public MockConsumer(OffsetResetStrategy offsetResetStrategy) {
+        this.subscriptions = new SubscriptionState(offsetResetStrategy);
+        this.partitions = new HashMap<String, List<PartitionInfo>>();
+        this.records = new HashMap<TopicPartition, List<ConsumerRecord<K, V>>>();
+        this.closed = false;
+    }
+    
+    @Override
+    public synchronized Set<TopicPartition> subscriptions() {
+        return this.subscriptions.assignedPartitions();
+    }
+
+    @Override
+    public synchronized void subscribe(String... topics) {
+        ensureNotClosed();
+        for (String topic : topics)
+            this.subscriptions.subscribe(topic);
+    }
+
+    @Override
+    public synchronized void subscribe(TopicPartition... partitions) {
+        ensureNotClosed();
+        for (TopicPartition partition : partitions)
+            this.subscriptions.subscribe(partition);
+    }
+
+    public synchronized void unsubscribe(String... topics) {
+        ensureNotClosed();
+        for (String topic : topics)
+            this.subscriptions.unsubscribe(topic);
+    }
+
+    public synchronized void unsubscribe(TopicPartition... partitions) {
+        ensureNotClosed();
+        for (TopicPartition partition : partitions)
+            this.subscriptions.unsubscribe(partition);
+    }
+
+    @Override
+    public synchronized ConsumerRecords<K, V> poll(long timeout) {
+        ensureNotClosed();
+        // update the consumed offset
+        for (Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : this.records.entrySet()) {
+            List<ConsumerRecord<K, V>> recs = entry.getValue();
+            if (!recs.isEmpty())
+                this.subscriptions.consumed(entry.getKey(), recs.get(recs.size() - 1).offset());
+        }
+
+        ConsumerRecords<K, V> copy = new ConsumerRecords<K, V>(this.records);
+        this.records = new HashMap<TopicPartition, List<ConsumerRecord<K, V>>>();
+        return copy;
+    }
+
+    public synchronized void addRecord(ConsumerRecord<K, V> record) {
+        ensureNotClosed();
+        TopicPartition tp = new TopicPartition(record.topic(), record.partition());
+        this.subscriptions.assignedPartitions().add(tp);
+        List<ConsumerRecord<K, V>> recs = this.records.get(tp);
+        if (recs == null) {
+            recs = new ArrayList<ConsumerRecord<K, V>>();
+            this.records.put(tp, recs);
+        }
+        recs.add(record);
+    }
+
+    @Override
+    public synchronized void commit(Map<TopicPartition, Long> offsets, CommitType commitType, ConsumerCommitCallback callback) {
+        ensureNotClosed();
+        for (Entry<TopicPartition, Long> entry : offsets.entrySet())
+            subscriptions.committed(entry.getKey(), entry.getValue());
+        if (callback != null) {
+            callback.onComplete(offsets, null);
+        }
+    }
+
+    @Override
+    public synchronized void commit(Map<TopicPartition, Long> offsets, CommitType commitType) {
+        commit(offsets, commitType, null);
+    }
+
+    @Override
+    public synchronized void commit(CommitType commitType, ConsumerCommitCallback callback) {
+        ensureNotClosed();
+        commit(this.subscriptions.allConsumed(), commitType, callback);
+    }
+
+    @Override
+    public synchronized void commit(CommitType commitType) {
+        commit(commitType, null);
+    }
+
+    @Override
+    public synchronized void seek(TopicPartition partition, long offset) {
+        ensureNotClosed();
+        subscriptions.seek(partition, offset);
+    }
+
+    @Override
+    public synchronized long committed(TopicPartition partition) {
+        ensureNotClosed();
+        return subscriptions.committed(partition);
+    }
+
+    @Override
+    public synchronized long position(TopicPartition partition) {
+        ensureNotClosed();
+        return subscriptions.consumed(partition);
+    }
+
+    @Override
+    public synchronized void seekToBeginning(TopicPartition... partitions) {
+        ensureNotClosed();
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public synchronized void seekToEnd(TopicPartition... partitions) {
+        ensureNotClosed();
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Map<MetricName, ? extends Metric> metrics() {
+        ensureNotClosed();
+        return Collections.emptyMap();
+    }
+
+    @Override
+    public synchronized List<PartitionInfo> partitionsFor(String topic) {
+        ensureNotClosed();
+        List<PartitionInfo> parts = this.partitions.get(topic);
+        if (parts == null)
+            return Collections.emptyList();
+        else
+            return parts;
+    }
+
+    public synchronized void updatePartitions(String topic, List<PartitionInfo> partitions) {
+        ensureNotClosed();
+        this.partitions.put(topic, partitions);
+    }
+
+    @Override
+    public synchronized void close() {
+        ensureNotClosed();
+        this.closed = true;
+    }
+
+    @Override
+    public void wakeup() {
+
+    }
+
+    private void ensureNotClosed() {
+        if (this.closed)
+            throw new IllegalStateException("This consumer has already been closed.");
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/NoOffsetForPartitionException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/NoOffsetForPartitionException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/NoOffsetForPartitionException.java
new file mode 100644
index 0000000..19ae0a6
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/NoOffsetForPartitionException.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.flink.kafka_backport.clients.consumer;
+
+import org.apache.flink.kafka_backport.common.KafkaException;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * Indicates that there is no stored offset and no defined offset reset policy
+ */
+public class NoOffsetForPartitionException extends KafkaException {
+
+    private static final long serialVersionUID = 1L;
+
+    public NoOffsetForPartitionException(String message) {
+        super(message);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/OffsetResetStrategy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/OffsetResetStrategy.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/OffsetResetStrategy.java
new file mode 100644
index 0000000..70c254f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/OffsetResetStrategy.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients.consumer;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public enum OffsetResetStrategy {
+    LATEST, EARLIEST, NONE
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/ConsumerNetworkClient.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/ConsumerNetworkClient.java
new file mode 100644
index 0000000..a6d16cd
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/ConsumerNetworkClient.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients.consumer.internals;
+
+import org.apache.flink.kafka_backport.clients.ClientResponse;
+import org.apache.flink.kafka_backport.clients.Metadata;
+import org.apache.flink.kafka_backport.clients.consumer.ConsumerWakeupException;
+import org.apache.flink.kafka_backport.common.Node;
+import org.apache.flink.kafka_backport.common.requests.AbstractRequest;
+import org.apache.flink.kafka_backport.common.requests.RequestHeader;
+import org.apache.flink.kafka_backport.common.requests.RequestSend;
+import org.apache.flink.kafka_backport.clients.ClientRequest;
+import org.apache.flink.kafka_backport.clients.KafkaClient;
+import org.apache.flink.kafka_backport.clients.RequestCompletionHandler;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+import org.apache.flink.kafka_backport.common.utils.Time;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Higher level consumer access to the network layer with basic support for futures and
+ * task scheduling. NOT thread-safe!
+ *
+ * TODO: The current implementation is simplistic in that it provides a facility for queueing requests
+ * prior to delivery, but it makes no effort to retry requests which cannot be sent at the time
+ * {@link #poll(long)} is called. This makes the behavior of the queue predictable and easy to
+ * understand, but there are opportunities to provide timeout or retry capabilities in the future.
+ * How we do this may depend on KAFKA-2120, so for now, we retain the simplistic behavior.
+ */
+public class ConsumerNetworkClient implements Closeable {
+    private final KafkaClient client;
+    private final AtomicBoolean wakeup = new AtomicBoolean(false);
+    private final DelayedTaskQueue delayedTasks = new DelayedTaskQueue();
+    private final Map<Node, List<ClientRequest>> unsent = new HashMap<Node, List<ClientRequest>>();
+    private final Metadata metadata;
+    private final Time time;
+    private final long retryBackoffMs;
+
+    public ConsumerNetworkClient(KafkaClient client,
+                                 Metadata metadata,
+                                 Time time,
+                                 long retryBackoffMs) {
+        this.client = client;
+        this.metadata = metadata;
+        this.time = time;
+        this.retryBackoffMs = retryBackoffMs;
+    }
+
+    /**
+     * Schedule a new task to be executed at the given time. This is "best-effort" scheduling and
+     * should only be used for coarse synchronization.
+     * @param task The task to be scheduled
+     * @param at The time it should run
+     */
+    public void schedule(DelayedTask task, long at) {
+        delayedTasks.add(task, at);
+    }
+
+    /**
+     * Unschedule a task. This will remove all instances of the task from the task queue.
+     * This is a no-op if the task is not scheduled.
+     * @param task The task to be unscheduled.
+     */
+    public void unschedule(DelayedTask task) {
+        delayedTasks.remove(task);
+    }
+
+    /**
+     * Send a new request. Note that the request is not actually transmitted on the
+     * network until one of the {@link #poll(long)} variants is invoked. At this
+     * point the request will either be transmitted successfully or will fail.
+     * Use the returned future to obtain the result of the send.
+     * @param node The destination of the request
+     * @param api The Kafka API call
+     * @param request The request payload
+     * @return A future which indicates the result of the send.
+     */
+    public RequestFuture<ClientResponse> send(Node node,
+                                              ApiKeys api,
+                                              AbstractRequest request) {
+        long now = time.milliseconds();
+        RequestFutureCompletionHandler future = new RequestFutureCompletionHandler();
+        RequestHeader header = client.nextRequestHeader(api);
+        RequestSend send = new RequestSend(node.idString(), header, request.toStruct());
+        put(node, new ClientRequest(now, true, send, future));
+        return future;
+    }
+
+    private void put(Node node, ClientRequest request) {
+        List<ClientRequest> nodeUnsent = unsent.get(node);
+        if (nodeUnsent == null) {
+            nodeUnsent = new ArrayList<ClientRequest>();
+            unsent.put(node, nodeUnsent);
+        }
+        nodeUnsent.add(request);
+    }
+
+    public Node leastLoadedNode() {
+        return client.leastLoadedNode(time.milliseconds());
+    }
+
+    /**
+     * Block until the metadata has been refreshed.
+     */
+    public void awaitMetadataUpdate() {
+        int version = this.metadata.requestUpdate();
+        do {
+            poll(Long.MAX_VALUE);
+        } while (this.metadata.version() == version);
+    }
+
+    /**
+     * Wakeup an active poll. This will cause the polling thread to throw an exception either
+     * on the current poll if one is active, or the next poll.
+     */
+    public void wakeup() {
+        this.wakeup.set(true);
+        this.client.wakeup();
+    }
+
+    /**
+     * Block indefinitely until the given request future has finished.
+     * @param future The request future to await.
+     * @throws org.apache.flink.kafka_backport.clients.consumer.ConsumerWakeupException if {@link #wakeup()} is called from another thread
+     */
+    public void poll(RequestFuture<?> future) {
+        while (!future.isDone())
+            poll(Long.MAX_VALUE);
+    }
+
+    /**
+     * Block until the provided request future request has finished or the timeout has expired.
+     * @param future The request future to wait for
+     * @param timeout The maximum duration (in ms) to wait for the request
+     * @return true if the future is done, false otherwise
+     * @throws org.apache.flink.kafka_backport.clients.consumer.ConsumerWakeupException if {@link #wakeup()} is called from another thread
+     */
+    public boolean poll(RequestFuture<?> future, long timeout) {
+        long now = time.milliseconds();
+        long deadline = now + timeout;
+        while (!future.isDone() && now < deadline) {
+            poll(deadline - now, now);
+            now = time.milliseconds();
+        }
+        return future.isDone();
+    }
+
+    /**
+     * Poll for any network IO. All send requests will either be transmitted on the network
+     * or failed when this call completes.
+     * @param timeout The maximum time to wait for an IO event.
+     * @throws org.apache.flink.kafka_backport.clients.consumer.ConsumerWakeupException if {@link #wakeup()} is called from another thread
+     */
+    public void poll(long timeout) {
+        poll(timeout, time.milliseconds());
+    }
+
+    private void poll(long timeout, long now) {
+        // send all the requests we can send now
+        pollUnsentRequests(now);
+
+        // ensure we don't poll any longer than the deadline for
+        // the next scheduled task
+        timeout = Math.min(timeout, delayedTasks.nextTimeout(now));
+        clientPoll(timeout, now);
+
+        // execute scheduled tasks
+        now = time.milliseconds();
+        delayedTasks.poll(now);
+
+        // try again to send requests since buffer space may have been
+        // cleared or a connect finished in the poll
+        pollUnsentRequests(now);
+
+        // fail all requests that couldn't be sent
+        clearUnsentRequests(now);
+
+    }
+
+    /**
+     * Block until all pending requests from the given node have finished.
+     * @param node The node to await requests from
+     */
+    public void awaitPendingRequests(Node node) {
+        while (pendingRequestCount(node) > 0)
+            poll(retryBackoffMs);
+    }
+
+    /**
+     * Get the count of pending requests to the given node. This includes both request that
+     * have been transmitted (i.e. in-flight requests) and those which are awaiting transmission.
+     * @param node The node in question
+     * @return The number of pending requests
+     */
+    public int pendingRequestCount(Node node) {
+        List<ClientRequest> pending = unsent.get(node);
+        int unsentCount = pending == null ? 0 : pending.size();
+        return unsentCount + client.inFlightRequestCount(node.idString());
+    }
+
+    /**
+     * Get the total count of pending requests from all nodes. This includes both requests that
+     * have been transmitted (i.e. in-flight requests) and those which are awaiting transmission.
+     * @return The total count of pending requests
+     */
+    public int pendingRequestCount() {
+        int total = 0;
+        for (List<ClientRequest> requests: unsent.values())
+            total += requests.size();
+        return total + client.inFlightRequestCount();
+    }
+
+    private void pollUnsentRequests(long now) {
+        while (trySend(now))
+            clientPoll(0, now);
+    }
+
+    private void clearUnsentRequests(long now) {
+        // clear all unsent requests and fail their corresponding futures
+        for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) {
+            Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
+            while (iterator.hasNext()) {
+                ClientRequest request = iterator.next();
+                RequestFutureCompletionHandler handler =
+                        (RequestFutureCompletionHandler) request.callback();
+                handler.raise(SendFailedException.INSTANCE);
+                iterator.remove();
+            }
+        }
+        unsent.clear();
+    }
+
+    private boolean trySend(long now) {
+        // send any requests that can be sent now
+        boolean requestsSent = false;
+        for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) {
+            Node node = requestEntry.getKey();
+            Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
+            while (iterator.hasNext()) {
+                ClientRequest request = iterator.next();
+                if (client.ready(node, now)) {
+                    client.send(request);
+                    iterator.remove();
+                    requestsSent = true;
+                } else if (client.connectionFailed(node)) {
+                    RequestFutureCompletionHandler handler =
+                            (RequestFutureCompletionHandler) request.callback();
+                    handler.onComplete(new ClientResponse(request, now, true, null));
+                    iterator.remove();
+                }
+            }
+        }
+        return requestsSent;
+    }
+
+    private void clientPoll(long timeout, long now) {
+        client.poll(timeout, now);
+        if (wakeup.get()) {
+            clearUnsentRequests(now);
+            wakeup.set(false);
+            throw new ConsumerWakeupException();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        client.close();
+    }
+
+    public static class RequestFutureCompletionHandler
+            extends RequestFuture<ClientResponse>
+            implements RequestCompletionHandler {
+
+        @Override
+        public void onComplete(ClientResponse response) {
+            complete(response);
+        }
+    }
+}


Mime
View raw message