hadoop-zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maha...@apache.org
Subject svn commit: r903483 [1/6] - in /hadoop/zookeeper/trunk: ./ src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/ src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ src/cont...
Date Tue, 26 Jan 2010 23:16:49 GMT
Author: mahadev
Date: Tue Jan 26 23:16:45 2010
New Revision: 903483

URL: http://svn.apache.org/viewvc?rev=903483&view=rev
Log:
ZOOKEEPER-507. BookKeeper client re-write (Utkarsh and ben via mahadev)

Added:
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/BufferedChannel.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/EntryLogger.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/FileInfo.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerCache.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/MarkerFileChannel.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieWatcher.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/CRC32DigestManager.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DigestManager.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DistributionSchedule.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerCreateOp.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerMetadata.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerOpenOp.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/MacDigestManager.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingAddOp.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/SyncCounter.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/MathUtils.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/SafeRunnable.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/StringUtils.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BaseTestCase.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
Removed:
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKDefs.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerManagementProcessor.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerSequence.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ReadEntryCallback.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/WriteCallback.java
Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerEntry.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieProtocol.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/NIOServerFactory.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ServerStats.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerInputStream.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerOutputStream.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/LocalBookKeeper.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/Main.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/AsyncLedgerOpsTest.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieFailureTest.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/CloseTest.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LedgerRecoveryTest.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LoopbackClient.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/NIOServerFactoryTest.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Tue Jan 26 23:16:45 2010
@@ -266,6 +266,8 @@
   ZOOKEEPER-593.  java client api does not allow client to access negotiated
   session timeout (phunt via mahadev)
 
+  ZOOKEEPER-507. BookKeeper client re-write (Utkarsh and ben via mahadev)
+
 NEW FEATURES:
   ZOOKEEPER-539. generate eclipse project via ant target. (phunt via mahadev)
 

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java Tue Jan 26 23:16:45 2010
@@ -1,4 +1,3 @@
-package org.apache.bookkeeper.bookie;
 /*
  * 
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,19 +19,25 @@
  * 
  */
 
+package org.apache.bookkeeper.bookie;
 
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
-import java.util.Random;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.bookkeeper.bookie.BookieException;
-import org.apache.bookkeeper.proto.WriteCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.log4j.Logger;
 
 
@@ -45,10 +50,6 @@
 public class Bookie extends Thread {
     HashMap<Long, LedgerDescriptor> ledgers = new HashMap<Long, LedgerDescriptor>();
     static Logger LOG = Logger.getLogger(Bookie.class);
-    /**
-     * 4 byte signature followed by 2-byte major and 2-byte minor versions
-     */
-    private static byte ledgerHeader[] =  { 0x42, 0x6f, 0x6f, 0x6b, 0, 0, 0, 0};
     
     final File journalDirectory;
 
@@ -69,6 +70,7 @@
         private long ledgerId;
         private long entryId;
         public NoEntryException(long ledgerId, long entryId) {
+            super("Entry " + entryId + " not found in " + ledgerId);
             this.ledgerId = ledgerId;
             this.entryId = entryId;
         }
@@ -80,14 +82,124 @@
         }
     }
 
