zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iv...@apache.org
Subject svn commit: r1241922 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/test/java/org/apache/bookkeeper/client/
Date Wed, 08 Feb 2012 14:54:30 GMT
Author: ivank
Date: Wed Feb  8 14:54:29 2012
New Revision: 1241922

URL: http://svn.apache.org/viewvc?rev=1241922&view=rev
Log:
BOOKKEEPER-165: Add versioning support for journal files (ivank)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1241922&r1=1241921&r2=1241922&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed Feb  8 14:54:29 2012
@@ -47,6 +47,8 @@ Trunk (unreleased changes)
 
 	BOOKKEEPER-157:	For small packets, increasing number of bookies actually degrades performance.
(ivank via fpj)
 
+        BOOKKEEPER-165: Add versioning support for journal files (ivank)
+
       hedwig-server/
 
         BOOKKEEPER-77: Add a console client for hedwig (Sijie Guo via ivank)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1241922&r1=1241921&r2=1241922&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
Wed Feb  8 14:54:29 2012
@@ -302,12 +302,12 @@ public class Bookie extends Thread {
         ByteBuffer lenBuff = ByteBuffer.allocate(4);
         ByteBuffer recBuff = ByteBuffer.allocate(64*1024);
         for(Long id: logs) {
-            FileChannel recLog ;
+            JournalChannel recLog;
             if(id == markedLogId) {
-              long markedLogPosition = lastLogMark.txnLogPosition;
-              recLog = openChannel(id, markedLogPosition);
+                long markedLogPosition = lastLogMark.txnLogPosition;
+                recLog = new JournalChannel(journalDirectory, id, markedLogPosition);
             } else {
-              recLog = openChannel(id);
+                recLog = new JournalChannel(journalDirectory, id);
             }
 
             while(true) {
@@ -344,6 +344,7 @@ public class Bookie extends Thread {
                     putHandle(handle);
                 }
             }
+            recLog.close();
         }
         // pass zookeeper instance here
         // since GarbageCollector thread should only start after journal
@@ -602,7 +603,7 @@ public class Bookie extends Thread {
         }
     }
 
