hadoop-zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From f..@apache.org
Subject svn commit: r772098 - in /hadoop/zookeeper/trunk: ./ src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/ src/contrib...
Date Wed, 06 May 2009 08:49:48 GMT
Author: fpj
Date: Wed May  6 08:49:48 2009
New Revision: 772098

URL: http://svn.apache.org/viewvc?rev=772098&view=rev
Log:
ZOOKEEPER-380. bookkeeper should have a streaming api so that its easier to
store checpoints/snapshots in bookkeeper. (mahadev via flavio)


Added:
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerInputStream.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerOutputStream.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/CloseTest.java
Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerStream.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/NIOServerFactory.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LedgerRecoveryTest.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=772098&r1=772097&r2=772098&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Wed May  6 08:49:48 2009
@@ -113,6 +113,9 @@
 
   ZOOKEEPER-384. keeper exceptions missing path (phunt via mahadev)
 
+  ZOOKEEPER-380. bookkeeper should have a streaming api so that its easier to
+  store checpoints/snapshots in bookkeeper. (mahadev via flavio)
+
 NEW FEATURES:
 
   ZOOKEEPER-371. jdiff documentation included in build/release (giri via phunt)

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java?rev=772098&r1=772097&r2=772098&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java Wed May  6 08:49:48 2009
@@ -1,4 +1,4 @@
-package org.apache.bookkeeper.client;
+package org. apache.bookkeeper.client;
 /*
  * 
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -23,12 +23,8 @@
 
 import java.io.IOException;
 import java.net.ConnectException;
-import java.io.ByteArrayOutputStream;
-import java.security.NoSuchAlgorithmException;
 import java.nio.ByteBuffer;
-import java.security.MessageDigest;
 import java.util.ArrayList;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.HashSet;
 import java.util.List;
 import java.util.HashMap;
@@ -40,10 +36,6 @@
 import org.apache.bookkeeper.client.LedgerSequence;
 import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.LedgerHandle.QMode;
-import org.apache.bookkeeper.client.QuorumEngine.Operation;
-import org.apache.bookkeeper.client.QuorumEngine.Operation.AddOp;
-import org.apache.bookkeeper.client.QuorumEngine.Operation.ReadOp;
-import org.apache.bookkeeper.client.QuorumEngine.Operation.StopOp;
 import org.apache.log4j.Logger;
 
 import org.apache.zookeeper.data.Stat;
@@ -53,7 +45,6 @@
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.server.ZooKeeperServer;
 
 
 /**
@@ -63,12 +54,17 @@
  * There are three possible operations: start a new ledger, 
  * write to a ledger, and read from a ledger.
  * 
- *
  */
 
 public class BookKeeper 
