Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 564AF105BC for ; Thu, 27 Aug 2015 11:25:20 +0000 (UTC) Received: (qmail 52905 invoked by uid 500); 27 Aug 2015 11:25:20 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 52808 invoked by uid 500); 27 Aug 2015 11:25:20 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 51770 invoked by uid 99); 27 Aug 2015 11:25:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 27 Aug 2015 11:25:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 57994E7E5C; Thu, 27 Aug 2015 11:25:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Thu, 27 Aug 2015 11:25:39 -0000 Message-Id: <215deda595f949d89df88520fcba8d78@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [22/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Add comments to all backported kafka sources and move them to 'org.apache.flink.kafka_backport' http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/KafkaConsumer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/KafkaConsumer.java new file mode 100644 index 0000000..8800954 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/KafkaConsumer.java @@ -0,0 +1,1130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.flink.kafka_backport.clients.consumer; + +import org.apache.flink.kafka_backport.clients.Metadata; +import org.apache.flink.kafka_backport.clients.consumer.internals.DelayedTask; +import org.apache.flink.kafka_backport.common.KafkaException; +import org.apache.flink.kafka_backport.common.metrics.MetricConfig; +import org.apache.flink.kafka_backport.clients.ClientUtils; +import org.apache.flink.kafka_backport.clients.NetworkClient; +import org.apache.flink.kafka_backport.clients.consumer.internals.ConsumerNetworkClient; +import org.apache.flink.kafka_backport.clients.consumer.internals.Coordinator; +import org.apache.flink.kafka_backport.clients.consumer.internals.Fetcher; +import org.apache.flink.kafka_backport.clients.consumer.internals.SubscriptionState; +import org.apache.flink.kafka_backport.common.Cluster; +import org.apache.flink.kafka_backport.common.Metric; +import org.apache.flink.kafka_backport.common.MetricName; +import org.apache.flink.kafka_backport.common.PartitionInfo; +import org.apache.flink.kafka_backport.common.TopicPartition; +import org.apache.flink.kafka_backport.common.metrics.JmxReporter; +import org.apache.flink.kafka_backport.common.metrics.Metrics; +import org.apache.flink.kafka_backport.common.metrics.MetricsReporter; +import org.apache.flink.kafka_backport.common.network.Selector; +import org.apache.flink.kafka_backport.common.serialization.Deserializer; +import org.apache.flink.kafka_backport.common.utils.SystemTime; +import org.apache.flink.kafka_backport.common.utils.Time; +import org.apache.flink.kafka_backport.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.kafka_backport.common.utils.Utils.min; + +// ---------------------------------------------------------------------------- +// This class is copied from the Apache Kafka project. +// +// The class is part of a "backport" of the new consumer API, in order to +// give Flink access to its functionality until the API is properly released. +// +// This is a temporary workaround! +// ---------------------------------------------------------------------------- + +/** + * A Kafka client that consumes records from a Kafka cluster. + *

+ * It will transparently handle the failure of servers in the Kafka cluster, and transparently adapt as partitions of + * data it subscribes to migrate within the cluster. This client also interacts with the server to allow groups of + * consumers to load balance consumption using consumer groups (as described below). + *

+ * The consumer maintains TCP connections to the necessary brokers to fetch data for the topics it subscribes to. + * Failure to close the consumer after use will leak these connections. + *

+ * The consumer is not thread-safe. See Multi-threaded Processing for more details. + * + *

Offsets and Consumer Position

+ * Kafka maintains a numerical offset for each record in a partition. This offset acts as a kind of unique identifier of + * a record within that partition, and also denotes the position of the consumer in the partition. That is, a consumer + * which has position 5 has consumed records with offsets 0 through 4 and will next receive record with offset 5. There + * are actually two notions of position relevant to the user of the consumer. + *

+ * The {@link #position(TopicPartition) position} of the consumer gives the offset of the next record that will be given + * out. It will be one larger than the highest offset the consumer has seen in that partition. It automatically advances + * every time the consumer receives data calls {@link #poll(long)} and receives messages. + *

+ * The {@link #commit(CommitType) committed position} is the last offset that has been saved securely. Should the + * process fail and restart, this is the offset that it will recover to. The consumer can either automatically commit + * offsets periodically, or it can choose to control this committed position manually by calling + * {@link #commit(CommitType) commit}. + *

+ * This distinction gives the consumer control over when a record is considered consumed. It is discussed in further + * detail below. + * + *

Consumer Groups

+ * + * Kafka uses the concept of consumer groups to allow a pool of processes to divide up the work of consuming and + * processing records. These processes can either be running on the same machine or, as is more likely, they can be + * distributed over many machines to provide additional scalability and fault tolerance for processing. + *

+ * Each Kafka consumer must specify a consumer group that it belongs to. Kafka will deliver each message in the + * subscribed topics to one process in each consumer group. This is achieved by balancing the partitions in the topic + * over the consumer processes in each group. So if there is a topic with four partitions, and a consumer group with two + * processes, each process would consume from two partitions. This group membership is maintained dynamically: if a + * process fails the partitions assigned to it will be reassigned to other processes in the same group, and if a new + * process joins the group, partitions will be moved from existing consumers to this new process. + *

+ * So if two processes subscribe to a topic both specifying different groups they will each get all the records in that + * topic; if they both specify the same group they will each get about half the records. + *

+ * Conceptually you can think of a consumer group as being a single logical subscriber that happens to be made up of + * multiple processes. As a multi-subscriber system, Kafka naturally supports having any number of consumer groups for a + * given topic without duplicating data (additional consumers are actually quite cheap). + *

+ * This is a slight generalization of the functionality that is common in messaging systems. To get semantics similar to + * a queue in a traditional messaging system all processes would be part of a single consumer group and hence record + * delivery would be balanced over the group like with a queue. Unlike a traditional messaging system, though, you can + * have multiple such groups. To get semantics similar to pub-sub in a traditional messaging system each process would + * have it's own consumer group, so each process would subscribe to all the records published to the topic. + *

+ * In addition, when offsets are committed they are always committed for a given consumer group. + *

+ * It is also possible for the consumer to manually specify the partitions it subscribes to, which disables this dynamic + * partition balancing. + * + *

Usage Examples

+ * The consumer APIs offer flexibility to cover a variety of consumption use cases. Here are some examples to + * demonstrate how to use them. + * + *

Simple Processing

+ * This example demonstrates the simplest usage of Kafka's consumer api. + * + *
+ *     Properties props = new Properties();
+ *     props.put("bootstrap.servers", "localhost:9092");
+ *     props.put("group.id", "test");
+ *     props.put("enable.auto.commit", "true");
+ *     props.put("auto.commit.interval.ms", "1000");
+ *     props.put("session.timeout.ms", "30000");
+ *     props.put("key.deserializer", "org.apache.StringDeserializer");
+ *     props.put("value.deserializer", "org.apache.StringDeserializer");
+ *     KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
+ *     consumer.subscribe("foo", "bar");
+ *     while (true) {
+ *         ConsumerRecords<String, String> records = consumer.poll(100);
+ *         for (ConsumerRecord<String, String> record : records)
+ *             System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
+ *     }
+ * 
+ * + * Setting enable.auto.commit means that offsets are committed automatically with a frequency controlled by + * the config auto.commit.interval.ms. + *

+ * The connection to the cluster is bootstrapped by specifying a list of one or more brokers to contact using the + * configuration bootstrap.servers. This list is just used to discover the rest of the brokers in the + * cluster and need not be an exhaustive list of servers in the cluster (though you may want to specify more than one in + * case there are servers down when the client is connecting). + *

+ * In this example the client is subscribing to the topics foo and bar as part of a group of consumers + * called test as described above. + *

+ * The broker will automatically detect failed processes in the test group by using a heartbeat mechanism. The + * consumer will automatically ping the cluster periodically, which let's the cluster know that it is alive. As long as + * the consumer is able to do this it is considered alive and retains the right to consume from the partitions assigned + * to it. If it stops heartbeating for a period of time longer than session.timeout.ms then it will be + * considered dead and it's partitions will be assigned to another process. + *

+ * The deserializer settings specify how to turn bytes into objects. For example, by specifying string deserializers, we + * are saying that our record's key and value will just be simple strings. + * + *

Controlling When Messages Are Considered Consumed

+ * + * In this example we will consume a batch of records and batch them up in memory, when we have sufficient records + * batched we will insert them into a database. If we allowed offsets to auto commit as in the previous example messages + * would be considered consumed after they were given out by the consumer, and it would be possible that our process + * could fail after we have read messages into our in-memory buffer but before they had been inserted into the database. + * To avoid this we will manually commit the offsets only once the corresponding messages have been inserted into the + * database. This gives us exact control of when a message is considered consumed. This raises the opposite possibility: + * the process could fail in the interval after the insert into the database but before the commit (even though this + * would likely just be a few milliseconds, it is a possibility). In this case the process that took over consumption + * would consume from last committed offset and would repeat the insert of the last batch of data. Used in this way + * Kafka provides what is often called "at-least once delivery" guarantees, as each message will likely be delivered one + * time but in failure cases could be duplicated. + * + *
+ *     Properties props = new Properties();
+ *     props.put("bootstrap.servers", "localhost:9092");
+ *     props.put("group.id", "test");
+ *     props.put("enable.auto.commit", "false");
+ *     props.put("auto.commit.interval.ms", "1000");
+ *     props.put("session.timeout.ms", "30000");
+ *     props.put("key.deserializer", "org.apache.StringDeserializer");
+ *     props.put("value.deserializer", "org.apache.StringDeserializer");
+ *     KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
+ *     consumer.subscribe("foo", "bar");
+ *     int commitInterval = 200;
+ *     List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();
+ *     while (true) {
+ *         ConsumerRecords<String, String> records = consumer.poll(100);
+ *         for (ConsumerRecord<String, String> record : records) {
+ *             buffer.add(record);
+ *             if (buffer.size() >= commitInterval) {
+ *                 insertIntoDb(buffer);
+ *                 consumer.commit(CommitType.SYNC);
+ *                 buffer.clear();
+ *             }
+ *         }
+ *     }
+ * 
+ * + *

Subscribing To Specific Partitions

+ * + * In the previous examples we subscribed to the topics we were interested in and let Kafka give our particular process + * a fair share of the partitions for those topics. This provides a simple load balancing mechanism so multiple + * instances of our program can divided up the work of processing records. + *

+ * In this mode the consumer will just get the partitions it subscribes to and if the consumer instance fails no attempt + * will be made to rebalance partitions to other instances. + *

+ * There are several cases where this makes sense: + *

    + *
  • The first case is if the process is maintaining some kind of local state associated with that partition (like a + * local on-disk key-value store) and hence it should only get records for the partition it is maintaining on disk. + *
  • Another case is if the process itself is highly available and will be restarted if it fails (perhaps using a + * cluster management framework like YARN, Mesos, or AWS facilities, or as part of a stream processing framework). In + * this case there is no need for Kafka to detect the failure and reassign the partition, rather the consuming process + * will be restarted on another machine. + *
+ *

+ * This mode is easy to specify, rather than subscribing to the topic, the consumer just subscribes to particular + * partitions: + * + *

+ *     String topic = "foo";
+ *     TopicPartition partition0 = new TopicPartition(topic, 0);
+ *     TopicPartition partition1 = new TopicPartition(topic, 1);
+ *     consumer.subscribe(partition0);
+ *     consumer.subscribe(partition1);
+ * 
+ * + * The group that the consumer specifies is still used for committing offsets, but now the set of partitions will only + * be changed if the consumer specifies new partitions, and no attempt at failure detection will be made. + *

+ * It isn't possible to mix both subscription to specific partitions (with no load balancing) and to topics (with load + * balancing) using the same consumer instance. + * + *

Managing Your Own Offsets

+ * + * The consumer application need not use Kafka's built-in offset storage, it can store offsets in a store of it's own + * choosing. The primary use case for this is allowing the application to store both the offset and the results of the + * consumption in the same system in a way that both the results and offsets are stored atomically. This is not always + * possible, but when it is it will make the consumption fully atomic and give "exactly once" semantics that are + * stronger than the default "at-least once" semantics you get with Kafka's offset commit functionality. + *

+ * Here are a couple of examples of this type of usage: + *

    + *
  • If the results of the consumption are being stored in a relational database, storing the offset in the database + * as well can allow committing both the results and offset in a single transaction. Thus either the transaction will + * succeed and the offset will be updated based on what was consumed or the result will not be stored and the offset + * won't be updated. + *
  • If the results are being stored in a local store it may be possible to store the offset there as well. For + * example a search index could be built by subscribing to a particular partition and storing both the offset and the + * indexed data together. If this is done in a way that is atomic, it is often possible to have it be the case that even + * if a crash occurs that causes unsync'd data to be lost, whatever is left has the corresponding offset stored as well. + * This means that in this case the indexing process that comes back having lost recent updates just resumes indexing + * from what it has ensuring that no updates are lost. + *
+ * + * Each record comes with it's own offset, so to manage your own offset you just need to do the following: + *
    + *
  1. Configure enable.auto.commit=false + *
  2. Use the offset provided with each {@link ConsumerRecord} to save your position. + *
  3. On restart restore the position of the consumer using {@link #seek(TopicPartition, long)}. + *
+ * + * This type of usage is simplest when the partition assignment is also done manually (this would be likely in the + * search index use case described above). If the partition assignment is done automatically special care will also be + * needed to handle the case where partition assignments change. This can be handled using a special callback specified + * using rebalance.callback.class, which specifies an implementation of the interface + * {@link ConsumerRebalanceCallback}. When partitions are taken from a consumer the consumer will want to commit its + * offset for those partitions by implementing + * {@link ConsumerRebalanceCallback#onPartitionsRevoked(Consumer, Collection)}. When partitions are assigned to a + * consumer, the consumer will want to look up the offset for those new partitions an correctly initialize the consumer + * to that position by implementing {@link ConsumerRebalanceCallback#onPartitionsAssigned(Consumer, Collection)}. + *

+ * Another common use for {@link ConsumerRebalanceCallback} is to flush any caches the application maintains for + * partitions that are moved elsewhere. + * + *

Controlling The Consumer's Position

+ * + * In most use cases the consumer will simply consume records from beginning to end, periodically committing it's + * position (either automatically or manually). However Kafka allows the consumer to manually control it's position, + * moving forward or backwards in a partition at will. This means a consumer can re-consume older records, or skip to + * the most recent records without actually consuming the intermediate records. + *

+ * There are several instances where manually controlling the consumer's position can be useful. + *

+ * One case is for time-sensitive record processing it may make sense for a consumer that falls far enough behind to not + * attempt to catch up processing all records, but rather just skip to the most recent records. + *

+ * Another use case is for a system that maintains local state as described in the previous section. In such a system + * the consumer will want to initialize it's position on start-up to whatever is contained in the local store. Likewise + * if the local state is destroyed (say because the disk is lost) the state may be recreated on a new machine by + * reconsuming all the data and recreating the state (assuming that Kafka is retaining sufficient history). + * + * Kafka allows specifying the position using {@link #seek(TopicPartition, long)} to specify the new position. Special + * methods for seeking to the earliest and latest offset the server maintains are also available ( + * {@link #seekToBeginning(TopicPartition...)} and {@link #seekToEnd(TopicPartition...)} respectively). + * + * + *

Multi-threaded Processing

+ * + * The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application + * making the call. It is the responsibility of the user to ensure that multi-threaded access + * is properly synchronized. Un-synchronized access will result in {@link ConcurrentModificationException}. + * + *

+ * The only exception to this rule is {@link #wakeup()}, which can safely be used from an external thread to + * interrupt an active operation. In this case, a {@link ConsumerWakeupException} will be thrown from the thread + * blocking on the operation. This can be used to shutdown the consumer from another thread. The following + * snippet shows the typical pattern: + * + *

+ * public class KafkaConsumerRunner implements Runnable {
+ *     private final AtomicBoolean closed = new AtomicBoolean(false);
+ *     private final KafkaConsumer consumer;
+ *
+ *     public void run() {
+ *         try {
+ *             consumer.subscribe("topic");
+ *             while (!closed.get()) {
+ *                 ConsumerRecords records = consumer.poll(10000);
+ *                 // Handle new records
+ *             }
+ *         } catch (ConsumerWakeupException e) {
+ *             // Ignore exception if closing
+ *             if (!closed.get()) throw e;
+ *         } finally {
+ *             consumer.close();
+ *         }
+ *     }
+ *
+ *     // Shutdown hook which can be called from a separate thread
+ *     public void shutdown() {
+ *         closed.set(true);
+ *         consumer.wakeup();
+ *     }
+ * }
+ * 
+ * + * Then in a separate thread, the consumer can be shutdown by setting the closed flag and waking up the consumer. + * + *
+ *     closed.set(true);
+ *     consumer.wakeup();
+ * 
+ * + *

+ * We have intentionally avoided implementing a particular threading model for processing. This leaves several + * options for implementing multi-threaded processing of records. + * + * + *

1. One Consumer Per Thread

+ * + * A simple option is to give each thread it's own consumer instance. Here are the pros and cons of this approach: + *
    + *
  • PRO: It is the easiest to implement + *
  • PRO: It is often the fastest as no inter-thread co-ordination is needed + *
  • PRO: It makes in-order processing on a per-partition basis very easy to implement (each thread just + * processes messages in the order it receives them). + *
  • CON: More consumers means more TCP connections to the cluster (one per thread). In general Kafka handles + * connections very efficiently so this is generally a small cost. + *
  • CON: Multiple consumers means more requests being sent to the server and slightly less batching of data + * which can cause some drop in I/O throughput. + *
  • CON: The number of total threads across all processes will be limited by the total number of partitions. + *
+ * + *

2. Decouple Consumption and Processing

+ * + * Another alternative is to have one or more consumer threads that do all data consumption and hands off + * {@link ConsumerRecords} instances to a blocking queue consumed by a pool of processor threads that actually handle + * the record processing. + * + * This option likewise has pros and cons: + *
    + *
  • PRO: This option allows independently scaling the number of consumers and processors. This makes it + * possible to have a single consumer that feeds many processor threads, avoiding any limitation on partitions. + *
  • CON: Guaranteeing order across the processors requires particular care as the threads will execute + * independently an earlier chunk of data may actually be processed after a later chunk of data just due to the luck of + * thread execution timing. For processing that has no ordering requirements this is not a problem. + *
  • CON: Manually committing the position becomes harder as it requires that all threads co-ordinate to ensure + * that processing is complete for that partition. + *
+ * + * There are many possible variations on this approach. For example each processor thread can have it's own queue, and + * the consumer threads can hash into these queues using the TopicPartition to ensure in-order consumption and simplify + * commit. + * + */ +public class KafkaConsumer implements Consumer { + + private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class); + private static final long NO_CURRENT_THREAD = -1L; + private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1); + + private final Coordinator coordinator; + private final Deserializer keyDeserializer; + private final Deserializer valueDeserializer; + private final Fetcher fetcher; + + private final Time time; + private final ConsumerNetworkClient client; + private final Metrics metrics; + private final SubscriptionState subscriptions; + private final Metadata metadata; + private final long retryBackoffMs; + private final boolean autoCommit; + private final long autoCommitIntervalMs; + private boolean closed = false; + + // currentThread holds the threadId of the current thread accessing KafkaConsumer + // and is used to prevent multi-threaded access + private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD); + // refcount is used to allow reentrant access by the thread who has acquired currentThread + private final AtomicInteger refcount = new AtomicInteger(0); + + // TODO: This timeout controls how long we should wait before retrying a request. We should be able + // to leverage the work of KAFKA-2120 to get this value from configuration. + private long requestTimeoutMs = 5000L; + + /** + * A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings + * are documented here. Values can be + * either strings or objects of the appropriate type (for example a numeric configuration would accept either the + * string "42" or the integer 42). + *

+ * Valid configuration strings are documented at {@link ConsumerConfig} + * + * @param configs The consumer configs + */ + public KafkaConsumer(Map configs) { + this(configs, null, null, null); + } + + /** + * A consumer is instantiated by providing a set of key-value pairs as configuration, a + * {@link ConsumerRebalanceCallback} implementation, a key and a value {@link Deserializer}. + *

+ * Valid configuration strings are documented at {@link ConsumerConfig} + * + * @param configs The consumer configs + * @param callback A callback interface that the user can implement to manage customized offsets on the start and + * end of every rebalance operation. + * @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method + * won't be called in the consumer when the deserializer is passed in directly. + * @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method + * won't be called in the consumer when the deserializer is passed in directly. + */ + public KafkaConsumer(Map configs, + ConsumerRebalanceCallback callback, + Deserializer keyDeserializer, + Deserializer valueDeserializer) { + this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(configs, keyDeserializer, valueDeserializer)), + callback, + keyDeserializer, + valueDeserializer); + } + + /** + * A consumer is instantiated by providing a {@link Properties} object as configuration. Valid + * configuration strings are documented at {@link ConsumerConfig} A consumer is instantiated by providing a + * {@link Properties} object as configuration. Valid configuration strings are documented at + * {@link ConsumerConfig} + */ + public KafkaConsumer(Properties properties) { + this(properties, null, null, null); + } + + /** + * A consumer is instantiated by providing a {@link Properties} object as configuration and a + * {@link ConsumerRebalanceCallback} implementation, a key and a value {@link Deserializer}. + *

+ * Valid configuration strings are documented at {@link ConsumerConfig} + * + * @param properties The consumer configuration properties + * @param callback A callback interface that the user can implement to manage customized offsets on the start and + * end of every rebalance operation. + * @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method + * won't be called in the consumer when the deserializer is passed in directly. + * @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method + * won't be called in the consumer when the deserializer is passed in directly. + */ + public KafkaConsumer(Properties properties, + ConsumerRebalanceCallback callback, + Deserializer keyDeserializer, + Deserializer valueDeserializer) { + this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(properties, keyDeserializer, valueDeserializer)), + callback, + keyDeserializer, + valueDeserializer); + } + + @SuppressWarnings("unchecked") + private KafkaConsumer(ConsumerConfig config, + ConsumerRebalanceCallback callback, + Deserializer keyDeserializer, + Deserializer valueDeserializer) { + try { + log.debug("Starting the Kafka consumer"); + if (callback == null) + callback = config.getConfiguredInstance(ConsumerConfig.CONSUMER_REBALANCE_CALLBACK_CLASS_CONFIG, + ConsumerRebalanceCallback.class); + this.time = new SystemTime(); + this.autoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); + this.autoCommitIntervalMs = config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG); + + MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG)) + .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), + TimeUnit.MILLISECONDS); + String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG); + String jmxPrefix = "kafka.consumer"; + if (clientId.length() <= 0) + clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement(); + List reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, + MetricsReporter.class); + reporters.add(new JmxReporter(jmxPrefix)); + this.metrics = new Metrics(metricConfig, reporters, time); + this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); + this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG)); + List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); + this.metadata.update(Cluster.bootstrap(addresses), 0); + + String metricGrpPrefix = "consumer"; + Map metricsTags = new LinkedHashMap(); + metricsTags.put("client-id", clientId); + NetworkClient netClient = new NetworkClient( + new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, metricsTags), + this.metadata, + clientId, + 100, // a fixed large enough value will suffice + config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG), + config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG), + config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG)); + this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs); + OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase()); + this.subscriptions = new SubscriptionState(offsetResetStrategy); + this.coordinator = new Coordinator(this.client, + config.getString(ConsumerConfig.GROUP_ID_CONFIG), + config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), + config.getString(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG), + this.subscriptions, + metrics, + metricGrpPrefix, + metricsTags, + this.time, + requestTimeoutMs, + retryBackoffMs, + wrapRebalanceCallback(callback)); + if (keyDeserializer == null) { + this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + Deserializer.class); + this.keyDeserializer.configure(config.originals(), true); + } else { + this.keyDeserializer = keyDeserializer; + } + if (valueDeserializer == null) { + this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + Deserializer.class); + this.valueDeserializer.configure(config.originals(), false); + } else { + this.valueDeserializer = valueDeserializer; + } + this.fetcher = new Fetcher(this.client, + config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG), + config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG), + config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG), + config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG), + this.keyDeserializer, + this.valueDeserializer, + this.metadata, + this.subscriptions, + metrics, + metricGrpPrefix, + metricsTags, + this.time, + this.retryBackoffMs); + + config.logUnused(); + + if (autoCommit) + scheduleAutoCommitTask(autoCommitIntervalMs); + + log.debug("Kafka consumer created"); + } catch (Throwable t) { + // call close methods if internal objects are already constructed + // this is to prevent resource leak. see KAFKA-2121 + close(true); + // now propagate the exception + throw new KafkaException("Failed to construct kafka consumer", t); + } + } + + /** + * The set of partitions currently assigned to this consumer. If subscription happened by directly subscribing to + * partitions using {@link #subscribe(TopicPartition...)} then this will simply return the list of partitions that + * were subscribed to. If subscription was done by specifying only the topic using {@link #subscribe(String...)} + * then this will give the set of topics currently assigned to the consumer (which may be none if the assignment + * hasn't happened yet, or the partitions are in the process of getting reassigned). + */ + public Set subscriptions() { + acquire(); + try { + return Collections.unmodifiableSet(this.subscriptions.assignedPartitions()); + } finally { + release(); + } + } + + /** + * Incrementally subscribes to the given list of topics and uses the consumer's group management functionality + *

