Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2AFBD18710 for ; Mon, 9 Nov 2015 19:11:30 +0000 (UTC) Received: (qmail 41020 invoked by uid 500); 9 Nov 2015 19:11:30 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 40989 invoked by uid 500); 9 Nov 2015 19:11:30 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 40979 invoked by uid 99); 9 Nov 2015 19:11:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Nov 2015 19:11:29 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CDCFBE05DB; Mon, 9 Nov 2015 19:11:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: guozhang@apache.org To: commits@kafka.apache.org Message-Id: <4d1971685ab9434f8e87298cb6264c44@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: KAFKA-2674: clarify onPartitionsRevoked behavior Date: Mon, 9 Nov 2015 19:11:29 +0000 (UTC) Repository: kafka Updated Branches: refs/heads/0.9.0 e176fcc7f -> 0977c03ec KAFKA-2674: clarify onPartitionsRevoked behavior Author: Jason Gustafson Reviewers: Guozhang Wang Closes #467 from hachikuji/KAFKA-2674 (cherry picked from commit 359be3a682951fd469d690df8d9e7a5a89a9d03b) Signed-off-by: Guozhang Wang Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0977c03e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0977c03e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0977c03e Branch: refs/heads/0.9.0 Commit: 0977c03ec31c009a7bb9ae1bb678afe060aa0347 Parents: e176fcc Author: Jason Gustafson Authored: Mon Nov 9 11:17:18 2015 -0800 Committer: Guozhang Wang Committed: Mon Nov 9 11:17:27 2015 -0800 ---------------------------------------------------------------------- .../consumer/ConsumerRebalanceListener.java | 37 +++++++++++--------- 1 file changed, 20 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/0977c03e/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java index 671b6f2..8af405c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java @@ -57,16 +57,17 @@ import org.apache.kafka.common.TopicPartition; * this.consumer = consumer; * } * - * public void onPartitionsAssigned(Collection partitions) { - * // read the offsets from an external store using some custom code not described here - * for(TopicPartition partition: partitions) - * consumer.seek(partition, readOffsetFromExternalStore(partition)); - * } * public void onPartitionsRevoked(Collection partitions) { * // save the offsets in an external store using some custom code not described here * for(TopicPartition partition: partitions) * saveOffsetInExternalStore(consumer.position(partition)); * } + * + * public void onPartitionsAssigned(Collection partitions) { + * // read the offsets from an external store using some custom code not described here + * for(TopicPartition partition: partitions) + * consumer.seek(partition, readOffsetFromExternalStore(partition)); + * } * } * } * @@ -74,6 +75,20 @@ import org.apache.kafka.common.TopicPartition; public interface ConsumerRebalanceListener { /** + * A callback method the user can implement to provide handling of offset commits to a customized store on the start + * of a rebalance operation. This method will be called before a rebalance operation starts and after the consumer + * stops fetching data. It is recommended that offsets should be committed in this callback to either Kafka or a + * custom offset store to prevent duplicate data. + *

+ * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer} + *

+ * NOTE: This method is only called before rebalances. It is not called prior to {@link KafkaConsumer#close()}. + * + * @param partitions The list of partitions that were assigned to the consumer on the last rebalance + */ + public void onPartitionsRevoked(Collection partitions); + + /** * A callback method the user can implement to provide handling of customized offsets on completion of a successful * partition re-assignment. This method will be called after an offset re-assignment completes and before the * consumer starts fetching data. @@ -86,16 +101,4 @@ public interface ConsumerRebalanceListener { * assigned to the consumer) */ public void onPartitionsAssigned(Collection partitions); - - /** - * A callback method the user can implement to provide handling of offset commits to a customized store on the start - * of a rebalance operation. This method will be called before a rebalance operation starts and after the consumer - * stops fetching data. It is recommended that offsets should be committed in this callback to either Kafka or a - * custom offset store to prevent duplicate data - *

- * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer} - * - * @param partitions The list of partitions that were assigned to the consumer on the last rebalance - */ - public void onPartitionsRevoked(Collection partitions); }