kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-2988; Change default configuration of the log cleaner
Date Thu, 07 Jan 2016 17:57:49 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a9ff3f2ec -> ee1770e00


KAFKA-2988; Change default configuration of the log cleaner

Author: Grant Henke <granthenke@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>

Closes #686 from granthenke/compaction


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ee1770e0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ee1770e0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ee1770e0

Branch: refs/heads/trunk
Commit: ee1770e00e841ba68dc25f03a8d3f2ac647b0eb3
Parents: a9ff3f2
Author: Grant Henke <granthenke@gmail.com>
Authored: Thu Jan 7 09:57:35 2016 -0800
Committer: Jun Rao <junrao@gmail.com>
Committed: Thu Jan 7 09:57:35 2016 -0800

----------------------------------------------------------------------
 config/server.properties                            | 16 ++++++----------
 core/src/main/scala/kafka/server/KafkaConfig.scala  |  6 +++---
 .../src/test/scala/unit/kafka/utils/TestUtils.scala |  5 +++--
 docs/upgrade.html                                   |  1 +
 4 files changed, 13 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ee1770e0/config/server.properties
----------------------------------------------------------------------
diff --git a/config/server.properties b/config/server.properties
index 80ee2fc..ddb695a 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -4,9 +4,9 @@
 # The ASF licenses this file to You under the Apache License, Version 2.0
 # (the "License"); you may not use this file except in compliance with
 # the License.  You may obtain a copy of the License at
-# 
+#
 #    http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -40,7 +40,7 @@ listeners=PLAINTEXT://:9092
 
 # The number of threads handling network requests
 num.network.threads=3
- 
+
 # The number of threads doing disk I/O
 num.io.threads=8
 
@@ -71,11 +71,11 @@ num.recovery.threads.per.data.dir=1
 ############################# Log Flush Policy #############################
 
 # Messages are immediately written to the filesystem but by default we only fsync() to sync
-# the OS cache lazily. The following configurations control the flush of data to disk. 
+# the OS cache lazily. The following configurations control the flush of data to disk.
 # There are a few important trade-offs here:
 #    1. Durability: Unflushed data may be lost if you are not using replication.
 #    2. Latency: Very large flush intervals may lead to latency spikes when the flush does
occur as there will be a lot of data to flush.
-#    3. Throughput: The flush is generally the most expensive operation, and a small flush
interval may lead to exceessive seeks. 
+#    3. Throughput: The flush is generally the most expensive operation, and a small flush
interval may lead to exceessive seeks.
 # The settings below allow one to configure the flush policy to flush data after a period
of time or
 # every N messages (or both). This can be done globally and overridden on a per-topic basis.
 
@@ -102,14 +102,10 @@ log.retention.hours=168
 # The maximum size of a log segment file. When this size is reached a new log segment will
be created.
 log.segment.bytes=1073741824
 
-# The interval at which log segments are checked to see if they can be deleted according

+# The interval at which log segments are checked to see if they can be deleted according
 # to the retention policies
 log.retention.check.interval.ms=300000
 
-# By default the log cleaner is disabled and the log retention policy will default to just
delete segments after their retention expires.
-# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then
be marked for log compaction.
-log.cleaner.enable=false
-
 ############################# Zookeeper #############################
 
 # Zookeeper connection string (see zookeeper docs for details).

http://git-wip-us.apache.org/repos/asf/kafka/blob/ee1770e0/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 0cb11ab..fc87a4d 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -79,12 +79,12 @@ object Defaults {
   val LogCleanupPolicy = Delete
   val LogCleanerThreads = 1
   val LogCleanerIoMaxBytesPerSecond = Double.MaxValue
-  val LogCleanerDedupeBufferSize = 500 * 1024 * 1024L
+  val LogCleanerDedupeBufferSize = 128 * 1024 * 1024L
   val LogCleanerIoBufferSize = 512 * 1024
   val LogCleanerDedupeBufferLoadFactor = 0.9d
   val LogCleanerBackoffMs = 15 * 1000
   val LogCleanerMinCleanRatio = 0.5d
