From commits-return-9586-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Wed May 30 17:30:05 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 287D318063B for ; Wed, 30 May 2018 17:30:03 +0200 (CEST) Received: (qmail 4902 invoked by uid 500); 30 May 2018 15:30:03 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 4892 invoked by uid 99); 30 May 2018 15:30:03 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 May 2018 15:30:03 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 887648076B; Wed, 30 May 2018 15:30:02 +0000 (UTC) Date: Wed, 30 May 2018 15:30:02 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch trunk updated: KAFKA-6608; Add timeout parameter to blocking consumer calls [KIP-266] (#5014) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <152769420182.6957.13903913829821738717@gitbox.apache.org> From: jgus@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/trunk X-Git-Reftype: branch X-Git-Oldrev: d99f4a0ffa65ae3a654b043547f114a16fbe4905 X-Git-Newrev: f24a62d4acfc48e23dd4bbb854668a727e9445a8 X-Git-Rev: f24a62d4acfc48e23dd4bbb854668a727e9445a8 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new f24a62d KAFKA-6608; Add timeout parameter to blocking consumer calls [KIP-266] (#5014) f24a62d is described below commit f24a62d4acfc48e23dd4bbb854668a727e9445a8 Author: ConcurrencyPractitioner AuthorDate: Wed May 30 08:29:54 2018 -0700 KAFKA-6608; Add timeout parameter to blocking consumer calls [KIP-266] (#5014) This patch implements the consumer timeout APIs from KIP-266 (everything except `poll()`, which was done separately). Reviewers: John Roesler , Jason Gustafson --- .../apache/kafka/clients/consumer/Consumer.java | 51 +++- .../kafka/clients/consumer/KafkaConsumer.java | 305 +++++++++++++++++++-- .../kafka/clients/consumer/MockConsumer.java | 46 ++++ 3 files changed, 370 insertions(+), 32 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java index acb53e1..2e8ad2c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java @@ -98,6 +98,10 @@ public interface Consumer extends Closeable { void commitSync(Map offsets); /** + * @see KafkaConsumer#commitSync(Map, Duration) + */ + void commitSync(final Map offsets, final Duration timeout); + /** * @see KafkaConsumer#commitAsync() */ void commitAsync(); @@ -131,6 +135,11 @@ public interface Consumer extends Closeable { * @see KafkaConsumer#position(TopicPartition) */ long position(TopicPartition partition); + + /** + * @see KafkaConsumer#position(TopicPartition, Duration) + */ + long position(TopicPartition partition, final Duration timeout); /** * @see KafkaConsumer#committed(TopicPartition) @@ -138,6 +147,11 @@ public interface Consumer extends Closeable { OffsetAndMetadata committed(TopicPartition partition); /** + * @see KafkaConsumer#committed(TopicPartition, Duration) + */ + OffsetAndMetadata committed(TopicPartition partition, final Duration timeout); + + /** * @see KafkaConsumer#metrics() */ Map metrics(); @@ -148,11 +162,21 @@ public interface Consumer extends Closeable { List partitionsFor(String topic); /** + * @see KafkaConsumer#partitionsFor(String, Duration) + */ + List partitionsFor(String topic, Duration timeout); + + /** * @see KafkaConsumer#listTopics() */ Map> listTopics(); /** + * @see KafkaConsumer#listTopics(Duration) + */ + Map> listTopics(Duration timeout); + + /** * @see KafkaConsumer#paused() */ Set paused(); @@ -168,21 +192,36 @@ public interface Consumer extends Closeable { void resume(Collection partitions); /** - * @see KafkaConsumer#offsetsForTimes(java.util.Map) + * @see KafkaConsumer#offsetsForTimes(Map) */ Map offsetsForTimes(Map timestampsToSearch); /** - * @see KafkaConsumer#beginningOffsets(java.util.Collection) + * @see KafkaConsumer#offsetsForTimes(Map, Duration) + */ + Map offsetsForTimes(Map timestampsToSearch, Duration timeout); + + /** + * @see KafkaConsumer#beginningOffsets(Collection) */ Map beginningOffsets(Collection partitions); /** - * @see KafkaConsumer#endOffsets(java.util.Collection) + * @see KafkaConsumer#beginningOffsets(Collection, Duration) + */ + Map beginningOffsets(Collection partitions, Duration timeout); + + /** + * @see KafkaConsumer#endOffsets(Collection) */ Map endOffsets(Collection partitions); /** + * @see KafkaConsumer#endOffsets(Collection, Duration) + */ + Map endOffsets(Collection partitions, Duration timeoutMs); + + /** * @see KafkaConsumer#close() */ void close(); @@ -190,9 +229,15 @@ public interface Consumer extends Closeable { /** * @see KafkaConsumer#close(long, TimeUnit) */ + @Deprecated void close(long timeout, TimeUnit unit); /** + * @see KafkaConsumer#close(Duration) + */ + void close(Duration timeout); + + /** * @see KafkaConsumer#wakeup() */ void wakeup(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 602c9d7..feaadd1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -36,6 +36,7 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; @@ -1321,16 +1322,55 @@ public class KafkaConsumer implements Consumer { */ @Override public void commitSync(final Map offsets) { + commitSync(offsets, Duration.ofMillis(Long.MAX_VALUE)); + } + + /** + * Commit the specified offsets for the specified list of topics and partitions. + *

+ * This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every + * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API + * should not be used. The committed offset should be the next message your application will consume, + * i.e. lastProcessedMessageOffset + 1. + *

+ * This is a synchronous commits and will block until either the commit succeeds, an unrecoverable error is + * encountered (in which case it is thrown to the caller), or the timeout expires. + *

+ * Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)} + * (or similar) are guaranteed to have their callbacks invoked prior to completion of this method. + * + * @param offsets A map of offsets by partition with associated metadata + * @param timeout The maximum amount of time to await completion of the offset commit + * @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried. + * This can only occur if you are using automatic group management with {@link #subscribe(Collection)}, + * or if there is an active group with the same groupId which is using group management. + * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this + * function is called + * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while + * this function is called + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details + * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the + * configured groupId. See the exception for more details + * @throws java.lang.IllegalArgumentException if the committed offset is negative + * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata + * is too large or if the topic does not exist). + * @throws org.apache.kafka.common.errors.TimeoutException if the timeout expires before successful completion + * of the offset commit + */ + @Override + public void commitSync(final Map offsets, final Duration timeout) { acquireAndEnsureOpen(); try { - coordinator.commitOffsetsSync(new HashMap<>(offsets), Long.MAX_VALUE); + if (!coordinator.commitOffsetsSync(new HashMap<>(offsets), timeout.toMillis())) { + throw new TimeoutException("Committing offsets synchronously took too long."); + } } finally { release(); } } /** - * Commit offsets returned on the last {@link #poll(Duration) poll()} for all the subscribed list of topics and partition. + * Commit offsets returned on the last {@link #poll(Duration)} for all the subscribed list of topics and partition. * Same as {@link #commitAsync(OffsetCommitCallback) commitAsync(null)} */ @Override @@ -1426,6 +1466,7 @@ public class KafkaConsumer implements Consumer { * @throws IllegalArgumentException if {@code partitions} is {@code null} * @throws IllegalStateException if any of the provided partitions are not currently assigned to this consumer */ + @Override public void seekToBeginning(Collection partitions) { if (partitions == null) throw new IllegalArgumentException("Partitions collection cannot be null"); @@ -1453,6 +1494,7 @@ public class KafkaConsumer implements Consumer { * @throws IllegalArgumentException if {@code partitions} is {@code null} * @throws IllegalStateException if any of the provided partitions are not currently assigned to this consumer */ + @Override public void seekToEnd(Collection partitions) { if (partitions == null) throw new IllegalArgumentException("Partitions collection cannot be null"); @@ -1490,20 +1532,59 @@ public class KafkaConsumer implements Consumer { * configured groupId. See the exception for more details * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors */ + @Override public long position(TopicPartition partition) { + return position(partition, Duration.ofMillis(Long.MAX_VALUE)); + } + + /** + * Get the offset of the next record that will be fetched (if a record with that offset exists). + * This method may issue a remote call to the server if there is no current position + * for the given partition. + *

+ * This call will block until the position can be determined, an unrecoverable error is + * encountered (in which case it is thrown to the caller), or the timeout expires. + * + * @param partition The partition to get the position for + * @param timeout The maximum amount of time to await determination of the current position + * @return The current position of the consumer (that is, the offset of the next record to be fetched) + * @throws IllegalArgumentException if the provided TopicPartition is not assigned to this consumer + * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if no offset is currently defined for + * the partition + * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this + * function is called + * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while + * this function is called + * @throws org.apache.kafka.common.errors.TimeoutException if the position cannot be determined before the + * passed timeout expires + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details + * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the + * configured groupId. See the exception for more details + * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors + */ + @Override + public long position(TopicPartition partition, final Duration timeout) { + final long timeoutMs = timeout.toMillis(); acquireAndEnsureOpen(); try { if (!this.subscriptions.isAssigned(partition)) throw new IllegalStateException("You can only check the position for partitions assigned to this consumer."); Long offset = this.subscriptions.position(partition); - while (offset == null) { + final long startMs = time.milliseconds(); + long finishMs = startMs; + + while (offset == null && finishMs - startMs < timeoutMs) { // batch update fetch positions for any partitions without a valid position - while (!updateFetchPositions(Long.MAX_VALUE)) { - log.warn("Still updating fetch positions"); + if (!updateFetchPositions(remainingTimeAtLeastZero(timeoutMs, time.milliseconds() - startMs))) { + break; } - client.poll(retryBackoffMs); + finishMs = time.milliseconds(); + + client.poll(remainingTimeAtLeastZero(timeoutMs, finishMs - startMs)); offset = this.subscriptions.position(partition); + finishMs = time.milliseconds(); } + if (offset == null) throw new TimeoutException("request timed out, position is unable to be acquired."); return offset; } finally { release(); @@ -1529,14 +1610,37 @@ public class KafkaConsumer implements Consumer { */ @Override public OffsetAndMetadata committed(TopicPartition partition) { + return committed(partition, Duration.ofMillis(Long.MAX_VALUE)); + } + + /** + * Get the last committed offset for the given partition (whether the commit happened by this process or + * another). This offset will be used as the position for the consumer in the event of a failure. + *

+ * This call will block to do a remote call to get the latest committed offsets from the server. + * + * @param partition The partition to check + * @param timeout The maximum amount of time to await the current committed offset + * @return The last committed offset and metadata or null if there was no prior commit + * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this + * function is called + * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while + * this function is called + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details + * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the + * configured groupId. See the exception for more details + * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors + * @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before + * expiration of the timeout + */ + @Override + public OffsetAndMetadata committed(TopicPartition partition, final Duration timeout) { acquireAndEnsureOpen(); try { - Map offsets = null; - while (offsets == null) { - offsets = coordinator.fetchCommittedOffsets( - Collections.singleton(partition), - Long.MAX_VALUE - ); + Map offsets = coordinator.fetchCommittedOffsets( + Collections.singleton(partition), timeout.toMillis()); + if (offsets == null) { + throw new TimeoutException("Unable to find committed offsets for partition within set duration."); } return offsets.get(partition); } finally { @@ -1565,13 +1669,38 @@ public class KafkaConsumer implements Consumer { * this function is called * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the specified topic. See the exception for more details - * @throws org.apache.kafka.common.errors.TimeoutException if the topic metadata could not be fetched before - * expiration of the configured request timeout * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors + * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before + * the amount of time allocated by {@code request.timeout.ms} expires. */ @Override public List partitionsFor(String topic) { + return partitionsFor(topic, Duration.ofMillis(requestTimeoutMs)); + } + + /** + * Get metadata about the partitions for a given topic. This method will issue a remote call to the server if it + * does not already have any metadata about the given topic. + * + * @param topic The topic to get partition metadata for + * @param timeout The maximum of time to await topic metadata + * + * @return The list of partitions + * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this + * function is called + * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while + * this function is called + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details + * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the specified topic. See + * the exception for more details + * @throws org.apache.kafka.common.errors.TimeoutException if topic metadata cannot be fetched before expiration + * of the passed timeout + * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors + */ + @Override + public List partitionsFor(String topic, Duration timeout) { acquireAndEnsureOpen(); + long timeoutMs = timeout.toMillis(); try { Cluster cluster = this.metadata.fetch(); List parts = cluster.partitionsForTopic(topic); @@ -1579,7 +1708,7 @@ public class KafkaConsumer implements Consumer { return parts; Map> topicMetadata = fetcher.getTopicMetadata( - new MetadataRequest.Builder(Collections.singletonList(topic), true), requestTimeoutMs); + new MetadataRequest.Builder(Collections.singletonList(topic), true), timeoutMs); return topicMetadata.get(topic); } finally { release(); @@ -1596,15 +1725,35 @@ public class KafkaConsumer implements Consumer { * function is called * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while * this function is called - * @throws org.apache.kafka.common.errors.TimeoutException if the topic metadata could not be fetched before - * expiration of the configured request timeout * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors + * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before + * the amount of time allocated by {@code request.timeout.ms} expires. */ @Override public Map> listTopics() { + return listTopics(Duration.ofMillis(requestTimeoutMs)); + } + + /** + * Get metadata about partitions for all topics that the user is authorized to view. This method will issue a + * remote call to the server. + * + * @param timeout The maximum time this operation will block to fetch topic metadata + * + * @return The map of topics and its partitions + * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this + * function is called + * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while + * this function is called + * @throws org.apache.kafka.common.errors.TimeoutException if the topic metadata could not be fetched before + * expiration of the passed timeout + * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors + */ + @Override + public Map> listTopics(Duration timeout) { acquireAndEnsureOpen(); try { - return fetcher.getAllTopicMetadata(requestTimeoutMs); + return fetcher.getAllTopicMetadata(timeout.toMillis()); } finally { release(); } @@ -1683,12 +1832,39 @@ public class KafkaConsumer implements Consumer { * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details * @throws IllegalArgumentException if the target timestamp is negative * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before - * expiration of the configured {@code request.timeout.ms} + * the amount of time allocated by {@code request.timeout.ms} expires. * @throws org.apache.kafka.common.errors.UnsupportedVersionException if the broker does not support looking up * the offsets by timestamp */ @Override public Map offsetsForTimes(Map timestampsToSearch) { + return offsetsForTimes(timestampsToSearch, Duration.ofMillis(requestTimeoutMs)); + } + + /** + * Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the + * earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. + * + * This is a blocking call. The consumer does not have to be assigned the partitions. + * If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps, null + * will be returned for that partition. + * + * @param timestampsToSearch the mapping from partition to the timestamp to look up. + * @param timeout The maximum amount of time to await retrieval of the offsets + * + * @return a mapping from partition to the timestamp and offset of the first message with timestamp greater + * than or equal to the target timestamp. {@code null} will be returned for the partition if there is no + * such message. + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details + * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details + * @throws IllegalArgumentException if the target timestamp is negative + * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before + * expiration of the passed timeout + * @throws org.apache.kafka.common.errors.UnsupportedVersionException if the broker does not support looking up + * the offsets by timestamp + */ + @Override + public Map offsetsForTimes(Map timestampsToSearch, Duration timeout) { acquireAndEnsureOpen(); try { for (Map.Entry entry : timestampsToSearch.entrySet()) { @@ -1698,7 +1874,7 @@ public class KafkaConsumer implements Consumer { throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " + entry.getValue() + ". The target time cannot be negative."); } - return fetcher.offsetsByTimes(timestampsToSearch, requestTimeoutMs); + return fetcher.offsetsByTimes(timestampsToSearch, timeout.toMillis()); } finally { release(); } @@ -1715,14 +1891,35 @@ public class KafkaConsumer implements Consumer { * @return The earliest available offsets for the given partitions * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details - * @throws org.apache.kafka.common.errors.TimeoutException if the offsets could not be fetched before + * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before * expiration of the configured {@code request.timeout.ms} */ @Override public Map beginningOffsets(Collection partitions) { + return beginningOffsets(partitions, Duration.ofMillis(requestTimeoutMs)); + } + + /** + * Get the first offset for the given partitions. + *

+ * This method does not change the current consumer position of the partitions. + * + * @see #seekToBeginning(Collection) + * + * @param partitions the partitions to get the earliest offsets + * @param timeout The maximum amount of time to await retrieval of the beginning offsets + * + * @return The earliest available offsets for the given partitions + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details + * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details + * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before + * expiration of the passed timeout + */ + @Override + public Map beginningOffsets(Collection partitions, Duration timeout) { acquireAndEnsureOpen(); try { - return fetcher.beginningOffsets(partitions, requestTimeoutMs); + return fetcher.beginningOffsets(partitions, timeout.toMillis()); } finally { release(); } @@ -1744,14 +1941,40 @@ public class KafkaConsumer implements Consumer { * @return The end offsets for the given partitions. * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details - * @throws org.apache.kafka.common.errors.TimeoutException if the offsets could not be fetched before - * expiration of the configured {@code request.timeout.ms} + * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before + * the amount of time allocated by {@code request.timeout.ms} expires */ @Override public Map endOffsets(Collection partitions) { + return endOffsets(partitions, Duration.ofMillis(requestTimeoutMs)); + } + + /** + * Get the end offsets for the given partitions. In the default {@code read_uncommitted} isolation level, the end + * offset is the high watermark (that is, the offset of the last successfully replicated message plus one). For + * {@code read_committed} consumers, the end offset is the last stable offset (LSO), which is the minimum of + * the high watermark and the smallest offset of any open transaction. Finally, if the partition has never been + * written to, the end offset is 0. + * + *

+ * This method does not change the current consumer position of the partitions. + * + * @see #seekToEnd(Collection) + * + * @param partitions the partitions to get the end offsets. + * @param timeout The maximum amount of time to await retrieval of the end offsets + * + * @return The end offsets for the given partitions. + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details + * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details + * @throws org.apache.kafka.common.errors.TimeoutException if the offsets could not be fetched before + * expiration of the passed timeout + */ + @Override + public Map endOffsets(Collection partitions, Duration timeout) { acquireAndEnsureOpen(); try { - return fetcher.endOffsets(partitions, requestTimeoutMs); + return fetcher.endOffsets(partitions, timeout.toMillis()); } finally { release(); } @@ -1760,7 +1983,7 @@ public class KafkaConsumer implements Consumer { /** * Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup. * If auto-commit is enabled, this will commit the current offsets if possible within the default - * timeout. See {@link #close(long, TimeUnit)} for details. Note that {@link #wakeup()} + * timeout. See {@link #close(Duration)} for details. Note that {@link #wakeup()} * cannot be used to interrupt close. * * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted @@ -1769,7 +1992,7 @@ public class KafkaConsumer implements Consumer { */ @Override public void close() { - close(DEFAULT_CLOSE_TIMEOUT_MS, TimeUnit.MILLISECONDS); + close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS)); } /** @@ -1786,15 +2009,39 @@ public class KafkaConsumer implements Consumer { * @throws IllegalArgumentException If the {@code timeout} is negative. * @throws InterruptException If the thread is interrupted before or while this function is called * @throws org.apache.kafka.common.KafkaException for any other error during close + * + * @deprecated Since 2.0. Use {@link #close(Duration)} or {@link #close()}. */ + @Deprecated + @Override public void close(long timeout, TimeUnit timeUnit) { - if (timeout < 0) + close(Duration.ofMillis(TimeUnit.MILLISECONDS.toMillis(timeout))); + } + + /** + * Tries to close the consumer cleanly within the specified timeout. This method waits up to + * {@code timeout} for the consumer to complete pending commits and leave the group. + * If auto-commit is enabled, this will commit the current offsets if possible within the + * timeout. If the consumer is unable to complete offset commits and gracefully leave the group + * before the timeout expires, the consumer is force closed. Note that {@link #wakeup()} cannot be + * used to interrupt close. + * + * @param timeout The maximum time to wait for consumer to close gracefully. The value must be + * non-negative. Specifying a timeout of zero means do not wait for pending requests to complete. + * + * @throws IllegalArgumentException If the {@code timeout} is negative. + * @throws InterruptException If the thread is interrupted before or while this function is called + * @throws org.apache.kafka.common.KafkaException for any other error during close + */ + @Override + public void close(Duration timeout) { + if (timeout.toMillis() < 0) throw new IllegalArgumentException("The timeout cannot be negative."); acquire(); try { if (!closed) { closed = true; - close(timeUnit.toMillis(timeout), false); + close(timeout.toMillis(), false); } } finally { release(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index 479a9ff..3502156 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -252,6 +252,11 @@ public class MockConsumer implements Consumer { } @Override + public void commitSync(Map offsets, final Duration timeout) { + commitSync(offsets); + } + + @Override public synchronized void seek(TopicPartition partition, long offset) { ensureNotClosed(); subscriptions.seek(partition, offset); @@ -267,6 +272,11 @@ public class MockConsumer implements Consumer { } @Override + public OffsetAndMetadata committed(TopicPartition partition, final Duration timeout) { + return committed(partition); + } + + @Override public synchronized long position(TopicPartition partition) { ensureNotClosed(); if (!this.subscriptions.isAssigned(partition)) @@ -280,6 +290,11 @@ public class MockConsumer implements Consumer { } @Override + public synchronized long position(TopicPartition partition, final Duration timeout) { + return position(partition); + } + + @Override public synchronized void seekToBeginning(Collection partitions) { ensureNotClosed(); for (TopicPartition tp : partitions) @@ -470,4 +485,35 @@ public class MockConsumer implements Consumer { } return offsets.size() > 1 ? offsets.remove(0) : offsets.get(0); } + + @Override + public List partitionsFor(String topic, Duration timeout) { + return partitionsFor(topic); + } + + @Override + public Map> listTopics(Duration timeout) { + return listTopics(); + } + + @Override + public Map offsetsForTimes(Map timestampsToSearch, + Duration timeout) { + return offsetsForTimes(timestampsToSearch); + } + + @Override + public Map beginningOffsets(Collection partitions, Duration timeout) { + return beginningOffsets(partitions); + } + + @Override + public Map endOffsets(Collection partitions, Duration duration) { + return endOffsets(partitions); + } + + @Override + public void close(Duration timeout) { + close(); + } } -- To stop receiving notification emails like this one, please contact jgus@apache.org.