Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 53710200AE3 for ; Wed, 4 May 2016 15:56:06 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 51D421609FC; Wed, 4 May 2016 13:56:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4B39F1609A1 for ; Wed, 4 May 2016 15:56:05 +0200 (CEST) Received: (qmail 87186 invoked by uid 500); 4 May 2016 13:56:04 -0000 Mailing-List: contact commits-help@bookkeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: bookkeeper-dev@bookkeeper.apache.org Delivered-To: mailing list commits@bookkeeper.apache.org Received: (qmail 87176 invoked by uid 99); 4 May 2016 13:56:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 May 2016 13:56:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 44814DFA3F; Wed, 4 May 2016 13:56:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: mmerli@apache.org To: commits@bookkeeper.apache.org Message-Id: <9ea04fb2cf9a4d0f983fdc498e17ab56@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: bookkeeper git commit: BOOKKEEPER-926: Compacted entries are not properly synced before updating index Date: Wed, 4 May 2016 13:56:04 +0000 (UTC) archived-at: Wed, 04 May 2016 13:56:06 -0000 Repository: bookkeeper Updated Branches: refs/heads/master f8e0331f1 -> d32010f5f BOOKKEEPER-926: Compacted entries are not properly synced before updating index …ting index Author: Matteo Merli Reviewers: Guo Sijie Closes #41 from merlimat/bk-926 Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/d32010f5 Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/d32010f5 Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/d32010f5 Branch: refs/heads/master Commit: d32010f5fcc6a040a56dc8b983cc14d107cff2df Parents: f8e0331 Author: Matteo Merli Authored: Wed May 4 06:55:46 2016 -0700 Committer: Matteo Merli Committed: Wed May 4 06:55:46 2016 -0700 ---------------------------------------------------------------------- .../bookie/GarbageCollectorThread.java | 96 ++++++-------------- .../bookkeeper/bookie/CompactionTest.java | 63 ++++++++++++- 2 files changed, 91 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d32010f5/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java index 2821ec8..e5ee8d7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java @@ -126,7 +126,7 @@ public class GarbageCollectorThread extends BookieThread { /** * A scanner wrapper to check whether a ledger is alive in an entry log file */ - class CompactionScannerFactory implements EntryLogger.EntryLogListener { + class CompactionScannerFactory { List offsets = new ArrayList(); EntryLogScanner newScanner(final EntryLogMetadata meta) { @@ -141,66 +141,38 @@ public class GarbageCollectorThread extends BookieThread { } @Override - public void process(final long ledgerId, long offset, ByteBuffer entry) - throws IOException { + public void process(final long ledgerId, long offset, ByteBuffer entry) throws IOException { throttler.acquire(entry.remaining()); - synchronized (CompactionScannerFactory.this) { - if (offsets.size() > maxOutstandingRequests) { - waitEntrylogFlushed(); - } - entry.getLong(); // discard ledger id, we already have it - long entryId = entry.getLong(); - entry.rewind(); - - long newoffset = entryLogger.addEntry(ledgerId, entry); - offsets.add(new EntryLocation(ledgerId, entryId, newoffset)); + + if (offsets.size() > maxOutstandingRequests) { + flush(); } + entry.getLong(); // discard ledger id, we already have it + long entryId = entry.getLong(); + entry.rewind(); + + long newoffset = entryLogger.addEntry(ledgerId, entry); + offsets.add(new EntryLocation(ledgerId, entryId, newoffset)); + } }; } - final Object flushLock = new Object(); - - @Override - public void onRotateEntryLog() { - synchronized (flushLock) { - flushLock.notifyAll(); + void flush() throws IOException { + if (offsets.isEmpty()) { + LOG.debug("Skipping entry log flushing, as there are no offset!"); + return; } - } - synchronized private void waitEntrylogFlushed() throws IOException { + // Before updating the index, we want to wait until all the compacted entries are flushed into the + // entryLog try { - if (offsets.size() <= 0) { - LOG.debug("Skipping entry log flushing, as there is no offset!"); - return; - } - - EntryLocation lastOffset = offsets.get(offsets.size()-1); - long lastOffsetLogId = EntryLogger.logIdForOffset(lastOffset.location); - while (lastOffsetLogId < entryLogger.getLeastUnflushedLogId() && running) { - synchronized (flushLock) { - flushLock.wait(1000); - } + entryLogger.flush(); - lastOffset = offsets.get(offsets.size()-1); - lastOffsetLogId = EntryLogger.logIdForOffset(lastOffset.location); - } - if (lastOffsetLogId >= entryLogger.getLeastUnflushedLogId() && !running) { - throw new IOException("Shutdown before flushed"); - } - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupted waiting for flush", ie); + ledgerStorage.updateEntriesLocations(offsets); + } finally { + offsets.clear(); } - - ledgerStorage.updateEntriesLocations(offsets); - offsets.clear(); - } - - synchronized void flush() throws IOException { - waitEntrylogFlushed(); - - ledgerStorage.flushEntriesLocationsIndex(); } } @@ -227,7 +199,6 @@ public class GarbageCollectorThread extends BookieThread { this.compactionRateByEntries = conf.getCompactionRateByEntries(); this.compactionRateByBytes = conf.getCompactionRateByBytes(); this.scannerFactory = new CompactionScannerFactory(); - entryLogger.addListener(this.scannerFactory); this.garbageCleaner = new GarbageCollector.GarbageCleaner() { @Override @@ -456,7 +427,6 @@ public class GarbageCollectorThread extends BookieThread { List logsToCompact = new ArrayList(); logsToCompact.addAll(entryLogMetaMap.values()); Collections.sort(logsToCompact, sizeComparator); - List toRemove = new ArrayList(); for (EntryLogMetadata meta : logsToCompact) { if (meta.getUsage() >= threshold) { @@ -464,11 +434,15 @@ public class GarbageCollectorThread extends BookieThread { } if (LOG.isDebugEnabled()) { - LOG.debug("Compacting entry log {} below threshold {}.", meta.getEntryLogId(), threshold); + LOG.debug("Compacting entry log {} below threshold {}", meta.getEntryLogId(), threshold); } try { compactEntryLog(scannerFactory, meta); - toRemove.add(meta.getEntryLogId()); + scannerFactory.flush(); + + LOG.info("Removing entry log {} after compaction", meta.getEntryLogId()); + removeEntryLog(meta.getEntryLogId()); + } catch (LedgerDirsManager.NoWritableLedgerDirException nwlde) { LOG.warn("No writable ledger directory available, aborting compaction", nwlde); break; @@ -483,18 +457,6 @@ public class GarbageCollectorThread extends BookieThread { return; } } - try { - // compaction finished, flush any outstanding offsets - scannerFactory.flush(); - } catch (IOException ioe) { - LOG.error("Cannot flush compacted entries, skip removal", ioe); - return; - } - - // offsets have been flushed, its now safe to remove the old entrylogs - for (Long l : toRemove) { - removeEntryLog(l); - } } /** @@ -545,7 +507,7 @@ public class GarbageCollectorThread extends BookieThread { return; } - LOG.info("Compacting entry log : {}", entryLogMeta.getEntryLogId()); + LOG.info("Compacting entry log : {} - Usage: {} %", entryLogMeta.getEntryLogId(), entryLogMeta.getUsage()); try { entryLogger.scanEntryLog(entryLogMeta.getEntryLogId(), http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/d32010f5/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java index 4f3bb87..5d384ba 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java @@ -33,6 +33,8 @@ import java.util.Arrays; import java.util.Collection; import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner; +import org.apache.bookkeeper.bookie.GarbageCollectorThread.CompactionScannerFactory; import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.LedgerMetadata; @@ -49,7 +51,7 @@ import org.apache.bookkeeper.util.MathUtils; import org.apache.bookkeeper.util.TestUtils; import org.apache.bookkeeper.versioning.Version; import org.apache.zookeeper.AsyncCallback; - +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -114,6 +116,7 @@ public class CompactionTest extends BookKeeperClusterTestCase { baseConf.setEntryLogSizeLimit(numEntries * ENTRY_SIZE); // Disable skip list for compaction baseConf.setGcWaitTime(gcWaitTime); + baseConf.setFlushInterval(100); baseConf.setMinorCompactionThreshold(minorCompactionThreshold); baseConf.setMajorCompactionThreshold(majorCompactionThreshold); baseConf.setMinorCompactionInterval(minorCompactionInterval); @@ -631,4 +634,62 @@ public class CompactionTest extends BookKeeperClusterTestCase { storage.gcThread.resumeMinorGC(); storage.gcThread.resumeMajorGC(); } + + @Test(timeout = 60000) + public void testCompactionWithEntryLogRollover() throws Exception { + // Disable bookie gc during this test + baseConf.setGcWaitTime(60000); + baseConf.setMinorCompactionInterval(0); + baseConf.setMajorCompactionInterval(0); + restartBookies(); + + // prepare data + LedgerHandle[] lhs = prepareData(3, false); + + for (LedgerHandle lh : lhs) { + lh.close(); + } + + // remove ledger2 and ledger3 + bkc.deleteLedger(lhs[1].getId()); + bkc.deleteLedger(lhs[2].getId()); + LOG.info("Finished deleting the ledgers contains most entries."); + + InterleavedLedgerStorage ledgerStorage = (InterleavedLedgerStorage) bs.get(0).getBookie().ledgerStorage; + GarbageCollectorThread garbageCollectorThread = ledgerStorage.gcThread; + CompactionScannerFactory compactionScannerFactory = garbageCollectorThread.scannerFactory; + long entryLogId = 0; + EntryLogger entryLogger = ledgerStorage.entryLogger; + + LOG.info("Before compaction -- Least unflushed log id: {}", entryLogger.getLeastUnflushedLogId()); + + // Compact entryLog 0 + EntryLogScanner scanner = compactionScannerFactory.newScanner(entryLogger.getEntryLogMetadata(entryLogId)); + + entryLogger.scanEntryLog(entryLogId, scanner); + + long entryLogIdAfterCompaction = entryLogger.getLeastUnflushedLogId(); + LOG.info("After compaction -- Least unflushed log id: {}", entryLogIdAfterCompaction); + + // Add more entries to trigger entrylog roll over + LedgerHandle[] lhs2 = prepareData(3, false); + + for (LedgerHandle lh : lhs2) { + lh.close(); + } + + // Wait for entry logger to move forward + while (entryLogger.getLeastUnflushedLogId() <= entryLogIdAfterCompaction) { + Thread.sleep(100); + } + + long entryLogIdBeforeFlushing = entryLogger.getLeastUnflushedLogId(); + LOG.info("Added more data -- Least unflushed log id: {}", entryLogIdBeforeFlushing); + + Assert.assertTrue(entryLogIdAfterCompaction < entryLogIdBeforeFlushing); + + // Wait for entries to be flushed on entry logs and update index + // This operation should succeed even if the entry log rolls over after the last entry was compacted + compactionScannerFactory.flush(); + } }