-    public Bookie(File journalDirectory, File ledgerDirectories[]) {
+    EntryLogger entryLogger;
+    LedgerCache ledgerCache;
+    class SyncThread extends Thread {
+        volatile boolean running = true;
+        public SyncThread() {
+            super("SyncThread");
+        }
+        @Override
+        public void run() {
+            while(running) {
+                synchronized(this) {
+                    try {
+                        wait(100);
+                        if (!entryLogger.testAndClearSomethingWritten()) {
+                            continue;
+                        }
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        continue;
+                    }
+                }
+                lastLogMark.markLog();
+                try {
+                    ledgerCache.flushLedger(true);
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+                try {
+                    entryLogger.flush();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+                lastLogMark.rollLog();
+            }
+        }
+    }
+    SyncThread syncThread = new SyncThread();
+    public Bookie(File journalDirectory, File ledgerDirectories[]) throws IOException {
         this.journalDirectory = journalDirectory;
         this.ledgerDirectories = ledgerDirectories;
+        entryLogger = new EntryLogger(ledgerDirectories);
+        ledgerCache = new LedgerCache(ledgerDirectories);
+        lastLogMark.readLog();
+        final long markedLogId = lastLogMark.txnLogId;
+        if (markedLogId > 0) {
+            File logFiles[] = journalDirectory.listFiles();
+            ArrayList<Long> logs = new ArrayList<Long>();
+            for(File f: logFiles) {
+                String name = f.getName();
+                if (!name.endsWith(".txn")) {
+                    continue;
+                }
+                String idString = name.split("\\.")[0];
+                long id = Long.parseLong(idString, 16);
+                if (id < markedLogId) {
+                    continue;
+                }
+                logs.add(id);
+            }
+            Collections.sort(logs);
+            if (logs.size() == 0 || logs.get(0) != markedLogId) {
+                throw new IOException("Recovery log " + markedLogId + " is missing");
+            }
+            ByteBuffer lenBuff = ByteBuffer.allocate(4);
+            ByteBuffer recBuff = ByteBuffer.allocate(64*1024);
+            for(Long id: logs) {
+                FileChannel recLog = openChannel(id);
+                while(true) {
+                    lenBuff.clear();
+                    fullRead(recLog, lenBuff);
+                    if (lenBuff.remaining() != 0) {
+                        break;
+                    }
+                    lenBuff.flip();
+                    int len = lenBuff.getInt();
+                    if (len == 0) {
+                        break;
+                    }
+                    recBuff.clear();
+                    if (recBuff.remaining() < len) {
+                        recBuff = ByteBuffer.allocate(len);
+                    }
+                    recBuff.limit(len);
+                    if (fullRead(recLog, recBuff) != len) {
+                        // This seems scary, but it just means that this is where we
+                        // left off writing
+                        break;
+                    }
+                    recBuff.flip();
+                    long ledgerId = recBuff.getLong();
+                    // XXX we net to make sure we set the master keys appropriately!
+                    LedgerDescriptor handle = getHandle(ledgerId, false);
+                    try {
+                        recBuff.rewind();
+                        handle.addEntry(recBuff);
+                    } finally {
+                        putHandle(handle);
+                    }
+                }
+            }
+        }
         setDaemon(true);
         LOG.debug("I'm starting a bookie with journal directory " + journalDirectory.getName());
         start();
+        syncThread.start();
     }
 
+    private static int fullRead(FileChannel fc, ByteBuffer bb) throws IOException {
+        int total = 0;
+        while(bb.remaining() > 0) {
+            int rc = fc.read(bb);
+            if (rc <= 0) {
+                return total;
+            }
+            total += rc;
+        }
+        return total;
+    }
     private void putHandle(LedgerDescriptor handle) {
         synchronized (ledgers) {
             handle.decRef();
@@ -99,6 +211,9 @@
         synchronized (ledgers) {
             handle = ledgers.get(ledgerId);
             if (handle == null) {
+                if (readonly) {
+                    throw new NoLedgerException(ledgerId);
+                }
                 handle = createHandle(ledgerId, readonly);
                 ledgers.put(ledgerId, handle);
                 handle.setMasterKey(ByteBuffer.wrap(masterKey));
@@ -113,6 +228,9 @@
         synchronized (ledgers) {
             handle = ledgers.get(ledgerId);
             if (handle == null) {
+                if (readonly) {
+                    throw new NoLedgerException(ledgerId);
+                }
                 handle = createHandle(ledgerId, readonly);
                 ledgers.put(ledgerId, handle);
             } 
@@ -123,85 +241,9 @@
     
 
     private LedgerDescriptor createHandle(long ledgerId, boolean readOnly) throws IOException {
-        RandomAccessFile ledgerFile = null;
-        RandomAccessFile ledgerIndexFile = null;
-        String ledgerName = getLedgerName(ledgerId, false);
-        String ledgerIndexName = getLedgerName(ledgerId, true);
-        for (File d : ledgerDirectories) {
-            File lf = new File(d, ledgerName);
-            File lif = new File(d, ledgerIndexName);
-            if (lf.exists()) {
-                if (ledgerFile != null) {
-                    throw new IOException("Duplicate ledger file found for "
-                            + ledgerId);
-                }
-                ledgerFile = new RandomAccessFile(lf, "rw");
-            }
-            if (lif.exists()) {
-                if (ledgerIndexFile != null) {
-                    throw new IOException(
-                            "Duplicate ledger index file found for " + ledgerId);
-                }
-                ledgerIndexFile = new RandomAccessFile(lif, "rw");
-            }
-        }
-        if (ledgerFile == null && ledgerIndexFile == null) {
-            if (readOnly) {
-                throw new NoLedgerException(ledgerId);
-            }
-            File dirs[] = pickDirs(ledgerDirectories);
-            File lf = new File(dirs[0], ledgerName);
-            checkParents(lf);
-            ledgerFile = new RandomAccessFile(lf, "rw");
-            ledgerFile.write(ledgerHeader);
-            File lif = new File(dirs[1], ledgerIndexName);
-            checkParents(lif);
-            ledgerIndexFile = new RandomAccessFile(lif, "rw");
-        }
-        if (ledgerFile != null && ledgerIndexFile != null) {
-            return new LedgerDescriptor(ledgerId, ledgerFile.getChannel(),
-                    ledgerIndexFile.getChannel());
-        }
-        if (ledgerFile == null) {
-            throw new IOException("Found index but no data for " + ledgerId);
-        }
-        throw new IOException("Found data but no index for " + ledgerId);
+        return new LedgerDescriptor(ledgerId, entryLogger, ledgerCache);
     }
     
-    static final private void checkParents(File f) throws IOException {
-        File parent = f.getParentFile();
-        if (parent.exists()) {
-            return;
-        }
-        if (parent.mkdirs() == false) {
-            throw new IOException("Counldn't mkdirs for " + parent);
-        }
-    }
-
-    static final private Random rand = new Random();
-
-    static final private File[] pickDirs(File dirs[]) {
-        File rc[] = new File[2];
-        rc[0] = dirs[rand.nextInt(dirs.length)];
-        rc[1] = dirs[rand.nextInt(dirs.length)];
-        return rc;
-    }
-
-    static final private String getLedgerName(long ledgerId, boolean isIndex) {
-        int parent = (int) (ledgerId & 0xff);
-        int grandParent = (int) ((ledgerId & 0xff00) >> 8);
-        StringBuilder sb = new StringBuilder();
-        sb.append(Integer.toHexString(grandParent));
-        sb.append('/');
-        sb.append(Integer.toHexString(parent));
-        sb.append('/');
-        sb.append(Long.toHexString(ledgerId));
-        if (isIndex) {
-            sb.append(".idx");
-        }
-        return sb.toString();
-    }
-
     static class QueueEntry {
         QueueEntry(ByteBuffer entry, long ledgerId, long entryId, 
                 WriteCallback cb, Object ctx) {
@@ -229,15 +271,76 @@
     
     public final static ByteBuffer zeros = ByteBuffer.allocate(512);
     
+    class LastLogMark {
+        long txnLogId;
+        long txnLogPosition;
+        LastLogMark lastMark;
+        LastLogMark(long logId, long logPosition) {
+            this.txnLogId = logId;
+            this.txnLogPosition = logPosition;
+        }
+        synchronized void setLastLogMark(long logId, long logPosition) {
+            txnLogId = logId;
+            txnLogPosition = logPosition;
+        }
+        synchronized void markLog() {
+            lastMark = new LastLogMark(txnLogId, txnLogPosition);
+        }
+        synchronized void rollLog() {
+            byte buff[] = new byte[16];
+            ByteBuffer bb = ByteBuffer.wrap(buff);
+            bb.putLong(txnLogId);
+            bb.putLong(txnLogPosition);
+            for(File dir: ledgerDirectories) {
+                File file = new File(dir, "lastMark");
+                try {
+                    FileOutputStream fos = new FileOutputStream(file);
+                    fos.write(buff);
+                    fos.getChannel().force(true);
+                    fos.close();
+                } catch (IOException e) {
+                    LOG.error("Problems writing to " + file, e);
+                }
+            }
+        }
+        synchronized void readLog() {
+            byte buff[] = new byte[16];
+            ByteBuffer bb = ByteBuffer.wrap(buff);
+            for(File dir: ledgerDirectories) {
+                File file = new File(dir, "lastMark");
+                try {
+                    FileInputStream fis = new FileInputStream(file);
+                    fis.read(buff);
+                    fis.close();
+                    bb.clear();
+                    long i = bb.getLong();
+                    long p = bb.getLong();
+                    if (i > txnLogId) {
+                        txnLogId = i;
+                    }
+                    if (p > txnLogPosition) {
+                        txnLogPosition = p;
+                    }
+                } catch (IOException e) {
+                    LOG.error("Problems reading from " + file + " (this is okay if it is the first time starting this bookie");
+                }
+            }
+        }
+    }
+    
+    private LastLogMark lastLogMark = new LastLogMark(0, 0);
+    
+    @Override
     public void run() {
         LinkedList<QueueEntry> toFlush = new LinkedList<QueueEntry>();
         ByteBuffer lenBuff = ByteBuffer.allocate(4);
         try {
-            FileChannel logFile = new RandomAccessFile(new File(journalDirectory,
-                    Long.toHexString(System.currentTimeMillis()) + ".txn"),
-                    "rw").getChannel();
+            long logId = System.currentTimeMillis();
+            FileChannel logFile = openChannel(logId);
+            BufferedChannel bc = new BufferedChannel(logFile, 65536);
             zeros.clear();
             long nextPrealloc = preAllocSize;
+            long lastFlushPosition = 0;
             logFile.write(zeros, nextPrealloc);
             while (true) {
                 QueueEntry qe = null;
@@ -245,10 +348,13 @@
                     qe = queue.take();
                 } else {
                     qe = queue.poll();
-                    if (qe == null || toFlush.size() > 100) {
-                        logFile.force(false);
+                    if (qe == null || bc.position() > lastFlushPosition + 512*1024) {
+                        //logFile.force(false);
+                        bc.flush(true);
+                        lastFlushPosition = bc.position();
+                        lastLogMark.setLastLogMark(logId, lastFlushPosition);
                         for (QueueEntry e : toFlush) {
-                            e.cb.writeComplete(0, e.ledgerId, e.entryId, e.ctx);
+                            e.cb.writeComplete(0, e.ledgerId, e.entryId, null, e.ctx);
                         }
                         toFlush.clear();
                     }
@@ -259,8 +365,13 @@
                 lenBuff.clear();
                 lenBuff.putInt(qe.entry.remaining());
                 lenBuff.flip();
-                logFile.write(new ByteBuffer[] { lenBuff, qe.entry });
-                if (logFile.position() > nextPrealloc) {
+                //
+                // we should be doing the following, but then we run out of
+                // direct byte buffers
+                // logFile.write(new ByteBuffer[] { lenBuff, qe.entry });
+                bc.write(lenBuff);
+                bc.write(qe.entry);
+                if (bc.position() > nextPrealloc) {
                     nextPrealloc = (logFile.size() / preAllocSize + 1) * preAllocSize;
                     zeros.clear();
                     logFile.write(zeros, nextPrealloc);
@@ -272,9 +383,18 @@
         }
     }
 
+    private FileChannel openChannel(long logId) throws FileNotFoundException {
+        FileChannel logFile = new RandomAccessFile(new File(journalDirectory,
+                Long.toHexString(logId) + ".txn"),
+                "rw").getChannel();
+        return logFile;
+    }
+
     public void shutdown() throws InterruptedException {
         this.interrupt();
         this.join();
+        syncThread.running = false;
+        syncThread.join();
         for(LedgerDescriptor d: ledgers.values()) {
             d.close();
         }
@@ -282,7 +402,6 @@
     
     public void addEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)
             throws IOException, BookieException {
-        
         long ledgerId = entry.getLong();
         LedgerDescriptor handle = getHandle(ledgerId, false, masterKey);
         
@@ -318,7 +437,7 @@
     static class CounterCallback implements WriteCallback {
         int count;
 
-        synchronized public void writeComplete(int rc, long l, long e, Object ctx) {
+        synchronized public void writeComplete(int rc, long l, long e, InetSocketAddress addr, Object ctx) {
             count--;
             if (count == 0) {
                 notifyAll();

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/BufferedChannel.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/BufferedChannel.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/BufferedChannel.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/BufferedChannel.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,157 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+/**
+ * Provides a buffering layer in front of a FileChannel.
+ */
+public class BufferedChannel 
+{
+    ByteBuffer writeBuffer;
+    ByteBuffer readBuffer;
+    private FileChannel bc;
+    long position;
+    int capacity;
+    long readBufferStartPosition;
+    long writeBufferStartPosition;
+    BufferedChannel(FileChannel bc, int capacity) throws IOException {
+        this.bc = bc;
+        this.capacity = capacity;
+        position = bc.position();
+        writeBufferStartPosition = position;
+    }
+/*    public void close() throws IOException {
+        bc.close();
+    }
+*/
+//    public boolean isOpen() {
+//        return bc.isOpen();
+//    }
+
+    synchronized public int write(ByteBuffer src) throws IOException {
+        int copied = 0;
+        if (writeBuffer == null) {
+            writeBuffer = ByteBuffer.allocateDirect(capacity);
+        }
+        while(src.remaining() > 0) {
+            int truncated = 0;
+            if (writeBuffer.remaining() < src.remaining()) {
+                truncated = src.remaining() - writeBuffer.remaining();
+                src.limit(src.limit()-truncated);
+            }
+            copied += src.remaining();
+            writeBuffer.put(src);
+            src.limit(src.limit()+truncated);
+            if (writeBuffer.remaining() == 0) {
+                writeBuffer.flip();
+                bc.write(writeBuffer);
+                writeBuffer.clear();
+                writeBufferStartPosition = bc.position();
+            }
+        }
+        position += copied;
+        return copied;
+    }
+    
+    public long position() {
+        return position;
+    }
+    
+    public void flush(boolean sync) throws IOException {
+        synchronized(this) {
+            if (writeBuffer == null) {
+                return;
+            }
+            writeBuffer.flip();
+            bc.write(writeBuffer);
+            writeBuffer.clear();
+            writeBufferStartPosition = bc.position();
+        }
+        if (sync) {
+            bc.force(false);
+        }
+    }
+
+    /*public Channel getInternalChannel() {
+        return bc;
+    }*/
+    synchronized public int read(ByteBuffer buff, long pos) throws IOException {
+        if (readBuffer == null) {
+            readBuffer = ByteBuffer.allocateDirect(capacity);
+            readBufferStartPosition = Long.MIN_VALUE;
+        }
+        int rc = buff.remaining();
+        while(buff.remaining() > 0) {
+            // check if it is in the write buffer    
+            if (writeBuffer != null && writeBufferStartPosition <= pos) {
+                long positionInBuffer = pos - writeBufferStartPosition;
+                long bytesToCopy = writeBuffer.position()-positionInBuffer;
+                if (bytesToCopy > buff.remaining()) {
+                    bytesToCopy = buff.remaining();
+                }
+                if (bytesToCopy == 0) {
+                    throw new IOException("Read past EOF");
+                }
+                ByteBuffer src = writeBuffer.duplicate();
+                src.position((int) positionInBuffer);
+                src.limit((int) (positionInBuffer+bytesToCopy));
+                buff.put(src);
+                pos+= bytesToCopy;
+                // first check if there is anything we can grab from the readBuffer
+            } else if (readBufferStartPosition <= pos && pos < readBufferStartPosition+readBuffer.capacity()) {
+                long positionInBuffer = pos - readBufferStartPosition;
+                long bytesToCopy = readBuffer.capacity()-positionInBuffer;
+                if (bytesToCopy > buff.remaining()) {
+                    bytesToCopy = buff.remaining();
+                }
+                ByteBuffer src = readBuffer.duplicate();
+                src.position((int) positionInBuffer);
+                src.limit((int) (positionInBuffer+bytesToCopy));
+                buff.put(src);
+                pos += bytesToCopy;
+            // let's read it
+            } else {
+                readBufferStartPosition = pos;
+                readBuffer.clear();
+                // make sure that we don't overlap with the write buffer
+                if (readBufferStartPosition + readBuffer.capacity() >= writeBufferStartPosition) {
+                    readBufferStartPosition = writeBufferStartPosition - readBuffer.capacity();
+                    if (readBufferStartPosition < 0) {
+                        readBuffer.put(LedgerEntryPage.zeroPage, 0, (int)-readBufferStartPosition);
+                    }
+                }
+                while(readBuffer.remaining() > 0) {
+                    if (bc.read(readBuffer, readBufferStartPosition+readBuffer.position()) <= 0) {
+                        throw new IOException("Short read");
+                    }
+                }
+                readBuffer.put(LedgerEntryPage.zeroPage, 0, readBuffer.remaining());
+                readBuffer.clear();
+            }
+        }
+        return rc;
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/EntryLogger.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/EntryLogger.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/EntryLogger.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/EntryLogger.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,264 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+
+/**
+ * This class manages the writing of the bookkeeper entries. All the new
+ * entries are written to a common log. The LedgerCache will have pointers
+ * into files created by this class with offsets into the files to find
+ * the actual ledger entry. The entry log files created by this class are
+ * identified by a long.
+ */
+public class EntryLogger {
+    private static final Logger LOG = Logger.getLogger(EntryLogger.class);
+    private File dirs[];
+    private long logId;
+    /**
+     * The maximum size of a entry logger file.
+     */
+    final static long LOG_SIZE_LIMIT = 2*1024*1024*1024L;
+    private volatile BufferedChannel logChannel;
+    // The ledgers contained in this file, seems to be unsused right now
+    //private HashSet<Long> ledgerMembers = new HashSet<Long>();
+    /**
+     * The 1K block at the head of the entry logger file
+     * that contains the fingerprint and (future) meta-data
+     */
+    final static ByteBuffer LOGFILE_HEADER = ByteBuffer.allocate(1024);
+    static {
+        LOGFILE_HEADER.put("BKLO".getBytes());
+    }
+    // this indicates that a write has happened since the last flush
+    private volatile boolean somethingWritten = false;
+    
+    /**
+     * Create an EntryLogger that stores it's log files in the given
+     * directories
+     */
+    public EntryLogger(File dirs[]) throws IOException {
+        this.dirs = dirs;
+        // Find the largest logId
+        for(File f: dirs) {
+            long lastLogId = getLastLogId(f);
+            if (lastLogId >= logId) {
+                logId = lastLogId+1;
+            }
+        }
+        createLogId(logId);
+        //syncThread = new SyncThread();
+        //syncThread.start();
+    }
+    
+    /**
+     * Maps entry log files to open channels.
+     */
+    private ConcurrentHashMap<Long, BufferedChannel> channels = new ConcurrentHashMap<Long, BufferedChannel>();
+    
+    /**
+     * Creates a new log file with the given id.
+     */
+    private void createLogId(long logId) throws IOException {
+        List<File> list = Arrays.asList(dirs);
+        Collections.shuffle(list);
+        File firstDir = list.get(0);
+        if (logChannel != null) {
+            logChannel.flush(true);
+        }
+        logChannel = new BufferedChannel(new RandomAccessFile(new File(firstDir, Long.toHexString(logId)+".log"), "rw").getChannel(), 64*1024);
+        logChannel.write((ByteBuffer) LOGFILE_HEADER.clear());
+        channels.put(logId, logChannel);
+        for(File f: dirs) {
+            setLastLogId(f, logId);
+        }
+    }
+
+    /**
+     * writes the given id to the "lastId" file in the given directory.
+     */
+    private void setLastLogId(File dir, long logId) throws IOException {
+        FileOutputStream fos;
+        fos = new FileOutputStream(new File(dir, "lastId"));
+        BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fos));
+        try {
+            bw.write(Long.toHexString(logId) + "\n");
+            bw.flush();
+        } finally {
+            try {
+                fos.close();
+            } catch (IOException e) {
+            }
+        }
+    }
+    
+    /**
+     * reads id from the "lastId" file in the given directory.
+     */
+    private long getLastLogId(File f) {
+        FileInputStream fis;
+        try {
+            fis = new FileInputStream(new File(f, "lastId"));
+        } catch (FileNotFoundException e) {
+            return -1;
+        }
+        BufferedReader br = new BufferedReader(new InputStreamReader(fis));
+        try {
+            String lastIdString = br.readLine();
+            return Long.parseLong(lastIdString);
+        } catch (IOException e) {
+            return -1;
+        } catch(NumberFormatException e) {
+            return -1;
+        } finally {
+            try {
+                fis.close();
+            } catch (IOException e) {
+            }
+        }
+    }
+    
+    private void openNewChannel() throws IOException {
+        createLogId(++logId);
+    }
+    
+    synchronized void flush() throws IOException {
+        if (logChannel != null) {
+            logChannel.flush(true);
+        }
+    }
+    synchronized long addEntry(long ledger, ByteBuffer entry) throws IOException {
+        if (logChannel.position() + entry.remaining() + 4 > LOG_SIZE_LIMIT) {
+            openNewChannel();
+        }
+        ByteBuffer buff = ByteBuffer.allocate(4);
+        buff.putInt(entry.remaining());
+        buff.flip();
+        logChannel.write(buff);
+        long pos = logChannel.position();
+        logChannel.write(entry);
+        //logChannel.flush(false);
+        somethingWritten = true;
+        return (logId << 32L) | pos;
+    }
+    
+    byte[] readEntry(long ledgerId, long entryId, long location) throws IOException {
+        long entryLogId = location >> 32L;
+        long pos = location & 0xffffffffL;
+        ByteBuffer sizeBuff = ByteBuffer.allocate(4);
+        pos -= 4; // we want to get the ledgerId and length to check
+        BufferedChannel fc;
+        try {
+            fc = getChannelForLogId(entryLogId);
+        } catch (FileNotFoundException e) {
+            FileNotFoundException newe = new FileNotFoundException(e.getMessage() + " for " + ledgerId + " with location " + location);
+            newe.setStackTrace(e.getStackTrace());
+            throw newe;
+        }
+        if (fc.read(sizeBuff, pos) != sizeBuff.capacity()) {
+            throw new IOException("Short read from entrylog " + entryLogId);
+        }
+        pos += 4;
+        sizeBuff.flip();
+        int entrySize = sizeBuff.getInt();
+        // entrySize does not include the ledgerId
+        if (entrySize > 1024*1024) {
+            LOG.error("Sanity check failed for entry size of " + entrySize + " at location " + pos + " in " + entryLogId);
+            
+        }
+        byte data[] = new byte[entrySize];
+        ByteBuffer buff = ByteBuffer.wrap(data);
+        int rc = fc.read(buff, pos);
+        if ( rc != data.length) {
+            throw new IOException("Short read for " + ledgerId + "@" + entryId + " in " + entryLogId + "@" + pos + "("+rc+"!="+data.length+")");
+        }
+        buff.flip();
+        long thisLedgerId = buff.getLong();
+        if (thisLedgerId != ledgerId) {
+            throw new IOException("problem found in " + entryLogId + "@" + entryId + " at position + " + pos + " entry belongs to " + thisLedgerId + " not " + ledgerId);
+        }
+        long thisEntryId = buff.getLong();
+        if (thisEntryId != entryId) {
+            throw new IOException("problem found in " + entryLogId + "@" + entryId + " at position + " + pos + " entry is " + thisEntryId + " not " + entryId);
+        }
+        
+        return data;
+    }
+    
+    private BufferedChannel getChannelForLogId(long entryLogId) throws IOException {
+        BufferedChannel fc = channels.get(entryLogId);
+        if (fc != null) {
+            return fc;
+        }
+        File file = findFile(entryLogId);
+        FileChannel newFc = new RandomAccessFile(file, "rw").getChannel();
+        synchronized (channels) {
+            fc = channels.get(entryLogId);
+            if (fc != null){
+                newFc.close();
+                return fc;
+            }
+            fc = new BufferedChannel(newFc, 8192);
+            channels.put(entryLogId, fc);
+            return fc;
+        }
+    }
+
+    private File findFile(long logId) throws FileNotFoundException {
+        for(File d: dirs) {
+            File f = new File(d, Long.toHexString(logId)+".log");
+            if (f.exists()) {
+                return f;
+            }
+        }
+        throw new FileNotFoundException("No file for log " + Long.toHexString(logId));
+    }
+    
+    public void close() {
+    }
+
+    synchronized public boolean testAndClearSomethingWritten() {
+        try {
+            return somethingWritten;
+        } finally {
+            somethingWritten = false;
+        }
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/FileInfo.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/FileInfo.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/FileInfo.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/FileInfo.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,113 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+/**
+ * This is the file handle for a ledger's index file that maps entry ids to location.
+ * It is used by LedgerCache.
+ */
+class FileInfo {
+    private FileChannel fc;
+    /**
+     * The fingerprint of a ledger index file
+     */
+    private byte header[] = "BKLE\0\0\0\0".getBytes();
+    static final long START_OF_DATA = 1024;
+    private long size;
+    private int useCount;
+    private boolean isClosed;
+    public FileInfo(File lf) throws IOException {
+        fc = new RandomAccessFile(lf, "rws").getChannel();
+        size = fc.size();
+        if (size == 0) {
+            fc.write(ByteBuffer.wrap(header));
+        }
+    }
+
+    synchronized public long size() {
+        long rc = size-START_OF_DATA;
+        if (rc < 0) {
+            rc = 0;
+        }
+        return rc;
+    }
+
+    synchronized public int read(ByteBuffer bb, long position) throws IOException {
+        int total = 0;
+        while(bb.remaining() > 0) {
+            int rc = fc.read(bb, position+START_OF_DATA);
+            if (rc <= 0) {
+                throw new IOException("Short read");
+            }
+            total += rc;
+        }
+        return total;
+    }
+
+    synchronized public void close() throws IOException {
+        isClosed = true;
+        if (useCount == 0) {
+            fc.close();
+        }
+    }
+
+    synchronized public long write(ByteBuffer[] buffs, long position) throws IOException {
+        long total = 0;
+        try {
+            fc.position(position+START_OF_DATA);
+            while(buffs[buffs.length-1].remaining() > 0) {
+                long rc = fc.write(buffs);
+                if (rc <= 0) {
+                    throw new IOException("Short write");
+                }
+                total += rc;
+            }
+        } finally {
+            long newsize = position+START_OF_DATA+total;
+            if (newsize > size) {
+                size = newsize;
+            }
+        }
+        return total;
+    }
+
+    synchronized public void use() {
+        useCount++;
+    }
+    
+    synchronized public void release() {
+        useCount--;
+        if (isClosed && useCount == 0) {
+            try {
+                fc.close();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerCache.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerCache.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerCache.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerCache.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,454 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.log4j.Logger;
+
+/**
+ * This class maps a ledger entry number into a location (entrylogid, offset) in
+ * an entry log file. It does user level caching to more efficiently manage disk
+ * head scheduling.
+ */
+public class LedgerCache {
+    private final static Logger LOG = Logger.getLogger(LedgerDescriptor.class);
+    
+    final File ledgerDirectories[];
+
+    public LedgerCache(File ledgerDirectories[]) {
+        this.ledgerDirectories = ledgerDirectories;
+    }
+    /**
+     * the list of potentially clean ledgers
+     */
+    LinkedList<Long> cleanLedgers = new LinkedList<Long>();
+    
+    /**
+     * the list of potentially dirty ledgers
+     */
+    LinkedList<Long> dirtyLedgers = new LinkedList<Long>();
+    
+    HashMap<Long, FileInfo> fileInfoCache = new HashMap<Long, FileInfo>();
+    
+    LinkedList<Long> openLedgers = new LinkedList<Long>();
+    
+    static int OPEN_FILE_LIMIT = 900;
+    static {
+        if (System.getProperty("openFileLimit") != null) {
+            OPEN_FILE_LIMIT = Integer.parseInt(System.getProperty("openFileLimit"));
+        }
+        LOG.info("openFileLimit is " + OPEN_FILE_LIMIT);
+    }
+    
+    // allocate half of the memory to the page cache
+    private static int pageLimit = (int)(Runtime.getRuntime().maxMemory() / 3) / LedgerEntryPage.PAGE_SIZE;
+    static {
+        LOG.info("maxMemory = " + Runtime.getRuntime().maxMemory());
+        if (System.getProperty("pageLimit") != null) {
+            pageLimit = Integer.parseInt(System.getProperty("pageLimit"));
+        }
+        LOG.info("pageLimit is " + pageLimit);
+    }
+    // The number of pages that have actually been used
+    private int pageCount;
+    HashMap<Long, HashMap<Long,LedgerEntryPage>> pages = new HashMap<Long, HashMap<Long,LedgerEntryPage>>();
+    
+    private void putIntoTable(HashMap<Long, HashMap<Long,LedgerEntryPage>> table, LedgerEntryPage lep) {
+        HashMap<Long, LedgerEntryPage> map = table.get(lep.getLedger());
+        if (map == null) {
+            map = new HashMap<Long, LedgerEntryPage>();
+            table.put(lep.getLedger(), map);
+        }
+        map.put(lep.getFirstEntry(), lep);
+    }
+    
+    private static LedgerEntryPage getFromTable(HashMap<Long, HashMap<Long,LedgerEntryPage>> table, Long ledger, Long firstEntry) {
+        HashMap<Long, LedgerEntryPage> map = table.get(ledger);
+        if (map != null) {
+            return map.get(firstEntry);
+        }
+        return null;
+    }
+    
+   synchronized private LedgerEntryPage getLedgerEntryPage(Long ledger, Long firstEntry, boolean onlyDirty) {
+        LedgerEntryPage lep = getFromTable(pages, ledger, firstEntry);
+        try {
+            if (onlyDirty && lep.isClean()) {
+                return null;
+            }
+            return lep;
+        } finally {
+            if (lep != null) {
+                lep.usePage();
+            }
+        }
+    }
+
+   public void putEntryOffset(long ledger, long entry, long offset) throws IOException {
+        int offsetInPage = (int) (entry%LedgerEntryPage.ENTRIES_PER_PAGES);
+        // find the id of the first entry of the page that has the entry
+        // we are looking for
+        long pageEntry = entry-offsetInPage;
+        LedgerEntryPage lep = getLedgerEntryPage(ledger, pageEntry, false);
+        if (lep == null) {
+            // find a free page
+            lep = grabCleanPage(ledger, pageEntry);
+            updatePage(lep);
+            synchronized(this) {
+                putIntoTable(pages, lep);
+            }
+        }
+        if (lep != null) {
+            lep.setOffset(offset, offsetInPage*8);
+            lep.releasePage();
+            return;
+        }
+    }
+    
+    public long getEntryOffset(long ledger, long entry) throws IOException {
+        int offsetInPage = (int) (entry%LedgerEntryPage.ENTRIES_PER_PAGES);
+        // find the id of the first entry of the page that has the entry
+        // we are looking for
+        long pageEntry = entry-offsetInPage;
+        LedgerEntryPage lep = getLedgerEntryPage(ledger, pageEntry, false);
+        try {
+            if (lep == null) {
+                lep = grabCleanPage(ledger, pageEntry);
+                synchronized(this) {
+                    putIntoTable(pages, lep);
+                }
+                updatePage(lep);
+                
+            }
+            return lep.getOffset(offsetInPage*8);
+        } finally {
+            if (lep != null) {
+                lep.releasePage();
+            }
+        }
+    }
+    
+    static final private String getLedgerName(long ledgerId) {
+        int parent = (int) (ledgerId & 0xff);
+        int grandParent = (int) ((ledgerId & 0xff00) >> 8);
+        StringBuilder sb = new StringBuilder();
+        sb.append(Integer.toHexString(grandParent));
+        sb.append('/');
+        sb.append(Integer.toHexString(parent));
+        sb.append('/');
+        sb.append(Long.toHexString(ledgerId));
+        sb.append(".idx");
+        return sb.toString();
+    }
+    
+    static final private void checkParents(File f) throws IOException {
+        File parent = f.getParentFile();
+        if (parent.exists()) {
+            return;
+        }
+        if (parent.mkdirs() == false) {
+            throw new IOException("Counldn't mkdirs for " + parent);
+        }
+    }
+    
+    static final private Random rand = new Random();
+
+    static final private File pickDirs(File dirs[]) {
+        return dirs[rand.nextInt(dirs.length)];
+    }
+
+    FileInfo getFileInfo(Long ledger, boolean create) throws IOException {
+        synchronized(fileInfoCache) {
+            FileInfo fi = fileInfoCache.get(ledger);
+            if (fi == null) {
+                String ledgerName = getLedgerName(ledger);
+                File lf = null;
+                for(File d: ledgerDirectories) {
+                    lf = new File(d, ledgerName);
+                    if (lf.exists()) {
+                        break;
+                    }
+                    lf = null;
+                }
+                if (lf == null) {
+                    if (!create) {
+                        throw new Bookie.NoLedgerException(ledger);
+                    }
+                    File dir = pickDirs(ledgerDirectories);
+                    lf = new File(dir, ledgerName);
+                    checkParents(lf);
+                }
+                if (openLedgers.size() > OPEN_FILE_LIMIT) {
+                    fileInfoCache.remove(openLedgers.removeFirst()).close();
+                }
+                fi = new FileInfo(lf);
+                fileInfoCache.put(ledger, fi);
+                openLedgers.add(ledger);
+            }
+            if (fi != null) {
+                fi.use();
+            }
+            return fi;
+        }
+    }
+    private void updatePage(LedgerEntryPage lep) throws IOException {
+        if (!lep.isClean()) {
+            throw new IOException("Trying to update a dirty page");
+        }
+        FileInfo fi = null;
+        try {
+            fi = getFileInfo(lep.getLedger(), true);
+            long pos = lep.getFirstEntry()*8;
+            if (pos >= fi.size()) {
+                lep.zeroPage();
+            } else {
+                lep.readPage(fi);
+            }
+        } finally {
+            if (fi != null) {
+                fi.release();
+            }
+        }
+    }
+
+    void flushLedger(boolean doAll) throws IOException {
+        synchronized(dirtyLedgers) {
+            if (dirtyLedgers.isEmpty()) {
+                synchronized(this) {
+                    for(Long l: pages.keySet()) {
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("Adding " + Long.toHexString(l) + " to dirty pages");
+                        }
+                        dirtyLedgers.add(l);
+                    }
+                }
+            }
+            if (dirtyLedgers.isEmpty()) {
+                return;
+            }
+            while(!dirtyLedgers.isEmpty()) {
+                Long l = dirtyLedgers.removeFirst();
+                LinkedList<Long> firstEntryList;
+                synchronized(this) {
+                    HashMap<Long, LedgerEntryPage> pageMap = pages.get(l);
+                    if (pageMap == null || pageMap.isEmpty()) {
+                        continue;
+                    }
+                    firstEntryList = new LinkedList<Long>();
+                    for(Map.Entry<Long, LedgerEntryPage> entry: pageMap.entrySet()) {
+                        LedgerEntryPage lep = entry.getValue();
+                        if (lep.isClean()) {
+                            if (LOG.isTraceEnabled()) {
+                                LOG.trace("Page is clean " + lep);
+                            }
+                            continue;
+                        }
+                        firstEntryList.add(lep.getFirstEntry());
+                    }
+                }
+                // Now flush all the pages of a ledger
+                List<LedgerEntryPage> entries = new ArrayList<LedgerEntryPage>(firstEntryList.size());
+                FileInfo fi = null;
+                try {
+                    for(Long firstEntry: firstEntryList) {
+                        LedgerEntryPage lep = getLedgerEntryPage(l, firstEntry, true);
+                        if (lep != null) {
+                            entries.add(lep);
+                        }
+                    }
+                    Collections.sort(entries, new Comparator<LedgerEntryPage>() {
+                        @Override
+                        public int compare(LedgerEntryPage o1, LedgerEntryPage o2) {
+                            return (int)(o1.getFirstEntry()-o2.getFirstEntry());
+                        }});
+                    ArrayList<Integer> versions = new ArrayList<Integer>(entries.size());
+                    fi = getFileInfo(l, true);
+                    int start = 0;
+                    long lastOffset = -1;
+                    for(int i = 0; i < entries.size(); i++) {
+                        versions.add(i, entries.get(i).getVersion());
+                        if (lastOffset != -1 && (entries.get(i).getFirstEntry() - lastOffset) != LedgerEntryPage.ENTRIES_PER_PAGES) {
+                            // send up a sequential list
+                            int count = i - start;
+                            if (count == 0) {
+                                System.out.println("Count cannot possibly be zero!");
+                            }
+                            writeBuffers(l, entries, fi, start, count);
+                            start = i;
+                        }
+                        lastOffset = entries.get(i).getFirstEntry();
+                    }
+                    if (entries.size()-start == 0 && entries.size() != 0) {
+                        System.out.println("Nothing to write, but there were entries!");
+                    }
+                    writeBuffers(l, entries, fi, start, entries.size()-start);
+                    synchronized(this) {
+                        for(int i = 0; i < entries.size(); i++) {
+                            LedgerEntryPage lep = entries.get(i);
+                            lep.setClean(versions.get(i));
+                        }
+                    }
+                } finally {
+                    for(LedgerEntryPage lep: entries) {
+                        lep.releasePage();
+                    }
+                    if (fi != null) {
+                        fi.release();
+                    }
+                }
+                if (!doAll) {
+                    break;
+                }
+                // Yeild. if we are doing all the ledgers we don't want to block other flushes that
+                // need to happen
+                try {
+                    dirtyLedgers.wait(1);
+                } catch (InterruptedException e) {
+                    // just pass it on
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+    }
+    
+    private void writeBuffers(Long ledger,
+            List<LedgerEntryPage> entries, FileInfo fi,
+            int start, int count) throws IOException {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Writing " + count + " buffers of " + Long.toHexString(ledger));
+        }
+        if (count == 0) {
+            //System.out.println("Count is zero!");
+            return;
+        }
+        ByteBuffer buffs[] = new ByteBuffer[count];
+        for(int j = 0; j < count; j++) {
+            buffs[j] = entries.get(start+j).getPageToWrite();
+            if (entries.get(start+j).getLedger() != ledger) {
+                throw new IOException("Writing to " + ledger + " but page belongs to " + entries.get(start+j).getLedger());
+            }
+        }
+        long totalWritten = 0;
+        while(buffs[buffs.length-1].remaining() > 0) {
+            long rc = fi.write(buffs, entries.get(start+0).getFirstEntry()*8);
+            if (rc <= 0) {
+                throw new IOException("Short write to ledger " + ledger + " rc = " + rc);
+            }
+            //System.out.println("Wrote " + rc + " to " + ledger);
+            totalWritten += rc;
+        }
+        if (totalWritten != count*LedgerEntryPage.PAGE_SIZE) {
+            throw new IOException("Short write to ledger " + ledger + " wrote " + totalWritten + " expected " + count*LedgerEntryPage.PAGE_SIZE);
+        }
+    }
+    private LedgerEntryPage grabCleanPage(long ledger, long entry) throws IOException {
+        if (entry % LedgerEntryPage.ENTRIES_PER_PAGES != 0) {
+            throw new IllegalArgumentException(entry + " is not a multiple of " + LedgerEntryPage.ENTRIES_PER_PAGES);
+        }
+        synchronized(this) {
+            if (pageCount  < pageLimit) {
+                // let's see if we can allocate something
+                LedgerEntryPage lep = new LedgerEntryPage();
+                lep.setLedger(ledger);
+                lep.setFirstEntry(entry);
+                // note, this will not block since it is a new page
+                lep.usePage();
+                pageCount++;
+                return lep;
+            }
+        }
+        
+        outerLoop:
+        while(true) {
+            synchronized(cleanLedgers) {
+                if (cleanLedgers.isEmpty()) {
+                    flushLedger(false);
+                    synchronized(this) {
+                        for(Long l: pages.keySet()) {
+                            cleanLedgers.add(l);
+                        }
+                    }
+                }
+                synchronized(this) {
+                    Long cleanLedger = cleanLedgers.getFirst();
+                    Map<Long, LedgerEntryPage> map = pages.get(cleanLedger);
+                    if (map == null || map.isEmpty()) {
+                        cleanLedgers.removeFirst();
+                        continue;
+                    }
+                    Iterator<Map.Entry<Long, LedgerEntryPage>> it = map.entrySet().iterator();
+                    LedgerEntryPage lep = it.next().getValue();
+                    while((lep.inUse() || !lep.isClean())) {
+                        if (it.hasNext()) {
+                            continue outerLoop;
+                        }
+                        lep = it.next().getValue();
+                    }
+                    it.remove();
+                    if (map.isEmpty()) {
+                        pages.remove(lep.getLedger());
+                    }
+                    lep.usePage();
+                    lep.zeroPage();
+                    lep.setLedger(ledger);
+                    lep.setFirstEntry(entry);
+                    return lep;
+                }
+            }
+        }
+    }
+
+    public long getLastEntry(long ledgerId) {
+        long lastEntry = 0;
+        // Find the last entry in the cache
+        synchronized(this) {
+            Map<Long, LedgerEntryPage> map = pages.get(ledgerId);
+            if (map != null) {
+                for(LedgerEntryPage lep: map.values()) {
+                    if (lep.getFirstEntry() + LedgerEntryPage.ENTRIES_PER_PAGES < lastEntry) {
+                        continue;
+                    }
+                    lep.usePage();
+                    long highest = lep.getLastEntry();
+                    if (highest > lastEntry) {
+                        lastEntry = highest;
+                    }
+                    lep.releasePage();
+                }
+            }
+        }
+        
+        return lastEntry;
+    }
+}

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java Tue Jan 26 23:16:45 2010
@@ -1,4 +1,3 @@
-package org.apache.bookkeeper.bookie;
 /*
  * 
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,11 +19,10 @@
  * 
  */
 
+package org.apache.bookkeeper.bookie;
 
 import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
 
 import org.apache.log4j.Logger;
 
@@ -36,11 +34,12 @@
  *
  */
 public class LedgerDescriptor {
-    Logger LOG = Logger.getLogger(LedgerDescriptor.class);
-    LedgerDescriptor(long ledgerId, FileChannel ledger, FileChannel ledgerIndex) {
+    final static Logger LOG = Logger.getLogger(LedgerDescriptor.class);
+    LedgerCache ledgerCache;
+    LedgerDescriptor(long ledgerId, EntryLogger entryLogger, LedgerCache ledgerCache) {
         this.ledgerId = ledgerId;
-        this.ledger = ledger;
-        this.ledgerIndex = ledgerIndex;
+        this.entryLogger = entryLogger;
+        this.ledgerCache = ledgerCache;
     }
     
     private ByteBuffer masterKey = null;
@@ -54,8 +53,7 @@
     }
     
     private long ledgerId;
-    private FileChannel ledger;
-    private FileChannel ledgerIndex;
+    EntryLogger entryLogger;
     private int refCnt;
     synchronized public void incRef() {
         refCnt++;
@@ -66,100 +64,70 @@
     synchronized public int getRefCnt() {
         return refCnt;
     }
-    static private final long calcEntryOffset(long entryId) {
-        return 8L*entryId;
-    }
     long addEntry(ByteBuffer entry) throws IOException {
-        ByteBuffer offsetBuffer = ByteBuffer.wrap(new byte[8]);
         long ledgerId = entry.getLong();
         if (ledgerId != this.ledgerId) {
             throw new IOException("Entry for ledger " + ledgerId + " was sent to " + this.ledgerId);
         }
-        /*
-         * Get entry id
-         */
-                
         long entryId = entry.getLong();
         entry.rewind();
         
         /*
-         * Set offset of entry id to be the current ledger position
+         * Log the entry
          */
-        offsetBuffer.rewind();
-        offsetBuffer.putLong(ledger.position());
-        //LOG.debug("Offset: " + ledger.position() + ", " + entry.position() + ", " + calcEntryOffset(entryId) + ", " + entryId);
-        offsetBuffer.flip();
-        
-        /*
-         * Write on the index entry corresponding to entryId the position
-         * of this entry.
-         */
-        ledgerIndex.write(offsetBuffer, calcEntryOffset(entryId));
-        ByteBuffer lenBuffer = ByteBuffer.allocate(4);
+        long pos = entryLogger.addEntry(ledgerId, entry);
         
         
-        lenBuffer.putInt(entry.remaining());
-        lenBuffer.flip();
-        
         /*
-         * Write length of entry first, then the entry itself
+         * Set offset of entry id to be the current ledger position
          */
-        ledger.write(lenBuffer);
-        ledger.write(entry);
-        //entry.position(24);
-        //LOG.debug("Entry: " + entry.position() + ", " + new String(entry.array()));
-     
+        ledgerCache.putEntryOffset(ledgerId, entryId, pos);
         return entryId;
     }
     ByteBuffer readEntry(long entryId) throws IOException {
-        ByteBuffer buffer = ByteBuffer.wrap(new byte[8]);
         long offset;
         /*
          * If entryId is -1, then return the last written.
          */
         if (entryId == -1) {
-            offset = ledgerIndex.size()-8; 
-        } else {
-            offset = calcEntryOffset(entryId);
+            long lastEntry = ledgerCache.getLastEntry(ledgerId);
+            FileInfo fi = null;
+            try {
+                fi = ledgerCache.getFileInfo(ledgerId, false);
+                long size = fi.size();
+                // we may not have the last entry in the cache
+                if (size > lastEntry*8) {
+                    ByteBuffer bb = ByteBuffer.allocate(LedgerEntryPage.PAGE_SIZE);
+                    long position = size-LedgerEntryPage.PAGE_SIZE;
+                    if (position < 0) {
+                        position = 0;
+                    }
+                    fi.read(bb, position);
+                    bb.flip();
+                    long startingEntryId = position/8;
+                    for(int i = LedgerEntryPage.ENTRIES_PER_PAGES-1; i >= 0; i--) {
+                        if (bb.getLong(i*8) != 0) {
+                            if (lastEntry < startingEntryId+i) {
+                                lastEntry = startingEntryId+i;
+                            }
+                            break;
+                        }
+                    }
+                }
+            } finally {
+                if (fi != null) {
+                    fi.release();
+                }
+            }
+            entryId = lastEntry;
         }
-        int len = ledgerIndex.read(buffer, offset);
-        buffer.flip();
-        if (len != buffer.limit()) {
-            throw new Bookie.NoEntryException(ledgerId, entryId);
-        }
-        offset = buffer.getLong();
+        
+        offset = ledgerCache.getEntryOffset(ledgerId, entryId);
         if (offset == 0) {
             throw new Bookie.NoEntryException(ledgerId, entryId);
         }
-        LOG.debug("Offset: " + offset);
-
-        buffer.limit(4);
-        buffer.rewind();
-        /*
-         * Read the length
-         */
-        ledger.read(buffer, offset);
-        buffer.flip();
-        len = buffer.getInt();
-        LOG.debug("Length of buffer: " + len);
-        buffer = ByteBuffer.allocate(len);
-        /*
-         * Read the rest. We add 4 to skip the length
-         */
-        ledger.read(buffer, offset + 4);
-        buffer.flip();
-        return buffer;
+        return ByteBuffer.wrap(entryLogger.readEntry(ledgerId, entryId, offset));
     }
     void close() {
-        try {
-            ledger.close();
-        } catch (IOException e) {
-            LOG.warn("Error closing ledger " + ledgerId, e);
-        }
-        try {
-            ledgerIndex.close();
-        } catch (IOException e) {
-            LOG.warn("Error closing index for ledger " + ledgerId, e);
-        }
     }
 }

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerEntryPage.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,151 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * This is a page in the LedgerCache. It holds the locations
+ * (entrylogfile, offset) for entry ids.
+ */
+public class LedgerEntryPage {
+    public static final int PAGE_SIZE = 8192;
+    public static final int ENTRIES_PER_PAGES = PAGE_SIZE/8;
+    private long ledger = -1;
+    private long firstEntry = -1;
+    private ByteBuffer page = ByteBuffer.allocateDirect(PAGE_SIZE);
+    private boolean clean = true;
+    private boolean pinned = false;
+    private int useCount;
+    private int version;
+    
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(getLedger());
+        sb.append('@');
+        sb.append(getFirstEntry());
+        sb.append(clean ? " clean " : " dirty ");
+        sb.append(useCount);
+        return sb.toString();
+    }
+    synchronized public void usePage() {
+        useCount++;
+    }
+    synchronized public void pin() {
+        pinned = true;
+    }
+    synchronized public void unpin() {
+        pinned = false;
+    }
+    synchronized public boolean isPinned() {
+        return pinned;
+    }
+    synchronized public void releasePage() {
+        useCount--;
+        if (useCount < 0) {
+            throw new IllegalStateException("Use count has gone below 0");
+        }
+    }
+    synchronized private void checkPage() {
+        if (useCount <= 0) {
+            throw new IllegalStateException("Page not marked in use");
+        }
+    }
+    @Override
+    public boolean equals(Object other) {
+        LedgerEntryPage otherLEP = (LedgerEntryPage) other;
+        return otherLEP.getLedger() == getLedger() && otherLEP.getFirstEntry() == getFirstEntry();
+    }
+    @Override
+    public int hashCode() {
+        return (int)getLedger() ^ (int)(getFirstEntry());
+    }
+    void setClean(int versionOfCleaning) {
+        this.clean = (versionOfCleaning == version);
+    }
+    boolean isClean() {
+        return clean;
+    }
+    public void setOffset(long offset, int position) {
+        checkPage();
+        version++;
+        this.clean = false;
+        page.putLong(position, offset);
+    }
+    public long getOffset(int position) {
+        checkPage();
+        return page.getLong(position);
+    }
+    static final byte zeroPage[] = new byte[64*1024];
+    public void zeroPage() {
+        checkPage();
+        page.clear();
+        page.put(zeroPage, 0, page.remaining());
+        clean = true;
+    }
+    public void readPage(FileInfo fi) throws IOException {
+        checkPage();
+        page.clear();
+        while(page.remaining() != 0) {
+            if (fi.read(page, getFirstEntry()*8) <= 0) {
+                throw new IOException("Short page read of ledger " + getLedger() + " tried to get " + page.capacity() + " from position " + getFirstEntry()*8 + " still need " + page.remaining());
+            }
+        }
+        clean = true;
+    }
+    public ByteBuffer getPageToWrite() {
+        checkPage();
+        page.clear();
+        return page;
+    }
+    void setLedger(long ledger) {
+        this.ledger = ledger;
+    }
+    long getLedger() {
+        return ledger;
+    }
+    int getVersion() {
+        return version;
+    }
+    void setFirstEntry(long firstEntry) {
+        if (firstEntry % ENTRIES_PER_PAGES != 0) {
+            throw new IllegalArgumentException(firstEntry + " is not a multiple of " + ENTRIES_PER_PAGES);
+        }
+        this.firstEntry = firstEntry;
+    }
+    long getFirstEntry() {
+        return firstEntry;
+    }
+    public boolean inUse() {
+        return useCount > 0;
+    }
+    public long getLastEntry() {
+        for(int i = ENTRIES_PER_PAGES - 1; i >= 0; i--) {
+            if (getOffset(i*8) > 0) {
+                return i + firstEntry;
+            }
+        }
+        return 0;
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/MarkerFileChannel.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/MarkerFileChannel.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/MarkerFileChannel.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/MarkerFileChannel.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,147 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * This class is just a stub that can be used in collections with
+ * FileChannels
+ */
+public class MarkerFileChannel extends FileChannel {
+
+    @Override
+    public void force(boolean metaData) throws IOException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public FileLock lock(long position, long size, boolean shared)
+            throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public MappedByteBuffer map(MapMode mode, long position, long size)
+            throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public long position() throws IOException {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public FileChannel position(long newPosition) throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public int read(ByteBuffer dst) throws IOException {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public int read(ByteBuffer dst, long position) throws IOException {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public long read(ByteBuffer[] dsts, int offset, int length)
+            throws IOException {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public long size() throws IOException {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public long transferFrom(ReadableByteChannel src, long position, long count)
+            throws IOException {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public long transferTo(long position, long count, WritableByteChannel target)
+            throws IOException {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public FileChannel truncate(long size) throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public FileLock tryLock(long position, long size, boolean shared)
+            throws IOException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public int write(ByteBuffer src) throws IOException {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public int write(ByteBuffer src, long position) throws IOException {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public long write(ByteBuffer[] srcs, int offset, int length)
+            throws IOException {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    protected void implCloseChannel() throws IOException {
+        // TODO Auto-generated method stub
+
+    }
+
+}

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java Tue Jan 26 23:16:45 2010
@@ -1,81 +1,101 @@
 package org.apache.bookkeeper.client;
 
+import java.util.Enumeration;
+
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
  */
 
 public interface AsyncCallback {
-    public interface AddCallback {
-        /**
-         * Callback declaration
-         * 
-         * @param rc    return code
-         * @param ledgerId  ledger identifier
-         * @param entryId   entry identifier
-         * @param ctx   control object
-         */
-        void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx);
-    }
-    
-    public interface CloseCallback {
-        /**
-         * Callback definition
-         * 
-         * @param rc    return code
-         * @param ledgerId  ledger identifier
-         * @param ctx   control object
-         */
-        void closeComplete(int rc, LedgerHandle lh, Object ctx);
-    }
-    
-    public interface CreateCallback {
-        /**
-         * Declaration of callback method
-         * 
-         * @param rc    return status
-         * @param lh    ledger handle
-         * @param ctx   control object
-         */
-        
-        void createComplete(int rc, LedgerHandle lh, Object ctx);
-    }
-    
-    public interface OpenCallback {
-        /**
-         * Callback for asynchronous call to open ledger
-         * 
-         * @param rc
-         * @param lh
-         * @param ctx
-         */
-        
-        public void openComplete(int rc, LedgerHandle lh, Object ctx);
-        
-    }
-    
-    public interface ReadCallback {
-        /**
-         * Callback declaration
-         * 
-         * @param rc    return code
-         * @param ledgerId  ledger identifier
-         * @param seq   sequence of entries
-         * @param ctx   control object
-         */
-        void readComplete(int rc, LedgerHandle lh, LedgerSequence seq, Object ctx);
-    }
+  public interface AddCallback {
+    /**
+     * Callback declaration
+     * 
+     * @param rc
+     *          return code
+     * @param ledgerId
+     *          ledger identifier
+     * @param entryId
+     *          entry identifier
+     * @param ctx
+     *          control object
+     */
+    void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx);
+  }
+
+  public interface CloseCallback {
+    /**
+     * Callback definition
+     * 
+     * @param rc
+     *          return code
+     * @param ledgerId
+     *          ledger identifier
+     * @param ctx
+     *          control object
+     */
+    void closeComplete(int rc, LedgerHandle lh, Object ctx);
+  }
+
+  public interface CreateCallback {
+    /**
+     * Declaration of callback method
+     * 
+     * @param rc
+     *          return status
+     * @param lh
+     *          ledger handle
+     * @param ctx
+     *          control object
+     */
+
+    void createComplete(int rc, LedgerHandle lh, Object ctx);
+  }
+
+  public interface OpenCallback {
+    /**
+     * Callback for asynchronous call to open ledger
+     * 
+     * @param rc
+     *          Return code
+     * @param lh
+     *          ledger handle
+     * @param ctx
+     *          control object
+     */
+
+    public void openComplete(int rc, LedgerHandle lh, Object ctx);
+
+  }
+
+  public interface ReadCallback {
+    /**
+     * Callback declaration
+     * 
+     * @param rc
+     *          return code
+     * @param ledgerId
+     *          ledger identifier
+     * @param seq
+     *          sequence of entries
+     * @param ctx
+     *          control object
+     */
+
+    void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq,
+        Object ctx);
+  }
 }



Mime
View raw message