kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-3806) Adjust default values of log.retention.hours and offsets.retention.minutes
Date Tue, 06 Mar 2018 06:22:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-3806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16387348#comment-16387348

ASF GitHub Bot commented on KAFKA-3806:

hachikuji closed pull request #4648: KAFKA-3806: Increase offsets retention default to 7 days
URL: https://github.com/apache/kafka/pull/4648

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 8b2fb1044e0..cf22305caf7 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -158,7 +158,7 @@ object Defaults {
   val OffsetsTopicPartitions: Int = OffsetConfig.DefaultOffsetsTopicNumPartitions
   val OffsetsTopicSegmentBytes: Int = OffsetConfig.DefaultOffsetsTopicSegmentBytes
   val OffsetsTopicCompressionCodec: Int = OffsetConfig.DefaultOffsetsTopicCompressionCodec.codec
-  val OffsetsRetentionMinutes: Int = 24 * 60
+  val OffsetsRetentionMinutes: Int = 7 * 24 * 60
   val OffsetsRetentionCheckIntervalMs: Long = OffsetConfig.DefaultOffsetsRetentionCheckIntervalMs
   val OffsetCommitTimeoutMs = OffsetConfig.DefaultOffsetCommitTimeoutMs
   val OffsetCommitRequiredAcks = OffsetConfig.DefaultOffsetCommitRequiredAcks
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index 0793fa37a60..3e8a5359468 100755
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -249,11 +249,11 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
     Thread.sleep(retentionCheckInterval * 2)
     assertEquals(2L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset)
-    // v1 version commit request with commit timestamp set to now - two days
+    // v1 version commit request with commit timestamp set to now - seven + a bit days
     // committed offset should expire
     val commitRequest2 = OffsetCommitRequest(
       groupId = group,
-      requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(3L, "metadata",
Time.SYSTEM.milliseconds - 2*24*60*60*1000L)),
+      requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(3L, "metadata",
Time.SYSTEM.milliseconds - (Defaults.OffsetsRetentionMinutes + 1) * 60 * 1000L)),
       versionId = 1
     assertEquals(Errors.NONE, simpleConsumer.commitOffsets(commitRequest2).commitStatus.get(topicPartition).get)
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 3ac293d8498..324f8df1403 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -19,6 +19,67 @@
 <script id="upgrade-template" type="text/x-handlebars-template">
