Return-Path: X-Original-To: apmail-zookeeper-commits-archive@www.apache.org Delivered-To: apmail-zookeeper-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 61BF81010A for ; Sat, 4 May 2013 12:22:47 +0000 (UTC) Received: (qmail 86931 invoked by uid 500); 4 May 2013 12:22:47 -0000 Delivered-To: apmail-zookeeper-commits-archive@zookeeper.apache.org Received: (qmail 86770 invoked by uid 500); 4 May 2013 12:22:43 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 86731 invoked by uid 99); 4 May 2013 12:22:42 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 04 May 2013 12:22:42 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 04 May 2013 12:22:34 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 96E492388A9B; Sat, 4 May 2013 12:22:11 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1479085 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ Date: Sat, 04 May 2013 12:22:10 -0000 To: commits@zookeeper.apache.org From: ivank@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130504122211.96E492388A9B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: ivank Date: Sat May 4 12:22:10 2013 New Revision: 1479085 URL: http://svn.apache.org/r1479085 Log: BOOKKEEPER-564: Better checkpoint mechanism (sijie & ivank) Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CheckpointSource.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java Modified: zookeeper/bookkeeper/trunk/CHANGES.txt zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieAccessor.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java Modified: zookeeper/bookkeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1479085&r1=1479084&r2=1479085&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/CHANGES.txt (original) +++ zookeeper/bookkeeper/trunk/CHANGES.txt Sat May 4 12:22:10 2013 @@ -10,6 +10,8 @@ Trunk (unreleased changes) BUGFIXES: + BOOKKEEPER-564: Better checkpoint mechanism (sijie & ivank) + BOOKKEEPER-596: Ledgers are gc'ed by mistake in MSLedgerManagerFactory. (sijie & ivank) BOOKKEEPER-595: Crash of inprocess autorecovery daemon should not take down the bookie (ivank) Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1479085&r1=1479084&r2=1479085&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Sat May 4 12:22:10 2013 @@ -45,6 +45,7 @@ import org.apache.bookkeeper.meta.Ledger import org.apache.bookkeeper.bookie.BookieException; import org.apache.bookkeeper.bookie.GarbageCollectorThread.SafeEntryAdder; import org.apache.bookkeeper.bookie.Journal.JournalScanner; +import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; import org.apache.bookkeeper.conf.ServerConfiguration; @@ -239,39 +240,70 @@ public class Bookie extends Thread { } /** - * SyncThread is a background thread which flushes ledger index pages periodically. - * Also it takes responsibility of garbage collecting journal files. - * - *

- * Before flushing, SyncThread first records a log marker {journalId, journalPos} in memory, - * which indicates entries before this log marker would be persisted to ledger files. - * Then sync thread begins flushing ledger index pages to ledger index files, flush entry - * logger to ensure all entries persisted to entry loggers for future reads. - *

+ * SyncThread is a background thread which help checkpointing ledger storage + * when a checkpoint is requested. After a ledger storage is checkpointed, + * the journal files added before checkpoint will be garbage collected. *

- * After all data has been persisted to ledger index files and entry loggers, it is safe - * to persist the log marker to disk. If bookie failed after persist log mark, - * bookie is able to relay journal entries started from last log mark without losing - * any entries. + * After all data has been persisted to ledger index files and entry + * loggers, it is safe to complete a checkpoint by persisting the log marker + * to disk. If bookie failed after persist log mark, bookie is able to relay + * journal entries started from last log mark without losing any entries. *

*

- * Those journal files whose id are less than the log id in last log mark, could be - * removed safely after persisting last log mark. We provide a setting to let user keeping - * number of old journal files which may be used for manual recovery in critical disaster. + * Those journal files whose id are less than the log id in last log mark, + * could be removed safely after persisting last log mark. We provide a + * setting to let user keeping number of old journal files which may be used + * for manual recovery in critical disaster. *

