Author: mahadev
Date: Tue Jan 26 23:16:45 2010
New Revision: 903483
URL: http://svn.apache.org/viewvc?rev=903483&view=rev
Log:
ZOOKEEPER-507. BookKeeper client re-write (Utkarsh and ben via mahadev)
Added:
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/BufferedChannel.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/EntryLogger.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/FileInfo.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerCache.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/MarkerFileChannel.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieWatcher.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/CRC32DigestManager.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DigestManager.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DistributionSchedule.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerCreateOp.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerMetadata.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerOpenOp.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/MacDigestManager.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingAddOp.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/SyncCounter.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/MathUtils.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/SafeRunnable.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/StringUtils.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BaseTestCase.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
Removed:
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKDefs.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerManagementProcessor.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerSequence.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ReadEntryCallback.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/WriteCallback.java
Modified:
hadoop/zookeeper/trunk/CHANGES.txt
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerEntry.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieProtocol.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/NIOServerFactory.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ServerStats.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerInputStream.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerOutputStream.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/LocalBookKeeper.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/Main.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/AsyncLedgerOpsTest.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieFailureTest.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/CloseTest.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LedgerRecoveryTest.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LoopbackClient.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/NIOServerFactoryTest.java
Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Tue Jan 26 23:16:45 2010
@@ -266,6 +266,8 @@
ZOOKEEPER-593. java client api does not allow client to access negotiated
session timeout (phunt via mahadev)
+ ZOOKEEPER-507. BookKeeper client re-write (Utkarsh and ben via mahadev)
+
NEW FEATURES:
ZOOKEEPER-539. generate eclipse project via ant target. (phunt via mahadev)
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java Tue Jan 26 23:16:45 2010
@@ -1,4 +1,3 @@
-package org.apache.bookkeeper.bookie;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,19 +19,25 @@
*
*/
+package org.apache.bookkeeper.bookie;
import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
-import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.bookkeeper.bookie.BookieException;
-import org.apache.bookkeeper.proto.WriteCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.log4j.Logger;
@@ -45,10 +50,6 @@
public class Bookie extends Thread {
HashMap<Long, LedgerDescriptor> ledgers = new HashMap<Long, LedgerDescriptor>();
static Logger LOG = Logger.getLogger(Bookie.class);
- /**
- * 4 byte signature followed by 2-byte major and 2-byte minor versions
- */
- private static byte ledgerHeader[] = { 0x42, 0x6f, 0x6f, 0x6b, 0, 0, 0, 0};
final File journalDirectory;
@@ -69,6 +70,7 @@
private long ledgerId;
private long entryId;
public NoEntryException(long ledgerId, long entryId) {
+ super("Entry " + entryId + " not found in " + ledgerId);
this.ledgerId = ledgerId;
this.entryId = entryId;
}
@@ -80,14 +82,124 @@
}
}
- public Bookie(File journalDirectory, File ledgerDirectories[]) {
+ EntryLogger entryLogger;
+ LedgerCache ledgerCache;
+ class SyncThread extends Thread {
+ volatile boolean running = true;
+ public SyncThread() {
+ super("SyncThread");
+ }
+ @Override
+ public void run() {
+ while(running) {
+ synchronized(this) {
+ try {
+ wait(100);
+ if (!entryLogger.testAndClearSomethingWritten()) {
+ continue;
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ continue;
+ }
+ }
+ lastLogMark.markLog();
+ try {
+ ledgerCache.flushLedger(true);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ try {
+ entryLogger.flush();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ lastLogMark.rollLog();
+ }
+ }
+ }
+ SyncThread syncThread = new SyncThread();
+ public Bookie(File journalDirectory, File ledgerDirectories[]) throws IOException {
this.journalDirectory = journalDirectory;
this.ledgerDirectories = ledgerDirectories;
+ entryLogger = new EntryLogger(ledgerDirectories);
+ ledgerCache = new LedgerCache(ledgerDirectories);
+ lastLogMark.readLog();
+ final long markedLogId = lastLogMark.txnLogId;
+ if (markedLogId > 0) {
+ File logFiles[] = journalDirectory.listFiles();
+ ArrayList<Long> logs = new ArrayList<Long>();
+ for(File f: logFiles) {
+ String name = f.getName();
+ if (!name.endsWith(".txn")) {
+ continue;
+ }
+ String idString = name.split("\\.")[0];
+ long id = Long.parseLong(idString, 16);
+ if (id < markedLogId) {
+ continue;
+ }
+ logs.add(id);
+ }
+ Collections.sort(logs);
+ if (logs.size() == 0 || logs.get(0) != markedLogId) {
+ throw new IOException("Recovery log " + markedLogId + " is missing");
+ }
+ ByteBuffer lenBuff = ByteBuffer.allocate(4);
+ ByteBuffer recBuff = ByteBuffer.allocate(64*1024);
+ for(Long id: logs) {
+ FileChannel recLog = openChannel(id);
+ while(true) {
+ lenBuff.clear();
+ fullRead(recLog, lenBuff);
+ if (lenBuff.remaining() != 0) {
+ break;
+ }
+ lenBuff.flip();
+ int len = lenBuff.getInt();
+ if (len == 0) {
+ break;
+ }
+ recBuff.clear();
+ if (recBuff.remaining() < len) {
+ recBuff = ByteBuffer.allocate(len);
+ }
+ recBuff.limit(len);
+ if (fullRead(recLog, recBuff) != len) {
+ // This seems scary, but it just means that this is where we
+ // left off writing
+ break;
+ }
+ recBuff.flip();
+ long ledgerId = recBuff.getLong();
+ // XXX we net to make sure we set the master keys appropriately!
+ LedgerDescriptor handle = getHandle(ledgerId, false);
+ try {
+ recBuff.rewind();
+ handle.addEntry(recBuff);
+ } finally {
+ putHandle(handle);
+ }
+ }
+ }
+ }
setDaemon(true);
LOG.debug("I'm starting a bookie with journal directory " + journalDirectory.getName());
start();
+ syncThread.start();
}
+ private static int fullRead(FileChannel fc, ByteBuffer bb) throws IOException {
+ int total = 0;
+ while(bb.remaining() > 0) {
+ int rc = fc.read(bb);
+ if (rc <= 0) {
+ return total;
+ }
+ total += rc;
+ }
+ return total;
+ }
private void putHandle(LedgerDescriptor handle) {
synchronized (ledgers) {
handle.decRef();
@@ -99,6 +211,9 @@
synchronized (ledgers) {
handle = ledgers.get(ledgerId);
if (handle == null) {
+ if (readonly) {
+ throw new NoLedgerException(ledgerId);
+ }
handle = createHandle(ledgerId, readonly);
ledgers.put(ledgerId, handle);
handle.setMasterKey(ByteBuffer.wrap(masterKey));
@@ -113,6 +228,9 @@
synchronized (ledgers) {
handle = ledgers.get(ledgerId);
if (handle == null) {
+ if (readonly) {
+ throw new NoLedgerException(ledgerId);
+ }
handle = createHandle(ledgerId, readonly);
ledgers.put(ledgerId, handle);
}
@@ -123,85 +241,9 @@
private LedgerDescriptor createHandle(long ledgerId, boolean readOnly) throws IOException {
- RandomAccessFile ledgerFile = null;
- RandomAccessFile ledgerIndexFile = null;
- String ledgerName = getLedgerName(ledgerId, false);
- String ledgerIndexName = getLedgerName(ledgerId, true);
- for (File d : ledgerDirectories) {
- File lf = new File(d, ledgerName);
- File lif = new File(d, ledgerIndexName);
- if (lf.exists()) {
- if (ledgerFile != null) {
- throw new IOException("Duplicate ledger file found for "
- + ledgerId);
- }
- ledgerFile = new RandomAccessFile(lf, "rw");
- }
- if (lif.exists()) {
- if (ledgerIndexFile != null) {
- throw new IOException(
- "Duplicate ledger index file found for " + ledgerId);
- }
- ledgerIndexFile = new RandomAccessFile(lif, "rw");
- }
- }
- if (ledgerFile == null && ledgerIndexFile == null) {
- if (readOnly) {
- throw new NoLedgerException(ledgerId);
- }
- File dirs[] = pickDirs(ledgerDirectories);
- File lf = new File(dirs[0], ledgerName);
- checkParents(lf);
- ledgerFile = new RandomAccessFile(lf, "rw");
- ledgerFile.write(ledgerHeader);
- File lif = new File(dirs[1], ledgerIndexName);
- checkParents(lif);
- ledgerIndexFile = new RandomAccessFile(lif, "rw");
- }
- if (ledgerFile != null && ledgerIndexFile != null) {
- return new LedgerDescriptor(ledgerId, ledgerFile.getChannel(),
- ledgerIndexFile.getChannel());
- }
- if (ledgerFile == null) {
- throw new IOException("Found index but no data for " + ledgerId);
- }
- throw new IOException("Found data but no index for " + ledgerId);
+ return new LedgerDescriptor(ledgerId, entryLogger, ledgerCache);
}
- static final private void checkParents(File f) throws IOException {
- File parent = f.getParentFile();
- if (parent.exists()) {
- return;
- }
- if (parent.mkdirs() == false) {
- throw new IOException("Counldn't mkdirs for " + parent);
- }
- }
-
- static final private Random rand = new Random();
-
- static final private File[] pickDirs(File dirs[]) {
- File rc[] = new File[2];
- rc[0] = dirs[rand.nextInt(dirs.length)];
- rc[1] = dirs[rand.nextInt(dirs.length)];
- return rc;
- }
-
- static final private String getLedgerName(long ledgerId, boolean isIndex) {
- int parent = (int) (ledgerId & 0xff);
- int grandParent = (int) ((ledgerId & 0xff00) >> 8);
- StringBuilder sb = new StringBuilder();
- sb.append(Integer.toHexString(grandParent));
- sb.append('/');
- sb.append(Integer.toHexString(parent));
- sb.append('/');
- sb.append(Long.toHexString(ledgerId));
- if (isIndex) {
- sb.append(".idx");
- }
- return sb.toString();
- }
-
static class QueueEntry {
QueueEntry(ByteBuffer entry, long ledgerId, long entryId,
WriteCallback cb, Object ctx) {
@@ -229,15 +271,76 @@
public final static ByteBuffer zeros = ByteBuffer.allocate(512);
+ class LastLogMark {
+ long txnLogId;
+ long txnLogPosition;
+ LastLogMark lastMark;
+ LastLogMark(long logId, long logPosition) {
+ this.txnLogId = logId;
+ this.txnLogPosition = logPosition;
+ }
+ synchronized void setLastLogMark(long logId, long logPosition) {
+ txnLogId = logId;
+ txnLogPosition = logPosition;
+ }
+ synchronized void markLog() {
+ lastMark = new LastLogMark(txnLogId, txnLogPosition);
+ }
+ synchronized void rollLog() {
+ byte buff[] = new byte[16];
+ ByteBuffer bb = ByteBuffer.wrap(buff);
+ bb.putLong(txnLogId);
+ bb.putLong(txnLogPosition);
+ for(File dir: ledgerDirectories) {
+ File file = new File(dir, "lastMark");
+ try {
+ FileOutputStream fos = new FileOutputStream(file);
+ fos.write(buff);
+ fos.getChannel().force(true);
+ fos.close();
+ } catch (IOException e) {
+ LOG.error("Problems writing to " + file, e);
+ }
+ }
+ }
+ synchronized void readLog() {
+ byte buff[] = new byte[16];
+ ByteBuffer bb = ByteBuffer.wrap(buff);
+ for(File dir: ledgerDirectories) {
+ File file = new File(dir, "lastMark");
+ try {
+ FileInputStream fis = new FileInputStream(file);
+ fis.read(buff);
+ fis.close();
+ bb.clear();
+ long i = bb.getLong();
+ long p = bb.getLong();
+ if (i > txnLogId) {
+ txnLogId = i;
+ }
+ if (p > txnLogPosition) {
+ txnLogPosition = p;
+ }
+ } catch (IOException e) {
+ LOG.error("Problems reading from " + file + " (this is okay if it is the first time starting this bookie");
+ }
+ }
+ }
+ }
+
+ private LastLogMark lastLogMark = new LastLogMark(0, 0);
+
+ @Override
public void run() {
LinkedList<QueueEntry> toFlush = new LinkedList<QueueEntry>();
ByteBuffer lenBuff = ByteBuffer.allocate(4);
try {
- FileChannel logFile = new RandomAccessFile(new File(journalDirectory,
- Long.toHexString(System.currentTimeMillis()) + ".txn"),
- "rw").getChannel();
+ long logId = System.currentTimeMillis();
+ FileChannel logFile = openChannel(logId);
+ BufferedChannel bc = new BufferedChannel(logFile, 65536);
zeros.clear();
long nextPrealloc = preAllocSize;
+ long lastFlushPosition = 0;
logFile.write(zeros, nextPrealloc);
while (true) {
QueueEntry qe = null;
@@ -245,10 +348,13 @@
qe = queue.take();
} else {
qe = queue.poll();
- if (qe == null || toFlush.size() > 100) {
- logFile.force(false);
+ if (qe == null || bc.position() > lastFlushPosition + 512*1024) {
+ //logFile.force(false);
+ bc.flush(true);
+ lastFlushPosition = bc.position();
+ lastLogMark.setLastLogMark(logId, lastFlushPosition);
for (QueueEntry e : toFlush) {
- e.cb.writeComplete(0, e.ledgerId, e.entryId, e.ctx);
+ e.cb.writeComplete(0, e.ledgerId, e.entryId, null, e.ctx);
}
toFlush.clear();
}
@@ -259,8 +365,13 @@
lenBuff.clear();
lenBuff.putInt(qe.entry.remaining());
lenBuff.flip();
- logFile.write(new ByteBuffer[] { lenBuff, qe.entry });
- if (logFile.position() > nextPrealloc) {
+ //
+ // we should be doing the following, but then we run out of
+ // direct byte buffers
+ // logFile.write(new ByteBuffer[] { lenBuff, qe.entry });
+ bc.write(lenBuff);
+ bc.write(qe.entry);
+ if (bc.position() > nextPrealloc) {
nextPrealloc = (logFile.size() / preAllocSize + 1) * preAllocSize;
zeros.clear();
logFile.write(zeros, nextPrealloc);
@@ -272,9 +383,18 @@
}
}
+ private FileChannel openChannel(long logId) throws FileNotFoundException {
+ FileChannel logFile = new RandomAccessFile(new File(journalDirectory,
+ Long.toHexString(logId) + ".txn"),
+ "rw").getChannel();
+ return logFile;
+ }
+
public void shutdown() throws InterruptedException {
this.interrupt();
this.join();
+ syncThread.running = false;
+ syncThread.join();
for(LedgerDescriptor d: ledgers.values()) {
d.close();
}
@@ -282,7 +402,6 @@
public void addEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)
throws IOException, BookieException {
-
long ledgerId = entry.getLong();
LedgerDescriptor handle = getHandle(ledgerId, false, masterKey);
@@ -318,7 +437,7 @@
static class CounterCallback implements WriteCallback {
int count;
- synchronized public void writeComplete(int rc, long l, long e, Object ctx) {
+ synchronized public void writeComplete(int rc, long l, long e, InetSocketAddress addr, Object ctx) {
count--;
if (count == 0) {
notifyAll();
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/BufferedChannel.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/BufferedChannel.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/BufferedChannel.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/BufferedChannel.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,157 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+/**
+ * Provides a buffering layer in front of a FileChannel.
+ */
+public class BufferedChannel
+{
+ ByteBuffer writeBuffer;
+ ByteBuffer readBuffer;
+ private FileChannel bc;
+ long position;
+ int capacity;
+ long readBufferStartPosition;
+ long writeBufferStartPosition;
+ BufferedChannel(FileChannel bc, int capacity) throws IOException {
+ this.bc = bc;
+ this.capacity = capacity;
+ position = bc.position();
+ writeBufferStartPosition = position;
+ }
+/* public void close() throws IOException {
+ bc.close();
+ }
+*/
+// public boolean isOpen() {
+// return bc.isOpen();
+// }
+
+ synchronized public int write(ByteBuffer src) throws IOException {
+ int copied = 0;
+ if (writeBuffer == null) {
+ writeBuffer = ByteBuffer.allocateDirect(capacity);
+ }
+ while(src.remaining() > 0) {
+ int truncated = 0;
+ if (writeBuffer.remaining() < src.remaining()) {
+ truncated = src.remaining() - writeBuffer.remaining();
+ src.limit(src.limit()-truncated);
+ }
+ copied += src.remaining();
+ writeBuffer.put(src);
+ src.limit(src.limit()+truncated);
+ if (writeBuffer.remaining() == 0) {
+ writeBuffer.flip();
+ bc.write(writeBuffer);
+ writeBuffer.clear();
+ writeBufferStartPosition = bc.position();
+ }
+ }
+ position += copied;
+ return copied;
+ }
+
+ public long position() {
+ return position;
+ }
+
+ public void flush(boolean sync) throws IOException {
+ synchronized(this) {
+ if (writeBuffer == null) {
+ return;
+ }
+ writeBuffer.flip();
+ bc.write(writeBuffer);
+ writeBuffer.clear();
+ writeBufferStartPosition = bc.position();
+ }
+ if (sync) {
+ bc.force(false);
+ }
+ }
+
+ /*public Channel getInternalChannel() {
+ return bc;
+ }*/
+ synchronized public int read(ByteBuffer buff, long pos) throws IOException {
+ if (readBuffer == null) {
+ readBuffer = ByteBuffer.allocateDirect(capacity);
+ readBufferStartPosition = Long.MIN_VALUE;
+ }
+ int rc = buff.remaining();
+ while(buff.remaining() > 0) {
+ // check if it is in the write buffer
+ if (writeBuffer != null && writeBufferStartPosition <= pos) {
+ long positionInBuffer = pos - writeBufferStartPosition;
+ long bytesToCopy = writeBuffer.position()-positionInBuffer;
+ if (bytesToCopy > buff.remaining()) {
+ bytesToCopy = buff.remaining();
+ }
+ if (bytesToCopy == 0) {
+ throw new IOException("Read past EOF");
+ }
+ ByteBuffer src = writeBuffer.duplicate();
+ src.position((int) positionInBuffer);
+ src.limit((int) (positionInBuffer+bytesToCopy));
+ buff.put(src);
+ pos+= bytesToCopy;
+ // first check if there is anything we can grab from the readBuffer
+ } else if (readBufferStartPosition <= pos && pos < readBufferStartPosition+readBuffer.capacity()) {
+ long positionInBuffer = pos - readBufferStartPosition;
+ long bytesToCopy = readBuffer.capacity()-positionInBuffer;
+ if (bytesToCopy > buff.remaining()) {
+ bytesToCopy = buff.remaining();
+ }
+ ByteBuffer src = readBuffer.duplicate();
+ src.position((int) positionInBuffer);
+ src.limit((int) (positionInBuffer+bytesToCopy));
+ buff.put(src);
+ pos += bytesToCopy;
+ // let's read it
+ } else {
+ readBufferStartPosition = pos;
+ readBuffer.clear();
+ // make sure that we don't overlap with the write buffer
+ if (readBufferStartPosition + readBuffer.capacity() >= writeBufferStartPosition) {
+ readBufferStartPosition = writeBufferStartPosition - readBuffer.capacity();
+ if (readBufferStartPosition < 0) {
+ readBuffer.put(LedgerEntryPage.zeroPage, 0, (int)-readBufferStartPosition);
+ }
+ }
+ while(readBuffer.remaining() > 0) {
+ if (bc.read(readBuffer, readBufferStartPosition+readBuffer.position()) <= 0) {
+ throw new IOException("Short read");
+ }
+ }
+ readBuffer.put(LedgerEntryPage.zeroPage, 0, readBuffer.remaining());
+ readBuffer.clear();
+ }
+ }
+ return rc;
+ }
+}
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/EntryLogger.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/EntryLogger.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/EntryLogger.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/EntryLogger.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,264 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+
+/**
+ * This class manages the writing of the bookkeeper entries. All the new
+ * entries are written to a common log. The LedgerCache will have pointers
+ * into files created by this class with offsets into the files to find
+ * the actual ledger entry. The entry log files created by this class are
+ * identified by a long.
+ */
+public class EntryLogger {
+ private static final Logger LOG = Logger.getLogger(EntryLogger.class);
+ private File dirs[];
+ private long logId;
+ /**
+ * The maximum size of a entry logger file.
+ */
+ final static long LOG_SIZE_LIMIT = 2*1024*1024*1024L;
+ private volatile BufferedChannel logChannel;
+ // The ledgers contained in this file, seems to be unsused right now
+ //private HashSet<Long> ledgerMembers = new HashSet<Long>();
+ /**
+ * The 1K block at the head of the entry logger file
+ * that contains the fingerprint and (future) meta-data
+ */
+ final static ByteBuffer LOGFILE_HEADER = ByteBuffer.allocate(1024);
+ static {
+ LOGFILE_HEADER.put("BKLO".getBytes());
+ }
+ // this indicates that a write has happened since the last flush
+ private volatile boolean somethingWritten = false;
+
+ /**
+ * Create an EntryLogger that stores it's log files in the given
+ * directories
+ */
+ public EntryLogger(File dirs[]) throws IOException {
+ this.dirs = dirs;
+ // Find the largest logId
+ for(File f: dirs) {
+ long lastLogId = getLastLogId(f);
+ if (lastLogId >= logId) {
+ logId = lastLogId+1;
+ }
+ }
+ createLogId(logId);
+ //syncThread = new SyncThread();
+ //syncThread.start();
+ }
+
+ /**
+ * Maps entry log files to open channels.
+ */
+ private ConcurrentHashMap<Long, BufferedChannel> channels = new ConcurrentHashMap<Long, BufferedChannel>();
+
+ /**
+ * Creates a new log file with the given id.
+ */
+ private void createLogId(long logId) throws IOException {
+ List<File> list = Arrays.asList(dirs);
+ Collections.shuffle(list);
+ File firstDir = list.get(0);
+ if (logChannel != null) {
+ logChannel.flush(true);
+ }
+ logChannel = new BufferedChannel(new RandomAccessFile(new File(firstDir, Long.toHexString(logId)+".log"), "rw").getChannel(), 64*1024);
+ logChannel.write((ByteBuffer) LOGFILE_HEADER.clear());
+ channels.put(logId, logChannel);
+ for(File f: dirs) {
+ setLastLogId(f, logId);
+ }
+ }
+
+ /**
+ * writes the given id to the "lastId" file in the given directory.
+ */
+ private void setLastLogId(File dir, long logId) throws IOException {
+ FileOutputStream fos;
+ fos = new FileOutputStream(new File(dir, "lastId"));
+ BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fos));
+ try {
+ bw.write(Long.toHexString(logId) + "\n");
+ bw.flush();
+ } finally {
+ try {
+ fos.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+
+ /**
+ * reads id from the "lastId" file in the given directory.
+ */
+ private long getLastLogId(File f) {
+ FileInputStream fis;
+ try {
+ fis = new FileInputStream(new File(f, "lastId"));
+ } catch (FileNotFoundException e) {
+ return -1;
+ }
+ BufferedReader br = new BufferedReader(new InputStreamReader(fis));
+ try {
+ String lastIdString = br.readLine();
+ return Long.parseLong(lastIdString);
+ } catch (IOException e) {
+ return -1;
+ } catch(NumberFormatException e) {
+ return -1;
+ } finally {
+ try {
+ fis.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+
+ private void openNewChannel() throws IOException {
+ createLogId(++logId);
+ }
+
+ synchronized void flush() throws IOException {
+ if (logChannel != null) {
+ logChannel.flush(true);
+ }
+ }
+ synchronized long addEntry(long ledger, ByteBuffer entry) throws IOException {
+ if (logChannel.position() + entry.remaining() + 4 > LOG_SIZE_LIMIT) {
+ openNewChannel();
+ }
+ ByteBuffer buff = ByteBuffer.allocate(4);
+ buff.putInt(entry.remaining());
+ buff.flip();
+ logChannel.write(buff);
+ long pos = logChannel.position();
+ logChannel.write(entry);
+ //logChannel.flush(false);
+ somethingWritten = true;
+ return (logId << 32L) | pos;
+ }
+
+ byte[] readEntry(long ledgerId, long entryId, long location) throws IOException {
+ long entryLogId = location >> 32L;
+ long pos = location & 0xffffffffL;
+ ByteBuffer sizeBuff = ByteBuffer.allocate(4);
+ pos -= 4; // we want to get the ledgerId and length to check
+ BufferedChannel fc;
+ try {
+ fc = getChannelForLogId(entryLogId);
+ } catch (FileNotFoundException e) {
+ FileNotFoundException newe = new FileNotFoundException(e.getMessage() + " for " + ledgerId + " with location " + location);
+ newe.setStackTrace(e.getStackTrace());
+ throw newe;
+ }
+ if (fc.read(sizeBuff, pos) != sizeBuff.capacity()) {
+ throw new IOException("Short read from entrylog " + entryLogId);
+ }
+ pos += 4;
+ sizeBuff.flip();
+ int entrySize = sizeBuff.getInt();
+ // entrySize does not include the ledgerId
+ if (entrySize > 1024*1024) {
+ LOG.error("Sanity check failed for entry size of " + entrySize + " at location " + pos + " in " + entryLogId);
+
+ }
+ byte data[] = new byte[entrySize];
+ ByteBuffer buff = ByteBuffer.wrap(data);
+ int rc = fc.read(buff, pos);
+ if ( rc != data.length) {
+ throw new IOException("Short read for " + ledgerId + "@" + entryId + " in " + entryLogId + "@" + pos + "("+rc+"!="+data.length+")");
+ }
+ buff.flip();
+ long thisLedgerId = buff.getLong();
+ if (thisLedgerId != ledgerId) {
+ throw new IOException("problem found in " + entryLogId + "@" + entryId + " at position + " + pos + " entry belongs to " + thisLedgerId + " not " + ledgerId);
+ }
+ long thisEntryId = buff.getLong();
+ if (thisEntryId != entryId) {
+ throw new IOException("problem found in " + entryLogId + "@" + entryId + " at position + " + pos + " entry is " + thisEntryId + " not " + entryId);
+ }
+
+ return data;
+ }
+
+ private BufferedChannel getChannelForLogId(long entryLogId) throws IOException {
+ BufferedChannel fc = channels.get(entryLogId);
+ if (fc != null) {
+ return fc;
+ }
+ File file = findFile(entryLogId);
+ FileChannel newFc = new RandomAccessFile(file, "rw").getChannel();
+ synchronized (channels) {
+ fc = channels.get(entryLogId);
+ if (fc != null){
+ newFc.close();
+ return fc;
+ }
+ fc = new BufferedChannel(newFc, 8192);
+ channels.put(entryLogId, fc);
+ return fc;
+ }
+ }
+
+ private File findFile(long logId) throws FileNotFoundException {
+ for(File d: dirs) {
+ File f = new File(d, Long.toHexString(logId)+".log");
+ if (f.exists()) {
+ return f;
+ }
+ }
+ throw new FileNotFoundException("No file for log " + Long.toHexString(logId));
+ }
+
+ public void close() {
+ }
+
+ synchronized public boolean testAndClearSomethingWritten() {
+ try {
+ return somethingWritten;
+ } finally {
+ somethingWritten = false;
+ }
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/FileInfo.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/FileInfo.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/FileInfo.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/FileInfo.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,113 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+/**
+ * This is the file handle for a ledger's index file that maps entry ids to location.
+ * It is used by LedgerCache.
+ */
+class FileInfo {
+ private FileChannel fc;
+ /**
+ * The fingerprint of a ledger index file
+ */
+ private byte header[] = "BKLE\0\0\0\0".getBytes();
+ static final long START_OF_DATA = 1024;
+ private long size;
+ private int useCount;
+ private boolean isClosed;
+ public FileInfo(File lf) throws IOException {
+ fc = new RandomAccessFile(lf, "rws").getChannel();
+ size = fc.size();
+ if (size == 0) {
+ fc.write(ByteBuffer.wrap(header));
+ }
+ }
+
+ synchronized public long size() {
+ long rc = size-START_OF_DATA;
+ if (rc < 0) {
+ rc = 0;
+ }
+ return rc;
+ }
+
+ synchronized public int read(ByteBuffer bb, long position) throws IOException {
+ int total = 0;
+ while(bb.remaining() > 0) {
+ int rc = fc.read(bb, position+START_OF_DATA);
+ if (rc <= 0) {
+ throw new IOException("Short read");
+ }
+ total += rc;
+ }
+ return total;
+ }
+
+ synchronized public void close() throws IOException {
+ isClosed = true;
+ if (useCount == 0) {
+ fc.close();
+ }
+ }
+
+ synchronized public long write(ByteBuffer[] buffs, long position) throws IOException {
+ long total = 0;
+ try {
+ fc.position(position+START_OF_DATA);
+ while(buffs[buffs.length-1].remaining() > 0) {
+ long rc = fc.write(buffs);
+ if (rc <= 0) {
+ throw new IOException("Short write");
+ }
+ total += rc;
+ }
+ } finally {
+ long newsize = position+START_OF_DATA+total;
+ if (newsize > size) {
+ size = newsize;
+ }
+ }
+ return total;
+ }
+
+ synchronized public void use() {
+ useCount++;
+ }
+
+ synchronized public void release() {
+ useCount--;
+ if (isClosed && useCount == 0) {
+ try {
+ fc.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerCache.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerCache.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerCache.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerCache.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,454 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.log4j.Logger;
+
+/**
+ * This class maps a ledger entry number into a location (entrylogid, offset) in
+ * an entry log file. It does user level caching to more efficiently manage disk
+ * head scheduling.
+ */
+public class LedgerCache {
+ private final static Logger LOG = Logger.getLogger(LedgerDescriptor.class);
+
+ final File ledgerDirectories[];
+
+ public LedgerCache(File ledgerDirectories[]) {
+ this.ledgerDirectories = ledgerDirectories;
+ }
+ /**
+ * the list of potentially clean ledgers
+ */
+ LinkedList<Long> cleanLedgers = new LinkedList<Long>();
+
+ /**
+ * the list of potentially dirty ledgers
+ */
+ LinkedList<Long> dirtyLedgers = new LinkedList<Long>();
+
+ HashMap<Long, FileInfo> fileInfoCache = new HashMap<Long, FileInfo>();
+
+ LinkedList<Long> openLedgers = new LinkedList<Long>();
+
+ static int OPEN_FILE_LIMIT = 900;
+ static {
+ if (System.getProperty("openFileLimit") != null) {
+ OPEN_FILE_LIMIT = Integer.parseInt(System.getProperty("openFileLimit"));
+ }
+ LOG.info("openFileLimit is " + OPEN_FILE_LIMIT);
+ }
+
+ // allocate half of the memory to the page cache
+ private static int pageLimit = (int)(Runtime.getRuntime().maxMemory() / 3) / LedgerEntryPage.PAGE_SIZE;
+ static {
+ LOG.info("maxMemory = " + Runtime.getRuntime().maxMemory());
+ if (System.getProperty("pageLimit") != null) {
+ pageLimit = Integer.parseInt(System.getProperty("pageLimit"));
+ }
+ LOG.info("pageLimit is " + pageLimit);
+ }
+ // The number of pages that have actually been used
+ private int pageCount;
+ HashMap<Long, HashMap<Long,LedgerEntryPage>> pages = new HashMap<Long, HashMap<Long,LedgerEntryPage>>();
+
+ private void putIntoTable(HashMap<Long, HashMap<Long,LedgerEntryPage>> table, LedgerEntryPage lep) {
+ HashMap<Long, LedgerEntryPage> map = table.get(lep.getLedger());
+ if (map == null) {
+ map = new HashMap<Long, LedgerEntryPage>();
+ table.put(lep.getLedger(), map);
+ }
+ map.put(lep.getFirstEntry(), lep);
+ }
+
+ private static LedgerEntryPage getFromTable(HashMap<Long, HashMap<Long,LedgerEntryPage>> table, Long ledger, Long firstEntry) {
+ HashMap<Long, LedgerEntryPage> map = table.get(ledger);
+ if (map != null) {
+ return map.get(firstEntry);
+ }
+ return null;
+ }
+
+ synchronized private LedgerEntryPage getLedgerEntryPage(Long ledger, Long firstEntry, boolean onlyDirty) {
+ LedgerEntryPage lep = getFromTable(pages, ledger, firstEntry);
+ try {
+ if (onlyDirty && lep.isClean()) {
+ return null;
+ }
+ return lep;
+ } finally {
+ if (lep != null) {
+ lep.usePage();
+ }
+ }
+ }
+
+ public void putEntryOffset(long ledger, long entry, long offset) throws IOException {
+ int offsetInPage = (int) (entry%LedgerEntryPage.ENTRIES_PER_PAGES);
+ // find the id of the first entry of the page that has the entry
+ // we are looking for
+ long pageEntry = entry-offsetInPage;
+ LedgerEntryPage lep = getLedgerEntryPage(ledger, pageEntry, false);
+ if (lep == null) {
+ // find a free page
+ lep = grabCleanPage(ledger, pageEntry);
+ updatePage(lep);
+ synchronized(this) {
+ putIntoTable(pages, lep);
+ }
+ }
+ if (lep != null) {
+ lep.setOffset(offset, offsetInPage*8);
+ lep.releasePage();
+ return;
+ }
+ }
+
+ public long getEntryOffset(long ledger, long entry) throws IOException {
+ int offsetInPage = (int) (entry%LedgerEntryPage.ENTRIES_PER_PAGES);
+ // find the id of the first entry of the page that has the entry
+ // we are looking for
+ long pageEntry = entry-offsetInPage;
+ LedgerEntryPage lep = getLedgerEntryPage(ledger, pageEntry, false);
+ try {
+ if (lep == null) {
+ lep = grabCleanPage(ledger, pageEntry);
+ synchronized(this) {
+ putIntoTable(pages, lep);
+ }
+ updatePage(lep);
+
+ }
+ return lep.getOffset(offsetInPage*8);
+ } finally {
+ if (lep != null) {
+ lep.releasePage();
+ }
+ }
+ }
+
+ static final private String getLedgerName(long ledgerId) {
+ int parent = (int) (ledgerId & 0xff);
+ int grandParent = (int) ((ledgerId & 0xff00) >> 8);
+ StringBuilder sb = new StringBuilder();
+ sb.append(Integer.toHexString(grandParent));
+ sb.append('/');
+ sb.append(Integer.toHexString(parent));
+ sb.append('/');
+ sb.append(Long.toHexString(ledgerId));
+ sb.append(".idx");
+ return sb.toString();
+ }
+
+ static final private void checkParents(File f) throws IOException {
+ File parent = f.getParentFile();
+ if (parent.exists()) {
+ return;
+ }
+ if (parent.mkdirs() == false) {
+ throw new IOException("Counldn't mkdirs for " + parent);
+ }
+ }
+
+ static final private Random rand = new Random();
+
+ static final private File pickDirs(File dirs[]) {
+ return dirs[rand.nextInt(dirs.length)];
+ }
+
+ FileInfo getFileInfo(Long ledger, boolean create) throws IOException {
+ synchronized(fileInfoCache) {
+ FileInfo fi = fileInfoCache.get(ledger);
+ if (fi == null) {
+ String ledgerName = getLedgerName(ledger);
+ File lf = null;
+ for(File d: ledgerDirectories) {
+ lf = new File(d, ledgerName);
+ if (lf.exists()) {
+ break;
+ }
+ lf = null;
+ }
+ if (lf == null) {
+ if (!create) {
+ throw new Bookie.NoLedgerException(ledger);
+ }
+ File dir = pickDirs(ledgerDirectories);
+ lf = new File(dir, ledgerName);
+ checkParents(lf);
+ }
+ if (openLedgers.size() > OPEN_FILE_LIMIT) {
+ fileInfoCache.remove(openLedgers.removeFirst()).close();
+ }
+ fi = new FileInfo(lf);
+ fileInfoCache.put(ledger, fi);
+ openLedgers.add(ledger);
+ }
+ if (fi != null) {
+ fi.use();
+ }
+ return fi;
+ }
+ }
+ private void updatePage(LedgerEntryPage lep) throws IOException {
+ if (!lep.isClean()) {
+ throw new IOException("Trying to update a dirty page");
+ }
+ FileInfo fi = null;
+ try {
+ fi = getFileInfo(lep.getLedger(), true);
+ long pos = lep.getFirstEntry()*8;
+ if (pos >= fi.size()) {
+ lep.zeroPage();
+ } else {
+ lep.readPage(fi);
+ }
+ } finally {
+ if (fi != null) {
+ fi.release();
+ }
+ }
+ }
+
+ void flushLedger(boolean doAll) throws IOException {
+ synchronized(dirtyLedgers) {
+ if (dirtyLedgers.isEmpty()) {
+ synchronized(this) {
+ for(Long l: pages.keySet()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Adding " + Long.toHexString(l) + " to dirty pages");
+ }
+ dirtyLedgers.add(l);
+ }
+ }
+ }
+ if (dirtyLedgers.isEmpty()) {
+ return;
+ }
+ while(!dirtyLedgers.isEmpty()) {
+ Long l = dirtyLedgers.removeFirst();
+ LinkedList<Long> firstEntryList;
+ synchronized(this) {
+ HashMap<Long, LedgerEntryPage> pageMap = pages.get(l);
+ if (pageMap == null || pageMap.isEmpty()) {
+ continue;
+ }
+ firstEntryList = new LinkedList<Long>();
+ for(Map.Entry<Long, LedgerEntryPage> entry: pageMap.entrySet()) {
+ LedgerEntryPage lep = entry.getValue();
+ if (lep.isClean()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Page is clean " + lep);
+ }
+ continue;
+ }
+ firstEntryList.add(lep.getFirstEntry());
+ }
+ }
+ // Now flush all the pages of a ledger
+ List<LedgerEntryPage> entries = new ArrayList<LedgerEntryPage>(firstEntryList.size());
+ FileInfo fi = null;
+ try {
+ for(Long firstEntry: firstEntryList) {
+ LedgerEntryPage lep = getLedgerEntryPage(l, firstEntry, true);
+ if (lep != null) {
+ entries.add(lep);
+ }
+ }
+ Collections.sort(entries, new Comparator<LedgerEntryPage>() {
+ @Override
+ public int compare(LedgerEntryPage o1, LedgerEntryPage o2) {
+ return (int)(o1.getFirstEntry()-o2.getFirstEntry());
+ }});
+ ArrayList<Integer> versions = new ArrayList<Integer>(entries.size());
+ fi = getFileInfo(l, true);
+ int start = 0;
+ long lastOffset = -1;
+ for(int i = 0; i < entries.size(); i++) {
+ versions.add(i, entries.get(i).getVersion());
+ if (lastOffset != -1 && (entries.get(i).getFirstEntry() - lastOffset) != LedgerEntryPage.ENTRIES_PER_PAGES) {
+ // send up a sequential list
+ int count = i - start;
+ if (count == 0) {
+ System.out.println("Count cannot possibly be zero!");
+ }
+ writeBuffers(l, entries, fi, start, count);
+ start = i;
+ }
+ lastOffset = entries.get(i).getFirstEntry();
+ }
+ if (entries.size()-start == 0 && entries.size() != 0) {
+ System.out.println("Nothing to write, but there were entries!");
+ }
+ writeBuffers(l, entries, fi, start, entries.size()-start);
+ synchronized(this) {
+ for(int i = 0; i < entries.size(); i++) {
+ LedgerEntryPage lep = entries.get(i);
+ lep.setClean(versions.get(i));
+ }
+ }
+ } finally {
+ for(LedgerEntryPage lep: entries) {
+ lep.releasePage();
+ }
+ if (fi != null) {
+ fi.release();
+ }
+ }
+ if (!doAll) {
+ break;
+ }
+ // Yeild. if we are doing all the ledgers we don't want to block other flushes that
+ // need to happen
+ try {
+ dirtyLedgers.wait(1);
+ } catch (InterruptedException e) {
+ // just pass it on
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+
+ private void writeBuffers(Long ledger,
+ List<LedgerEntryPage> entries, FileInfo fi,
+ int start, int count) throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Writing " + count + " buffers of " + Long.toHexString(ledger));
+ }
+ if (count == 0) {
+ //System.out.println("Count is zero!");
+ return;
+ }
+ ByteBuffer buffs[] = new ByteBuffer[count];
+ for(int j = 0; j < count; j++) {
+ buffs[j] = entries.get(start+j).getPageToWrite();
+ if (entries.get(start+j).getLedger() != ledger) {
+ throw new IOException("Writing to " + ledger + " but page belongs to " + entries.get(start+j).getLedger());
+ }
+ }
+ long totalWritten = 0;
+ while(buffs[buffs.length-1].remaining() > 0) {
+ long rc = fi.write(buffs, entries.get(start+0).getFirstEntry()*8);
+ if (rc <= 0) {
+ throw new IOException("Short write to ledger " + ledger + " rc = " + rc);
+ }
+ //System.out.println("Wrote " + rc + " to " + ledger);
+ totalWritten += rc;
+ }
+ if (totalWritten != count*LedgerEntryPage.PAGE_SIZE) {
+ throw new IOException("Short write to ledger " + ledger + " wrote " + totalWritten + " expected " + count*LedgerEntryPage.PAGE_SIZE);
+ }
+ }
+ private LedgerEntryPage grabCleanPage(long ledger, long entry) throws IOException {
+ if (entry % LedgerEntryPage.ENTRIES_PER_PAGES != 0) {
+ throw new IllegalArgumentException(entry + " is not a multiple of " + LedgerEntryPage.ENTRIES_PER_PAGES);
+ }
+ synchronized(this) {
+ if (pageCount < pageLimit) {
+ // let's see if we can allocate something
+ LedgerEntryPage lep = new LedgerEntryPage();
+ lep.setLedger(ledger);
+ lep.setFirstEntry(entry);
+ // note, this will not block since it is a new page
+ lep.usePage();
+ pageCount++;
+ return lep;
+ }
+ }
+
+ outerLoop:
+ while(true) {
+ synchronized(cleanLedgers) {
+ if (cleanLedgers.isEmpty()) {
+ flushLedger(false);
+ synchronized(this) {
+ for(Long l: pages.keySet()) {
+ cleanLedgers.add(l);
+ }
+ }
+ }
+ synchronized(this) {
+ Long cleanLedger = cleanLedgers.getFirst();
+ Map<Long, LedgerEntryPage> map = pages.get(cleanLedger);
+ if (map == null || map.isEmpty()) {
+ cleanLedgers.removeFirst();
+ continue;
+ }
+ Iterator<Map.Entry<Long, LedgerEntryPage>> it = map.entrySet().iterator();
+ LedgerEntryPage lep = it.next().getValue();
+ while((lep.inUse() || !lep.isClean())) {
+ if (it.hasNext()) {
+ continue outerLoop;
+ }
+ lep = it.next().getValue();
+ }
+ it.remove();
+ if (map.isEmpty()) {
+ pages.remove(lep.getLedger());
+ }
+ lep.usePage();
+ lep.zeroPage();
+ lep.setLedger(ledger);
+ lep.setFirstEntry(entry);
+ return lep;
+ }
+ }
+ }
+ }
+
+ public long getLastEntry(long ledgerId) {
+ long lastEntry = 0;
+ // Find the last entry in the cache
+ synchronized(this) {
+ Map<Long, LedgerEntryPage> map = pages.get(ledgerId);
+ if (map != null) {
+ for(LedgerEntryPage lep: map.values()) {
+ if (lep.getFirstEntry() + LedgerEntryPage.ENTRIES_PER_PAGES < lastEntry) {
+ continue;
+ }
+ lep.usePage();
+ long highest = lep.getLastEntry();
+ if (highest > lastEntry) {
+ lastEntry = highest;
+ }
+ lep.releasePage();
+ }
+ }
+ }
+
+ return lastEntry;
+ }
+}
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java Tue Jan 26 23:16:45 2010
@@ -1,4 +1,3 @@
-package org.apache.bookkeeper.bookie;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,11 +19,10 @@
*
*/
+package org.apache.bookkeeper.bookie;
import java.io.IOException;
-import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
import org.apache.log4j.Logger;
@@ -36,11 +34,12 @@
*
*/
public class LedgerDescriptor {
- Logger LOG = Logger.getLogger(LedgerDescriptor.class);
- LedgerDescriptor(long ledgerId, FileChannel ledger, FileChannel ledgerIndex) {
+ final static Logger LOG = Logger.getLogger(LedgerDescriptor.class);
+ LedgerCache ledgerCache;
+ LedgerDescriptor(long ledgerId, EntryLogger entryLogger, LedgerCache ledgerCache) {
this.ledgerId = ledgerId;
- this.ledger = ledger;
- this.ledgerIndex = ledgerIndex;
+ this.entryLogger = entryLogger;
+ this.ledgerCache = ledgerCache;
}
private ByteBuffer masterKey = null;
@@ -54,8 +53,7 @@
}
private long ledgerId;
- private FileChannel ledger;
- private FileChannel ledgerIndex;
+ EntryLogger entryLogger;
private int refCnt;
synchronized public void incRef() {
refCnt++;
@@ -66,100 +64,70 @@
synchronized public int getRefCnt() {
return refCnt;
}
- static private final long calcEntryOffset(long entryId) {
- return 8L*entryId;
- }
long addEntry(ByteBuffer entry) throws IOException {
- ByteBuffer offsetBuffer = ByteBuffer.wrap(new byte[8]);
long ledgerId = entry.getLong();
if (ledgerId != this.ledgerId) {
throw new IOException("Entry for ledger " + ledgerId + " was sent to " + this.ledgerId);
}
- /*
- * Get entry id
- */
-
long entryId = entry.getLong();
entry.rewind();
/*
- * Set offset of entry id to be the current ledger position
+ * Log the entry
*/
- offsetBuffer.rewind();
- offsetBuffer.putLong(ledger.position());
- //LOG.debug("Offset: " + ledger.position() + ", " + entry.position() + ", " + calcEntryOffset(entryId) + ", " + entryId);
- offsetBuffer.flip();
-
- /*
- * Write on the index entry corresponding to entryId the position
- * of this entry.
- */
- ledgerIndex.write(offsetBuffer, calcEntryOffset(entryId));
- ByteBuffer lenBuffer = ByteBuffer.allocate(4);
+ long pos = entryLogger.addEntry(ledgerId, entry);
- lenBuffer.putInt(entry.remaining());
- lenBuffer.flip();
-
/*
- * Write length of entry first, then the entry itself
+ * Set offset of entry id to be the current ledger position
*/
- ledger.write(lenBuffer);
- ledger.write(entry);
- //entry.position(24);
- //LOG.debug("Entry: " + entry.position() + ", " + new String(entry.array()));
-
+ ledgerCache.putEntryOffset(ledgerId, entryId, pos);
return entryId;
}
ByteBuffer readEntry(long entryId) throws IOException {
- ByteBuffer buffer = ByteBuffer.wrap(new byte[8]);
long offset;
/*
* If entryId is -1, then return the last written.
*/
if (entryId == -1) {
- offset = ledgerIndex.size()-8;
- } else {
- offset = calcEntryOffset(entryId);
+ long lastEntry = ledgerCache.getLastEntry(ledgerId);
+ FileInfo fi = null;
+ try {
+ fi = ledgerCache.getFileInfo(ledgerId, false);
+ long size = fi.size();
+ // we may not have the last entry in the cache
+ if (size > lastEntry*8) {
+ ByteBuffer bb = ByteBuffer.allocate(LedgerEntryPage.PAGE_SIZE);
+ long position = size-LedgerEntryPage.PAGE_SIZE;
+ if (position < 0) {
+ position = 0;
+ }
+ fi.read(bb, position);
+ bb.flip();
+ long startingEntryId = position/8;
+ for(int i = LedgerEntryPage.ENTRIES_PER_PAGES-1; i >= 0; i--) {
+ if (bb.getLong(i*8) != 0) {
+ if (lastEntry < startingEntryId+i) {
+ lastEntry = startingEntryId+i;
+ }
+ break;
+ }
+ }
+ }
+ } finally {
+ if (fi != null) {
+ fi.release();
+ }
+ }
+ entryId = lastEntry;
}
- int len = ledgerIndex.read(buffer, offset);
- buffer.flip();
- if (len != buffer.limit()) {
- throw new Bookie.NoEntryException(ledgerId, entryId);
- }
- offset = buffer.getLong();
+
+ offset = ledgerCache.getEntryOffset(ledgerId, entryId);
if (offset == 0) {
throw new Bookie.NoEntryException(ledgerId, entryId);
}
- LOG.debug("Offset: " + offset);
-
- buffer.limit(4);
- buffer.rewind();
- /*
- * Read the length
- */
- ledger.read(buffer, offset);
- buffer.flip();
- len = buffer.getInt();
- LOG.debug("Length of buffer: " + len);
- buffer = ByteBuffer.allocate(len);
- /*
- * Read the rest. We add 4 to skip the length
- */
- ledger.read(buffer, offset + 4);
- buffer.flip();
- return buffer;
+ return ByteBuffer.wrap(entryLogger.readEntry(ledgerId, entryId, offset));
}
void close() {
- try {
- ledger.close();
- } catch (IOException e) {
- LOG.warn("Error closing ledger " + ledgerId, e);
- }
- try {
- ledgerIndex.close();
- } catch (IOException e) {
- LOG.warn("Error closing index for ledger " + ledgerId, e);
- }
}
}
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,151 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * This is a page in the LedgerCache. It holds the locations
+ * (entrylogfile, offset) for entry ids.
+ */
+public class LedgerEntryPage {
+ public static final int PAGE_SIZE = 8192;
+ public static final int ENTRIES_PER_PAGES = PAGE_SIZE/8;
+ private long ledger = -1;
+ private long firstEntry = -1;
+ private ByteBuffer page = ByteBuffer.allocateDirect(PAGE_SIZE);
+ private boolean clean = true;
+ private boolean pinned = false;
+ private int useCount;
+ private int version;
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(getLedger());
+ sb.append('@');
+ sb.append(getFirstEntry());
+ sb.append(clean ? " clean " : " dirty ");
+ sb.append(useCount);
+ return sb.toString();
+ }
+ synchronized public void usePage() {
+ useCount++;
+ }
+ synchronized public void pin() {
+ pinned = true;
+ }
+ synchronized public void unpin() {
+ pinned = false;
+ }
+ synchronized public boolean isPinned() {
+ return pinned;
+ }
+ synchronized public void releasePage() {
+ useCount--;
+ if (useCount < 0) {
+ throw new IllegalStateException("Use count has gone below 0");
+ }
+ }
+ synchronized private void checkPage() {
+ if (useCount <= 0) {
+ throw new IllegalStateException("Page not marked in use");
+ }
+ }
+ @Override
+ public boolean equals(Object other) {
+ LedgerEntryPage otherLEP = (LedgerEntryPage) other;
+ return otherLEP.getLedger() == getLedger() && otherLEP.getFirstEntry() == getFirstEntry();
+ }
+ @Override
+ public int hashCode() {
+ return (int)getLedger() ^ (int)(getFirstEntry());
+ }
+ void setClean(int versionOfCleaning) {
+ this.clean = (versionOfCleaning == version);
+ }
+ boolean isClean() {
+ return clean;
+ }
+ public void setOffset(long offset, int position) {
+ checkPage();
+ version++;
+ this.clean = false;
+ page.putLong(position, offset);
+ }
+ public long getOffset(int position) {
+ checkPage();
+ return page.getLong(position);
+ }
+ static final byte zeroPage[] = new byte[64*1024];
+ public void zeroPage() {
+ checkPage();
+ page.clear();
+ page.put(zeroPage, 0, page.remaining());
+ clean = true;
+ }
+ public void readPage(FileInfo fi) throws IOException {
+ checkPage();
+ page.clear();
+ while(page.remaining() != 0) {
+ if (fi.read(page, getFirstEntry()*8) <= 0) {
+ throw new IOException("Short page read of ledger " + getLedger() + " tried to get " + page.capacity() + " from position " + getFirstEntry()*8 + " still need " + page.remaining());
+ }
+ }
+ clean = true;
+ }
+ public ByteBuffer getPageToWrite() {
+ checkPage();
+ page.clear();
+ return page;
+ }
+ void setLedger(long ledger) {
+ this.ledger = ledger;
+ }
+ long getLedger() {
+ return ledger;
+ }
+ int getVersion() {
+ return version;
+ }
+ void setFirstEntry(long firstEntry) {
+ if (firstEntry % ENTRIES_PER_PAGES != 0) {
+ throw new IllegalArgumentException(firstEntry + " is not a multiple of " + ENTRIES_PER_PAGES);
+ }
+ this.firstEntry = firstEntry;
+ }
+ long getFirstEntry() {
+ return firstEntry;
+ }
+ public boolean inUse() {
+ return useCount > 0;
+ }
+ public long getLastEntry() {
+ for(int i = ENTRIES_PER_PAGES - 1; i >= 0; i--) {
+ if (getOffset(i*8) > 0) {
+ return i + firstEntry;
+ }
+ }
+ return 0;
+ }
+}
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/MarkerFileChannel.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/MarkerFileChannel.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/MarkerFileChannel.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/MarkerFileChannel.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,147 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * This class is just a stub that can be used in collections with
+ * FileChannels
+ */
+public class MarkerFileChannel extends FileChannel {
+
+ @Override
+ public void force(boolean metaData) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public FileLock lock(long position, long size, boolean shared)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public MappedByteBuffer map(MapMode mode, long position, long size)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public long position() throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public FileChannel position(long newPosition) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public int read(ByteBuffer dst) throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public int read(ByteBuffer dst, long position) throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public long read(ByteBuffer[] dsts, int offset, int length)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public long size() throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public long transferFrom(ReadableByteChannel src, long position, long count)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public long transferTo(long position, long count, WritableByteChannel target)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public FileChannel truncate(long size) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public FileLock tryLock(long position, long size, boolean shared)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public int write(ByteBuffer src, long position) throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public long write(ByteBuffer[] srcs, int offset, int length)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ protected void implCloseChannel() throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+}
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java Tue Jan 26 23:16:45 2010
@@ -1,81 +1,101 @@
package org.apache.bookkeeper.client;
+import java.util.Enumeration;
+
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
*/
public interface AsyncCallback {
- public interface AddCallback {
- /**
- * Callback declaration
- *
- * @param rc return code
- * @param ledgerId ledger identifier
- * @param entryId entry identifier
- * @param ctx control object
- */
- void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx);
- }
-
- public interface CloseCallback {
- /**
- * Callback definition
- *
- * @param rc return code
- * @param ledgerId ledger identifier
- * @param ctx control object
- */
- void closeComplete(int rc, LedgerHandle lh, Object ctx);
- }
-
- public interface CreateCallback {
- /**
- * Declaration of callback method
- *
- * @param rc return status
- * @param lh ledger handle
- * @param ctx control object
- */
-
- void createComplete(int rc, LedgerHandle lh, Object ctx);
- }
-
- public interface OpenCallback {
- /**
- * Callback for asynchronous call to open ledger
- *
- * @param rc
- * @param lh
- * @param ctx
- */
-
- public void openComplete(int rc, LedgerHandle lh, Object ctx);
-
- }
-
- public interface ReadCallback {
- /**
- * Callback declaration
- *
- * @param rc return code
- * @param ledgerId ledger identifier
- * @param seq sequence of entries
- * @param ctx control object
- */
- void readComplete(int rc, LedgerHandle lh, LedgerSequence seq, Object ctx);
- }
+ public interface AddCallback {
+ /**
+ * Callback declaration
+ *
+ * @param rc
+ * return code
+ * @param ledgerId
+ * ledger identifier
+ * @param entryId
+ * entry identifier
+ * @param ctx
+ * control object
+ */
+ void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx);
+ }
+
+ public interface CloseCallback {
+ /**
+ * Callback definition
+ *
+ * @param rc
+ * return code
+ * @param ledgerId
+ * ledger identifier
+ * @param ctx
+ * control object
+ */
+ void closeComplete(int rc, LedgerHandle lh, Object ctx);
+ }
+
+ public interface CreateCallback {
+ /**
+ * Declaration of callback method
+ *
+ * @param rc
+ * return status
+ * @param lh
+ * ledger handle
+ * @param ctx
+ * control object
+ */
+
+ void createComplete(int rc, LedgerHandle lh, Object ctx);
+ }
+
+ public interface OpenCallback {
+ /**
+ * Callback for asynchronous call to open ledger
+ *
+ * @param rc
+ * Return code
+ * @param lh
+ * ledger handle
+ * @param ctx
+ * control object
+ */
+
+ public void openComplete(int rc, LedgerHandle lh, Object ctx);
+
+ }
+
+ public interface ReadCallback {
+ /**
+ * Callback declaration
+ *
+ * @param rc
+ * return code
+ * @param ledgerId
+ * ledger identifier
+ * @param seq
+ * sequence of entries
+ * @param ctx
+ * control object
+ */
+
+ void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq,
+ Object ctx);
+ }
}
|