kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-670 Clean spurious .index files; reviewed by Neha Narkhede
Date Thu, 13 Dec 2012 20:54:30 GMT
Updated Branches:
  refs/heads/0.8 f9702c66b -> 3244bcafe


KAFKA-670 Clean spurious .index files; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3244bcaf
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3244bcaf
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3244bcaf

Branch: refs/heads/0.8
Commit: 3244bcafe4233611fc0366f9df53ec98b44ca571
Parents: f9702c6
Author: Edward Jay Kreps <jkreps@apache.org>
Authored: Thu Dec 13 10:15:32 2012 -0800
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Thu Dec 13 10:15:32 2012 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala          |   31 +++++++++++------
 core/src/test/scala/unit/kafka/log/LogTest.scala |   26 ++++++++++++++
 2 files changed, 46 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3244bcaf/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 96bf2ed..66c07af 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -142,18 +142,27 @@ private[kafka] class Log(val dir: File,
     val logSegments = new ArrayList[LogSegment]
     val ls = dir.listFiles()
     if(ls != null) {
-      for(file <- ls if file.isFile && file.toString.endsWith(LogFileSuffix))
{
-        if(!file.canRead)
-          throw new IOException("Could not read file " + file)
+      for(file <- ls if file.isFile) {
         val filename = file.getName()
-        val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong
-        // TODO: we should ideally rebuild any missing index files, instead of erroring out
-        if(!Log.indexFilename(dir, start).exists)
-          throw new IllegalStateException("Found log file with no corresponding index file.")
-        logSegments.add(new LogSegment(dir = dir, 
-                                       startOffset = start,
-                                       indexIntervalBytes = indexIntervalBytes, 
-                                       maxIndexSize = maxIndexSize))
+        if(!file.canRead) {
+          throw new IOException("Could not read file " + file)
+        } else if(filename.endsWith(IndexFileSuffix)) {
+          // ensure that we have a corresponding log file for this index file
+          val log = new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix))
+          if(!log.exists) {
+            warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath))
+            file.delete()
+          }
+        } else if(filename.endsWith(LogFileSuffix)) {
+          val offset = filename.substring(0, filename.length - LogFileSuffix.length).toLong
+          // TODO: we should ideally rebuild any missing index files, instead of erroring
out
+          if(!Log.indexFilename(dir, offset).exists)
+            throw new IllegalStateException("Found log file with no corresponding index file.")
+          logSegments.add(new LogSegment(dir = dir, 
+                                         startOffset = offset,
+                                         indexIntervalBytes = indexIntervalBytes, 
+                                         maxIndexSize = maxIndexSize))
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3244bcaf/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index afaa284..900d0e2 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -391,6 +391,32 @@ class LogTest extends JUnitSuite {
       log.delete()
     }
   }
+  
+  /**
+   * When we open a log any index segments without an associated log segment should be deleted.
+   */
+  @Test
+  def testBogusIndexSegmentsAreRemoved() {
+    val bogusIndex1 = Log.indexFilename(logDir, 0)
+    val bogusIndex2 = Log.indexFilename(logDir, 5)
+    
+    val set = TestUtils.singleMessageSet("test".getBytes())
+    val log = new Log(logDir, 
+                      maxLogFileSize = set.sizeInBytes * 5, 
+                      maxMessageSize = config.maxMessageSize,
+                      maxIndexSize = 1000, 
+                      indexIntervalBytes = 1, 
+                      needsRecovery = false)
+    
+    assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length
> 0)
+    assertFalse("The second index file should have been deleted.", bogusIndex2.exists)
+    
+    // check that we can append to the log
+    for(i <- 0 until 10)
+      log.append(set)
+      
+    log.delete()
+  }
 
   @Test
   def testReopenThenTruncate() {


Mime
View raw message