*/ class SyncThread extends Thread { volatile boolean running = true; // flag to ensure sync thread will not be interrupted during flush final AtomicBoolean flushing = new AtomicBoolean(false); - // make flush interval as a parameter final int flushInterval; + public SyncThread(ServerConfiguration conf) { super("SyncThread"); flushInterval = conf.getFlushInterval(); LOG.debug("Flush Interval : {}", flushInterval); } + /** + * flush data up to given logMark and roll log if success + * @param checkpoint + */ + @VisibleForTesting + public void checkpoint(Checkpoint checkpoint) { + boolean flushFailed = false; + try { + if (running) { + checkpoint = ledgerStorage.checkpoint(checkpoint); + } else { + ledgerStorage.flush(); + } + } catch (NoWritableLedgerDirException e) { + LOG.error("No writeable ledger directories"); + flushFailed = true; + flushing.set(false); + transitionToReadOnlyMode(); + } catch (IOException e) { + LOG.error("Exception flushing Ledger", e); + flushFailed = true; + } + + // if flush failed, we should not roll last mark, otherwise we would + // have some ledgers are not flushed and their journal entries were lost + if (!flushFailed) { + try { + journal.checkpointComplete(checkpoint, running); + } catch (IOException e) { + flushing.set(false); + LOG.error("Marking checkpoint as complete failed", e); + transitionToReadOnlyMode(); + } + } + } + private Object suspensionLock = new Object(); private boolean suspended = false; @@ -299,61 +331,35 @@ public class Bookie extends Thread { @Override public void run() { try { - while (running) { + while(running) { synchronized (this) { try { wait(flushInterval); - if (!ledgerStorage.isFlushRequired()) { - continue; - } } catch (InterruptedException e) { Thread.currentThread().interrupt(); continue; } } + synchronized (suspensionLock) { while (suspended) { - suspensionLock.wait(); + try { + suspensionLock.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + continue; + } } } - // try to mark flushing flag to make sure it would not be interrupted - // by shutdown during flushing. otherwise it will receive - // ClosedByInterruptException which may cause index file & entry logger - // closed and corrupted. + + // try to mark flushing flag to check if interrupted if (!flushing.compareAndSet(false, true)) { // set flushing flag failed, means flushing is true now // indicates another thread wants to interrupt sync thread to exit break; } + checkpoint(journal.newCheckpoint()); - // journal mark log - journal.markLog(); - - boolean flushFailed = false; - try { - ledgerStorage.flush(); - } catch (NoWritableLedgerDirException e) { - flushFailed = true; - flushing.set(false); - transitionToReadOnlyMode(); - } catch (IOException e) { - LOG.error("Exception flushing Ledger", e); - flushFailed = true; - } - - // if flush failed, we should not roll last mark, otherwise we would - // have some ledgers are not flushed and their journal entries were lost - if (!flushFailed) { - try { - journal.rollLog(); - journal.gcJournals(); - } catch (NoWritableLedgerDirException e) { - flushing.set(false); - transitionToReadOnlyMode(); - } - } - - // clear flushing flag flushing.set(false); } } catch (Throwable t) { @@ -365,9 +371,10 @@ public class Bookie extends Thread { // shutdown sync thread void shutdown() throws InterruptedException { + // Wake up and finish sync thread running = false; + // make a checkpoint when shutdown if (flushing.compareAndSet(false, true)) { - // if setting flushing flag succeed, means syncThread is not flushing now // it is safe to interrupt itself now this.interrupt(); } @@ -532,12 +539,12 @@ public class Bookie extends Thread { LOG.info("instantiate ledger manager {}", ledgerManagerFactory.getClass().getName()); ledgerManager = ledgerManagerFactory.newLedgerManager(); syncThread = new SyncThread(conf); + // instantiate the journal + journal = new Journal(conf, ledgerDirsManager); ledgerStorage = new InterleavedLedgerStorage(conf, ledgerManager, - ledgerDirsManager, + ledgerDirsManager, journal, new BookieSafeEntryAdder()); handles = new HandleFactoryImpl(ledgerStorage); - // instantiate the journal - journal = new Journal(conf, ledgerDirsManager); // ZK ephemeral node for this Bookie. zkBookieRegPath = this.bookieRegistrationPath + getMyId(); Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java?rev=1479085&r1=1479084&r2=1479085&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java Sat May 4 12:22:10 2013 @@ -916,10 +916,10 @@ public class BookieShell implements Tool * Print last log mark */ protected void printLastLogMark() throws IOException { - LastLogMark lastLogMark = getJournal().getLastLogMark(); - System.out.println("LastLogMark: Journal Id - " + lastLogMark.getTxnLogId() + "(" - + Long.toHexString(lastLogMark.getTxnLogId()) + ".txn), Pos - " - + lastLogMark.getTxnLogPosition()); + LogMark lastLogMark = getJournal().getLastLogMark().getCurMark(); + System.out.println("LastLogMark: Journal Id - " + lastLogMark.getLogFileId() + "(" + + Long.toHexString(lastLogMark.getLogFileId()) + ".txn), Pos - " + + lastLogMark.getLogFileOffset()); } /** Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CheckpointSource.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CheckpointSource.java?rev=1479085&view=auto ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CheckpointSource.java (added) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CheckpointSource.java Sat May 4 12:22:10 2013 @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.bookie; + +import java.io.IOException; + +/** + * Interface to communicate checkpoint progress. + */ +public interface CheckpointSource { + + /** + * A checkpoint presented a time point. All entries added before this checkpoint are already persisted. + */ + public static interface Checkpoint extends Comparable { + + public static final Checkpoint MAX = new Checkpoint() { + + @Override + public int compareTo(Checkpoint o) { + if (o == MAX) { + return 0; + } + return 1; + } + + @Override + public boolean equals(Object o) { + return this == o; + } + + }; + + public static final Checkpoint MIN = new Checkpoint() { + @Override + public int compareTo(Checkpoint o) { + if (o == MIN) { + return 0; + } + return 1; + } + + @Override + public boolean equals(Object o) { + return this == o; + } + }; + } + + /** + * Request a new a checkpoint. + * + * @return checkpoint. + */ + public Checkpoint newCheckpoint(); + + /** + * Tell checkpoint source that the checkpoint is completed. + * If compact is true, the implementation could compact + * to reduce size of data containing old checkpoints. + * + * @param checkpoint + * The checkpoint that has been completed + * @param compact + * Flag to compact old checkpoints. + */ + public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IOException; +} Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java?rev=1479085&r1=1479084&r2=1479085&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java Sat May 4 12:22:10 2013 @@ -21,6 +21,8 @@ package org.apache.bookkeeper.bookie; +import static com.google.common.base.Charsets.UTF_8; + import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; @@ -36,19 +38,23 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.Map.Entry; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static com.google.common.base.Charsets.UTF_8; - import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.util.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class manages the writing of the bookkeeper entries. All the new @@ -65,12 +71,16 @@ public class EntryLogger { private AtomicBoolean shouldCreateNewEntryLog = new AtomicBoolean(false); private long logId; + private volatile long leastUnflushedLogId; /** * The maximum size of a entry logger file. */ final long logSizeLimit; + private List logChannelsToFlush; private volatile BufferedChannel logChannel; + private final EntryLogListener listener; + /** * The 1K block at the head of the entry logger file * that contains the fingerprint and (future) meta-data @@ -110,12 +120,29 @@ public class EntryLogger { } /** + * Entry Log Listener + */ + static interface EntryLogListener { + /** + * Rotate a new entry log to write. + */ + public void onRotateEntryLog(); + } + + /** * Create an EntryLogger that stores it's log files in the given * directories */ public EntryLogger(ServerConfiguration conf, LedgerDirsManager ledgerDirsManager) throws IOException { + this(conf, ledgerDirsManager, null); + } + + public EntryLogger(ServerConfiguration conf, + LedgerDirsManager ledgerDirsManager, EntryLogListener listener) + throws IOException { this.ledgerDirsManager = ledgerDirsManager; + this.listener = listener; // log size limit this.logSizeLimit = conf.getEntryLogSizeLimit(); @@ -138,7 +165,7 @@ public class EntryLogger { logId = lastLogId; } } - + this.leastUnflushedLogId = logId + 1; initialize(); } @@ -147,6 +174,16 @@ public class EntryLogger { */ private ConcurrentHashMap channels = new ConcurrentHashMap(); + /** + * Get the least unflushed log id. Garbage collector thread should not process + * unflushed entry log file. + * + * @return least unflushed log id. + */ + synchronized long getLeastUnflushedLogId() { + return leastUnflushedLogId; + } + synchronized long getCurrentLogId() { return logId; } @@ -187,14 +224,28 @@ public class EntryLogger { } /** + * Rolling a new log file to write. + */ + synchronized void rollLog() throws IOException { + createNewLog(); + } + + /** * Creates a new log file */ void createNewLog() throws IOException { - if (logChannel != null) { - logChannel.flush(true); + if (null != logChannel) { + if (null == logChannelsToFlush) { + logChannelsToFlush = new LinkedList(); + } + // flush the internal buffer back to filesystem but not sync disk + // so the readers could access the data from filesystem. + logChannel.flush(false); + logChannelsToFlush.add(logChannel); + if (null != listener) { + listener.onRotateEntryLog(); + } } - - // It would better not to overwrite existing entry log files String logFileName = null; do { logFileName = Long.toHexString(++logId) + ".log"; @@ -202,7 +253,7 @@ public class EntryLogger { File newLogFile = new File(dir, logFileName); if (newLogFile.exists()) { LOG.warn("Found existed entry log " + newLogFile - + " when trying to create it as a new log."); + + " when trying to create it as a new log."); logFileName = null; break; } @@ -324,21 +375,57 @@ public class EntryLogger { } } - synchronized void flush() throws IOException { + /** + * Flushes all rotated log channels. After log channels are flushed, + * move leastUnflushedLogId ptr to current logId. + */ + void checkpoint() throws IOException { + flushRotatedLogs(); + } + + void flushRotatedLogs() throws IOException { + List tmpChannels = null; + long newUnflushedLogId; + synchronized (this) { + tmpChannels = logChannelsToFlush; + logChannelsToFlush = null; + newUnflushedLogId = logId; + } + if (null == tmpChannels) { + return; + } + for (BufferedChannel channel : tmpChannels) { + channel.flush(true); + } + // move the leastUnflushedLogId ptr + leastUnflushedLogId = newUnflushedLogId; + } + + void flush() throws IOException { + flushRotatedLogs(); + flushCurrentLog(); + } + + synchronized void flushCurrentLog() throws IOException { if (logChannel != null) { logChannel.flush(true); } } - synchronized long addEntry(long ledger, ByteBuffer entry) throws IOException { - // Create new log if logSizeLimit reached or current disk is full - boolean createNewLog = shouldCreateNewEntryLog.get(); - if (createNewLog - || (logChannel.position() + entry.remaining() + 4 > logSizeLimit)) { - createNewLog(); - - // Reset the flag - if (createNewLog) { - shouldCreateNewEntryLog.set(false); + + long addEntry(long ledger, ByteBuffer entry) throws IOException { + return addEntry(ledger, entry, true); + } + + synchronized long addEntry(long ledger, ByteBuffer entry, boolean rollLog) throws IOException { + if (rollLog) { + // Create new log if logSizeLimit reached or current disk is full + boolean createNewLog = shouldCreateNewEntryLog.get(); + if (createNewLog || reachEntryLogLimit(entry.remaining() + 4)) { + createNewLog(); + // Reset the flag + if (createNewLog) { + shouldCreateNewEntryLog.set(false); + } } } ByteBuffer buff = ByteBuffer.allocate(4); @@ -347,11 +434,14 @@ public class EntryLogger { logChannel.write(buff); long pos = logChannel.position(); logChannel.write(entry); - //logChannel.flush(false); return (logId << 32L) | pos; } + synchronized boolean reachEntryLogLimit(long size) { + return logChannel.position() + size > logSizeLimit; + } + byte[] readEntry(long ledgerId, long entryId, long location) throws IOException, Bookie.NoEntryException { long entryLogId = location >> 32L; long pos = location & 0xffffffffL; Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java?rev=1479085&r1=1479084&r2=1479085&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java Sat May 4 12:22:10 2013 @@ -226,6 +226,9 @@ class FileInfo { private int readAbsolute(ByteBuffer bb, long start) throws IOException { checkOpen(false); + if (fc == null) { + return 0; + } int total = 0; while(bb.remaining() > 0) { int rc = fc.read(bb, start); @@ -284,47 +287,49 @@ class FileInfo { */ public synchronized void moveToNewLocation(File newFile, long size) throws IOException { checkOpen(false); - if (size > fc.size()) { - size = fc.size(); - } - File rlocFile = new File(newFile.getParentFile(), newFile.getName() + LedgerCacheImpl.RLOC); - if (!rlocFile.exists()) { - checkParents(rlocFile); - if (!rlocFile.createNewFile()) { - throw new IOException("Creating new cache index file " + rlocFile + " failed "); + if (fc != null) { + if (size > fc.size()) { + size = fc.size(); + } + File rlocFile = new File(newFile.getParentFile(), newFile.getName() + LedgerCacheImpl.RLOC); + if (!rlocFile.exists()) { + checkParents(rlocFile); + if (!rlocFile.createNewFile()) { + throw new IOException("Creating new cache index file " + rlocFile + " failed "); + } } - } - // copy contents from old.idx to new.idx.rloc - FileChannel newFc = new RandomAccessFile(rlocFile, "rw").getChannel(); - try { - long written = 0; - while (written < size) { - long count = fc.transferTo(written, size, newFc); - if (count <= 0) { + // copy contents from old.idx to new.idx.rloc + FileChannel newFc = new RandomAccessFile(rlocFile, "rw").getChannel(); + try { + long written = 0; + while (written < size) { + long count = fc.transferTo(written, size, newFc); + if (count <= 0) { + throw new IOException("Copying to new location " + rlocFile + " failed"); + } + written += count; + } + if (written <= 0 && size > 0) { throw new IOException("Copying to new location " + rlocFile + " failed"); } - written += count; + } finally { + newFc.force(true); + newFc.close(); } - if (written <= 0 && size > 0) { - throw new IOException("Copying to new location " + rlocFile + " failed"); + // delete old.idx + fc.close(); + if (!delete()) { + LOG.error("Failed to delete the previous index file " + lf); + throw new IOException("Failed to delete the previous index file " + lf); } - } finally { - newFc.force(true); - newFc.close(); - } - // delete old.idx - fc.close(); - if (!delete()) { - LOG.error("Failed to delete the previous index file " + lf); - throw new IOException("Failed to delete the previous index file " + lf); - } - - // rename new.idx.rloc to new.idx - if (!rlocFile.renameTo(newFile)) { - LOG.error("Failed to rename " + rlocFile + " to " + newFile); - throw new IOException("Failed to rename " + rlocFile + " to " + newFile); + + // rename new.idx.rloc to new.idx + if (!rlocFile.renameTo(newFile)) { + LOG.error("Failed to rename " + rlocFile + " to " + newFile); + throw new IOException("Failed to rename " + rlocFile + " to " + newFile); + } + fc = new RandomAccessFile(newFile, mode).getChannel(); } - fc = new RandomAccessFile(newFile, mode).getChannel(); lf = newFile; } Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java?rev=1479085&r1=1479084&r2=1479085&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java Sat May 4 12:22:10 2013 @@ -528,7 +528,7 @@ public class GarbageCollectorThread exte // Extract it for every entry log except for the current one. // Entry Log ID's are just a long value that starts at 0 and increments // by 1 when the log fills up and we roll to a new one. - long curLogId = entryLogger.getCurrentLogId(); + long curLogId = entryLogger.getLeastUnflushedLogId(); boolean hasExceptionWhenScan = false; for (long entryLogId = scannedLogId; entryLogId < curLogId; entryLogId++) { // Comb the current entry log file if it has not already been extracted. Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java?rev=1479085&r1=1479084&r2=1479085&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java Sat May 4 12:22:10 2013 @@ -21,16 +21,16 @@ package org.apache.bookkeeper.bookie; -import java.nio.ByteBuffer; import java.io.IOException; +import java.nio.ByteBuffer; -import org.apache.bookkeeper.jmx.BKMBeanInfo; +import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; +import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener; import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.jmx.BKMBeanInfo; import org.apache.bookkeeper.meta.LedgerManager; import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.util.SnapshotMap; -import org.apache.zookeeper.ZooKeeper; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,11 +39,34 @@ import org.slf4j.LoggerFactory; * This ledger storage implementation stores all entries in a single * file and maintains an index file for each ledger. */ -class InterleavedLedgerStorage implements LedgerStorage { +class InterleavedLedgerStorage implements LedgerStorage, EntryLogListener { final static Logger LOG = LoggerFactory.getLogger(InterleavedLedgerStorage.class); + // Hold the last checkpoint + static class CheckpointHolder { + Checkpoint lastCheckpoint = Checkpoint.MAX; + + synchronized void setNextCheckpoint(Checkpoint cp) { + if (Checkpoint.MAX.equals(lastCheckpoint) || lastCheckpoint.compareTo(cp) < 0) { + lastCheckpoint = cp; + } + } + + synchronized void clearLastCheckpoint(Checkpoint done) { + if (0 == lastCheckpoint.compareTo(done)) { + lastCheckpoint = Checkpoint.MAX; + } + } + + synchronized Checkpoint getLastCheckpoint() { + return lastCheckpoint; + } + } + EntryLogger entryLogger; LedgerCache ledgerCache; + private final CheckpointSource checkpointSource; + private final CheckpointHolder checkpointHolder = new CheckpointHolder(); // A sorted map to stored all active ledger ids protected final SnapshotMap activeLedgers; @@ -56,12 +79,12 @@ class InterleavedLedgerStorage implement // this indicates that a write has happened since the last flush private volatile boolean somethingWritten = false; - InterleavedLedgerStorage(ServerConfiguration conf, - LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, - GarbageCollectorThread.SafeEntryAdder safeEntryAdder) - throws IOException { + InterleavedLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager, + LedgerDirsManager ledgerDirsManager, CheckpointSource checkpointSource, + GarbageCollectorThread.SafeEntryAdder safeEntryAdder) throws IOException { activeLedgers = new SnapshotMap(); - entryLogger = new EntryLogger(conf, ledgerDirsManager); + this.checkpointSource = checkpointSource; + entryLogger = new EntryLogger(conf, ledgerDirsManager, this); ledgerCache = new LedgerCacheImpl(conf, activeLedgers, ledgerDirsManager); gcThread = new GarbageCollectorThread(conf, ledgerCache, entryLogger, activeLedgers, safeEntryAdder, ledgerManager); @@ -115,19 +138,8 @@ class InterleavedLedgerStorage implement long ledgerId = entry.getLong(); long entryId = entry.getLong(); entry.rewind(); - - /* - * Log the entry - */ - long pos = entryLogger.addEntry(ledgerId, entry); - - - /* - * Set offset of entry id to be the current ledger position - */ - ledgerCache.putEntryOffset(ledgerId, entryId, pos); - somethingWritten = true; + processEntry(ledgerId, entryId, entry); return entryId; } @@ -149,29 +161,29 @@ class InterleavedLedgerStorage implement return ByteBuffer.wrap(entryLogger.readEntry(ledgerId, entryId, offset)); } - @Override - public boolean isFlushRequired() { - return somethingWritten; - }; - - @Override - public void flush() throws IOException { + private void flushOrCheckpoint(boolean isCheckpointFlush) + throws IOException { - if (!somethingWritten) { - return; - } - somethingWritten = false; boolean flushFailed = false; - try { ledgerCache.flushLedger(true); + } catch (LedgerDirsManager.NoWritableLedgerDirException e) { + throw e; } catch (IOException ioe) { LOG.error("Exception flushing Ledger cache", ioe); flushFailed = true; } try { - entryLogger.flush(); + // if it is just a checkpoint flush, we just flush rotated entry log files + // in entry logger. + if (isCheckpointFlush) { + entryLogger.checkpoint(); + } else { + entryLogger.flush(); + } + } catch (LedgerDirsManager.NoWritableLedgerDirException e) { + throw e; } catch (IOException ioe) { LOG.error("Exception flushing Ledger", ioe); flushFailed = true; @@ -182,7 +194,66 @@ class InterleavedLedgerStorage implement } @Override + public Checkpoint checkpoint(Checkpoint checkpoint) throws IOException { + Checkpoint lastCheckpoint = checkpointHolder.getLastCheckpoint(); + // if checkpoint is less than last checkpoint, we don't need to do checkpoint again. + if (lastCheckpoint.compareTo(checkpoint) > 0) { + return lastCheckpoint; + } + // we don't need to check somethingwritten since checkpoint + // is scheduled when rotate an entry logger file. and we could + // not set somethingWritten to false after checkpoint, since + // current entry logger file isn't flushed yet. + flushOrCheckpoint(true); + // after the ledger storage finished checkpointing, try to clear the done checkpoint + checkpointHolder.clearLastCheckpoint(lastCheckpoint); + return lastCheckpoint; + } + + @Override + synchronized public void flush() throws IOException { + if (!somethingWritten) { + return; + } + somethingWritten = false; + flushOrCheckpoint(false); + } + + @Override public BKMBeanInfo getJMXBean() { return ledgerCache.getJMXBean(); } + + protected void processEntry(long ledgerId, long entryId, ByteBuffer entry) throws IOException { + processEntry(ledgerId, entryId, entry, true); + } + + synchronized protected void processEntry(long ledgerId, long entryId, ByteBuffer entry, boolean rollLog) + throws IOException { + /* + * Touch dirty flag + */ + somethingWritten = true; + + /* + * Log the entry + */ + long pos = entryLogger.addEntry(ledgerId, entry, rollLog); + + /* + * Set offset of entry id to be the current ledger position + */ + ledgerCache.putEntryOffset(ledgerId, entryId, pos); + } + + @Override + public void onRotateEntryLog() { + // for interleaved ledger storage, we request a checkpoint when rotating a entry log file. + // the checkpoint represent the point that all the entries added before this point are already + // in ledger storage and ready to be synced to disk. + // TODO: we could consider remove checkpointSource and checkpointSouce#newCheckpoint + // later if we provide kind of LSN (Log/Journal Squeuence Number) + // mechanism when adding entry. + checkpointHolder.setNextCheckpoint(checkpointSource.newCheckpoint()); + } } Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java?rev=1479085&r1=1479084&r2=1479085&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java Sat May 4 12:22:10 2013 @@ -32,6 +32,7 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; +import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; @@ -43,7 +44,7 @@ import org.slf4j.LoggerFactory; /** * Provide journal related management. */ -class Journal extends Thread { +class Journal extends Thread implements CheckpointSource { static Logger LOG = LoggerFactory.getLogger(Journal.class); @@ -84,43 +85,69 @@ class Journal extends Thread { } /** + * A wrapper over log mark to provide a checkpoint for users of journal + * to do checkpointing. + */ + private static class LogMarkCheckpoint implements Checkpoint { + final LastLogMark mark; + + public LogMarkCheckpoint(LastLogMark checkpoint) { + this.mark = checkpoint; + } + + @Override + public int compareTo(Checkpoint o) { + if (o == Checkpoint.MAX) { + return -1; + } else if (o == Checkpoint.MIN) { + return 1; + } + return mark.getCurMark().compare(((LogMarkCheckpoint)o).mark.getCurMark()); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof LogMarkCheckpoint)) { + return false; + } + return 0 == compareTo((LogMarkCheckpoint)o); + } + + @Override + public int hashCode() { + return mark.hashCode(); + } + } + + /** * Last Log Mark */ class LastLogMark { - private long txnLogId; - private long txnLogPosition; - private LastLogMark lastMark; + private LogMark curMark; LastLogMark(long logId, long logPosition) { - this.txnLogId = logId; - this.txnLogPosition = logPosition; - } - synchronized void setLastLogMark(long logId, long logPosition) { - txnLogId = logId; - txnLogPosition = logPosition; - } - synchronized void markLog() { - lastMark = new LastLogMark(txnLogId, txnLogPosition); + this.curMark = new LogMark(logId, logPosition); } - synchronized LastLogMark getLastMark() { - return lastMark; + synchronized void setCurLogMark(long logId, long logPosition) { + curMark.setLogMark(logId, logPosition); } - synchronized long getTxnLogId() { - return txnLogId; + + synchronized LastLogMark markLog() { + return new LastLogMark(curMark.getLogFileId(), curMark.getLogFileOffset()); } - synchronized long getTxnLogPosition() { - return txnLogPosition; + + synchronized LogMark getCurMark() { + return curMark; } - synchronized void rollLog() throws NoWritableLedgerDirException { + synchronized void rollLog(LastLogMark lastMark) throws NoWritableLedgerDirException { byte buff[] = new byte[16]; ByteBuffer bb = ByteBuffer.wrap(buff); // we should record marked in markLog // which is safe since records before lastMark have been // persisted to disk (both index & entry logger) - bb.putLong(lastMark.getTxnLogId()); - bb.putLong(lastMark.getTxnLogPosition()); - LOG.debug("RollLog to persist last marked log : {}", lastMark); + lastMark.getCurMark().writeLogMark(bb); + LOG.debug("RollLog to persist last marked log : {}", lastMark.getCurMark()); List writableLedgerDirs = ledgerDirsManager .getWritableLedgerDirs(); for (File dir : writableLedgerDirs) { @@ -151,6 +178,7 @@ class Journal extends Thread { synchronized void readLog() { byte buff[] = new byte[16]; ByteBuffer bb = ByteBuffer.wrap(buff); + LogMark mark = new LogMark(); for(File dir: ledgerDirsManager.getAllLedgerDirs()) { File file = new File(dir, "lastMark"); try { @@ -165,38 +193,31 @@ class Journal extends Thread { fis.close(); } bb.clear(); - long i = bb.getLong(); - long p = bb.getLong(); - if (i > txnLogId) { - txnLogId = i; - if(p > txnLogPosition) { - txnLogPosition = p; - } + mark.readLogMark(bb); + if (curMark.compare(mark) < 0) { + curMark.setLogMark(mark.getLogFileId(), mark.logFileOffset); } } catch (IOException e) { LOG.error("Problems reading from " + file + " (this is okay if it is the first time starting this bookie"); } } } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - - sb.append("LastMark: logId - ").append(txnLogId) - .append(" , position - ").append(txnLogPosition); - - return sb.toString(); - } } /** * Filter to return list of journals for rolling */ - private class JournalRollingFilter implements JournalIdFilter { + private static class JournalRollingFilter implements JournalIdFilter { + + final LastLogMark lastMark; + + JournalRollingFilter(LastLogMark lastMark) { + this.lastMark = lastMark; + } + @Override public boolean accept(long journalId) { - if (journalId < lastLogMark.getLastMark().getTxnLogId()) { + if (journalId < lastMark.getCurMark().getLogFileId()) { return true; } else { return false; @@ -273,7 +294,7 @@ class Journal extends Thread { // read last log mark lastLogMark.readLog(); - LOG.debug("Last Log Mark : {}", lastLogMark); + LOG.debug("Last Log Mark : {}", lastLogMark.getCurMark()); } LastLogMark getLastLogMark() { @@ -281,64 +302,45 @@ class Journal extends Thread { } /** - * Records a LastLogMark in memory. - * - *

- * The LastLogMark contains two parts: first one is txnLogId - * (file id of a journal) and the second one is txnLogPos (offset in - * a journal). The LastLogMark indicates that those entries before - * it have been persisted to both index and entry log files. - *

- * - *

- * This method is called before flushing entry log files and ledger cache. - *

+ * Application tried to schedule a checkpoint. After all the txns added + * before checkpoint are persisted, a checkpoint will be returned + * to application. Application could use checkpoint to do its logic. */ - public void markLog() { - lastLogMark.markLog(); + @Override + public Checkpoint newCheckpoint() { + return new LogMarkCheckpoint(lastLogMark.markLog()); } /** - * Persists the LastLogMark marked by #markLog() to disk. + * Telling journal a checkpoint is finished. * - *

- * This action means entries added before LastLogMark whose entry data - * and index pages were already persisted to disk. It is the time to safely - * remove journal files created earlier than LastLogMark.txnLogId. - *

- *

- * If the bookie has crashed before persisting LastLogMark to disk, - * it still has journal files contains entries for which index pages may not - * have been persisted. Consequently, when the bookie restarts, it inspects - * journal files to restore those entries; data isn't lost. - *

- *

- * This method is called after flushing entry log files and ledger cache successfully, which is to ensure LastLogMark is pesisted. - *

- * @see #markLog() - */ - public void rollLog() throws NoWritableLedgerDirException { - lastLogMark.rollLog(); - } - - /** - * Garbage collect older journals + * @throws IOException */ - public void gcJournals() { - // list the journals that have been marked - List logs = listJournalIds(journalDirectory, new JournalRollingFilter()); - // keep MAX_BACKUP_JOURNALS journal files before marked journal - if (logs.size() >= maxBackupJournals) { - int maxIdx = logs.size() - maxBackupJournals; - for (int i=0; i logs = listJournalIds(journalDirectory, new JournalRollingFilter(mark)); + // keep MAX_BACKUP_JOURNALS journal files before marked journal + if (logs.size() >= maxBackupJournals) { + int maxIdx = logs.size() - maxBackupJournals; + for (int i=0; i logs = listJournalIds(journalDirectory, new JournalIdFilter() { @Override public boolean accept(long journalId) { - if (journalId < markedLogId) { + if (journalId < markedLog.getLogFileId()) { return false; } return true; @@ -419,9 +421,9 @@ class Journal extends Thread { }); // last log mark may be missed due to no sync up before // validate filtered log ids only when we have markedLogId - if (markedLogId > 0) { - if (logs.size() == 0 || logs.get(0) != markedLogId) { - throw new IOException("Recovery log " + markedLogId + " is missing"); + if (markedLog.getLogFileId() > 0) { + if (logs.size() == 0 || logs.get(0) != markedLog.getLogFileId()) { + throw new IOException("Recovery log " + markedLog.getLogFileId() + " is missing"); } } LOG.debug("Try to relay journal logs : {}", logs); @@ -430,8 +432,8 @@ class Journal extends Thread { // system calls done. for(Long id: logs) { long logPosition = 0L; - if(id == markedLogId) { - logPosition = lastLogMark.getTxnLogPosition(); + if(id == markedLog.getLogFileId()) { + logPosition = markedLog.getLogFileOffset(); } scanJournal(id, logPosition, scanner); } @@ -501,7 +503,7 @@ class Journal extends Thread { //logFile.force(false); bc.flush(true); lastFlushPosition = bc.position(); - lastLogMark.setLastLogMark(logId, lastFlushPosition); + lastLogMark.setCurLogMark(logId, lastFlushPosition); for (QueueEntry e : toFlush) { e.cb.writeComplete(BookieException.Code.OK, e.ledgerId, e.entryId, null, e.ctx); Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java?rev=1479085&r1=1479084&r2=1479085&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java Sat May 4 12:22:10 2013 @@ -24,6 +24,7 @@ package org.apache.bookkeeper.bookie; import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; import org.apache.bookkeeper.jmx.BKMBeanInfo; /** @@ -91,11 +92,6 @@ interface LedgerStorage { ByteBuffer getEntry(long ledgerId, long entryId) throws IOException; /** - * Whether there is data in the storage which needs to be flushed - */ - boolean isFlushRequired(); - - /** * Flushes all data in the storage. Once this is called, * add data written to the LedgerStorage up until this point * has been persisted to perminant storage @@ -103,6 +99,19 @@ interface LedgerStorage { void flush() throws IOException; /** + * Ask the ledger storage to sync data until the given checkpoint. + * The ledger storage implementation do checkpoint and return the real checkpoint + * that it finished. The returned the checkpoint indicates that all entries added + * before that point already persist. + * + * @param checkpoint + * Check Point that {@link Checkpointer} proposed. + * @throws IOException + * @return the checkpoint that the ledger storage finished. + */ + Checkpoint checkpoint(Checkpoint checkpoint) throws IOException; + + /** * Get the JMX management bean for this LedgerStorage */ BKMBeanInfo getJMXBean(); Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java?rev=1479085&view=auto ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java (added) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LogMark.java Sat May 4 12:22:10 2013 @@ -0,0 +1,83 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.bookie; + +import java.nio.ByteBuffer; + +/** + * Journal stream position + */ +class LogMark { + long logFileId; + long logFileOffset; + + public LogMark() { + setLogMark(0, 0); + } + + public LogMark(LogMark other) { + setLogMark(other.logFileId, other.logFileOffset); + } + + public LogMark(long logFileId, long logFileOffset) { + setLogMark(logFileId, logFileOffset); + } + + public long getLogFileId() { + return logFileId; + } + + public long getLogFileOffset() { + return logFileOffset; + } + + public void readLogMark(ByteBuffer bb) { + logFileId = bb.getLong(); + logFileOffset = bb.getLong(); + } + + public void writeLogMark(ByteBuffer bb) { + bb.putLong(logFileId); + bb.putLong(logFileOffset); + } + + public void setLogMark(long logFileId, long logFileOffset) { + this.logFileId = logFileId; + this.logFileOffset = logFileOffset; + } + + public int compare(LogMark other) { + long ret = this.logFileId - other.logFileId; + if (ret == 0) { + ret = this.logFileOffset - other.logFileOffset; + } + return (ret < 0)? -1 : ((ret > 0)? 1 : 0); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + + sb.append("LogMark: logFileId - ").append(logFileId) + .append(" , logFileOffset - ").append(logFileOffset); + + return sb.toString(); + } +} Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieAccessor.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieAccessor.java?rev=1479085&r1=1479084&r2=1479085&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieAccessor.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieAccessor.java Sat May 4 12:22:10 2013 @@ -22,6 +22,8 @@ package org.apache.bookkeeper.bookie; import java.io.IOException; +import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; + /** * Accessor class to avoid making Bookie internals public */ @@ -30,6 +32,8 @@ public class BookieAccessor { * Force a bookie to flush its ledger storage */ public static void forceFlush(Bookie b) throws IOException { + Checkpoint cp = b.journal.newCheckpoint(); b.ledgerStorage.flush(); + b.journal.checkpointComplete(cp, true); } -} \ No newline at end of file +} Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java?rev=1479085&r1=1479084&r2=1479085&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java Sat May 4 12:22:10 2013 @@ -276,6 +276,8 @@ public class CompactionTest extends Book bkc.deleteLedger(lhs[1].getId()); bkc.deleteLedger(lhs[2].getId()); LOG.info("Finished deleting the ledgers contains most entries."); + // restart bookies again to roll entry log files. + restartBookies(); Thread.sleep(baseConf.getMajorCompactionInterval() * 1000 + baseConf.getGcWaitTime()); Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java?rev=1479085&r1=1479084&r2=1479085&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java Sat May 4 12:22:10 2013 @@ -96,6 +96,8 @@ public class LedgerDeleteTest extends Mu public void testLedgerDelete() throws Exception { // Write enough ledger entries so that we roll over the initial entryLog (0.log) LedgerHandle[] lhs = writeLedgerEntries(3, 1024, 1024); + // restart bookies to force rolling entry log files + restartBookies(); // Delete all of these ledgers from the BookKeeper client for (LedgerHandle lh : lhs) { @@ -136,7 +138,7 @@ public class LedgerDeleteTest extends Mu bkc.deleteLedger(lh.getId()); } LOG.info("Finished deleting all ledgers so waiting for the GC thread to clean up the entryLogs"); - Thread.sleep(2000); + Thread.sleep(2 * baseConf.getGcWaitTime()); /* * Verify that the first two entry logs ([0,1].log) have been deleted