-implements ReadCallback, AddCallback, Watcher {
-    
+implements Watcher {
+    /**
+     * the chain of classes to get a client 
+     * request from the bookeeper class to the 
+     * server is 
+     * bookkeeper->quorumengine->bookiehandle->bookieclient
+     * 
+     */
     Logger LOG = Logger.getLogger(BookKeeper.class);
 
     static public final String prefix = "/ledgers/L";
@@ -79,7 +75,6 @@
     
     ZooKeeper zk = null;
     QuorumEngine engine = null;
-    HashMap<Long, QuorumEngine> engines;
     HashSet<InetSocketAddress> bookieBlackList;
     
     LedgerSequence responseRead;
@@ -93,7 +88,6 @@
         
         //Create hashmap for quorum engines
         //this.qeMap = new HashMap<Long, ArrayBlockingQueue<Operation> >();
-        this.engines = new HashMap<Long, QuorumEngine >();
         //List to enable clients to blacklist bookies
         this.bookieBlackList = new HashSet<InetSocketAddress>();
     }
@@ -102,68 +96,27 @@
      * Watcher method. 
      */
     synchronized public void process(WatchedEvent event) {
-        LOG.info("Process: " + event.getType() + " " + event.getPath());
+        LOG.debug("Process: " + event.getType() + " " + event.getPath());
     }
     
-    
-    /**
-     * Implements objects to help with the synchronization of asynchronous calls
-     * 
-     */
-    
-    private static class RetCounter {
-        int i;
-        int rc;
-        int total;
-        LedgerSequence seq = null;
-        
-        synchronized void inc() {
-            i++;
-            total++;
-        }
-        synchronized void dec() {
-            i--;
-            notifyAll();
-        }
-        synchronized void block(int limit) throws InterruptedException {
-            while(i > limit) {
-                int prev = i;
-                wait(15000);
-                if(i == prev){
-                    break;
-                }
-            }
-        }
-        synchronized int total() {
-            return total;
-        }
-        
-        void setrc(int rc){
-            this.rc = rc;
-        }
-        
-        int getrc(){
-            return rc;
-        }
-        
-        void setSequence(LedgerSequence seq){
-            this.seq = seq;
-        }
-        
-        LedgerSequence getSequence(){
-            return seq;
-        }
-    }
+
     
     /**
      * Formats ledger ID according to ZooKeeper rules
      * 
      * @param id	znode id
      */
-    private String getZKStringId(long id){
+    String getZKStringId(long id){
         return String.format("%010d", id);        
     }
     
+    /**
+     * return the zookeeper instance
+     * @return return the zookeeper instance
+     */
+    ZooKeeper getZooKeeper() {
+        return zk;
+    }
     
     /**
      * Creates a new ledger. To create a ledger, we need to specify the ensemble
@@ -179,8 +132,8 @@
      * @param passwd    password
      */
     public LedgerHandle createLedger(int ensSize, int qSize, QMode mode,  byte passwd[])
-    throws KeeperException, InterruptedException, 
-    IOException, BKException {
+        throws KeeperException, InterruptedException, 
+        IOException, BKException {
         // Check that quorum size follows the minimum
         long t;
         switch(mode){
@@ -201,37 +154,29 @@
         case FREEFORM:
             break;
         }
-        
         /*
          * Create ledger node on ZK.
          * We get the id from the sequence number on the node.
          */
-        
         String path = zk.create(prefix, new byte[0], 
                 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
-        
         /* 
          * Extract ledger id.
          */
         String parts[] = path.split("/");
         String subparts[] = parts[2].split("L");
-        System.out.println("SubPath: " + subparts[0]);
         long lId = Long.parseLong(subparts[1]);
-               
         /* 
          * Get children from "/ledgers/available" on zk
          */
         List<String> list = 
             zk.getChildren("/ledgers/available", false);
         ArrayList<InetSocketAddress> lBookies = new ArrayList<InetSocketAddress>();
-        
         /* 
          * Select ensSize servers to form the ensemble
          */
-        System.out.println("create: " + (prefix + getZKStringId(lId) + ensemble));
         path = zk.create(prefix + getZKStringId(lId) + ensemble, new byte[0], 
                 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        
         /* 
          * Add quorum size to ZK metadata
          */
@@ -239,7 +184,6 @@
         bb.putInt(qSize);
         zk.create(prefix + getZKStringId(lId) + quorumSize, bb.array(), 
                 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        
         /* 
          * Quorum mode
          */
@@ -247,15 +191,11 @@
         bb.putInt(mode.ordinal());
         zk.create(prefix + getZKStringId(lId) + quorumMode, bb.array(), 
                 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        
         /* 
          * Create QuorumEngine
          */
         LedgerHandle lh = new LedgerHandle(this, lId, 0, qSize, mode, passwd);
-        //ArrayBlockingQueue<Operation> queue = new ArrayBlockingQueue<Operation>(200);
-        engines.put(lh.getId(), new QuorumEngine(lh)); //queue));
         //qeMap.put(lId, queue);
-        
         /*
          * Adding bookies to ledger handle
          */
@@ -269,7 +209,6 @@
         	    index = 0;
         	else {
         	    LOG.error("Not enough bookies available");
-        	    engines.remove(lh.getId());
         	    
         	    return null;
         	}
@@ -290,7 +229,6 @@
         	    i--;
         	} 
         }
-      
         LOG.debug("Created new ledger");
         // Return ledger handler
         return lh; 
@@ -377,7 +315,6 @@
          *  Create QuorumEngine
          */
         LedgerHandle lh = new LedgerHandle(this, lId, last, qSize, qMode, passwd);
-        engines.put(lh.getId(), new QuorumEngine(lh));// queue));
         
         /*
          * Get children of "/ledgers/id/ensemble" 
@@ -422,124 +359,7 @@
         return addr;
     }
     
-    /**
-     * Add entry synchronously to an open ledger.
-     * 
-     * @param	lh	LedgerHandle
-     * @param	data byte[]
-     */
-    
-    public long addEntry(LedgerHandle lh, byte[] data)
-    throws InterruptedException{
-        LOG.debug("Adding entry " + data);
-        RetCounter counter = new RetCounter();
-        counter.inc();
-        
-        if(lh != null){ 
-        	Operation r = new AddOp(lh, data, this, counter);
-        	engines.get(lh.getId()).sendOp(r);
-        	//qeMap.get(lh.getId()).put(r);
-        
-        	counter.block(0);
-        
-        	return counter.getrc();
-        } else return -1;
-    }
-    
-    /**
-     * Add entry asynchronously to an open ledger.
-     * 
-     * @param lh	ledger handle returned with create
-     * @param data	array of bytes to be written
-     * @param cb	object implementing callbackinterface
-     * @param ctx	some control object
-     */
-    public void asyncAddEntry(LedgerHandle lh, byte[] data, AddCallback cb, Object ctx)
-    throws InterruptedException {
-       
-        if(lh != null){
-            AddOp r = new AddOp(lh, data, cb, ctx);
-            engines.get(lh.getId()).sendOp(r);
-        }
-    }
-    
-    
-    /**
-     * Read a sequence of entries synchronously.
-     * 
-     * @param lh	ledger handle returned with create
-     * @param firstEntry	id of first entry of sequence
-     * @param lastEntry		id of last entry of sequence
-     *
-     */
-    public LedgerSequence readEntries(LedgerHandle lh, long firstEntry, long lastEntry) 
-    throws InterruptedException, BKException {
-        // Little sanity check
-        if((firstEntry > lh.getLast()) || (firstEntry > lastEntry))
-            throw BKException.create(Code.ReadException);
-        
-        RetCounter counter = new RetCounter();
-        counter.inc();
-        
-        Operation r = new ReadOp(lh, firstEntry, lastEntry, this, counter);
-        engines.get(lh.getId()).sendOp(r);
-        
-        LOG.debug("Going to wait for read entries: " + counter.i);
-        counter.block(0);
-        LOG.debug("Done with waiting: " + counter.i + ", " + firstEntry);
-        
-        if(counter.getSequence() == null) throw BKException.create(Code.ReadException);
-        return counter.getSequence();
-    }
-    
-    /**
-     * Read a sequence of entries asynchronously.
-     * 
-     * @param lh	ledger handle
-     * @param firstEntry	id of first entry of sequence
-     * @param lastEntry		id of last entry of sequence
-     * @param cb	object implementing read callback interface
-     * @param ctx	control object 
-     */
-    public void asyncReadEntries(LedgerHandle lh, long firstEntry, long lastEntry, ReadCallback cb, Object ctx)
-    throws BKException, InterruptedException {
-        // Little sanity check
-        if((firstEntry > lh.getLast()) || (firstEntry > lastEntry)) 
-            throw BKException.create(Code.ReadException);
-        
-        Operation r = new ReadOp(lh, firstEntry, lastEntry, cb, ctx);
-        engines.get(lh.getId()).sendOp(r); 
-        //qeMap.get(lh.getId()).put(r);
-    }
-    
-    /**
-     * Close ledger.
-     * 
-     * @param lh	handle of ledger to close
-     */
-    public void closeLedger(LedgerHandle lh) 
-    throws KeeperException, InterruptedException {
-        //Set data on zookeeper
-        ByteBuffer last = ByteBuffer.allocate(8);
-        last.putLong(lh.getLast());
-        LOG.info("Last saved on ZK is: " + lh.getLast());
-        String closePath = prefix + getZKStringId(lh.getId()) + close; 
-        if(zk.exists(closePath, false) == null){
-           zk.create(closePath, 
-                   last.array(), 
-                   Ids.OPEN_ACL_UNSAFE, 
-                   CreateMode.PERSISTENT); 
-        } else {
-            zk.setData(closePath, 
-                last.array(), -1);
-        }
-        lh.close();
-        for(QuorumEngine qe : engines.values()){
-        	StopOp sOp = new StopOp();
-        	qe.sendOp(sOp);
-        }
-    }
-    
+ 
     /**
      * Check if close node exists. 
      * 
@@ -619,7 +439,7 @@
      * 
      * @param addrList	list of bookies to replace
      */
-    public InetSocketAddress getNewBookie(ArrayList<InetSocketAddress> addrList)
+    InetSocketAddress getNewBookie(ArrayList<InetSocketAddress> addrList)
     throws InterruptedException {
         try{
             // Get children from "/ledgers/available" on zk
@@ -681,40 +501,5 @@
         bookieBlackList.add(addr);
     }
     
-    /**
-     * Implementation of callback interface for synchronous read method.
-     * 
-     * @param rc	return code
-     * @param leder	ledger identifier
-     * @param seq	sequence of entries
-     * @param ctx	control object
-     */
-    public void readComplete(int rc, 
-            long ledger, 
-            LedgerSequence seq,  
-            Object ctx){        
-        
-        RetCounter counter = (RetCounter) ctx;
-        counter.setSequence(seq);
-        LOG.debug("Read complete: " + seq.size() + ", " + counter.i);
-        counter.dec();
-    }
-    
-    /**
-     * Implementation of callback interface for synchronous read method.
-     * 
-     * @param rc	return code
-     * @param leder	ledger identifier
-     * @param entry	entry identifier
-     * @param ctx	control object
-     */
-    public void addComplete(int rc, 
-            long ledger, 
-            long entry, 
-            Object ctx){          
-        RetCounter counter = (RetCounter) ctx;
-        
-        counter.setrc(rc);
-        counter.dec();
-    }
+   
 }

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java?rev=772098&r1=772097&r2=772098&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java Wed May  6 08:49:48 2009
@@ -35,12 +35,9 @@
 import org.apache.bookkeeper.client.QuorumEngine.Operation;
 import org.apache.bookkeeper.client.QuorumEngine.SubOp;
 import org.apache.bookkeeper.client.QuorumEngine.Operation.AddOp;
-import org.apache.bookkeeper.client.QuorumEngine.Operation.ReadOp;
 import org.apache.bookkeeper.client.QuorumEngine.SubOp.SubAddOp;
 import org.apache.bookkeeper.client.QuorumEngine.SubOp.SubReadOp;
 import org.apache.bookkeeper.proto.BookieClient;
-import org.apache.bookkeeper.proto.ReadEntryCallback;
-import org.apache.bookkeeper.proto.WriteCallback;
 import org.apache.log4j.Logger;
 
 
@@ -50,7 +47,7 @@
  * 
  */
 
-public class BookieHandle extends Thread{
+class BookieHandle extends Thread{
     Logger LOG = Logger.getLogger(BookieClient.class);
     
     boolean stop = false;
@@ -64,7 +61,7 @@
      * Objects of this class are queued waiting to be
      * processed.
      */
-    class ToSend {
+    private static class ToSend {
     	LedgerHandle lh;
         long entry = -1;
         Object ctx;
@@ -79,7 +76,8 @@
     }
     
     /**
-     * @param addr	address
+     * @param addr	address of the bookkeeper server that this
+     * handle should connect to.
      */
     BookieHandle(InetSocketAddress addr) throws IOException {
         this.client = new BookieClient(addr, recvTimeout);

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java?rev=772098&r1=772097&r2=772098&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java Wed May  6 08:49:48 2009
@@ -23,6 +23,7 @@
 
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.client.QuorumEngine.Operation;
 import org.apache.bookkeeper.client.QuorumEngine.Operation.AddOp;
@@ -41,6 +42,7 @@
     static ClientCBWorker instance = null;
     
     private boolean stop = false;
+    private static int instanceCounter= 0;
     
     ArrayBlockingQueue<Operation> pendingOps;
     QuorumOpMonitor monitor;
@@ -50,6 +52,7 @@
         if(instance == null){
             instance = new ClientCBWorker();
         }
+        instanceCounter++;
         
         return instance;
     }
@@ -82,9 +85,11 @@
      * 
      */
     synchronized void shutdown(){
-        stop = true;
-        instance = null;
-        LOG.debug("Shutting down");
+        if((--instanceCounter) == 0){
+            stop = true;
+            instance = null;
+            LOG.debug("Shutting down");
+        }
     }
     
     

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=772098&r1=772097&r2=772098&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java Wed May  6 08:49:48 2009
@@ -32,18 +32,26 @@
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookieHandle;
 import org.apache.bookkeeper.client.BKException.Code;
+import org.apache.bookkeeper.client.QuorumEngine.Operation;
+import org.apache.bookkeeper.client.QuorumEngine.Operation.AddOp;
+import org.apache.bookkeeper.client.QuorumEngine.Operation.ReadOp;
+import org.apache.bookkeeper.client.QuorumEngine.Operation.StopOp;
 import org.apache.log4j.Logger;
-
-
-
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
 
 /**
  * Ledger handle on the client side. Contains ledger metadata
- * used to access it.
- * 
+ * used to access it. This api exposes the read and write 
+ * to a ledger and also exposes a streaming api for the ledger.
  */
-
-public class LedgerHandle {
+public class LedgerHandle implements ReadCallback, AddCallback {
+    /**
+     * the call stack looks like --
+     * ledgerhandle->write->bookeeper->quorumengine->bookiehandle
+     * ->bookieclient
+     */
     Logger LOG = Logger.getLogger(LedgerHandle.class);
     
     public enum QMode {VERIFIABLE, GENERIC, FREEFORM};
@@ -55,7 +63,7 @@
     private ArrayList<BookieHandle> bookies;
     private ArrayList<InetSocketAddress> bookieAddrList;
     private BookKeeper bk;
-
+    private QuorumEngine qe;
     private int qSize;
     private QMode qMode = QMode.VERIFIABLE;
 
@@ -66,6 +74,14 @@
     private byte[] ledgerKey;
     private byte[] passwd;
     
+    /**
+     * @param bk the bookkeeper handle
+     * @param ledger the id for this ledger
+     * @param last the last id written 
+     * @param passwd the passwd to encode
+     * the entries
+     * @throws InterruptedException
+     */
     LedgerHandle(BookKeeper bk, 
             long ledger, 
             long last,
@@ -77,10 +93,21 @@
         this.passwd = passwd;
         genLedgerKey(passwd);
         genMacKey(passwd);
-
         this.qSize = (bookies.size() + 1)/2;
+        this.qe = new QuorumEngine(this);
     }
     
+    /**
+     * @param bk the bookkeeper handle
+     * @param ledger the id for this ledger
+     * @param last the last entree written
+     * @param qSize the queuing size 
+     * for this ledger
+     * @param mode the quueuing mode
+     * for this ledger
+     * @param passwd the passwd to encode
+     * @throws InterruptedException
+     */
     LedgerHandle(BookKeeper bk, 
             long ledger, 
             long last,
@@ -97,9 +124,19 @@
         this.passwd = passwd;
         genLedgerKey(passwd);
         genMacKey(passwd);
+        this.qe = new QuorumEngine(this);
     }
         
-        
+    /**
+     * 
+     * @param bk the bookkeeper handle
+     * @param ledger the id for this ledger
+     * @param last the last entree written
+     * @param qSize the queuing size 
+     * for this ledger
+     * @param passwd the passwd to encode
+     * @throws InterruptedException
+     */
     LedgerHandle(BookKeeper bk, 
             long ledger, 
             long last,
@@ -114,6 +151,7 @@
         this.passwd = passwd;
         genLedgerKey(passwd);
         genMacKey(passwd);
+        this.qe = new QuorumEngine(this);
     }
     
     private void setBookies(ArrayList<InetSocketAddress> bookies)
@@ -127,18 +165,29 @@
     		}
     	} catch(ConnectException e){
     		LOG.error(e);
-                
     		InetSocketAddress addr = bk.getNewBookie(bookies);
-                
-                if(addr != null){
-                	bookies.add(addr);
-                }
+    		if(addr != null){
+    		    bookies.add(addr);
+    		}
     	} catch(IOException e) {
     		LOG.error(e);
     	}
     }
     
+    /**
+     * set the quorum engine
+     * @param qe the quorum engine
+     */
+    void setQuorumEngine(QuorumEngine qe) {
+        this.qe = qe;
+    }
     
+    /** get the quorum engine
+     * @return return the quorum engine
+     */
+    QuorumEngine getQuorumEngine(QuorumEngine qe) {
+        return this.qe;
+    }
     
     /**
      * Create bookie handle and add it to the list
@@ -147,16 +196,14 @@
      */
     int addBookie(InetSocketAddress addr)
     throws IOException {
-        LOG.info("My address: " + addr.toString());
+        LOG.debug("Bookie address: " + addr);
         //BookieHandle bh = new BookieHandle(this, addr);
         this.bookies.add(bk.getBookieHandle(addr));
-        
         if(bookies.size() > qSize) setThreshold();
-        
         return (this.bookies.size() - 1);
     }
     
-    private void setThreshold(){
+    private void setThreshold() {
         switch(qMode){
         case GENERIC:
             threshold = bookies.size() - qSize/2;
@@ -170,14 +217,13 @@
         
     }
     
-    public int getThreshold(){
+    public int getThreshold() {
         return threshold;
     }
     
     /**
      * Replace bookie in the case of a failure 
      */
-    
     void replaceBookie(int index) 
     throws BKException {
         InetSocketAddress addr = null;
@@ -222,7 +268,7 @@
         bookies.remove(index);
     }
     
-    void close(){
+    void closeUp(){
         ledger = -1;
         last = -1;
         bk.haltBookieHandles(bookies);
@@ -390,4 +436,198 @@
        return ledgerKey; 
     }
     
+    /**
+     * Close ledger.
+     * 
+     */
+    public void close() 
+    throws KeeperException, InterruptedException {
+        //Set data on zookeeper
+        ByteBuffer last = ByteBuffer.allocate(8);
+        last.putLong(getLast());
+        LOG.info("Last saved on ZK is: " + getLast());
+        String closePath = BookKeeper.prefix + bk.getZKStringId(getId()) + BookKeeper.close; 
+        if(bk.getZooKeeper().exists(closePath, false) == null){
+           bk.getZooKeeper().create(closePath, 
+                   last.array(), 
+                   Ids.OPEN_ACL_UNSAFE, 
+                   CreateMode.PERSISTENT); 
+        } else {
+            bk.getZooKeeper().setData(closePath, 
+                last.array(), -1);
+        }
+        closeUp();
+        StopOp sOp = new StopOp();
+        qe.sendOp(sOp);
+    }
+    
+    /**
+     * Read a sequence of entries asynchronously.
+     * 
+     * @param firstEntry    id of first entry of sequence
+     * @param lastEntry     id of last entry of sequence
+     * @param cb    object implementing read callback interface
+     * @param ctx   control object 
+     */
+    public void asyncReadEntries(long firstEntry, 
+            long lastEntry, ReadCallback cb, Object ctx)
+    throws BKException, InterruptedException {
+        // Little sanity check
+        if((firstEntry > getLast()) || (firstEntry > lastEntry)) 
+            throw BKException.create(Code.ReadException);
+        
+        Operation r = new ReadOp(this, firstEntry, lastEntry, cb, ctx);
+        qe.sendOp(r); 
+        //qeMap.get(lh.getId()).put(r);
+    }
+    
+    
+    /**
+     * Read a sequence of entries synchronously.
+     * 
+     * @param firstEntry    id of first entry of sequence
+     * @param lastEntry     id of last entry of sequence
+     *
+     */
+    public LedgerSequence readEntries(long firstEntry, long lastEntry) 
+    throws InterruptedException, BKException {
+        // Little sanity check
+        if((firstEntry > getLast()) || (firstEntry > lastEntry))
+            throw BKException.create(Code.ReadException);
+        
+        RetCounter counter = new RetCounter();
+        counter.inc();
+        
+        Operation r = new ReadOp(this, firstEntry, lastEntry, this, counter);
+        qe.sendOp(r);
+        
+        LOG.debug("Going to wait for read entries: " + counter.i);
+        counter.block(0);
+        LOG.debug("Done with waiting: " + counter.i + ", " + firstEntry);
+        
+        if(counter.getSequence() == null) throw BKException.create(Code.ReadException);
+        return counter.getSequence();
+    }
+   
+    /**
+     * Add entry asynchronously to an open ledger.
+     * 
+     * @param data  array of bytes to be written
+     * @param cb    object implementing callbackinterface
+     * @param ctx   some control object
+     */
+    public void asyncAddEntry(byte[] data, AddCallback cb, Object ctx)
+    throws InterruptedException {
+        AddOp r = new AddOp(this, data, cb, ctx);
+        qe.sendOp(r);
+    }
+    
+    
+    /**
+     * Add entry synchronously to an open ledger.
+     * 
+     * @param   data byte[]
+     */
+    
+    public long addEntry(byte[] data)
+    throws InterruptedException{
+        LOG.debug("Adding entry " + data);
+        RetCounter counter = new RetCounter();
+        counter.inc();
+        
+        Operation r = new AddOp(this, data, this, counter);
+        qe.sendOp(r);   
+        //qeMap.get(lh.getId()).put(r);
+        counter.block(0);
+        return counter.getrc();
+    }
+    
+    
+    /**
+     * Implementation of callback interface for synchronous read method.
+     * 
+     * @param rc    return code
+     * @param leder ledger identifier
+     * @param seq   sequence of entries
+     * @param ctx   control object
+     */
+    public void readComplete(int rc, 
+            long ledger, 
+            LedgerSequence seq,  
+            Object ctx){        
+        
+        RetCounter counter = (RetCounter) ctx;
+        counter.setSequence(seq);
+        LOG.debug("Read complete: " + seq.size() + ", " + counter.i);
+        counter.dec();
+    }
+    
+    /**
+     * Implementation of callback interface for synchronous read method.
+     * 
+     * @param rc    return code
+     * @param leder ledger identifier
+     * @param entry entry identifier
+     * @param ctx   control object
+     */
+    public void addComplete(int rc, 
+            long ledger, 
+            long entry, 
+            Object ctx){          
+        RetCounter counter = (RetCounter) ctx;
+        
+        counter.setrc(rc);
+        counter.dec();
+    }
+    
+    
+    
+    /**
+     * Implements objects to help with the synchronization of asynchronous calls
+     * 
+     */
+    
+    private static class RetCounter {
+        int i;
+        int rc;
+        int total;
+        LedgerSequence seq = null;
+        
+        synchronized void inc() {
+            i++;
+            total++;
+        }
+        synchronized void dec() {
+            i--;
+            notifyAll();
+        }
+        synchronized void block(int limit) throws InterruptedException {
+            while(i > limit) {
+                int prev = i;
+                wait(15000);
+                if(i == prev){
+                    break;
+                }
+            }
+        }
+        synchronized int total() {
+            return total;
+        }
+        
+        void setrc(int rc){
+            this.rc = rc;
+        }
+        
+        int getrc(){
+            return rc;
+        }
+        
+        void setSequence(LedgerSequence seq){
+            this.seq = seq;
+        }
+        
+        LedgerSequence getSequence(){
+            return seq;
+        }
+    }
 }

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java?rev=772098&r1=772097&r2=772098&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java Wed May  6 08:49:48 2009
@@ -34,7 +34,6 @@
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerSequence;
-import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.LedgerHandle.QMode;
 import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.ReadEntryCallback;
@@ -136,7 +135,6 @@
          */
         
         LedgerHandle lh = new LedgerHandle(self, lId, 0, qSize, qMode, passwd);
-        self.engines.put(lh.getId(), new QuorumEngine(lh));
         for(InetSocketAddress addr : bookies){
             lh.addBookie(addr);
         }
@@ -151,14 +149,14 @@
                 while(hasMore){
                     hasMore = false;
                     LOG.debug("Recovering: " + lh.getLast());
-                    LedgerSequence ls = self.readEntries(lh, lh.getLast(), lh.getLast());
+                    LedgerSequence ls = lh.readEntries(lh.getLast(), lh.getLast());
                     //if(ls == null) throw BKException.create(Code.ReadException);
                     LOG.debug("Received entry for: " + lh.getLast());
                     
                     byte[] le = ls.nextElement().getEntry();
                     if(le != null){
                         if(notLegitimate) notLegitimate = false;
-                        self.addEntry(lh, le);
+                        lh.addEntry(le);
                         //lh.incLast();
                         hasMore = true;
                     }
@@ -171,12 +169,12 @@
          */
         if(!notLegitimate){
             //lh.setLast(readCounter);
-            self.closeLedger(lh);
+            lh.close();
             
             return true;
         } else {
         	lh.setLast(0);
-        	self.closeLedger(lh);
+        	lh.close();
         	
         	return false;
         }

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerStream.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerStream.java?rev=772098&r1=772097&r2=772098&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerStream.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerStream.java Wed May  6 08:49:48 2009
@@ -1,104 +0,0 @@
-package org.apache.bookkeeper.client;
-/*
- * 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * 
- */
-
-
-import java.util.List;
-import java.util.ArrayList;
-import java.util.NoSuchElementException;
-
-import org.apache.log4j.Logger;
-
-/**
- * Sequence of entries of a ledger. Used to return a sequence of entries
- * upon an asynchornous read call.
- * 
- * This is feature is under construction.
- * 
- */
-
-public class LedgerStream {
-    Logger LOG = Logger.getLogger(LedgerStream.class);
-    
-    private ArrayList<LedgerEntry> pending;
-    private ArrayList<LedgerEntry> toDeliver;
-    private long index;
-    
-    
-    /**
-     * Constructor takes the first entry id expected.
-     * 
-     *  @param first    long
-     */
-    public LedgerStream(long first){
-        pending = new ArrayList<LedgerEntry>();
-        toDeliver = new ArrayList<LedgerEntry>();
-        index = first;
-    }
-    
-    /**
-     * Read the next entry if available. It blocks if the next entry
-     * is not yet available.
-     */
-    
-    public LedgerEntry readEntry(){
-        synchronized(toDeliver){
-            if(toDeliver.size() == 0){
-                try{
-                    toDeliver.wait();
-                } catch(InterruptedException e){
-                    LOG.info("Received an interrupted exception", e);
-                }
-            }
-            return toDeliver.get(0);
-        }
-    }
-    
-    /**
-     * Invoked upon reception of a new ledger entry.
-     * 
-     * @param le    a new ledger entry to deliver.
-     */
-    
-    public void addEntry(LedgerEntry le){
-        synchronized(toDeliver){
-            if(index == le.getEntryId()){
-                toDeliver.add(le);
-                index++;
-                
-                boolean noMore = false;
-                while(!noMore){
-                    noMore = true;
-                    for(int i = 0; i < pending.size(); i++){
-                        if(pending.get(i).getEntryId() == index){
-                            toDeliver.add(pending.get(i));
-                            index++;
-                            noMore = false;
-                        }
-                    }   
-                }
-                toDeliver.notify();
-            } else {
-                pending.add(le);
-            }
-        }
-    }
-}

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java?rev=772098&r1=772097&r2=772098&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java Wed May  6 08:49:48 2009
@@ -22,16 +22,10 @@
 
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.io.ByteArrayOutputStream;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.bookkeeper.client.BookieHandle;
 import org.apache.bookkeeper.client.ClientCBWorker;
-import org.apache.bookkeeper.client.ErrorCodes;
 import org.apache.bookkeeper.client.QuorumOpMonitor;
 import org.apache.bookkeeper.client.QuorumOpMonitor.PendingOp;
 import org.apache.bookkeeper.client.QuorumOpMonitor.PendingReadOp;
@@ -53,7 +47,6 @@
     ClientCBWorker cbWorker;
     LedgerHandle lh;
     int qRef = 0;
-    boolean stop = false;
     
     /**
      * Operation descriptor: Requests generated by BookKeeper.java
@@ -263,7 +256,6 @@
             //qRef = (qRef + 1) % n;
             break;
                 case Operation.STOP:
-                    stop = true;
                     cbWorker.shutdown();
                     break;
         }

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java?rev=772098&r1=772097&r2=772098&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java Wed May  6 08:49:48 2009
@@ -22,19 +22,16 @@
 
 
 import java.io.IOException;
-import java.lang.reflect.Array;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.ConcurrentHashMap;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.security.InvalidKeyException;
 import javax.crypto.Mac; 
-import javax.crypto.spec.SecretKeySpec;
 
 
 import org.apache.bookkeeper.client.BookieHandle;
@@ -42,8 +39,6 @@
 import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.ErrorCodes;
 import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.client.LedgerHandle.QMode;
-import org.apache.bookkeeper.client.QuorumEngine.Operation;
 import org.apache.bookkeeper.client.QuorumEngine.Operation.AddOp;
 import org.apache.bookkeeper.client.QuorumEngine.Operation.ReadOp;
 import org.apache.bookkeeper.client.QuorumEngine.SubOp.SubAddOp;
@@ -150,8 +145,6 @@
     
     public void writeComplete(int rc, long ledgerId, long entryId, Object ctx){ 
         //PendingAddOp pOp;
-        String logmsg;
-        
         //synchronized(pendingAdds){
         //pOp = pendingAdds.get(entryId);
         //}
@@ -165,8 +158,6 @@
         }
         
         ArrayList<BookieHandle> list = lh.getBookies();
-        int n = list.size();
-         
         if(rc == 0){
             // Everything went ok with this op
             synchronized(pOp){ 
@@ -319,7 +310,7 @@
                     }
             
                     
-                    long diff = rOp.lastEntry - rOp.firstEntry;
+                    //long diff = rOp.lastEntry - rOp.firstEntry;
                     //LOG.debug("Counter: " + rOp.counter + ", " + diff);
                 }
             }

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java?rev=772098&r1=772097&r2=772098&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java Wed May  6 08:49:48 2009
@@ -23,14 +23,11 @@
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.SocketAddress;
 import java.net.ConnectException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.Semaphore;
-import java.util.Arrays;
 import java.security.NoSuchAlgorithmException;
 import java.security.InvalidKeyException;
 import java.security.MessageDigest;
@@ -46,7 +43,7 @@
 /**
  * Implements the client-side part of the BookKeeper protocol. 
  * 
- */
+ */    
 public class BookieClient extends Thread {
 	Logger LOG = Logger.getLogger(BookieClient.class);
     SocketChannel sock;
@@ -74,13 +71,11 @@
         }
 
         T cb;
-
         Object ctx;
     }
 
     private static class CompletionKey {
         long ledgerId;
-
         long entryId;
 
         CompletionKey(long ledgerId, long entryId) {
@@ -104,11 +99,12 @@
 
     }
 
-    ConcurrentHashMap<CompletionKey, Completion<WriteCallback>> addCompletions = new ConcurrentHashMap<CompletionKey, Completion<WriteCallback>>();
-    ConcurrentHashMap<CompletionKey, Completion<ReadEntryCallback>> readCompletions = new ConcurrentHashMap<CompletionKey, Completion<ReadEntryCallback>>();
+    ConcurrentHashMap<CompletionKey, Completion<WriteCallback>> addCompletions = 
+        new ConcurrentHashMap<CompletionKey, Completion<WriteCallback>>();
+    
+    ConcurrentHashMap<CompletionKey, Completion<ReadEntryCallback>> readCompletions =
+        new ConcurrentHashMap<CompletionKey, Completion<ReadEntryCallback>>();
     
-    //Object writeLock = new Object();
-    //Object readLock = new Object();
     
     /*
      * Use this semaphore to control the number of completion key in both addCompletions
@@ -282,8 +278,8 @@
                     bb.get(data);
                     ByteBuffer entryData = ByteBuffer.wrap(data);
                     //ByteBuffer entryData = bb;
-                    //LOG.info("Received entry: " + ledgerId + ", " + entryId + ", " + rc + ", " + entryData.array().length + ", " + bb.array().length + ", " + bb.remaining());
-          
+                    //LOG.info("Received entry: " + ledgerId + ", " + entryId
+                    // + ", " + rc + ", " + entryData.array().length + ", " + bb.array().length + ", " + bb.remaining());          
                     
                     CompletionKey key = new CompletionKey(ledgerId, entryId);
                     Completion<ReadEntryCallback> c;

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/NIOServerFactory.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/NIOServerFactory.java?rev=772098&r1=772097&r2=772098&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/NIOServerFactory.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/NIOServerFactory.java Wed May  6 08:49:48 2009
@@ -18,10 +18,7 @@
 
 package org.apache.bookkeeper.proto;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.CancelledKeyException;
@@ -34,7 +31,6 @@
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
 

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerInputStream.java?rev=772098&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerInputStream.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerInputStream.java Wed May  6 08:49:48 2009
@@ -0,0 +1,164 @@
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+package org.apache.bookkeeper.streaming;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.LedgerSequence;
+import org.apache.log4j.Logger;
+
+public class LedgerInputStream extends InputStream {
+    Logger LOG = Logger.getLogger(LedgerInputStream.class);
+    private LedgerHandle lh;
+    private ByteBuffer bytebuff;
+    byte[] bbytes;
+    long lastEntry =0;
+    int increment = 50;
+    int defaultSize = 1024 * 1024; // 1MB default size
+    LedgerSequence ledgerSeq = null;
+    
+    /**
+     * construct a outputstream from a ledger handle
+     * @param lh ledger handle
+     * @throws {@link BKException}, {@link InterruptedException}
+     */
+    public LedgerInputStream(LedgerHandle lh) throws BKException, InterruptedException {
+        this.lh = lh;
+        bbytes = new byte[defaultSize];
+        this.bytebuff = ByteBuffer.wrap(bbytes);
+        this.bytebuff.position(this.bytebuff.limit());
+        lastEntry = Math.max(lh.getLast(), increment);
+        ledgerSeq = lh.readEntries(0, lastEntry);
+    }
+
+    /**
+     * construct a outputstream from a ledger handle
+     * @param lh the ledger handle
+     * @param size the size of the buffer
+     * @throws {@link BKException}, {@link InterruptedException}
+     */
+    public LedgerInputStream(LedgerHandle lh, int size) throws BKException, InterruptedException {
+        this.lh = lh;
+        bbytes = new byte[size];
+        this.bytebuff = ByteBuffer.wrap(bbytes);
+        this.bytebuff.position(this.bytebuff.limit());
+        lastEntry = Math.max(lh.getLast(), increment);
+        ledgerSeq = lh.readEntries(0, lastEntry);
+    }
+    
+    
+    @Override
+    public void close() {
+        // do nothing
+        // let the applciation
+        // close the ledger
+    }
+    
+    /**
+     * refill the buffer, we 
+     * need to read more bytes
+     * @return if we can refill or not
+     */
+    private synchronized boolean refill() throws IOException {
+        bytebuff.clear();
+        if (!ledgerSeq.hasMoreElements() && lastEntry >= lh.getLast()) {
+            return false;
+        }
+        if (!ledgerSeq.hasMoreElements()) {
+            //do refill 
+            long last = Math.max( lastEntry + increment, lh.getLast());
+            try {
+                ledgerSeq = lh.readEntries(lastEntry + 1, last);
+            } catch(BKException bk) {
+                IOException ie = new IOException(bk.getMessage());
+                ie.initCause(bk);
+                throw ie;
+            } catch(InterruptedException ie) {
+                Thread.currentThread().interrupt();
+            }
+            lastEntry = last;
+        }
+        LedgerEntry le = ledgerSeq.nextElement();
+        bbytes = le.getEntry();
+        bytebuff = ByteBuffer.wrap(bbytes);
+        return true;
+    }
+    
+    @Override
+    public synchronized int read() throws IOException {
+        boolean toread = true;
+        if (bytebuff.remaining() == 0) {
+            // their are no remaining bytes
+            toread = refill();
+        }
+        if (toread) {
+            int ret = 0xFF & bytebuff.get();
+            return ret;
+        }
+        return -1;
+    }
+    
+    @Override
+    public synchronized int read(byte[] b) throws IOException {
+        // be smart ... just copy the bytes 
+        // once and return the size
+        // user will call it again
+        boolean toread = true;
+        if (bytebuff.remaining() == 0) {
+            toread = refill();
+        }
+        if (toread) {
+            int bcopied = bytebuff.remaining();
+            int tocopy = Math.min(bcopied, b.length);
+            //cannot used gets because of
+            // the underflow/overflow exceptions
+            System.arraycopy(bbytes, bytebuff.position(), b,0, tocopy);
+            bytebuff.position(bytebuff.position() + tocopy);
+            return tocopy;
+        }
+        return -1;
+    }
+    
+    @Override
+    public synchronized int read(byte[] b, int off, int len) throws IOException {
+        //again dont need ot fully
+        // fill b, just return 
+        // what we have and let the application call read
+        // again
+        boolean toread = true;
+        if (bytebuff.remaining() == 0) {
+            toread = refill();
+        }
+        if (toread) {
+            int bcopied = bytebuff.remaining();
+            int tocopy = Math.min(bcopied, len);
+            System.arraycopy(bbytes, bytebuff.position(), b, off, tocopy);
+            bytebuff.position(bytebuff.position() + tocopy);
+            return tocopy;
+        }
+        return -1;
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerOutputStream.java?rev=772098&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerOutputStream.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/streaming/LedgerOutputStream.java Wed May  6 08:49:48 2009
@@ -0,0 +1,143 @@
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+package org.apache.bookkeeper.streaming;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.log4j.Logger;
+
+
+/**
+ * this class provides a streaming api 
+ * to get an output stream from a ledger
+ * handle and write to it as a stream of 
+ * bytes. This is built on top of ledgerhandle
+ * api and uses a buffer to cache the data
+ * written to it and writes out the entry 
+ * to the ledger.
+ */
+public class LedgerOutputStream extends OutputStream {
+    Logger LOG = Logger.getLogger(LedgerOutputStream.class);
+    private LedgerHandle lh;
+    private ByteBuffer bytebuff;
+    byte[] bbytes;
+    int defaultSize = 1024 * 1024; // 1MB default size
+    
+    /**
+     * construct a outputstream from a ledger handle
+     * @param lh ledger handle
+     */
+    public LedgerOutputStream(LedgerHandle lh) {
+        this.lh = lh;
+        bbytes = new byte[defaultSize];
+        this.bytebuff = ByteBuffer.wrap(bbytes);
+    }
+    
+    /**
+     * construct a outputstream from a ledger handle
+     * @param lh the ledger handle
+     * @param size the size of the buffer
+     */
+    public LedgerOutputStream(LedgerHandle lh, int size) {
+        this.lh = lh;
+        bbytes = new byte[size];
+        this.bytebuff = ByteBuffer.wrap(bbytes);
+    }
+    
+    @Override
+    public void close() {
+        //flush everything
+        // we have
+        flush();
+    }
+    
+    @Override
+    public synchronized void flush() {
+        // lets flush all the data 
+        // into the ledger entry
+        if (bytebuff.position() > 0) {
+            //copy the bytes into 
+            // a new byte buffer and send it out
+            byte[] b = new byte[bytebuff.position()];
+            LOG.info("Comment: flushing with params " + " " + bytebuff.position());
+            System.arraycopy(bbytes, 0, b, 0, bytebuff.position());
+            try {
+                lh.addEntry(b);
+            } catch(InterruptedException ie) {
+                LOG.warn("Interrupted while flusing " + ie);
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+    
+    /**
+     * make space for len bytes to be written
+     * to the buffer. 
+     * @param len
+     * @return if true then we can make space for len
+     * if false we cannot
+     */
+    private boolean makeSpace(int len) {
+        if (bytebuff.remaining() < len) {
+            flush();
+            bytebuff.clear();
+            if (bytebuff.capacity() < len) {
+                return false;
+            }
+        }
+        return true;
+    }
+    
+    @Override
+    public synchronized void write(byte[] b) {
+        if (makeSpace(b.length)) {
+            bytebuff.put(b);
+        }
+        else {
+            try {
+                lh.addEntry(b);
+            } catch(InterruptedException ie) {
+                LOG.warn("Interrupted while writing", ie);
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+    
+    @Override
+    public synchronized void write(byte[] b, int off, int len) {
+        if (!makeSpace(len)) {
+            //lets try making the buffer bigger
+            bbytes = new byte[len];
+            bytebuff = ByteBuffer.wrap(bbytes);
+        }
+        bytebuff.put(b, off, len);
+    }
+    
+    @Override
+    public synchronized void write(int b) throws IOException {
+        makeSpace(1);
+        byte oneB = (byte) (b & 0xFF);
+        bytebuff.put(oneB);
+    }
+}
\ No newline at end of file

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java?rev=772098&r1=772097&r2=772098&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java Wed May  6 08:49:48 2009
@@ -26,8 +26,6 @@
 import java.util.Arrays;
 
 import org.junit.Test;
-import org.apache.bookkeeper.bookie.Bookie;
-import org.apache.bookkeeper.client.AddCallback;
 import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookieServer;

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java?rev=772098&r1=772097&r2=772098&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java Wed May  6 08:49:48 2009
@@ -25,7 +25,6 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
-import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.Random;
 import java.util.Set;
@@ -37,6 +36,8 @@
 import org.apache.bookkeeper.client.LedgerSequence;
 import org.apache.bookkeeper.client.ReadCallback;
 import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.streaming.LedgerInputStream;
+import org.apache.bookkeeper.streaming.LedgerOutputStream;
 import org.apache.bookkeeper.util.ClientBase;
 import org.apache.log4j.ConsoleAppender;
 import org.apache.log4j.Level;
@@ -48,7 +49,6 @@
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.server.NIOServerCnxn;
-import org.apache.zookeeper.server.ServerStats;
 import org.apache.zookeeper.server.ZooKeeperServer;
 
 import org.apache.zookeeper.ZooDefs.Ids;
@@ -111,6 +111,63 @@
 		}    	
     }
     
+    /**
+     * test the streaming api for reading
+     * and writing
+     * @throws {@link IOException}, {@link KeeperException}
+     */
+    @Test
+    public void testStreamingClients() throws IOException,
+        KeeperException, BKException, InterruptedException {
+        bkc = new BookKeeper("127.0.0.1");
+        lh = bkc.createLedger(ledgerPassword);
+        //write a string so that we cna
+        // create a buffer of a single bytes
+        // and check for corner cases
+        String toWrite = "we need to check for this string to match " +
+        		"and for the record mahadev is the best";
+        LedgerOutputStream lout = new LedgerOutputStream(lh , 1);
+        byte[] b = toWrite.getBytes();
+        lout.write(b);
+        lout.close();
+        long lId = lh.getId();
+        lh.close();
+        //check for sanity
+        lh = bkc.openLedger(lId, ledgerPassword);
+        LedgerInputStream lin = new LedgerInputStream(lh,  1);
+        byte[] bread = new byte[b.length];
+        int read = 0;
+        while (read < b.length) { 
+            read = read + lin.read(bread, read, b.length);
+        }
+        
+        String newString = new String(bread);
+        assertTrue("these two should same", toWrite.equals(newString));
+        lin.close();
+        lh.close();
+        //create another ledger to write one byte at a time
+        lh = bkc.createLedger(ledgerPassword);
+        lout = new LedgerOutputStream(lh);
+        for (int i=0; i < b.length;i++) {
+            lout.write(b[i]);
+        }
+        lout.close();
+        lId = lh.getId();
+        lh.close();
+        lh = bkc.openLedger(lId, ledgerPassword);
+        lin = new LedgerInputStream(lh);
+        bread = new byte[b.length];
+        read= 0;
+        while (read < b.length) {
+            read = read + lin.read(bread, read, b.length);
+        }
+        newString = new String(bread);
+        assertTrue("these two should be same ", toWrite.equals(newString));
+        lin.close();
+        lh.close();
+    }
+        
+    
     @Test
 	public void testReadWriteAsyncSingleClient() throws IOException{
 		try {
@@ -127,7 +184,7 @@
 				
 				entries.add(entry.array());
 				entriesSize.add(entry.array().length);
-				bkc.asyncAddEntry(lh, entry.array(), this, sync);
+				lh.asyncAddEntry(entry.array(), this, sync);
 			}
 			
 			// wait for all entries to be acknowledged
@@ -140,7 +197,7 @@
 			
 			LOG.debug("*** WRITE COMPLETE ***");
 			// close ledger 
-			bkc.closeLedger(lh);
+			lh.close();
 			
 			//*** WRITING PART COMPLETE // READ PART BEGINS ***
 			
@@ -150,7 +207,7 @@
 			assertTrue("Verifying number of entries written", lh.getLast() == numEntriesToWrite);		
 			
 			//read entries
-			bkc.asyncReadEntries(lh, 0, numEntriesToWrite - 1, this, (Object) sync);
+			lh.asyncReadEntries(0, numEntriesToWrite - 1, this, (Object) sync);
 			
 			synchronized (sync) {
 				while(sync.value == false){
@@ -178,7 +235,7 @@
 				assertTrue("Checking entry " + i + " for size", entry.length == entriesSize.get(i).intValue());
 				i++;
 			}
-			bkc.closeLedger(lh);
+			lh.close();
 		} catch (KeeperException e) {
 			e.printStackTrace();
 		} catch (BKException e) {
@@ -207,7 +264,7 @@
 				int randomInt = rng.nextInt(maxInt);
 				byte[] entry = new String(Integer.toString(randomInt)).getBytes(charset);
 				entries.add(entry);
-				bkc.asyncAddEntry(lh, entry, this, sync);
+				lh.asyncAddEntry(entry, this, sync);
 			}
 			
 			// wait for all entries to be acknowledged
@@ -220,7 +277,7 @@
 			
 			LOG.debug("*** ASYNC WRITE COMPLETE ***");
 			// close ledger 
-			bkc.closeLedger(lh);
+			lh.close();
 			
 			//*** WRITING PART COMPLETED // READ PART BEGINS ***
 			
@@ -230,7 +287,7 @@
 			assertTrue("Verifying number of entries written", lh.getLast() == numEntriesToWrite);		
 			
 			//read entries			
-			ls = bkc.readEntries(lh, 0, numEntriesToWrite - 1);
+			ls = lh.readEntries(0, numEntriesToWrite - 1);
 			
 			assertTrue("Checking number of read entries", ls.size() == numEntriesToWrite);
 			
@@ -253,7 +310,7 @@
 				
 				assertTrue("Checking entry " + i + " for equality", origEntry.equals(retrEntry));
 			}
-			bkc.closeLedger(lh);
+			lh.close();
 		} catch (KeeperException e) {
 			e.printStackTrace();
 		} catch (BKException e) {
@@ -280,14 +337,14 @@
 				entry.putInt(rng.nextInt(maxInt));
 				entry.position(0);
 				entries.add(entry.array());				
-				bkc.addEntry(lh, entry.array());
+				lh.addEntry(entry.array());
 			}
-			bkc.closeLedger(lh);
+			lh.close();
 			lh = bkc.openLedger(ledgerId, ledgerPassword);
 			LOG.debug("Number of entries written: " + lh.getLast());
 			assertTrue("Verifying number of entries written", lh.getLast() == numEntriesToWrite);		
 			
-			ls = bkc.readEntries(lh, 0, numEntriesToWrite - 1);
+			ls = lh.readEntries(0, numEntriesToWrite - 1);
 			int i = 0;
 			while(ls.hasMoreElements()){
 			    ByteBuffer origbb = ByteBuffer.wrap(entries.get(i++));
@@ -300,7 +357,7 @@
 				LOG.debug("Retrieved entry: " + retrEntry);
 				assertTrue("Checking entry " + i + " for equality", origEntry.equals(retrEntry));
 			}
-			bkc.closeLedger(lh);
+			lh.close();
 		} catch (KeeperException e) {
 			e.printStackTrace();
 		} catch (BKException e) {
@@ -322,7 +379,7 @@
 			ledgerId = lh.getId();
 			LOG.info("Ledger ID: " + lh.getId());
 			for(int i = 0; i < numEntriesToWrite; i++){				
-				bkc.addEntry(lh, new byte[0]);
+			lh.addEntry(new byte[0]);
 			}
 			
 			/*
@@ -332,14 +389,14 @@
 			entry.putInt(rng.nextInt(maxInt));
 			entry.position(0);
 			entries.add(entry.array());				
-			bkc.addEntry(lh, entry.array());
+			lh.addEntry( entry.array());
 			
-			bkc.closeLedger(lh);
+			lh.close();
 			lh = bkc.openLedger(ledgerId, ledgerPassword);
 			LOG.debug("Number of entries written: " + lh.getLast());
 			assertTrue("Verifying number of entries written", lh.getLast() == (numEntriesToWrite + 1));		
 			
-			ls = bkc.readEntries(lh, 0, numEntriesToWrite - 1);
+			ls = lh.readEntries(0, numEntriesToWrite - 1);
 			int i = 0;
 			while(ls.hasMoreElements()){
 				ByteBuffer result = ByteBuffer.wrap(ls.nextElement().getEntry());
@@ -347,7 +404,7 @@
 				
 				assertTrue("Checking if entry " + i + " has zero bytes", result.capacity() == 0);
 			}
-			bkc.closeLedger(lh);
+			lh.close();
 		} catch (KeeperException e) {
 			e.printStackTrace();
 		} catch (BKException e) {
@@ -373,13 +430,13 @@
             //bkc.initMessageDigest("SHA1");
             LOG.info("Ledger ID 1: " + lh.getId() + ", Ledger ID 2: " + lh2.getId());
             for(int i = 0; i < numEntriesToWrite; i++){             
-                bkc.addEntry(lh, new byte[0]);
-                bkc.addEntry(lh2, new byte[0]);
+                lh.addEntry( new byte[0]);
+                lh2.addEntry(new byte[0]);
             }
             
-            bkc.closeLedger(lh);
-            bkc.closeLedger(lh2);
-            
+            lh.close();
+            lh2.close();
+                
             lh = bkc.openLedger(ledgerId, ledgerPassword);
             lh2 = bkc.openLedger(ledgerId2, ledgerPassword);
             
@@ -387,7 +444,7 @@
             assertTrue("Verifying number of entries written lh (" + lh.getLast() + ")" , lh.getLast() == numEntriesToWrite);
             assertTrue("Verifying number of entries written lh2 (" + lh2.getLast() + ")", lh2.getLast() == numEntriesToWrite);
             
-            ls = bkc.readEntries(lh, 0, numEntriesToWrite - 1);
+            ls = lh.readEntries(0, numEntriesToWrite - 1);
             int i = 0;
             while(ls.hasMoreElements()){
                 ByteBuffer result = ByteBuffer.wrap(ls.nextElement().getEntry());
@@ -395,9 +452,8 @@
                 
                 assertTrue("Checking if entry " + i + " has zero bytes", result.capacity() == 0);
             }
-            bkc.closeLedger(lh);
-            
-            ls = bkc.readEntries(lh2, 0, numEntriesToWrite - 1);
+            lh.close();
+            ls = lh2.readEntries( 0, numEntriesToWrite - 1);
             i = 0;
             while(ls.hasMoreElements()){
                 ByteBuffer result = ByteBuffer.wrap(ls.nextElement().getEntry());
@@ -406,8 +462,7 @@
                 assertTrue("Checking if entry " + i + " has zero bytes", result.capacity() == 0);
             }
             
-            bkc.closeLedger(lh2);
-            
+            lh2.close();
         } catch (KeeperException e) {
             e.printStackTrace();
         } catch (BKException e) {

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/CloseTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/CloseTest.java?rev=772098&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/CloseTest.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/CloseTest.java Wed May  6 08:49:48 2009
@@ -0,0 +1,301 @@
+package org.apache.bookkeeper.test;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import static org.apache.bookkeeper.util.ClientBase.CONNECTION_TIMEOUT;
+
+import java.lang.InterruptedException;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import junit.framework.TestCase;
+
+import org.junit.*;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.util.ClientBase;
+import org.apache.log4j.Logger;
+
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ServerStats;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.ZooKeeper;
+
+
+/**
+ * This unit test tests closing ledgers sequentially. 
+ * It creates 4 ledgers, then write 1000 entries to each 
+ * ledger and close it.
+ * 
+ */
+
+public class CloseTest 
+extends TestCase 
+implements Watcher {
+    static Logger LOG = Logger.getLogger(LedgerRecoveryTest.class);
+    
+    BookieServer bs1, bs2, bs3;
+    File tmpDir1, tmpDir2, tmpDir3, tmpDirZK;
+    private static final String HOSTPORT = "127.0.0.1:33299";
+    private NIOServerCnxn.Factory serverFactory;
+    
+    private static String BOOKIEADDR1 = "127.0.0.1:33300";
+    private static String BOOKIEADDR2 = "127.0.0.1:33301";
+    private static String BOOKIEADDR3 = "127.0.0.1:33302";
+    
+    private static void recursiveDelete(File dir) {
+        File children[] = dir.listFiles();
+        if (children != null) {
+            for(File child: children) {
+                recursiveDelete(child);
+            }
+        }
+        dir.delete();
+    }
+    
+    protected void setUp() throws Exception {
+        /*
+         * Creates 3 BookieServers
+         */
+        
+        
+        tmpDir1 = File.createTempFile("bookie1", "test");
+        tmpDir1.delete();
+        tmpDir1.mkdir();
+        
+        final int PORT1 = Integer.parseInt(BOOKIEADDR1.split(":")[1]);
+        bs1 = new BookieServer(PORT1, tmpDir1, new File[] { tmpDir1 });
+        bs1.start();
+        
+        tmpDir2 = File.createTempFile("bookie2", "test");
+        tmpDir2.delete();
+        tmpDir2.mkdir();
+        
+        final int PORT2 = Integer.parseInt(BOOKIEADDR2.split(":")[1]);
+        bs2 = new BookieServer(PORT2, tmpDir2, new File[] { tmpDir2 });
+        bs2.start();
+        
+        tmpDir3 = File.createTempFile("bookie3", "test");
+        tmpDir3.delete();
+        tmpDir3.mkdir();
+        
+        final int PORT3 = Integer.parseInt(BOOKIEADDR3.split(":")[1]);
+        bs3 = new BookieServer(PORT3, tmpDir3, new File[] { tmpDir3 });
+        bs3.start();
+        
+        /*
+         * Instantiates a ZooKeeper server. This is a blind copy
+         * of setUp from SessionTest.java.
+         */
+        LOG.info("STARTING " + getName());
+
+        //ServerStats.registerAsConcrete();
+
+        tmpDirZK = ClientBase.createTmpDir();
+
+        ClientBase.setupTestEnv();
+        ZooKeeperServer zs = new ZooKeeperServer(tmpDirZK, tmpDirZK, 3000);
+        
+        final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
+        serverFactory = new NIOServerCnxn.Factory(PORT);
+        serverFactory.startup(zs);
+
+        assertTrue("waiting for server up",
+                   ClientBase.waitForServerUp(HOSTPORT,
+                                              CONNECTION_TIMEOUT));
+        
+        /*
+         * Creating necessary znodes
+         */
+        try{
+            ZooKeeper zk = new ZooKeeper(HOSTPORT, 3000, this);
+            zk.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            zk.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            zk.create("/ledgers/available/" + BOOKIEADDR1, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT );
+            zk.create("/ledgers/available/" + BOOKIEADDR2, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT );
+            zk.create("/ledgers/available/" + BOOKIEADDR3, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT );
+        } catch (KeeperException ke) {
+            LOG.error(ke);
+            fail("Couldn't execute ZooKeeper start procedure");
+        }
+        
+    }
+    
+    /**
+     * Watcher method. 
+     */
+    synchronized public void process(WatchedEvent event) {
+        LOG.info("Process: " + event.getType() + " " + event.getPath());
+    }
+    
+    protected void tearDown() throws Exception {
+        LOG.info("### Tear down ###");
+        bs1.shutdown();
+        recursiveDelete(tmpDir1);
+        
+        bs2.shutdown();
+        recursiveDelete(tmpDir2);
+        
+        bs3.shutdown();
+        recursiveDelete(tmpDir3);
+        
+        serverFactory.shutdown();
+        assertTrue("waiting for server down",
+                   ClientBase.waitForServerDown(HOSTPORT,
+                                                CONNECTION_TIMEOUT));
+
+        //ServerStats.unregister();
+        recursiveDelete(tmpDirZK);
+        LOG.info("FINISHED " + getName());
+    }
+
+    @Test
+    public void testClose(){
+        /*
+         * Instantiate BookKeeper object.
+         */
+        BookKeeper bk = null;
+        try{
+            bk = new BookKeeper(HOSTPORT);
+        } catch (KeeperException ke){
+            LOG.error("Error instantiating BookKeeper", ke);
+            fail("ZooKeeper error");
+        } catch (IOException ioe){
+            LOG.error(ioe);
+            fail("Failure due to IOException");
+        }
+        
+        /*
+         * Create 4 ledgers.
+         */
+        LedgerHandle lh1 = null;
+        LedgerHandle lh2 = null;
+        LedgerHandle lh3 = null;
+        LedgerHandle lh4 = null;
+        
+        try{
+            lh1 = bk.createLedger("".getBytes());
+            lh2 = bk.createLedger("".getBytes());
+            lh3 = bk.createLedger("".getBytes());
+            lh4 = bk.createLedger("".getBytes());
+        } catch (KeeperException ke){
+            LOG.error("Error creating a ledger", ke);
+            fail("ZooKeeper error");            
+        } catch (BKException bke){
+            LOG.error("BookKeeper error");
+            fail("BookKeeper error");
+        } catch (InterruptedException ie) {
+            LOG.error(ie);
+            fail("Failure due to interrupted exception");
+        } catch (IOException ioe) {
+            LOG.error(ioe);
+            fail("Failure due to IO exception");
+        }
+        
+        /*
+         * Write a 1000 entries to lh1.
+         */
+        try{
+            String tmp = "BookKeeper is cool!";
+            for(int i = 0; i < 1000; i++){
+                lh1.addEntry(tmp.getBytes());
+            }
+        } catch(InterruptedException e){
+            LOG.error("Interrupted when adding entry", e);
+            fail("Couldn't finish adding entries");
+        }
+        
+        try{
+            lh1.close();
+        } catch(Exception e) {
+            LOG.error(e);
+            fail("Exception while closing ledger 1");
+        }
+        /*
+         * Write a 1000 entries to lh2.
+         */
+        try{
+            String tmp = "BookKeeper is cool!";
+            for(int i = 0; i < 1000; i++){
+                lh2.addEntry(tmp.getBytes());
+            }
+        } catch(InterruptedException e){
+            LOG.error("Interrupted when adding entry", e);
+            fail("Couldn't finish adding entries");
+        }
+        
+        try{
+            lh2.close();
+        } catch(Exception e){
+            LOG.error(e);
+            fail("Exception while closing ledger 2");
+        }
+        
+        /*
+         * Write a 1000 entries to lh3 and lh4.
+         */
+        try{
+            String tmp = "BookKeeper is cool!";
+            for(int i = 0; i < 1000; i++){
+                lh3.addEntry(tmp.getBytes());
+                lh4.addEntry(tmp.getBytes());
+            }
+        } catch(InterruptedException e){
+            LOG.error("Interrupted when adding entry", e);
+            fail("Couldn't finish adding entries");
+        }
+        
+        try{
+            lh3.close();
+            lh4.close();
+        } catch(Exception e){
+            LOG.error(e);
+            fail("Exception while closing ledger 4");
+        }
+        /*
+            LedgerHandle afterlh = bk.openLedger(beforelh.getId(), "".getBytes());
+            
+        } catch (KeeperException e) {
+            LOG.error("Error when opening ledger", e);
+            fail("Couldn't open ledger");
+        } /* catch (InterruptedException ie) {
+            LOG.error("Interrupted exception", ie);
+            fail("Failure due to interrupted exception");
+        } catch (IOException ioe) {
+            LOG.error("IO Exception", ioe);
+            fail("Failure due to IO exception");
+        } catch (BKException bke){
+            LOG.error("BookKeeper error", bke);
+            fail("BookKeeper error");
+        }*/
+    }      
+}
+    
+    
\ No newline at end of file

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LedgerRecoveryTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LedgerRecoveryTest.java?rev=772098&r1=772097&r2=772098&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LedgerRecoveryTest.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LedgerRecoveryTest.java Wed May  6 08:49:48 2009
@@ -217,7 +217,7 @@
         try{
             String tmp = "BookKeeper is cool!";
             for(int i = 0; i < 1000; i++){
-                bk.addEntry(beforelh, tmp.getBytes());
+                beforelh.addEntry(tmp.getBytes());
             }
         } catch(InterruptedException e){
             LOG.error("Interrupted when adding entry", e);
@@ -302,7 +302,7 @@
         try{
             String tmp = "BookKeeper is cool!";
             for(int i = 0; i < 1; i++){
-                bk.addEntry(beforelh, tmp.getBytes());
+                beforelh.addEntry(tmp.getBytes());
             }
         } catch(InterruptedException e){
             LOG.error("Interrupted when adding entry", e);



Mime
View raw message