+ * As part of group management, the consumer will keep track of the list of consumers that belong to a particular + * group and will trigger a rebalance operation if one of the following events trigger - + *

    + *
  • Number of partitions change for any of the subscribed list of topics + *
  • Topic is created or deleted + *
  • An existing member of the consumer group dies + *
  • A new member is added to an existing consumer group via the join API + *
+ * + * @param topics A variable list of topics that the consumer wants to subscribe to + */ + @Override + public void subscribe(String... topics) { + acquire(); + try { + log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", ")); + for (String topic : topics) + this.subscriptions.subscribe(topic); + metadata.addTopics(topics); + } finally { + release(); + } + } + + /** + * Incrementally subscribes to a specific topic partition and does not use the consumer's group management + * functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic + * metadata change. + *

+ * + * @param partitions Partitions to incrementally subscribe to + */ + @Override + public void subscribe(TopicPartition... partitions) { + acquire(); + try { + log.debug("Subscribed to partitions(s): {}", Utils.join(partitions, ", ")); + for (TopicPartition tp : partitions) { + this.subscriptions.subscribe(tp); + metadata.addTopics(tp.topic()); + } + } finally { + release(); + } + } + + /** + * Unsubscribe from the specific topics. This will trigger a rebalance operation and records for this topic will not + * be returned from the next {@link #poll(long) poll()} onwards + * + * @param topics Topics to unsubscribe from + */ + public void unsubscribe(String... topics) { + acquire(); + try { + log.debug("Unsubscribed from topic(s): {}", Utils.join(topics, ", ")); + // throw an exception if the topic was never subscribed to + for (String topic : topics) + this.subscriptions.unsubscribe(topic); + } finally { + release(); + } + } + + /** + * Unsubscribe from the specific topic partitions. records for these partitions will not be returned from the next + * {@link #poll(long) poll()} onwards + * + * @param partitions Partitions to unsubscribe from + */ + public void unsubscribe(TopicPartition... partitions) { + acquire(); + try { + log.debug("Unsubscribed from partitions(s): {}", Utils.join(partitions, ", ")); + // throw an exception if the partition was never subscribed to + for (TopicPartition partition : partitions) + this.subscriptions.unsubscribe(partition); + } finally { + release(); + } + } + + /** + * Fetches data for the topics or partitions specified using one of the subscribe APIs. It is an error to not have + * subscribed to any topics or partitions before polling for data. + *

+ * The offset used for fetching the data is governed by whether or not {@link #seek(TopicPartition, long)} is used. + * If {@link #seek(TopicPartition, long)} is used, it will use the specified offsets on startup and on every + * rebalance, to consume data from that offset sequentially on every poll. If not, it will use the last checkpointed + * offset using {@link #commit(Map, CommitType) commit(offsets, sync)} for the subscribed list of partitions. + * + * @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, returns + * immediately with any records available now. Must not be negative. + * @return map of topic to records since the last fetch for the subscribed list of topics and partitions + * + * @throws NoOffsetForPartitionException If there is no stored offset for a subscribed partition and no automatic + * offset reset policy has been configured. + */ + @Override + public ConsumerRecords poll(long timeout) { + acquire(); + try { + if (timeout < 0) + throw new IllegalArgumentException("Timeout must not be negative"); + + // poll for new data until the timeout expires + long remaining = timeout; + while (remaining >= 0) { + long start = time.milliseconds(); + Map>> records = pollOnce(remaining); + long end = time.milliseconds(); + + if (!records.isEmpty()) { + // if data is available, then return it, but first send off the + // next round of fetches to enable pipelining while the user is + // handling the fetched records. + fetcher.initFetches(metadata.fetch()); + client.poll(0); + return new ConsumerRecords(records); + } + + remaining -= end - start; + + // nothing was available, so we should backoff before retrying + if (remaining > 0) { + Utils.sleep(min(remaining, retryBackoffMs)); + remaining -= time.milliseconds() - end; + } + } + + return ConsumerRecords.empty(); + } finally { + release(); + } + } + + /** + * Do one round of polling. In addition to checking for new data, this does any needed + * heart-beating, auto-commits, and offset updates. + * @param timeout The maximum time to block in the underlying poll + * @return The fetched records (may be empty) + */ + private Map>> pollOnce(long timeout) { + // TODO: Sub-requests should take into account the poll timeout (KAFKA-1894) + coordinator.ensureCoordinatorKnown(); + + // ensure we have partitions assigned if we expect to + if (subscriptions.partitionsAutoAssigned()) + coordinator.ensurePartitionAssignment(); + + // fetch positions if we have partitions we're subscribed to that we + // don't know the offset for + if (!subscriptions.hasAllFetchPositions()) + updateFetchPositions(this.subscriptions.missingFetchPositions()); + + // init any new fetches (won't resend pending fetches) + Cluster cluster = this.metadata.fetch(); + fetcher.initFetches(cluster); + client.poll(timeout); + return fetcher.fetchedRecords(); + } + + private void scheduleAutoCommitTask(final long interval) { + DelayedTask task = new DelayedTask() { + public void run(long now) { + commit(CommitType.ASYNC); + client.schedule(this, now + interval); + } + }; + client.schedule(task, time.milliseconds() + interval); + } + + /** + * Commits the specified offsets for the specified list of topics and partitions to Kafka. + *

+ * This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every + * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API + * should not be used. + *

+ * Asynchronous commits (i.e. {@link CommitType#ASYNC} will not block. Any errors encountered during an asynchronous + * commit are silently discarded. If you need to determine the result of an asynchronous commit, you should use + * {@link #commit(Map, CommitType, ConsumerCommitCallback)}. Synchronous commits (i.e. {@link CommitType#SYNC}) + * block until either the commit succeeds or an unrecoverable error is encountered (in which case it is thrown + * to the caller). + * + * @param offsets The list of offsets per partition that should be committed to Kafka. + * @param commitType Control whether the commit is blocking + */ + @Override + public void commit(final Map offsets, CommitType commitType) { + commit(offsets, commitType, null); + } + + /** + * Commits the specified offsets for the specified list of topics and partitions to Kafka. + *

+ * This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every + * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API + * should not be used. + *

+ * Asynchronous commits (i.e. {@link CommitType#ASYNC} will not block. Any errors encountered during an asynchronous + * commit are either passed to the callback (if provided) or silently discarded. Synchronous commits (i.e. + * {@link CommitType#SYNC}) block until either the commit succeeds or an unrecoverable error is encountered. In + * this case, the error is either passed to the callback (if provided) or thrown to the caller. + * + * @param offsets The list of offsets per partition that should be committed to Kafka. + * @param commitType Control whether the commit is blocking + * @param callback Callback to invoke when the commit completes + */ + @Override + public void commit(final Map offsets, CommitType commitType, ConsumerCommitCallback callback) { + acquire(); + try { + log.debug("Committing offsets ({}): {} ", commitType.toString().toLowerCase(), offsets); + coordinator.commitOffsets(offsets, commitType, callback); + } finally { + release(); + } + } + + /** + * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions. + *

+ * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after + * every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API + * should not be used. + *

+ * Asynchronous commits (i.e. {@link CommitType#ASYNC} will not block. Any errors encountered during an asynchronous + * commit are either passed to the callback (if provided) or silently discarded. Synchronous commits (i.e. + * {@link CommitType#SYNC}) block until either the commit succeeds or an unrecoverable error is encountered. In + * this case, the error is either passed to the callback (if provided) or thrown to the caller. + * + * @param commitType Whether or not the commit should block until it is acknowledged. + * @param callback Callback to invoke when the commit completes + */ + @Override + public void commit(CommitType commitType, ConsumerCommitCallback callback) { + acquire(); + try { + // need defensive copy to ensure offsets are not removed before completion (e.g. in rebalance) + Map allConsumed = new HashMap(this.subscriptions.allConsumed()); + commit(allConsumed, commitType, callback); + } finally { + release(); + } + } + + /** + * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions. + *

+ * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after + * every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API + * should not be used. + *

+ * Asynchronous commits (i.e. {@link CommitType#ASYNC} will not block. Any errors encountered during an asynchronous + * commit are silently discarded. If you need to determine the result of an asynchronous commit, you should use + * {@link #commit(CommitType, ConsumerCommitCallback)}. Synchronous commits (i.e. {@link CommitType#SYNC}) + * block until either the commit succeeds or an unrecoverable error is encountered (in which case it is thrown + * to the caller). + * + * @param commitType Whether or not the commit should block until it is acknowledged. + */ + @Override + public void commit(CommitType commitType) { + commit(commitType, null); + } + + /** + * Overrides the fetch offsets that the consumer will use on the next {@link #poll(long) poll(timeout)}. If this API + * is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that + * you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets + */ + @Override + public void seek(TopicPartition partition, long offset) { + acquire(); + try { + log.debug("Seeking to offset {} for partition {}", offset, partition); + this.subscriptions.seek(partition, offset); + } finally { + release(); + } + } + + /** + * Seek to the first offset for each of the given partitions + */ + public void seekToBeginning(TopicPartition... partitions) { + acquire(); + try { + Collection parts = partitions.length == 0 ? this.subscriptions.assignedPartitions() + : Arrays.asList(partitions); + for (TopicPartition tp : parts) { + log.debug("Seeking to beginning of partition {}", tp); + subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST); + } + } finally { + release(); + } + } + + /** + * Seek to the last offset for each of the given partitions + */ + public void seekToEnd(TopicPartition... partitions) { + acquire(); + try { + Collection parts = partitions.length == 0 ? this.subscriptions.assignedPartitions() + : Arrays.asList(partitions); + for (TopicPartition tp : parts) { + log.debug("Seeking to end of partition {}", tp); + subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST); + } + } finally { + release(); + } + } + + /** + * Returns the offset of the next record that will be fetched (if a record with that offset exists). + * + * @param partition The partition to get the position for + * @return The offset + * @throws NoOffsetForPartitionException If a position hasn't been set for a given partition, and no reset policy is + * available. + */ + public long position(TopicPartition partition) { + acquire(); + try { + if (!this.subscriptions.assignedPartitions().contains(partition)) + throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer."); + Long offset = this.subscriptions.consumed(partition); + if (offset == null) { + updateFetchPositions(Collections.singleton(partition)); + return this.subscriptions.consumed(partition); + } else { + return offset; + } + } finally { + release(); + } + } + + /** + * Fetches the last committed offset for the given partition (whether the commit happened by this process or + * another). This offset will be used as the position for the consumer in the event of a failure. + *

+ * This call may block to do a remote call if the partition in question isn't assigned to this consumer or if the + * consumer hasn't yet initialized it's cache of committed offsets. + * + * @param partition The partition to check + * @return The last committed offset or null if no offset has been committed + * @throws NoOffsetForPartitionException If no offset has ever been committed by any process for the given + * partition. + */ + @Override + public long committed(TopicPartition partition) { + acquire(); + try { + Long committed; + if (subscriptions.assignedPartitions().contains(partition)) { + committed = this.subscriptions.committed(partition); + if (committed == null) { + coordinator.refreshCommittedOffsetsIfNeeded(); + committed = this.subscriptions.committed(partition); + } + } else { + Map offsets = coordinator.fetchCommittedOffsets(Collections.singleton(partition)); + committed = offsets.get(partition); + } + + if (committed == null) + throw new NoOffsetForPartitionException("No offset has been committed for partition " + partition); + + return committed; + } finally { + release(); + } + } + + /** + * Get the metrics kept by the consumer + */ + @Override + public Map metrics() { + return Collections.unmodifiableMap(this.metrics.metrics()); + } + + /** + * Get metadata about the partitions for a given topic. This method will issue a remote call to the server if it + * does not already have any metadata about the given topic. + * + * @param topic The topic to get partition metadata for + * @return The list of partitions + */ + @Override + public List partitionsFor(String topic) { + acquire(); + try { + Cluster cluster = this.metadata.fetch(); + List parts = cluster.partitionsForTopic(topic); + if (parts == null) { + metadata.add(topic); + client.awaitMetadataUpdate(); + parts = metadata.fetch().partitionsForTopic(topic); + } + return parts; + } finally { + release(); + } + } + + @Override + public void close() { + acquire(); + try { + if (closed) return; + close(false); + } finally { + release(); + } + } + + /** + * Wakeup the consumer. This method is thread-safe and is useful in particular to abort a long poll. + * The thread which is blocking in an operation will throw {@link ConsumerWakeupException}. + */ + @Override + public void wakeup() { + this.client.wakeup(); + } + + private void close(boolean swallowException) { + log.trace("Closing the Kafka consumer."); + AtomicReference firstException = new AtomicReference(); + this.closed = true; + ClientUtils.closeQuietly(metrics, "consumer metrics", firstException); + ClientUtils.closeQuietly(client, "consumer network client", firstException); + ClientUtils.closeQuietly(keyDeserializer, "consumer key deserializer", firstException); + ClientUtils.closeQuietly(valueDeserializer, "consumer value deserializer", firstException); + log.debug("The Kafka consumer has closed."); + if (firstException.get() != null && !swallowException) { + throw new KafkaException("Failed to close kafka consumer", firstException.get()); + } + } + + private Coordinator.RebalanceCallback wrapRebalanceCallback(final ConsumerRebalanceCallback callback) { + return new Coordinator.RebalanceCallback() { + @Override + public void onPartitionsAssigned(Collection partitions) { + callback.onPartitionsAssigned(KafkaConsumer.this, partitions); + } + + @Override + public void onPartitionsRevoked(Collection partitions) { + callback.onPartitionsRevoked(KafkaConsumer.this, partitions); + } + }; + } + + /** + * Set the fetch position to the committed position (if there is one) + * or reset it using the offset reset policy the user has configured. + * + * @param partitions The partitions that needs updating fetch positions + * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is + * defined + */ + private void updateFetchPositions(Set partitions) { + // refresh commits for all assigned partitions + coordinator.refreshCommittedOffsetsIfNeeded(); + + // then do any offset lookups in case some positions are not known + fetcher.updateFetchPositions(partitions); + } + + /* + * Check that the consumer hasn't been closed. + */ + private void ensureNotClosed() { + if (this.closed) + throw new IllegalStateException("This consumer has already been closed."); + } + + /** + * Acquire the light lock protecting this consumer from multi-threaded access. Instead of blocking + * when the lock is not available, however, we just throw an exception (since multi-threaded usage is not + * supported). + * @throws IllegalStateException if the consumer has been closed + * @throws ConcurrentModificationException if another thread already has the lock + */ + private void acquire() { + ensureNotClosed(); + long threadId = Thread.currentThread().getId(); + if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId)) + throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access"); + refcount.incrementAndGet(); + } + + /** + * Release the light lock protecting the consumer from multi-threaded access. + */ + private void release() { + if (refcount.decrementAndGet() == 0) + currentThread.set(NO_CURRENT_THREAD); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/MockConsumer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/MockConsumer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/MockConsumer.java new file mode 100644 index 0000000..1d08519 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/MockConsumer.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.flink.kafka_backport.clients.consumer; + +import org.apache.flink.kafka_backport.clients.consumer.internals.SubscriptionState; +import org.apache.flink.kafka_backport.common.Metric; +import org.apache.flink.kafka_backport.common.MetricName; +import org.apache.flink.kafka_backport.common.PartitionInfo; +import org.apache.flink.kafka_backport.common.TopicPartition; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +// ---------------------------------------------------------------------------- +// This class is copied from the Apache Kafka project. +// +// The class is part of a "backport" of the new consumer API, in order to +// give Flink access to its functionality until the API is properly released. +// +// This is a temporary workaround! +// ---------------------------------------------------------------------------- + +/** + * A mock of the {@link Consumer} interface you can use for testing code that uses Kafka. This class is not + * threadsafe + *

+ * The consumer runs in the user thread and multiplexes I/O over TCP connections to each of the brokers it needs to + * communicate with. Failure to close the consumer after use will leak these resources. + */ +public class MockConsumer implements Consumer { + + private final Map> partitions; + private final SubscriptionState subscriptions; + private Map>> records; + private boolean closed; + + public MockConsumer(OffsetResetStrategy offsetResetStrategy) { + this.subscriptions = new SubscriptionState(offsetResetStrategy); + this.partitions = new HashMap>(); + this.records = new HashMap>>(); + this.closed = false; + } + + @Override + public synchronized Set subscriptions() { + return this.subscriptions.assignedPartitions(); + } + + @Override + public synchronized void subscribe(String... topics) { + ensureNotClosed(); + for (String topic : topics) + this.subscriptions.subscribe(topic); + } + + @Override + public synchronized void subscribe(TopicPartition... partitions) { + ensureNotClosed(); + for (TopicPartition partition : partitions) + this.subscriptions.subscribe(partition); + } + + public synchronized void unsubscribe(String... topics) { + ensureNotClosed(); + for (String topic : topics) + this.subscriptions.unsubscribe(topic); + } + + public synchronized void unsubscribe(TopicPartition... partitions) { + ensureNotClosed(); + for (TopicPartition partition : partitions) + this.subscriptions.unsubscribe(partition); + } + + @Override + public synchronized ConsumerRecords poll(long timeout) { + ensureNotClosed(); + // update the consumed offset + for (Entry>> entry : this.records.entrySet()) { + List> recs = entry.getValue(); + if (!recs.isEmpty()) + this.subscriptions.consumed(entry.getKey(), recs.get(recs.size() - 1).offset()); + } + + ConsumerRecords copy = new ConsumerRecords(this.records); + this.records = new HashMap>>(); + return copy; + } + + public synchronized void addRecord(ConsumerRecord record) { + ensureNotClosed(); + TopicPartition tp = new TopicPartition(record.topic(), record.partition()); + this.subscriptions.assignedPartitions().add(tp); + List> recs = this.records.get(tp); + if (recs == null) { + recs = new ArrayList>(); + this.records.put(tp, recs); + } + recs.add(record); + } + + @Override + public synchronized void commit(Map offsets, CommitType commitType, ConsumerCommitCallback callback) { + ensureNotClosed(); + for (Entry entry : offsets.entrySet()) + subscriptions.committed(entry.getKey(), entry.getValue()); + if (callback != null) { + callback.onComplete(offsets, null); + } + } + + @Override + public synchronized void commit(Map offsets, CommitType commitType) { + commit(offsets, commitType, null); + } + + @Override + public synchronized void commit(CommitType commitType, ConsumerCommitCallback callback) { + ensureNotClosed(); + commit(this.subscriptions.allConsumed(), commitType, callback); + } + + @Override + public synchronized void commit(CommitType commitType) { + commit(commitType, null); + } + + @Override + public synchronized void seek(TopicPartition partition, long offset) { + ensureNotClosed(); + subscriptions.seek(partition, offset); + } + + @Override + public synchronized long committed(TopicPartition partition) { + ensureNotClosed(); + return subscriptions.committed(partition); + } + + @Override + public synchronized long position(TopicPartition partition) { + ensureNotClosed(); + return subscriptions.consumed(partition); + } + + @Override + public synchronized void seekToBeginning(TopicPartition... partitions) { + ensureNotClosed(); + throw new UnsupportedOperationException(); + } + + @Override + public synchronized void seekToEnd(TopicPartition... partitions) { + ensureNotClosed(); + throw new UnsupportedOperationException(); + } + + @Override + public Map metrics() { + ensureNotClosed(); + return Collections.emptyMap(); + } + + @Override + public synchronized List partitionsFor(String topic) { + ensureNotClosed(); + List parts = this.partitions.get(topic); + if (parts == null) + return Collections.emptyList(); + else + return parts; + } + + public synchronized void updatePartitions(String topic, List partitions) { + ensureNotClosed(); + this.partitions.put(topic, partitions); + } + + @Override + public synchronized void close() { + ensureNotClosed(); + this.closed = true; + } + + @Override + public void wakeup() { + + } + + private void ensureNotClosed() { + if (this.closed) + throw new IllegalStateException("This consumer has already been closed."); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/NoOffsetForPartitionException.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/NoOffsetForPartitionException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/NoOffsetForPartitionException.java new file mode 100644 index 0000000..19ae0a6 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/NoOffsetForPartitionException.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.flink.kafka_backport.clients.consumer; + +import org.apache.flink.kafka_backport.common.KafkaException; + +// ---------------------------------------------------------------------------- +// This class is copied from the Apache Kafka project. +// +// The class is part of a "backport" of the new consumer API, in order to +// give Flink access to its functionality until the API is properly released. +// +// This is a temporary workaround! +// ---------------------------------------------------------------------------- + +/** + * Indicates that there is no stored offset and no defined offset reset policy + */ +public class NoOffsetForPartitionException extends KafkaException { + + private static final long serialVersionUID = 1L; + + public NoOffsetForPartitionException(String message) { + super(message); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/OffsetResetStrategy.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/OffsetResetStrategy.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/OffsetResetStrategy.java new file mode 100644 index 0000000..70c254f --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/OffsetResetStrategy.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.flink.kafka_backport.clients.consumer; + +// ---------------------------------------------------------------------------- +// This class is copied from the Apache Kafka project. +// +// The class is part of a "backport" of the new consumer API, in order to +// give Flink access to its functionality until the API is properly released. +// +// This is a temporary workaround! +// ---------------------------------------------------------------------------- + +public enum OffsetResetStrategy { + LATEST, EARLIEST, NONE +} http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/ConsumerNetworkClient.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/ConsumerNetworkClient.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/ConsumerNetworkClient.java new file mode 100644 index 0000000..a6d16cd --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/ConsumerNetworkClient.java @@ -0,0 +1,296 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.flink.kafka_backport.clients.consumer.internals; + +import org.apache.flink.kafka_backport.clients.ClientResponse; +import org.apache.flink.kafka_backport.clients.Metadata; +import org.apache.flink.kafka_backport.clients.consumer.ConsumerWakeupException; +import org.apache.flink.kafka_backport.common.Node; +import org.apache.flink.kafka_backport.common.requests.AbstractRequest; +import org.apache.flink.kafka_backport.common.requests.RequestHeader; +import org.apache.flink.kafka_backport.common.requests.RequestSend; +import org.apache.flink.kafka_backport.clients.ClientRequest; +import org.apache.flink.kafka_backport.clients.KafkaClient; +import org.apache.flink.kafka_backport.clients.RequestCompletionHandler; +import org.apache.flink.kafka_backport.common.protocol.ApiKeys; +import org.apache.flink.kafka_backport.common.utils.Time; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Higher level consumer access to the network layer with basic support for futures and + * task scheduling. NOT thread-safe! + * + * TODO: The current implementation is simplistic in that it provides a facility for queueing requests + * prior to delivery, but it makes no effort to retry requests which cannot be sent at the time + * {@link #poll(long)} is called. This makes the behavior of the queue predictable and easy to + * understand, but there are opportunities to provide timeout or retry capabilities in the future. + * How we do this may depend on KAFKA-2120, so for now, we retain the simplistic behavior. + */ +public class ConsumerNetworkClient implements Closeable { + private final KafkaClient client; + private final AtomicBoolean wakeup = new AtomicBoolean(false); + private final DelayedTaskQueue delayedTasks = new DelayedTaskQueue(); + private final Map> unsent = new HashMap>(); + private final Metadata metadata; + private final Time time; + private final long retryBackoffMs; + + public ConsumerNetworkClient(KafkaClient client, + Metadata metadata, + Time time, + long retryBackoffMs) { + this.client = client; + this.metadata = metadata; + this.time = time; + this.retryBackoffMs = retryBackoffMs; + } + + /** + * Schedule a new task to be executed at the given time. This is "best-effort" scheduling and + * should only be used for coarse synchronization. + * @param task The task to be scheduled + * @param at The time it should run + */ + public void schedule(DelayedTask task, long at) { + delayedTasks.add(task, at); + } + + /** + * Unschedule a task. This will remove all instances of the task from the task queue. + * This is a no-op if the task is not scheduled. + * @param task The task to be unscheduled. + */ + public void unschedule(DelayedTask task) { + delayedTasks.remove(task); + } + + /** + * Send a new request. Note that the request is not actually transmitted on the + * network until one of the {@link #poll(long)} variants is invoked. At this + * point the request will either be transmitted successfully or will fail. + * Use the returned future to obtain the result of the send. + * @param node The destination of the request + * @param api The Kafka API call + * @param request The request payload + * @return A future which indicates the result of the send. + */ + public RequestFuture send(Node node, + ApiKeys api, + AbstractRequest request) { + long now = time.milliseconds(); + RequestFutureCompletionHandler future = new RequestFutureCompletionHandler(); + RequestHeader header = client.nextRequestHeader(api); + RequestSend send = new RequestSend(node.idString(), header, request.toStruct()); + put(node, new ClientRequest(now, true, send, future)); + return future; + } + + private void put(Node node, ClientRequest request) { + List nodeUnsent = unsent.get(node); + if (nodeUnsent == null) { + nodeUnsent = new ArrayList(); + unsent.put(node, nodeUnsent); + } + nodeUnsent.add(request); + } + + public Node leastLoadedNode() { + return client.leastLoadedNode(time.milliseconds()); + } + + /** + * Block until the metadata has been refreshed. + */ + public void awaitMetadataUpdate() { + int version = this.metadata.requestUpdate(); + do { + poll(Long.MAX_VALUE); + } while (this.metadata.version() == version); + } + + /** + * Wakeup an active poll. This will cause the polling thread to throw an exception either + * on the current poll if one is active, or the next poll. + */ + public void wakeup() { + this.wakeup.set(true); + this.client.wakeup(); + } + + /** + * Block indefinitely until the given request future has finished. + * @param future The request future to await. + * @throws org.apache.flink.kafka_backport.clients.consumer.ConsumerWakeupException if {@link #wakeup()} is called from another thread + */ + public void poll(RequestFuture future) { + while (!future.isDone()) + poll(Long.MAX_VALUE); + } + + /** + * Block until the provided request future request has finished or the timeout has expired. + * @param future The request future to wait for + * @param timeout The maximum duration (in ms) to wait for the request + * @return true if the future is done, false otherwise + * @throws org.apache.flink.kafka_backport.clients.consumer.ConsumerWakeupException if {@link #wakeup()} is called from another thread + */ + public boolean poll(RequestFuture future, long timeout) { + long now = time.milliseconds(); + long deadline = now + timeout; + while (!future.isDone() && now < deadline) { + poll(deadline - now, now); + now = time.milliseconds(); + } + return future.isDone(); + } + + /** + * Poll for any network IO. All send requests will either be transmitted on the network + * or failed when this call completes. + * @param timeout The maximum time to wait for an IO event. + * @throws org.apache.flink.kafka_backport.clients.consumer.ConsumerWakeupException if {@link #wakeup()} is called from another thread + */ + public void poll(long timeout) { + poll(timeout, time.milliseconds()); + } + + private void poll(long timeout, long now) { + // send all the requests we can send now + pollUnsentRequests(now); + + // ensure we don't poll any longer than the deadline for + // the next scheduled task + timeout = Math.min(timeout, delayedTasks.nextTimeout(now)); + clientPoll(timeout, now); + + // execute scheduled tasks + now = time.milliseconds(); + delayedTasks.poll(now); + + // try again to send requests since buffer space may have been + // cleared or a connect finished in the poll + pollUnsentRequests(now); + + // fail all requests that couldn't be sent + clearUnsentRequests(now); + + } + + /** + * Block until all pending requests from the given node have finished. + * @param node The node to await requests from + */ + public void awaitPendingRequests(Node node) { + while (pendingRequestCount(node) > 0) + poll(retryBackoffMs); + } + + /** + * Get the count of pending requests to the given node. This includes both request that + * have been transmitted (i.e. in-flight requests) and those which are awaiting transmission. + * @param node The node in question + * @return The number of pending requests + */ + public int pendingRequestCount(Node node) { + List pending = unsent.get(node); + int unsentCount = pending == null ? 0 : pending.size(); + return unsentCount + client.inFlightRequestCount(node.idString()); + } + + /** + * Get the total count of pending requests from all nodes. This includes both requests that + * have been transmitted (i.e. in-flight requests) and those which are awaiting transmission. + * @return The total count of pending requests + */ + public int pendingRequestCount() { + int total = 0; + for (List requests: unsent.values()) + total += requests.size(); + return total + client.inFlightRequestCount(); + } + + private void pollUnsentRequests(long now) { + while (trySend(now)) + clientPoll(0, now); + } + + private void clearUnsentRequests(long now) { + // clear all unsent requests and fail their corresponding futures + for (Map.Entry> requestEntry: unsent.entrySet()) { + Iterator iterator = requestEntry.getValue().iterator(); + while (iterator.hasNext()) { + ClientRequest request = iterator.next(); + RequestFutureCompletionHandler handler = + (RequestFutureCompletionHandler) request.callback(); + handler.raise(SendFailedException.INSTANCE); + iterator.remove(); + } + } + unsent.clear(); + } + + private boolean trySend(long now) { + // send any requests that can be sent now + boolean requestsSent = false; + for (Map.Entry> requestEntry: unsent.entrySet()) { + Node node = requestEntry.getKey(); + Iterator iterator = requestEntry.getValue().iterator(); + while (iterator.hasNext()) { + ClientRequest request = iterator.next(); + if (client.ready(node, now)) { + client.send(request); + iterator.remove(); + requestsSent = true; + } else if (client.connectionFailed(node)) { + RequestFutureCompletionHandler handler = + (RequestFutureCompletionHandler) request.callback(); + handler.onComplete(new ClientResponse(request, now, true, null)); + iterator.remove(); + } + } + } + return requestsSent; + } + + private void clientPoll(long timeout, long now) { + client.poll(timeout, now); + if (wakeup.get()) { + clearUnsentRequests(now); + wakeup.set(false); + throw new ConsumerWakeupException(); + } + } + + @Override + public void close() throws IOException { + client.close(); + } + + public static class RequestFutureCompletionHandler + extends RequestFuture + implements RequestCompletionHandler { + + @Override + public void onComplete(ClientResponse response) { + complete(response); + } + } +}