kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3319: improve session timeout broker/client config documentation
Date Tue, 22 Mar 2016 20:09:42 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 e2d7f9e44 -> 687d2494f


KAFKA-3319: improve session timeout broker/client config documentation

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Grant Henke, Ismael Juma, Guozhang Wang

Closes #1106 from hachikuji/KAFKA-3319

(cherry picked from commit ca77d67058726fc9df9bdd7cc0217ee62ccc5106)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/687d2494
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/687d2494
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/687d2494

Branch: refs/heads/0.10.0
Commit: 687d2494ff27b05f21ea29e645a0b1b000334db6
Parents: e2d7f9e
Author: Jason Gustafson <jason@confluent.io>
Authored: Tue Mar 22 13:09:13 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Mar 22 13:09:29 2016 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/ConsumerConfig.java      | 16 ++++++++++++----
 .../org/apache/kafka/common/protocol/Errors.java    |  3 ++-
 core/src/main/scala/kafka/server/KafkaConfig.scala  |  4 ++--
 3 files changed, 16 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/687d2494/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 9101307..c97c8fb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -43,11 +43,22 @@ public class ConsumerConfig extends AbstractConfig {
     public static final String GROUP_ID_CONFIG = "group.id";
     private static final String GROUP_ID_DOC = "A unique string that identifies the consumer
group this consumer belongs to. This property is required if the consumer uses either the
group management functionality by using <code>subscribe(topic)</code> or the Kafka-based
offset management strategy.";
 
+    /** <code>max.poll.records</code> */
+    public static final String MAX_POLL_RECORDS_CONFIG = "max.poll.records";
+    private static final String MAX_POLL_RECORDS_DOC = "The maximum number of records returned
in a single call to poll().";
+
     /**
      * <code>session.timeout.ms</code>
      */
     public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
-    private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect failures
when using Kafka's group management facilities.";
+    private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect failures
when using Kafka's " +
+            "group management facilities. When a consumer's heartbeat is not received within
the session timeout, " +
+            "the broker will mark the consumer as failed and rebalance the group. Since heartbeats
are sent only " +
+            "when poll() is invoked, a higher session timeout allows more time for message
processing in the consumer's " +
+            "poll loop at the cost of a longer time to detect hard failures. See also <code>"
+ MAX_POLL_RECORDS_CONFIG + "</code> for " +
+            "another option to control the processing time in the poll loop. Note that the
value must be in the " +
+            "allowable range as configured in the broker configuration by <code>group.min.session.timeout.ms</code>
" +
+            "and <code>group.max.session.timeout.ms</code>.";
 
     /**
      * <code>heartbeat.interval.ms</code>
@@ -168,9 +179,6 @@ public class ConsumerConfig extends AbstractConfig {
                                                         + "Implementing the <code>ConsumerInterceptor</code>
interface allows you to intercept (and possibly mutate) records "
                                                         + "received by the consumer. By default,
there are no interceptors.";
 
-    /** <code>max.poll.records</code> */
-    public static final String MAX_POLL_RECORDS_CONFIG = "max.poll.records";
-    private static final String MAX_POLL_RECORDS_DOC = "The maximum number of records returned
in a single call to poll().";
 
     /** <code>exclude.internal.topics</code> */
     public static final String EXCLUDE_INTERNAL_TOPICS_CONFIG = "exclude.internal.topics";

http://git-wip-us.apache.org/repos/asf/kafka/blob/687d2494/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 90be014..0f33516 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -117,7 +117,8 @@ public enum Errors {
     UNKNOWN_MEMBER_ID(25,
             new UnknownMemberIdException("The coordinator is not aware of this member.")),
     INVALID_SESSION_TIMEOUT(26,
-            new InvalidSessionTimeoutException("The session timeout is not within an acceptable
range.")),
+            new InvalidSessionTimeoutException("The session timeout is not within the range
allowed by the broker " +
+                    "(as configured by group.min.session.timeout.ms and group.max.session.timeout.ms).")),
     REBALANCE_IN_PROGRESS(27,
             new RebalanceInProgressException("The group is rebalancing, so a rejoin is needed.")),
     INVALID_COMMIT_OFFSET_SIZE(28,

http://git-wip-us.apache.org/repos/asf/kafka/blob/687d2494/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index a6018ad..dc2a0a0 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -480,8 +480,8 @@ object KafkaConfig {
   val ControlledShutdownRetryBackoffMsDoc = "Before each retry, the system needs time to
recover from the state that caused the previous failure (Controller fail over, replica lag
etc). This config determines the amount of time to wait before retrying."
   val ControlledShutdownEnableDoc = "Enable controlled shutdown of the server"
   /** ********* Consumer coordinator configuration ***********/
-  val ConsumerMinSessionTimeoutMsDoc = "The minimum allowed session timeout for registered
consumers"
-  val ConsumerMaxSessionTimeoutMsDoc = "The maximum allowed session timeout for registered
consumers"
+  val ConsumerMinSessionTimeoutMsDoc = "The minimum allowed session timeout for registered
consumers. Shorter timeouts leader to quicker failure detection at the cost of more frequent
consumer heartbeating, which can overwhelm broker resources."
+  val ConsumerMaxSessionTimeoutMsDoc = "The maximum allowed session timeout for registered
consumers. Longer timeouts give consumers more time to process messages in between heartbeats
at the cost of a longer time to detect failures."
   /** ********* Offset management configuration ***********/
   val OffsetMetadataMaxSizeDoc = "The maximum size for a metadata entry associated with an
offset commit"
   val OffsetsLoadBufferSizeDoc = "Batch size for reading from the offsets segments when loading
offsets into the cache."


Mime
View raw message