Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D8142E31B for ; Fri, 8 Feb 2013 18:56:22 +0000 (UTC) Received: (qmail 68863 invoked by uid 500); 8 Feb 2013 18:56:22 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 68832 invoked by uid 500); 8 Feb 2013 18:56:22 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 68821 invoked by uid 99); 8 Feb 2013 18:56:22 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Feb 2013 18:56:22 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 1480A3A962; Fri, 8 Feb 2013 18:56:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jkreps@apache.org To: commits@kafka.apache.org X-Mailer: ASF-Git Admin Mailer Subject: git commit: KAFKA-748 Maintain mmap position after resizing index. Also add a few more assertions. Message-Id: <20130208185622.1480A3A962@tyr.zones.apache.org> Date: Fri, 8 Feb 2013 18:56:22 +0000 (UTC) Updated Branches: refs/heads/0.8 b89fc2be8 -> 790a1504c KAFKA-748 Maintain mmap position after resizing index. Also add a few more assertions. Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/790a1504 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/790a1504 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/790a1504 Branch: refs/heads/0.8 Commit: 790a1504c3437b84ce2d910b9769a2b4ae481ed3 Parents: b89fc2b Author: Jay Kreps Authored: Fri Feb 8 10:50:38 2013 -0800 Committer: Jay Kreps Committed: Fri Feb 8 10:50:38 2013 -0800 ---------------------------------------------------------------------- core/src/main/scala/kafka/log/Log.scala | 8 ++++- core/src/main/scala/kafka/log/OffsetIndex.scala | 20 +++++++++----- .../scala/unit/kafka/log/OffsetIndexTest.scala | 6 +++- 3 files changed, 23 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/790a1504/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 5cd36e0..eee0ed3 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -126,6 +126,8 @@ private[kafka] class Log(val dir: File, /* Calculate the offset of the next message */ private var nextOffset: AtomicLong = new AtomicLong(segments.view.last.nextOffset()) + + debug("Completed load of log %s with log end offset %d".format(name, logEndOffset)) newGauge(name + "-" + "NumLogSegments", new Gauge[Int] { def getValue = numberOfSegments }) @@ -275,8 +277,10 @@ private[kafka] class Log(val dir: File, val lastOffset = offsetCounter.get - 1 (firstOffset, lastOffset) } else { - if(!messageSetInfo.offsetsMonotonic) - throw new IllegalArgumentException("Out of order offsets found in " + messages) + require(messageSetInfo.offsetsMonotonic, "Out of order offsets found in " + messages) + require(messageSetInfo.firstOffset >= nextOffset.get, + "Attempt to append a message set beginning with offset %d to a log with log end offset %d." + .format(messageSetInfo.firstOffset, nextOffset.get)) (messageSetInfo.firstOffset, messageSetInfo.lastOffset) } http://git-wip-us.apache.org/repos/asf/kafka/blob/790a1504/core/src/main/scala/kafka/log/OffsetIndex.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index 43b3575..0d67242 100644 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -90,8 +90,9 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = /* the last offset in the index */ var lastOffset = readLastOffset() - info("Created index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d" - .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset)) + info("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d" + .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset, mmap.position)) + require(entries == 0 || lastOffset > this.baseOffset, "Corrupt index found, index file (%s) has non-zero size but last offset is %d.".format(file.getAbsolutePath, lastOffset)) /* the maximum number of entries this index can hold */ def maxEntries = mmap.limit / 8 @@ -177,15 +178,14 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = */ def append(offset: Long, position: Int) { this synchronized { - if(isFull) - throw new IllegalStateException("Attempt to append to a full index (size = " + size + ").") - if(size.get > 0 && offset <= lastOffset) - throw new IllegalArgumentException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d).".format(offset, entries, lastOffset)) + require(!isFull, "Attempt to append to a full index (size = " + size + ").") + require(size.get == 0 || offset > lastOffset, "Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d).".format(offset, entries, lastOffset)) debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName)) this.mmap.putInt((offset - baseOffset).toInt) this.mmap.putInt(position) this.size.incrementAndGet() this.lastOffset = offset + require(entries * 8 == mmap.position, entries + " entries but file position in index is " + mmap.position + ".") } } @@ -230,7 +230,11 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = * Trim this segment to fit just the valid entries, deleting all trailing unwritten bytes from * the file. */ - def trimToValidSize() = resize(entries * 8) + def trimToValidSize() { + this synchronized { + resize(entries * 8) + } + } /** * Reset the size of the memory map and the underneath file. This is used in two kinds of cases: (1) in @@ -245,7 +249,9 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = val roundedNewSize = roundToExactMultiple(newSize, 8) try { raf.setLength(roundedNewSize) + val position = this.mmap.position this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize) + this.mmap.position(position) } finally { Utils.swallow(raf.close()) } http://git-wip-us.apache.org/repos/asf/kafka/blob/790a1504/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala index 952a40b..051ebe3 100644 --- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala +++ b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala @@ -86,7 +86,7 @@ class OffsetIndexTest extends JUnitSuite { val offset = idx.baseOffset + i + 1 idx.append(offset, i) } - assertWriteFails("Append should fail on a full index", idx, idx.maxEntries + 1, classOf[IllegalStateException]) + assertWriteFails("Append should fail on a full index", idx, idx.maxEntries + 1, classOf[IllegalArgumentException]) } @Test(expected = classOf[IllegalArgumentException]) @@ -105,7 +105,9 @@ class OffsetIndexTest extends JUnitSuite { val idxRo = new OffsetIndex(file = idx.file, baseOffset = idx.baseOffset) assertEquals(first, idxRo.lookup(first.offset)) assertEquals(sec, idxRo.lookup(sec.offset)) - assertWriteFails("Append should fail on read-only index", idxRo, 53, classOf[IllegalStateException]) + assertEquals(sec.offset, idxRo.lastOffset) + assertEquals(2, idxRo.entries) + assertWriteFails("Append should fail on read-only index", idxRo, 53, classOf[IllegalArgumentException]) } @Test