Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 33426D7D2 for ; Thu, 13 Dec 2012 20:54:31 +0000 (UTC) Received: (qmail 94857 invoked by uid 500); 13 Dec 2012 20:54:31 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 94807 invoked by uid 500); 13 Dec 2012 20:54:30 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 94795 invoked by uid 99); 13 Dec 2012 20:54:30 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Dec 2012 20:54:30 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 4E44281C3D9; Thu, 13 Dec 2012 20:54:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: nehanarkhede@apache.org To: commits@kafka.apache.org X-Mailer: ASF-Git Admin Mailer Subject: git commit: KAFKA-670 Clean spurious .index files; reviewed by Neha Narkhede Message-Id: <20121213205430.4E44281C3D9@tyr.zones.apache.org> Date: Thu, 13 Dec 2012 20:54:30 +0000 (UTC) Updated Branches: refs/heads/0.8 f9702c66b -> 3244bcafe KAFKA-670 Clean spurious .index files; reviewed by Neha Narkhede Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3244bcaf Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3244bcaf Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3244bcaf Branch: refs/heads/0.8 Commit: 3244bcafe4233611fc0366f9df53ec98b44ca571 Parents: f9702c6 Author: Edward Jay Kreps Authored: Thu Dec 13 10:15:32 2012 -0800 Committer: Neha Narkhede Committed: Thu Dec 13 10:15:32 2012 -0800 ---------------------------------------------------------------------- core/src/main/scala/kafka/log/Log.scala | 31 +++++++++++------ core/src/test/scala/unit/kafka/log/LogTest.scala | 26 ++++++++++++++ 2 files changed, 46 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3244bcaf/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 96bf2ed..66c07af 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -142,18 +142,27 @@ private[kafka] class Log(val dir: File, val logSegments = new ArrayList[LogSegment] val ls = dir.listFiles() if(ls != null) { - for(file <- ls if file.isFile && file.toString.endsWith(LogFileSuffix)) { - if(!file.canRead) - throw new IOException("Could not read file " + file) + for(file <- ls if file.isFile) { val filename = file.getName() - val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong - // TODO: we should ideally rebuild any missing index files, instead of erroring out - if(!Log.indexFilename(dir, start).exists) - throw new IllegalStateException("Found log file with no corresponding index file.") - logSegments.add(new LogSegment(dir = dir, - startOffset = start, - indexIntervalBytes = indexIntervalBytes, - maxIndexSize = maxIndexSize)) + if(!file.canRead) { + throw new IOException("Could not read file " + file) + } else if(filename.endsWith(IndexFileSuffix)) { + // ensure that we have a corresponding log file for this index file + val log = new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix)) + if(!log.exists) { + warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath)) + file.delete() + } + } else if(filename.endsWith(LogFileSuffix)) { + val offset = filename.substring(0, filename.length - LogFileSuffix.length).toLong + // TODO: we should ideally rebuild any missing index files, instead of erroring out + if(!Log.indexFilename(dir, offset).exists) + throw new IllegalStateException("Found log file with no corresponding index file.") + logSegments.add(new LogSegment(dir = dir, + startOffset = offset, + indexIntervalBytes = indexIntervalBytes, + maxIndexSize = maxIndexSize)) + } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/3244bcaf/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index afaa284..900d0e2 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -391,6 +391,32 @@ class LogTest extends JUnitSuite { log.delete() } } + + /** + * When we open a log any index segments without an associated log segment should be deleted. + */ + @Test + def testBogusIndexSegmentsAreRemoved() { + val bogusIndex1 = Log.indexFilename(logDir, 0) + val bogusIndex2 = Log.indexFilename(logDir, 5) + + val set = TestUtils.singleMessageSet("test".getBytes()) + val log = new Log(logDir, + maxLogFileSize = set.sizeInBytes * 5, + maxMessageSize = config.maxMessageSize, + maxIndexSize = 1000, + indexIntervalBytes = 1, + needsRecovery = false) + + assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length > 0) + assertFalse("The second index file should have been deleted.", bogusIndex2.exists) + + // check that we can append to the log + for(i <- 0 until 10) + log.append(set) + + log.delete() + } @Test def testReopenThenTruncate() {