-    private static int fullRead(FileChannel fc, ByteBuffer bb) throws IOException {
+    private static int fullRead(JournalChannel fc, ByteBuffer bb) throws IOException {
         int total = 0;
         while(bb.remaining() > 0) {
             int rc = fc.read(bb);
@@ -712,10 +713,6 @@ public class Bookie extends Thread {
 
     LinkedBlockingQueue<QueueEntry> queue = new LinkedBlockingQueue<QueueEntry>();
 
-    public final static long preAllocSize = 4*1024*1024;
-
-    public final static ByteBuffer zeros = ByteBuffer.allocate(512);
-
     class LastLogMark {
         long txnLogId;
         long txnLogPosition;
@@ -825,7 +822,7 @@ public class Bookie extends Thread {
         ByteBuffer lenBuff = ByteBuffer.allocate(4);
         try {
             long logId = 0;
-            FileChannel logFile = null;
+            JournalChannel logFile = null;
             BufferedChannel bc = null;
             long nextPrealloc = 0;
             long lastFlushPosition = 0;
@@ -835,12 +832,10 @@ public class Bookie extends Thread {
                 // new journal file to write
                 if (null == logFile) {
                     logId = System.currentTimeMillis();
-                    logFile = openChannel(logId);
-                    bc = new BufferedChannel(logFile, 65536);
-                    zeros.clear();
-                    nextPrealloc = preAllocSize;
+                    logFile = new JournalChannel(journalDirectory, logId);
+                    bc = logFile.getBufferedChannel();
+
                     lastFlushPosition = 0;
-                    logFile.write(zeros, nextPrealloc);
                 }
 
                 if (qe == null) {
@@ -884,11 +879,9 @@ public class Bookie extends Thread {
                 // logFile.write(new ByteBuffer[] { lenBuff, qe.entry });
                 bc.write(lenBuff);
                 bc.write(qe.entry);
-                if (bc.position() > nextPrealloc) {
-                    nextPrealloc = (logFile.size() / preAllocSize + 1) * preAllocSize;
-                    zeros.clear();
-                    logFile.write(zeros, nextPrealloc);
-                }
+
+                logFile.preAllocIfNeeded();
+
                 toFlush.add(qe);
                 qe = null;
             }
@@ -897,22 +890,6 @@ public class Bookie extends Thread {
         }
     }
 
-    private FileChannel openChannel(long logId) throws FileNotFoundException {
-        return openChannel(logId, 0);
-    }
-
-    private FileChannel openChannel(long logId, long position) throws FileNotFoundException
{
-        FileChannel logFile = new RandomAccessFile(new File(journalDirectory,
-                Long.toHexString(logId) + ".txn"),
-                "rw").getChannel();
-        try {
-            logFile.position(position);
-        } catch (IOException e) {
-            LOG.error("Bookie journal file can seek to position :", e);
-        }
-        return logFile;
-    }
-
     public synchronized void shutdown() throws InterruptedException {
         if (!running) { // avoid shutdown twice
             return;

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java?rev=1241922&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
(added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
Wed Feb  8 14:54:29 2012
@@ -0,0 +1,157 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.util.Arrays;
+
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.ByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple wrapper around FileChannel to add versioning
+ * information to the file.
+ */
+class JournalChannel {
+    static Logger LOG = LoggerFactory.getLogger(JournalChannel.class);
+
+    final FileChannel fc;
+    final BufferedChannel bc;
+    final int formatVersion;
+    long nextPrealloc = 0;
+
+    final byte[] MAGIC_WORD = "BKLG".getBytes();
+
+    private final static int START_OF_FILE = -12345;
+
+    int HEADER_SIZE = 8; // 4byte magic word, 4 byte version
+    int MIN_COMPAT_JOURNAL_FORMAT_VERSION = 1;
+    int CURRENT_JOURNAL_FORMAT_VERSION = 2;
+
+    public final static long preAllocSize = 4*1024*1024;
+    public final static ByteBuffer zeros = ByteBuffer.allocate(512);
+
+    JournalChannel(File journalDirectory, long logId) throws IOException {
+        this(journalDirectory, logId, START_OF_FILE);
+    }
+
+    JournalChannel(File journalDirectory, long logId, long position) throws IOException {
+        File fn = new File(journalDirectory, Long.toHexString(logId) + ".txn");
+
+        LOG.info("Opening journal {}", fn);
+        if (!fn.exists()) { // new file, write version
+            fc = new RandomAccessFile(fn, "rw").getChannel();
+            formatVersion = CURRENT_JOURNAL_FORMAT_VERSION;
+
+            ByteBuffer bb = ByteBuffer.allocate(HEADER_SIZE);
+            bb.put(MAGIC_WORD);
+            bb.putInt(formatVersion);
+            bb.flip();
+            fc.write(bb);
+            fc.force(true);
+
+            bc = new BufferedChannel(fc, 65536);
+
+            nextPrealloc = preAllocSize;
+            fc.write(zeros, nextPrealloc);
+        } else {  // open an existing file
+            fc = new RandomAccessFile(fn, "r").getChannel();
+            bc = null; // readonly
+
+            ByteBuffer bb = ByteBuffer.allocate(HEADER_SIZE);
+            int c = fc.read(bb);
+            bb.flip();
+
+            if (c == HEADER_SIZE) {
+                byte[] first4 = new byte[4];
+                bb.get(first4);
+
+                if (Arrays.equals(first4, MAGIC_WORD)) {
+                    formatVersion = bb.getInt();
+                } else {
+                    // pre magic word journal, reset to 0;
+                    formatVersion = 1;
+                }
+            } else {
+                // no header, must be old version
+                formatVersion = 1;
+            }
+
+            if (formatVersion < MIN_COMPAT_JOURNAL_FORMAT_VERSION
+                || formatVersion > CURRENT_JOURNAL_FORMAT_VERSION) {
+                String err = String.format("Invalid journal version, unable to read."
+                        + " Expected between (%d) and (%d), got (%d)",
+                        MIN_COMPAT_JOURNAL_FORMAT_VERSION, CURRENT_JOURNAL_FORMAT_VERSION,
+                        formatVersion);
+                LOG.error(err);
+                throw new IOException(err);
+            }
+
+            try {
+                if (position == START_OF_FILE) {
+                    if (formatVersion >= 2) {
+                        fc.position(HEADER_SIZE);
+                    } else {
+                        fc.position(0);
+                    }
+                } else {
+                    fc.position(position);
+                }
+            } catch (IOException e) {
+                LOG.error("Bookie journal file can seek to position :", e);
+            }
+        }
+    }
+
+    int getFormatVersion() {
+        return formatVersion;
+    }
+
+    BufferedChannel getBufferedChannel() throws IOException {
+        if (bc == null) {
+            throw new IOException("Read only journal channel");
+        }
+        return bc;
+    }
+
+    void preAllocIfNeeded() throws IOException {
+        if (bc.position() > nextPrealloc) {
+            nextPrealloc = ((fc.size() + HEADER_SIZE) / preAllocSize + 1) * preAllocSize;
+            zeros.clear();
+            fc.write(zeros, nextPrealloc);
+        }
+    }
+
+    int read(ByteBuffer dst)
+            throws IOException {
+        return fc.read(dst);
+    }
+
+    void close() throws IOException {
+        fc.close();
+    }
+}
\ No newline at end of file

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java?rev=1241922&r1=1241921&r2=1241922&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
Wed Feb  8 14:54:29 2012
@@ -197,7 +197,7 @@ public class LedgerCache {
         }
     }
 
-    static final private String getLedgerName(long ledgerId) {
+    static final String getLedgerName(long ledgerId) {
         int parent = (int) (ledgerId & 0xff);
         int grandParent = (int) ((ledgerId & 0xff00) >> 8);
         StringBuilder sb = new StringBuilder();

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java?rev=1241922&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
(added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
Wed Feb  8 14:54:29 2012
@@ -0,0 +1,394 @@
+package org.apache.bookkeeper.bookie;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.Random;
+import java.util.Set;
+import java.util.Arrays;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeperTestClient;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.ClientUtil;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class BookieJournalTest {
+    static Logger LOG = LoggerFactory.getLogger(BookieJournalTest.class);
+
+    private void writeIndexFileForLedger(File indexDir, long ledgerId,
+                                         byte[] masterKey)
+            throws Exception {
+        File fn = new File(indexDir, LedgerCache.getLedgerName(ledgerId));
+        fn.getParentFile().mkdirs();
+        FileInfo fi = new FileInfo(fn);
+        fi.writeMasterKey(masterKey);
+        fi.close();
+    }
+
+    private void writeJunkJournal(File journalDir) throws Exception {
+        long logId = System.currentTimeMillis();
+        File fn = new File(journalDir, Long.toHexString(logId) + ".txn");
+
+        FileChannel fc = new RandomAccessFile(fn, "rw").getChannel();
+
+        ByteBuffer zeros = ByteBuffer.allocate(512);
+        fc.write(zeros, 4*1024*1024);
+        fc.position(0);
+
+        for (int i = 1; i <= 10; i++) {
+            fc.write(ByteBuffer.wrap("JunkJunkJunk".getBytes()));
+        }
+    }
+
+    private void writePreV2Journal(File journalDir, int numEntries) throws Exception {
+        long logId = System.currentTimeMillis();
+        File fn = new File(journalDir, Long.toHexString(logId) + ".txn");
+
+        FileChannel fc = new RandomAccessFile(fn, "rw").getChannel();
+
+        ByteBuffer zeros = ByteBuffer.allocate(512);
+        fc.write(zeros, 4*1024*1024);
+        fc.position(0);
+
+        byte[] data = "JournalTestData".getBytes();
+        long lastConfirmed = -1;
+        for (int i = 1; i <= numEntries; i++) {
+            ByteBuffer packet = ClientUtil.generatePacket(1, i, lastConfirmed, i*data.length,
data).toByteBuffer();
+            lastConfirmed = i;
+            ByteBuffer lenBuff = ByteBuffer.allocate(4);
+            lenBuff.putInt(packet.remaining());
+            lenBuff.flip();
+
+            fc.write(lenBuff);
+            fc.write(packet);
+        }
+    }
+
+    private JournalChannel writePostV2Journal(File journalDir, int numEntries) throws Exception
{
+        long logId = System.currentTimeMillis();
+        JournalChannel jc = new JournalChannel(journalDir, logId);
+
+        BufferedChannel bc = jc.getBufferedChannel();
+
+        byte[] data = new byte[1024];
+        Arrays.fill(data, (byte)'X');
+        long lastConfirmed = -1;
+        for (int i = 1; i <= numEntries; i++) {
+            ByteBuffer packet = ClientUtil.generatePacket(1, i, lastConfirmed, i*data.length,
data).toByteBuffer();
+            lastConfirmed = i;
+            ByteBuffer lenBuff = ByteBuffer.allocate(4);
+            lenBuff.putInt(packet.remaining());
+            lenBuff.flip();
+
+            bc.write(lenBuff);
+            bc.write(packet);
+        }
+        bc.flush(true);
+
+        return jc;
+    }
+
+    /**
+     * test that we can open a journal written without the magic
+     * word at the start. This is for versions of bookkeeper before
+     * the magic word was introduced
+     */
+    @Test
+    public void testPreV2Journal() throws Exception {
+        File journalDir = File.createTempFile("bookie", "journal");
+        journalDir.delete();
+        journalDir.mkdir();
+
+        File ledgerDir = File.createTempFile("bookie", "ledger");
+        ledgerDir.delete();
+        ledgerDir.mkdir();
+
+        writePreV2Journal(journalDir, 100);
+        writeIndexFileForLedger(ledgerDir, 1, "testPasswd".getBytes());
+
+        ServerConfiguration conf = new ServerConfiguration()
+            .setZkServers(null)
+            .setJournalDirName(journalDir.getPath())
+            .setLedgerDirNames(new String[] { ledgerDir.getPath() });
+
+        Bookie b = new Bookie(conf);
+
+        b.readEntry(1, 100);
+        try {
+            b.readEntry(1, 101);
+            fail("Shouldn't have found entry 101");
+        } catch (Bookie.NoEntryException e) {
+            // correct behaviour
+        }
+
+        b.shutdown();
+    }
+
+    /**
+     * Test that if the journal is all journal, we can not
+     * start the bookie. An admin should look to see what has
+     * happened in this case
+     */
+    @Test
+    public void testAllJunkJournal() throws Exception {
+        File journalDir = File.createTempFile("bookie", "journal");
+        journalDir.delete();
+        journalDir.mkdir();
+
+        File ledgerDir = File.createTempFile("bookie", "ledger");
+        ledgerDir.delete();
+        ledgerDir.mkdir();
+
+        writeJunkJournal(journalDir);
+
+        ServerConfiguration conf = new ServerConfiguration()
+            .setZkServers(null)
+            .setJournalDirName(journalDir.getPath())
+            .setLedgerDirNames(new String[] { ledgerDir.getPath() });
+        Bookie b = null;
+        try {
+            b = new Bookie(conf);
+            fail("Shouldn't have been able to start without admin");
+        } catch (Throwable t) {
+            // correct behaviour
+        } finally {
+            if (b != null) {
+                b.shutdown();
+            }
+        }
+    }
+
+    /**
+     * Test that we can start with an empty journal.
+     * This can happen if the bookie crashes between creating the
+     * journal and writing the magic word. It could also happen before
+     * the magic word existed, if the bookie started but nothing was
+     * ever written.
+     */
+    @Test
+    public void testEmptyJournal() throws Exception {
+        File journalDir = File.createTempFile("bookie", "journal");
+        journalDir.delete();
+        journalDir.mkdir();
+
+        File ledgerDir = File.createTempFile("bookie", "ledger");
+        ledgerDir.delete();
+        ledgerDir.mkdir();
+
+        writePreV2Journal(journalDir, 0);
+
+        ServerConfiguration conf = new ServerConfiguration()
+            .setZkServers(null)
+            .setJournalDirName(journalDir.getPath())
+            .setLedgerDirNames(new String[] { ledgerDir.getPath() });
+
+        Bookie b = new Bookie(conf);
+        b.shutdown();
+    }
+
+    /**
+     * Test that a journal can load if only the magic word and
+     * version are there.
+     */
+    @Test
+    public void testHeaderOnlyJournal() throws Exception {
+        File journalDir = File.createTempFile("bookie", "journal");
+        journalDir.delete();
+        journalDir.mkdir();
+
+        File ledgerDir = File.createTempFile("bookie", "ledger");
+        ledgerDir.delete();
+        ledgerDir.mkdir();
+
+        writePostV2Journal(journalDir, 0);
+
+        ServerConfiguration conf = new ServerConfiguration()
+            .setZkServers(null)
+            .setJournalDirName(journalDir.getPath())
+            .setLedgerDirNames(new String[] { ledgerDir.getPath() });
+
+        Bookie b = new Bookie(conf);
+        b.shutdown();
+    }
+
+    /**
+     * Test that if a journal has junk at the end, it does not load.
+     * If the journal is corrupt like this, admin intervention is needed
+     */
+    @Test
+    public void testJunkEndedJournal() throws Exception {
+        File journalDir = File.createTempFile("bookie", "journal");
+        journalDir.delete();
+        journalDir.mkdir();
+
+        File ledgerDir = File.createTempFile("bookie", "ledger");
+        ledgerDir.delete();
+        ledgerDir.mkdir();
+
+        JournalChannel jc = writePostV2Journal(journalDir, 0);
+        jc.getBufferedChannel().write(ByteBuffer.wrap("JunkJunkJunk".getBytes()));
+        jc.getBufferedChannel().flush(true);
+
+        writeIndexFileForLedger(ledgerDir, 1, "testPasswd".getBytes());
+
+        ServerConfiguration conf = new ServerConfiguration()
+            .setZkServers(null)
+            .setJournalDirName(journalDir.getPath())
+            .setLedgerDirNames(new String[] { ledgerDir.getPath() });
+
+        Bookie b = null;
+        try {
+            b = new Bookie(conf);
+        } catch (Throwable t) {
+            // correct behaviour
+        } finally {
+            if (b != null) {
+                b.shutdown();
+            }
+        }
+    }
+
+    /**
+     * Test that if the bookie crashes while writing the length
+     * of an entry, that we can recover.
+     *
+     * This is currently not the case, which is bad as recovery
+     * should be fine here. The bookie has crashed while writing
+     * but so the client has not be notified of success.
+     */
+    //    @Test TODO, fix and reenable
+    public void testTruncatedInLenJournal() throws Exception {
+        File journalDir = File.createTempFile("bookie", "journal");
+        journalDir.delete();
+        journalDir.mkdir();
+
+        File ledgerDir = File.createTempFile("bookie", "ledger");
+        ledgerDir.delete();
+        ledgerDir.mkdir();
+
+        JournalChannel jc = writePostV2Journal(journalDir, 100);
+        ByteBuffer zeros = ByteBuffer.allocate(2048);
+
+        jc.fc.position(jc.getBufferedChannel().position() - 0x429);
+        jc.fc.write(zeros);
+        jc.fc.force(false);
+
+        writeIndexFileForLedger(ledgerDir, 1, "testPasswd".getBytes());
+
+        ServerConfiguration conf = new ServerConfiguration()
+            .setZkServers(null)
+            .setJournalDirName(journalDir.getPath())
+            .setLedgerDirNames(new String[] { ledgerDir.getPath() });
+
+        Bookie b = new Bookie(conf);
+
+        b.readEntry(1, 99);
+
+        try {
+            b.readEntry(1, 100);
+            fail("Shouldn't have found entry 100");
+        } catch (Bookie.NoEntryException e) {
+            // correct behaviour
+        }
+
+        b.shutdown();
+    }
+
+    /**
+     * Test that if the bookie crashes in the middle of writing
+     * the actual entry it can recover.
+     * In this case the entry will be available, but it will corrupt.
+     * This is ok, as the client will disregard the entry after looking
+     * at its checksum.
+     */
+    @Test
+    public void testTruncatedInEntryJournal() throws Exception {
+        File journalDir = File.createTempFile("bookie", "journal");
+        journalDir.delete();
+        journalDir.mkdir();
+
+        File ledgerDir = File.createTempFile("bookie", "ledger");
+        ledgerDir.delete();
+        ledgerDir.mkdir();
+
+        JournalChannel jc = writePostV2Journal(journalDir, 100);
+        ByteBuffer zeros = ByteBuffer.allocate(2048);
+
+        jc.fc.position(jc.getBufferedChannel().position() - 0x300);
+        jc.fc.write(zeros);
+        jc.fc.force(false);
+
+        writeIndexFileForLedger(ledgerDir, 1, "testPasswd".getBytes());
+
+        ServerConfiguration conf = new ServerConfiguration()
+            .setZkServers(null)
+            .setJournalDirName(journalDir.getPath())
+            .setLedgerDirNames(new String[] { ledgerDir.getPath() });
+
+        Bookie b = new Bookie(conf);
+        b.readEntry(1, 99);
+
+        // still able to read last entry, but it's junk
+        ByteBuffer buf = b.readEntry(1, 100);
+        assertEquals("Ledger Id is wrong", buf.getLong(), 1);
+        assertEquals("Entry Id is wrong", buf.getLong(), 100);
+        assertEquals("Last confirmed is wrong", buf.getLong(), 99);
+        assertEquals("Length is wrong", buf.getLong(), 100*1024);
+        buf.getLong(); // skip checksum
+        boolean allX = true;
+        for (int i = 0; i < 1024; i++) {
+            byte x = buf.get();
+            allX = allX && x == (byte)'X';
+        }
+        assertFalse("Some of buffer should have been zeroed", allX);
+
+        try {
+            b.readEntry(1, 101);
+            fail("Shouldn't have found entry 101");
+        } catch (Bookie.NoEntryException e) {
+            // correct behaviour
+        }
+
+        b.shutdown();
+    }
+
+}

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java?rev=1241922&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
(added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
Wed Feb  8 14:54:29 2012
@@ -0,0 +1,30 @@
+package org.apache.bookkeeper.client;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.jboss.netty.buffer.ChannelBuffer;
+
+public class ClientUtil {
+    public static ChannelBuffer generatePacket(long ledgerId, long entryId, long lastAddConfirmed,

+                                        long length, byte[] data) {
+        CRC32DigestManager dm = new CRC32DigestManager(ledgerId);
+        return dm.computeDigestAndPackageForSending(entryId, lastAddConfirmed, length,
+                                                    data, 0, data.length);
+    }
+}
\ No newline at end of file



Mime
View raw message