kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject git commit: KAFKA-1373; Set first dirty (uncompacted) offset to first offset of the log if no checkpoint exists. Reviewed by Timothy Chen and Neha Narkhede.
Date Thu, 10 Apr 2014 22:58:20 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.8.1 0ffec142a -> 48f1b7490


KAFKA-1373; Set first dirty (uncompacted) offset to first offset of the
log if no checkpoint exists. Reviewed by Timothy Chen and Neha Narkhede.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/48f1b749
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/48f1b749
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/48f1b749

Branch: refs/heads/0.8.1
Commit: 48f1b74909df3c08553a7783ddae9d93c41da7cf
Parents: 0ffec14
Author: Joel Koshy <jjkoshy@gmail.com>
Authored: Thu Apr 10 11:53:23 2014 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Thu Apr 10 15:58:13 2014 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/LogCleanerManager.scala       | 9 +++++----
 core/src/main/scala/kafka/server/KafkaConfig.scala          | 2 +-
 core/src/main/scala/kafka/server/OffsetCheckpoint.scala     | 2 +-
 .../scala/unit/kafka/log/LogCleanerIntegrationTest.scala    | 2 +-
 4 files changed, 8 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/48f1b749/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index 43e5c1f..79e9d55 100644
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -68,10 +68,11 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs:
Pool[To
   def grabFilthiestLog(): Option[LogToClean] = {
     inLock(lock) {
       val lastClean = allCleanerCheckpoints()
-      val cleanableLogs = logs.filter(l => l._2.config.compact)                      
             // skip any logs marked for delete rather than dedupe
-                              .filterNot(l => inProgress.contains(l._1))             
             // skip any logs already in-progress
-                              .map(l => LogToClean(l._1, l._2, lastClean.getOrElse(l._1,
0)))      // create a LogToClean instance for each
-      val dirtyLogs = cleanableLogs.filter(l => l.totalBytes > 0)                 
                // must have some bytes
+      val cleanableLogs = logs.filter(l => l._2.config.compact)          // skip any logs
marked for delete rather than dedupe
+                              .filterNot(l => inProgress.contains(l._1)) // skip any logs
already in-progress
+                              .map(l => LogToClean(l._1, l._2,           // create a LogToClean
instance for each
+                                                   lastClean.getOrElse(l._1, l._2.logSegments.head.baseOffset)))
+      val dirtyLogs = cleanableLogs.filter(l => l.totalBytes > 0)        // must have
some bytes
                                    .filter(l => l.cleanableRatio > l.log.config.minCleanableRatio)
// and must meet the minimum threshold for dirty byte ratio
       if(dirtyLogs.isEmpty) {
         None

http://git-wip-us.apache.org/repos/asf/kafka/blob/48f1b749/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index fd8e950..0a288f9 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -116,7 +116,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   /* the frequency in minutes that the log cleaner checks whether any log is eligible for
deletion */
   val logCleanupIntervalMs = props.getLongInRange("log.retention.check.interval.ms", 5*60*1000,
(1, Long.MaxValue))
   
-  /* the default cleanup policy for segments beyond the retention window, must be either
"delete" or "dedupe" */
+  /* the default cleanup policy for segments beyond the retention window, must be either
"delete" or "compact" */
   val logCleanupPolicy = props.getString("log.cleanup.policy", "delete")
   
   /* the number of background threads to use for log cleaning */

http://git-wip-us.apache.org/repos/asf/kafka/blob/48f1b749/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
index 19f61a9..7af2f43 100644
--- a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
@@ -90,7 +90,7 @@ class OffsetCheckpoint(val file: File) extends Logging {
               val topic = pieces(0)
               val partition = pieces(1).toInt
               val offset = pieces(2).toLong
-              offsets += (TopicAndPartition(pieces(0), partition) -> offset)
+              offsets += (TopicAndPartition(topic, partition) -> offset)
               line = reader.readLine()
             }
             if(offsets.size != expectedSize)

http://git-wip-us.apache.org/repos/asf/kafka/blob/48f1b749/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 9aeb69d..5bfa764 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -92,7 +92,7 @@ class LogCleanerIntegrationTest extends JUnitSuite {
   def makeCleaner(parts: Int, 
                   minDirtyMessages: Int = 0, 
                   numThreads: Int = 1,
-                  defaultPolicy: String = "dedupe",
+                  defaultPolicy: String = "compact",
                   policyOverrides: Map[String, String] = Map()): LogCleaner = {
     
     // create partitions and add them to the pool


Mime
View raw message