+<h4><a id="upgrade_1_2_0" href="#upgrade_1_2_0">Upgrading from 0.8.x, 0.9.x,
0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, or 1.1.x to 1.2.x</a></h4>
+<p>Kafka 1.2.0 introduces wire protocol changes. By following the recommended rolling
upgrade plan below,
+    you guarantee no downtime during the upgrade. However, please review the <a href="#upgrade_120_notable">notable
changes in 1.2.0</a> before upgrading.
+<p><b>For a rolling upgrade:</b></p>
+    <li> Update server.properties on all brokers and add the following properties.
CURRENT_KAFKA_VERSION refers to the version you
+        are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version
currently in use. If you have previously
+        overridden the message format version, you should keep its current value. Alternatively,
if you are upgrading from a version prior
+        to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
+        <ul>
+            <li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2, 0.9.0,
0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0, 1.1).</li>
+            <li>log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION  (See <a
href="#upgrade_10_performance_impact">potential performance impact
+                following the upgrade</a> for the details on what this configuration
+        </ul>
+        If you are upgrading from 0.11.0.x, 1.0.x, or 1.1.x and you have not overridden the
message format, then you only need to override
+        the inter-broker protocol format.
+        <ul>
+            <li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (0.11.0, 1.0, 1.1).</li>
+        </ul>
+    </li>
+    <li> Upgrade the brokers one at a time: shut down the broker, update the code,
and restart it. </li>
+    <li> Once the entire cluster is upgraded, bump the protocol version by editing
<code>inter.broker.protocol.version</code> and setting it to 1.1.
+    <li> Restart the brokers one by one for the new protocol version to take effect.</li>
+    <li> If you have overridden the message format version as instructed above, then
you need to do one more rolling restart to
+        upgrade it to its latest version. Once all (or most) consumers have been upgraded
to 0.11.0 or later,
+        change log.message.format.version to 1.2 on each broker and restart them one by one.
Note that the older Scala consumer
+        does not support the new message format introduced in 0.11, so to avoid the performance
cost of down-conversion (or to
+        take advantage of <a href="#upgrade_11_exactly_once_semantics">exactly once
semantics</a>), the newer Java consumer must be used.</li>
+<p><b>Additional Upgrade Notes:</b></p>
+    <li>If you are willing to accept downtime, you can simply take all the brokers
down, update the code and start them back up. They will start
+        with the new protocol by default.</li>
+    <li>Bumping the protocol version and restarting can be done any time after the
brokers are upgraded. It does not have to be immediately after.
+        Similarly for the message format version.</li>
+    <li>If you are using Java8 method references in your Kafka Streams code you might
need to update your code to resolve method ambiguties.
+        Hot-swaping the jar-file only might not work.</li>
+<h5><a id="upgrade_120_notable" href="#upgrade_120_notable">Notable changes in
+    <li><a href="https://cwiki.apache.org/confluence/x/oYtjB">KIP-186</a>
increases the default offset retention time from 1 day to 7 days. This makes it less likely
to "lose" offsets in an application that commits infrequently. It also increases the active
set of offsets and therefore can increase memory usage on the broker. Note that the console
consumer currently enables offset commit by default and can be the source of a large number
of offsets which this change will now preserve for 7 days instead of 1. You can preserve the
existing behavior by setting the broker config <code>offsets.retention.minutes</code>
to 1440.</li>
+<h5><a id="upgrade_120_new_protocols" href="#upgrade_120_new_protocols">New Protocol
+<h5><a id="upgrade_120_streams" href="#upgrade_120_streams">Upgrading a 1.2.0
Kafka Streams Application</a></h5>
+    <li> Upgrading your Streams application from 1.1.0 to 1.2.0 does not require a
broker upgrade.
+        A Kafka Streams 1.2.0 application can connect to 1.2, 1.1, 1.0, 0.11.0, 0.10.2 and
0.10.1 brokers (it is not possible to connect to 0.10.0 brokers though). </li>
+    <li> See <a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_120">Streams
API changes in 1.2.0</a> for more details. </li>
 <h4><a id="upgrade_1_1_0" href="#upgrade_1_1_0">Upgrading from 0.8.x, 0.9.x,
0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x or 1.0.x to 1.1.x</a></h4>
 <p>Kafka 1.1.0 introduces wire protocol changes. By following the recommended rolling
upgrade plan below,
     you guarantee no downtime during the upgrade. However, please review the <a href="#upgrade_110_notable">notable
changes in 1.1.0</a> before upgrading.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:

> Adjust default values of log.retention.hours and offsets.retention.minutes
> --------------------------------------------------------------------------
>                 Key: KAFKA-3806
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3806
>             Project: Kafka
>          Issue Type: Improvement
>          Components: config
>    Affects Versions:,
>            Reporter: Michal Turek
>            Priority: Minor
>             Fix For: 1.2.0
> Combination of default values of log.retention.hours (168 hours = 7 days) and offsets.retention.minutes
(1440 minutes = 1 day) may be dangerous in special cases. Offset retention should be always
greater than log retention.
> We have observed the following scenario and issue:
> - Producing of data to a topic was disabled two days ago by producer update, topic wasn't
> - Consumer consumed all data and properly committed offsets to Kafka.
> - Consumer made no more offset commits for that topic because there was no more incoming
data and there was nothing to confirm. (We have auto-commit disabled, I'm not sure how behaves
enabled auto-commit.)
> - After one day: Kafka cleared too old offsets according to offsets.retention.minutes.
> - After two days: Long-term running consumer was restarted after update, it didn't find
any committed offsets for that topic since they were deleted by offsets.retention.minutes
so it started consuming from the beginning.
> - The messages were still in Kafka due to larger log.retention.hours, about 5 days of
messages were read again.
> Known workaround to solve this issue:
> - Explicitly configure log.retention.hours and offsets.retention.minutes, don't use defaults.
> Proposals:
> - Prolong default value of offsets.retention.minutes to be at least twice larger than
> - Check these values during Kafka startup and log a warning if offsets.retention.minutes
is smaller than log.retention.hours.
> - Add a note to migration guide about differences between storing of offsets in ZooKeeper
and Kafka (http://kafka.apache.org/documentation.html#upgrade).

This message was sent by Atlassian JIRA

View raw message