-  val LogCleanerEnable = false
+  val LogCleanerEnable = true
   val LogCleanerDeleteRetentionMs = 24 * 60 * 60 * 1000L
   val LogIndexSizeMaxBytes = 10 * 1024 * 1024
   val LogIndexIntervalBytes = 4096
@@ -401,7 +401,7 @@ object KafkaConfig {
   "will allow more log to be cleaned at once but will lead to more hash collisions"
   val LogCleanerBackoffMsDoc = "The amount of time to sleep when there are no logs to clean"
   val LogCleanerMinCleanRatioDoc = "The minimum ratio of dirty log to total log for a log
to eligible for cleaning"
-  val LogCleanerEnableDoc = "Should we enable log cleaning?"
+  val LogCleanerEnableDoc = "Enable the log cleaner process to run on the server? Should
be enabled if using any topics with a cleanup.policy=compact including the internal offsets
topic. If disabled those topics will not be compacted and continually grow in size."
   val LogCleanerDeleteRetentionMsDoc = "How long are delete records retained?"
   val LogIndexSizeMaxBytesDoc = "The maximum size in bytes of the offset index"
   val LogIndexIntervalBytesDoc = "The interval with which we add an entry to the offset index"

http://git-wip-us.apache.org/repos/asf/kafka/blob/ee1770e0/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 0221373..ac05314 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -104,7 +104,7 @@ object TestUtils extends Logging {
     })
     f
   }
-  
+
   /**
    * Create a random log directory in the format <string>-<int> used for Kafka
partition logs.
    * It is the responsibility of the caller to set up a shutdown hook for deletion of the
directory.
@@ -213,6 +213,7 @@ object TestUtils extends Logging {
     props.put("controlled.shutdown.enable", enableControlledShutdown.toString)
     props.put("delete.topic.enable", enableDeleteTopic.toString)
     props.put("controlled.shutdown.retry.backoff.ms", "100")
+    props.put("log.cleaner.dedupe.buffer.size", "2097152")
 
     if (protocolAndPorts.exists { case (protocol, _) => usesSslTransportLayer(protocol)
})
       props.putAll(sslConfigs(Mode.SERVER, false, trustStoreFile, s"server$nodeId"))
@@ -470,7 +471,7 @@ object TestUtils extends Logging {
     producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs.toString)
     producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString)
     producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
-        
+
     /* Only use these if not already set */
     val defaultProps = Map(
       ProducerConfig.RETRY_BACKOFF_MS_CONFIG -> "100",

http://git-wip-us.apache.org/repos/asf/kafka/blob/ee1770e0/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index c0a3078..98ac570 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -39,6 +39,7 @@
     <li> Broker IDs above 1000 are now reserved by default to automatically assigned
broker IDs. If your cluster has existing broker IDs above that threshold make sure to increase
the reserved.broker.max.id broker configuration property accordingly. </li>
     <li> Configuration parameter replica.lag.max.messages was removed. Partition leaders
will no longer consider the number of lagging messages when deciding which replicas are in
sync. </li>
     <li> Configuration parameter replica.lag.time.max.ms now refers not just to the
time passed since last fetch request from replica, but also to time since the replica last
caught up. Replicas that are still fetching messages from leaders but did not catch up to
the latest messages in replica.lag.time.max.ms will be considered out of sync. </li>
+    <li> Configuration parameter log.cleaner.enable is now true by default. This means
topics with a cleanup.policy=compact will now be compacted by default, and 128 MB of heap
will be allocated to the cleaner process via log.cleaner.dedupe.buffer.size. You may want
to review log.cleaner.dedupe.buffer.size and the other log.cleaner configuration values based
on your usage of compacted topics. </li>
     <li> MirrorMaker no longer supports multiple target clusters. As a result it will
only accept a single --consumer.config parameter. To mirror multiple source clusters, you
will need at least one MirrorMaker instance per source cluster, each with its own consumer
configuration. </li>
     <li> Tools packaged under <em>org.apache.kafka.clients.tools.*</em>
have been moved to <em>org.apache.kafka.tools.*</em>. All included scripts will
still function as usual, only custom code directly importing these classes will be affected.
</li>
     <li> The default Kafka JVM performance options (KAFKA_JVM_PERFORMANCE_OPTS) have
been changed in kafka-run-class.sh. </li>


Mime
View raw message