Author: ivank
Date: Wed Feb 8 14:54:29 2012
New Revision: 1241922
URL: http://svn.apache.org/viewvc?rev=1241922&view=rev
Log:
BOOKKEEPER-165: Add versioning support for journal files (ivank)
Added:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1241922&r1=1241921&r2=1241922&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed Feb 8 14:54:29 2012
@@ -47,6 +47,8 @@ Trunk (unreleased changes)
BOOKKEEPER-157: For small packets, increasing number of bookies actually degrades performance.
(ivank via fpj)
+ BOOKKEEPER-165: Add versioning support for journal files (ivank)
+
hedwig-server/
BOOKKEEPER-77: Add a console client for hedwig (Sijie Guo via ivank)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1241922&r1=1241921&r2=1241922&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
Wed Feb 8 14:54:29 2012
@@ -302,12 +302,12 @@ public class Bookie extends Thread {
ByteBuffer lenBuff = ByteBuffer.allocate(4);
ByteBuffer recBuff = ByteBuffer.allocate(64*1024);
for(Long id: logs) {
- FileChannel recLog ;
+ JournalChannel recLog;
if(id == markedLogId) {
- long markedLogPosition = lastLogMark.txnLogPosition;
- recLog = openChannel(id, markedLogPosition);
+ long markedLogPosition = lastLogMark.txnLogPosition;
+ recLog = new JournalChannel(journalDirectory, id, markedLogPosition);
} else {
- recLog = openChannel(id);
+ recLog = new JournalChannel(journalDirectory, id);
}
while(true) {
@@ -344,6 +344,7 @@ public class Bookie extends Thread {
putHandle(handle);
}
}
+ recLog.close();
}
// pass zookeeper instance here
// since GarbageCollector thread should only start after journal
@@ -602,7 +603,7 @@ public class Bookie extends Thread {
}
}
- private static int fullRead(FileChannel fc, ByteBuffer bb) throws IOException {
+ private static int fullRead(JournalChannel fc, ByteBuffer bb) throws IOException {
int total = 0;
while(bb.remaining() > 0) {
int rc = fc.read(bb);
@@ -712,10 +713,6 @@ public class Bookie extends Thread {
LinkedBlockingQueue<QueueEntry> queue = new LinkedBlockingQueue<QueueEntry>();
- public final static long preAllocSize = 4*1024*1024;
-
- public final static ByteBuffer zeros = ByteBuffer.allocate(512);
-
class LastLogMark {
long txnLogId;
long txnLogPosition;
@@ -825,7 +822,7 @@ public class Bookie extends Thread {
ByteBuffer lenBuff = ByteBuffer.allocate(4);
try {
long logId = 0;
- FileChannel logFile = null;
+ JournalChannel logFile = null;
BufferedChannel bc = null;
long nextPrealloc = 0;
long lastFlushPosition = 0;
@@ -835,12 +832,10 @@ public class Bookie extends Thread {
// new journal file to write
if (null == logFile) {
logId = System.currentTimeMillis();
- logFile = openChannel(logId);
- bc = new BufferedChannel(logFile, 65536);
- zeros.clear();
- nextPrealloc = preAllocSize;
+ logFile = new JournalChannel(journalDirectory, logId);
+ bc = logFile.getBufferedChannel();
+
lastFlushPosition = 0;
- logFile.write(zeros, nextPrealloc);
}
if (qe == null) {
@@ -884,11 +879,9 @@ public class Bookie extends Thread {
// logFile.write(new ByteBuffer[] { lenBuff, qe.entry });
bc.write(lenBuff);
bc.write(qe.entry);
- if (bc.position() > nextPrealloc) {
- nextPrealloc = (logFile.size() / preAllocSize + 1) * preAllocSize;
- zeros.clear();
- logFile.write(zeros, nextPrealloc);
- }
+
+ logFile.preAllocIfNeeded();
+
toFlush.add(qe);
qe = null;
}
@@ -897,22 +890,6 @@ public class Bookie extends Thread {
}
}
- private FileChannel openChannel(long logId) throws FileNotFoundException {
- return openChannel(logId, 0);
- }
-
- private FileChannel openChannel(long logId, long position) throws FileNotFoundException
{
- FileChannel logFile = new RandomAccessFile(new File(journalDirectory,
- Long.toHexString(logId) + ".txn"),
- "rw").getChannel();
- try {
- logFile.position(position);
- } catch (IOException e) {
- LOG.error("Bookie journal file can seek to position :", e);
- }
- return logFile;
- }
-
public synchronized void shutdown() throws InterruptedException {
if (!running) { // avoid shutdown twice
return;
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java?rev=1241922&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
(added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/JournalChannel.java
Wed Feb 8 14:54:29 2012
@@ -0,0 +1,157 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.util.Arrays;
+
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.ByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple wrapper around FileChannel to add versioning
+ * information to the file.
+ */
+class JournalChannel {
+ static Logger LOG = LoggerFactory.getLogger(JournalChannel.class);
+
+ final FileChannel fc;
+ final BufferedChannel bc;
+ final int formatVersion;
+ long nextPrealloc = 0;
+
+ final byte[] MAGIC_WORD = "BKLG".getBytes();
+
+ private final static int START_OF_FILE = -12345;
+
+ int HEADER_SIZE = 8; // 4byte magic word, 4 byte version
+ int MIN_COMPAT_JOURNAL_FORMAT_VERSION = 1;
+ int CURRENT_JOURNAL_FORMAT_VERSION = 2;
+
+ public final static long preAllocSize = 4*1024*1024;
+ public final static ByteBuffer zeros = ByteBuffer.allocate(512);
+
+ JournalChannel(File journalDirectory, long logId) throws IOException {
+ this(journalDirectory, logId, START_OF_FILE);
+ }
+
+ JournalChannel(File journalDirectory, long logId, long position) throws IOException {
+ File fn = new File(journalDirectory, Long.toHexString(logId) + ".txn");
+
+ LOG.info("Opening journal {}", fn);
+ if (!fn.exists()) { // new file, write version
+ fc = new RandomAccessFile(fn, "rw").getChannel();
+ formatVersion = CURRENT_JOURNAL_FORMAT_VERSION;
+
+ ByteBuffer bb = ByteBuffer.allocate(HEADER_SIZE);
+ bb.put(MAGIC_WORD);
+ bb.putInt(formatVersion);
+ bb.flip();
+ fc.write(bb);
+ fc.force(true);
+
+ bc = new BufferedChannel(fc, 65536);
+
+ nextPrealloc = preAllocSize;
+ fc.write(zeros, nextPrealloc);
+ } else { // open an existing file
+ fc = new RandomAccessFile(fn, "r").getChannel();
+ bc = null; // readonly
+
+ ByteBuffer bb = ByteBuffer.allocate(HEADER_SIZE);
+ int c = fc.read(bb);
+ bb.flip();
+
+ if (c == HEADER_SIZE) {
+ byte[] first4 = new byte[4];
+ bb.get(first4);
+
+ if (Arrays.equals(first4, MAGIC_WORD)) {
+ formatVersion = bb.getInt();
+ } else {
+ // pre magic word journal, reset to 0;
+ formatVersion = 1;
+ }
+ } else {
+ // no header, must be old version
+ formatVersion = 1;
+ }
+
+ if (formatVersion < MIN_COMPAT_JOURNAL_FORMAT_VERSION
+ || formatVersion > CURRENT_JOURNAL_FORMAT_VERSION) {
+ String err = String.format("Invalid journal version, unable to read."
+ + " Expected between (%d) and (%d), got (%d)",
+ MIN_COMPAT_JOURNAL_FORMAT_VERSION, CURRENT_JOURNAL_FORMAT_VERSION,
+ formatVersion);
+ LOG.error(err);
+ throw new IOException(err);
+ }
+
+ try {
+ if (position == START_OF_FILE) {
+ if (formatVersion >= 2) {
+ fc.position(HEADER_SIZE);
+ } else {
+ fc.position(0);
+ }
+ } else {
+ fc.position(position);
+ }
+ } catch (IOException e) {
+ LOG.error("Bookie journal file can seek to position :", e);
+ }
+ }
+ }
+
+ int getFormatVersion() {
+ return formatVersion;
+ }
+
+ BufferedChannel getBufferedChannel() throws IOException {
+ if (bc == null) {
+ throw new IOException("Read only journal channel");
+ }
+ return bc;
+ }
+
+ void preAllocIfNeeded() throws IOException {
+ if (bc.position() > nextPrealloc) {
+ nextPrealloc = ((fc.size() + HEADER_SIZE) / preAllocSize + 1) * preAllocSize;
+ zeros.clear();
+ fc.write(zeros, nextPrealloc);
+ }
+ }
+
+ int read(ByteBuffer dst)
+ throws IOException {
+ return fc.read(dst);
+ }
+
+ void close() throws IOException {
+ fc.close();
+ }
+}
\ No newline at end of file
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java?rev=1241922&r1=1241921&r2=1241922&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
Wed Feb 8 14:54:29 2012
@@ -197,7 +197,7 @@ public class LedgerCache {
}
}
- static final private String getLedgerName(long ledgerId) {
+ static final String getLedgerName(long ledgerId) {
int parent = (int) (ledgerId & 0xff);
int grandParent = (int) ((ledgerId & 0xff00) >> 8);
StringBuilder sb = new StringBuilder();
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java?rev=1241922&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
(added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
Wed Feb 8 14:54:29 2012
@@ -0,0 +1,394 @@
+package org.apache.bookkeeper.bookie;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.Random;
+import java.util.Set;
+import java.util.Arrays;
+
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeperTestClient;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.ClientUtil;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class BookieJournalTest {
+ static Logger LOG = LoggerFactory.getLogger(BookieJournalTest.class);
+
+ private void writeIndexFileForLedger(File indexDir, long ledgerId,
+ byte[] masterKey)
+ throws Exception {
+ File fn = new File(indexDir, LedgerCache.getLedgerName(ledgerId));
+ fn.getParentFile().mkdirs();
+ FileInfo fi = new FileInfo(fn);
+ fi.writeMasterKey(masterKey);
+ fi.close();
+ }
+
+ private void writeJunkJournal(File journalDir) throws Exception {
+ long logId = System.currentTimeMillis();
+ File fn = new File(journalDir, Long.toHexString(logId) + ".txn");
+
+ FileChannel fc = new RandomAccessFile(fn, "rw").getChannel();
+
+ ByteBuffer zeros = ByteBuffer.allocate(512);
+ fc.write(zeros, 4*1024*1024);
+ fc.position(0);
+
+ for (int i = 1; i <= 10; i++) {
+ fc.write(ByteBuffer.wrap("JunkJunkJunk".getBytes()));
+ }
+ }
+
+ private void writePreV2Journal(File journalDir, int numEntries) throws Exception {
+ long logId = System.currentTimeMillis();
+ File fn = new File(journalDir, Long.toHexString(logId) + ".txn");
+
+ FileChannel fc = new RandomAccessFile(fn, "rw").getChannel();
+
+ ByteBuffer zeros = ByteBuffer.allocate(512);
+ fc.write(zeros, 4*1024*1024);
+ fc.position(0);
+
+ byte[] data = "JournalTestData".getBytes();
+ long lastConfirmed = -1;
+ for (int i = 1; i <= numEntries; i++) {
+ ByteBuffer packet = ClientUtil.generatePacket(1, i, lastConfirmed, i*data.length,
data).toByteBuffer();
+ lastConfirmed = i;
+ ByteBuffer lenBuff = ByteBuffer.allocate(4);
+ lenBuff.putInt(packet.remaining());
+ lenBuff.flip();
+
+ fc.write(lenBuff);
+ fc.write(packet);
+ }
+ }
+
+ private JournalChannel writePostV2Journal(File journalDir, int numEntries) throws Exception
{
+ long logId = System.currentTimeMillis();
+ JournalChannel jc = new JournalChannel(journalDir, logId);
+
+ BufferedChannel bc = jc.getBufferedChannel();
+
+ byte[] data = new byte[1024];
+ Arrays.fill(data, (byte)'X');
+ long lastConfirmed = -1;
+ for (int i = 1; i <= numEntries; i++) {
+ ByteBuffer packet = ClientUtil.generatePacket(1, i, lastConfirmed, i*data.length,
data).toByteBuffer();
+ lastConfirmed = i;
+ ByteBuffer lenBuff = ByteBuffer.allocate(4);
+ lenBuff.putInt(packet.remaining());
+ lenBuff.flip();
+
+ bc.write(lenBuff);
+ bc.write(packet);
+ }
+ bc.flush(true);
+
+ return jc;
+ }
+
+ /**
+ * test that we can open a journal written without the magic
+ * word at the start. This is for versions of bookkeeper before
+ * the magic word was introduced
+ */
+ @Test
+ public void testPreV2Journal() throws Exception {
+ File journalDir = File.createTempFile("bookie", "journal");
+ journalDir.delete();
+ journalDir.mkdir();
+
+ File ledgerDir = File.createTempFile("bookie", "ledger");
+ ledgerDir.delete();
+ ledgerDir.mkdir();
+
+ writePreV2Journal(journalDir, 100);
+ writeIndexFileForLedger(ledgerDir, 1, "testPasswd".getBytes());
+
+ ServerConfiguration conf = new ServerConfiguration()
+ .setZkServers(null)
+ .setJournalDirName(journalDir.getPath())
+ .setLedgerDirNames(new String[] { ledgerDir.getPath() });
+
+ Bookie b = new Bookie(conf);
+
+ b.readEntry(1, 100);
+ try {
+ b.readEntry(1, 101);
+ fail("Shouldn't have found entry 101");
+ } catch (Bookie.NoEntryException e) {
+ // correct behaviour
+ }
+
+ b.shutdown();
+ }
+
+ /**
+ * Test that if the journal is all journal, we can not
+ * start the bookie. An admin should look to see what has
+ * happened in this case
+ */
+ @Test
+ public void testAllJunkJournal() throws Exception {
+ File journalDir = File.createTempFile("bookie", "journal");
+ journalDir.delete();
+ journalDir.mkdir();
+
+ File ledgerDir = File.createTempFile("bookie", "ledger");
+ ledgerDir.delete();
+ ledgerDir.mkdir();
+
+ writeJunkJournal(journalDir);
+
+ ServerConfiguration conf = new ServerConfiguration()
+ .setZkServers(null)
+ .setJournalDirName(journalDir.getPath())
+ .setLedgerDirNames(new String[] { ledgerDir.getPath() });
+ Bookie b = null;
+ try {
+ b = new Bookie(conf);
+ fail("Shouldn't have been able to start without admin");
+ } catch (Throwable t) {
+ // correct behaviour
+ } finally {
+ if (b != null) {
+ b.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Test that we can start with an empty journal.
+ * This can happen if the bookie crashes between creating the
+ * journal and writing the magic word. It could also happen before
+ * the magic word existed, if the bookie started but nothing was
+ * ever written.
+ */
+ @Test
+ public void testEmptyJournal() throws Exception {
+ File journalDir = File.createTempFile("bookie", "journal");
+ journalDir.delete();
+ journalDir.mkdir();
+
+ File ledgerDir = File.createTempFile("bookie", "ledger");
+ ledgerDir.delete();
+ ledgerDir.mkdir();
+
+ writePreV2Journal(journalDir, 0);
+
+ ServerConfiguration conf = new ServerConfiguration()
+ .setZkServers(null)
+ .setJournalDirName(journalDir.getPath())
+ .setLedgerDirNames(new String[] { ledgerDir.getPath() });
+
+ Bookie b = new Bookie(conf);
+ b.shutdown();
+ }
+
+ /**
+ * Test that a journal can load if only the magic word and
+ * version are there.
+ */
+ @Test
+ public void testHeaderOnlyJournal() throws Exception {
+ File journalDir = File.createTempFile("bookie", "journal");
+ journalDir.delete();
+ journalDir.mkdir();
+
+ File ledgerDir = File.createTempFile("bookie", "ledger");
+ ledgerDir.delete();
+ ledgerDir.mkdir();
+
+ writePostV2Journal(journalDir, 0);
+
+ ServerConfiguration conf = new ServerConfiguration()
+ .setZkServers(null)
+ .setJournalDirName(journalDir.getPath())
+ .setLedgerDirNames(new String[] { ledgerDir.getPath() });
+
+ Bookie b = new Bookie(conf);
+ b.shutdown();
+ }
+
+ /**
+ * Test that if a journal has junk at the end, it does not load.
+ * If the journal is corrupt like this, admin intervention is needed
+ */
+ @Test
+ public void testJunkEndedJournal() throws Exception {
+ File journalDir = File.createTempFile("bookie", "journal");
+ journalDir.delete();
+ journalDir.mkdir();
+
+ File ledgerDir = File.createTempFile("bookie", "ledger");
+ ledgerDir.delete();
+ ledgerDir.mkdir();
+
+ JournalChannel jc = writePostV2Journal(journalDir, 0);
+ jc.getBufferedChannel().write(ByteBuffer.wrap("JunkJunkJunk".getBytes()));
+ jc.getBufferedChannel().flush(true);
+
+ writeIndexFileForLedger(ledgerDir, 1, "testPasswd".getBytes());
+
+ ServerConfiguration conf = new ServerConfiguration()
+ .setZkServers(null)
+ .setJournalDirName(journalDir.getPath())
+ .setLedgerDirNames(new String[] { ledgerDir.getPath() });
+
+ Bookie b = null;
+ try {
+ b = new Bookie(conf);
+ } catch (Throwable t) {
+ // correct behaviour
+ } finally {
+ if (b != null) {
+ b.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Test that if the bookie crashes while writing the length
+ * of an entry, that we can recover.
+ *
+ * This is currently not the case, which is bad as recovery
+ * should be fine here. The bookie has crashed while writing
+ * but so the client has not be notified of success.
+ */
+ // @Test TODO, fix and reenable
+ public void testTruncatedInLenJournal() throws Exception {
+ File journalDir = File.createTempFile("bookie", "journal");
+ journalDir.delete();
+ journalDir.mkdir();
+
+ File ledgerDir = File.createTempFile("bookie", "ledger");
+ ledgerDir.delete();
+ ledgerDir.mkdir();
+
+ JournalChannel jc = writePostV2Journal(journalDir, 100);
+ ByteBuffer zeros = ByteBuffer.allocate(2048);
+
+ jc.fc.position(jc.getBufferedChannel().position() - 0x429);
+ jc.fc.write(zeros);
+ jc.fc.force(false);
+
+ writeIndexFileForLedger(ledgerDir, 1, "testPasswd".getBytes());
+
+ ServerConfiguration conf = new ServerConfiguration()
+ .setZkServers(null)
+ .setJournalDirName(journalDir.getPath())
+ .setLedgerDirNames(new String[] { ledgerDir.getPath() });
+
+ Bookie b = new Bookie(conf);
+
+ b.readEntry(1, 99);
+
+ try {
+ b.readEntry(1, 100);
+ fail("Shouldn't have found entry 100");
+ } catch (Bookie.NoEntryException e) {
+ // correct behaviour
+ }
+
+ b.shutdown();
+ }
+
+ /**
+ * Test that if the bookie crashes in the middle of writing
+ * the actual entry it can recover.
+ * In this case the entry will be available, but it will corrupt.
+ * This is ok, as the client will disregard the entry after looking
+ * at its checksum.
+ */
+ @Test
+ public void testTruncatedInEntryJournal() throws Exception {
+ File journalDir = File.createTempFile("bookie", "journal");
+ journalDir.delete();
+ journalDir.mkdir();
+
+ File ledgerDir = File.createTempFile("bookie", "ledger");
+ ledgerDir.delete();
+ ledgerDir.mkdir();
+
+ JournalChannel jc = writePostV2Journal(journalDir, 100);
+ ByteBuffer zeros = ByteBuffer.allocate(2048);
+
+ jc.fc.position(jc.getBufferedChannel().position() - 0x300);
+ jc.fc.write(zeros);
+ jc.fc.force(false);
+
+ writeIndexFileForLedger(ledgerDir, 1, "testPasswd".getBytes());
+
+ ServerConfiguration conf = new ServerConfiguration()
+ .setZkServers(null)
+ .setJournalDirName(journalDir.getPath())
+ .setLedgerDirNames(new String[] { ledgerDir.getPath() });
+
+ Bookie b = new Bookie(conf);
+ b.readEntry(1, 99);
+
+ // still able to read last entry, but it's junk
+ ByteBuffer buf = b.readEntry(1, 100);
+ assertEquals("Ledger Id is wrong", buf.getLong(), 1);
+ assertEquals("Entry Id is wrong", buf.getLong(), 100);
+ assertEquals("Last confirmed is wrong", buf.getLong(), 99);
+ assertEquals("Length is wrong", buf.getLong(), 100*1024);
+ buf.getLong(); // skip checksum
+ boolean allX = true;
+ for (int i = 0; i < 1024; i++) {
+ byte x = buf.get();
+ allX = allX && x == (byte)'X';
+ }
+ assertFalse("Some of buffer should have been zeroed", allX);
+
+ try {
+ b.readEntry(1, 101);
+ fail("Shouldn't have found entry 101");
+ } catch (Bookie.NoEntryException e) {
+ // correct behaviour
+ }
+
+ b.shutdown();
+ }
+
+}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java?rev=1241922&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
(added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
Wed Feb 8 14:54:29 2012
@@ -0,0 +1,30 @@
+package org.apache.bookkeeper.client;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.jboss.netty.buffer.ChannelBuffer;
+
+public class ClientUtil {
+ public static ChannelBuffer generatePacket(long ledgerId, long entryId, long lastAddConfirmed,
+ long length, byte[] data) {
+ CRC32DigestManager dm = new CRC32DigestManager(ledgerId);
+ return dm.computeDigestAndPackageForSending(entryId, lastAddConfirmed, length,
+ data, 0, data.length);
+ }
+}
\ No newline at end of file
|