hadoop-zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maha...@apache.org
Subject svn commit: r903483 [3/6] - in /hadoop/zookeeper/trunk: ./ src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/ src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ src/cont...
Date Tue, 26 Jan 2010 23:16:49 GMT
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=903483&r1=903482&r2=903483&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java Tue Jan 26 23:16:45 2010
@@ -1,4 +1,5 @@
 package org.apache.bookkeeper.client;
+
 /*
  * 
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,825 +21,428 @@
  * 
  */
 
-
-import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.ConnectException;
-import java.nio.ByteBuffer;
-import java.security.NoSuchAlgorithmException;
-import java.security.MessageDigest;
+import java.security.GeneralSecurityException;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.TreeMap;
-
-import org.apache.bookkeeper.client.BKDefs;
+import java.util.Enumeration;
+import java.util.Queue;
 import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookieHandle;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
-import org.apache.bookkeeper.client.BKException.Code;
-import org.apache.bookkeeper.client.LedgerManagementProcessor.CloseLedgerOp;
-import org.apache.bookkeeper.client.QuorumEngine.Operation;
-import org.apache.bookkeeper.client.QuorumEngine.Operation.AddOp;
-import org.apache.bookkeeper.client.QuorumEngine.Operation.ReadOp;
-import org.apache.bookkeeper.client.QuorumEngine.Operation.StopOp;
+import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.util.SafeRunnable;
+import org.apache.bookkeeper.util.StringUtils;
+
 import org.apache.log4j.Logger;
-import org.apache.zookeeper.CreateMode;
+
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
+import org.apache.zookeeper.data.Stat;
+import org.jboss.netty.buffer.ChannelBuffer;
 
 /**
- * Ledger handle on the client side. Contains ledger metadata
- * used to access it. This api exposes the read and write 
- * to a ledger and also exposes a streaming api for the ledger.
+ * Ledger handle contains ledger metadata and is used to access the read and
+ * write operations to a ledger.
  */
-public class LedgerHandle implements ReadCallback, AddCallback {
-    /**
-     * the call stack looks like --
-     * ledgerhandle->write->bookeeper->quorumengine->bookiehandle
-     * ->bookieclient
-     */
-   static Logger LOG = Logger.getLogger(LedgerHandle.class);
-    
-    public enum QMode {VERIFIABLE, GENERIC, FREEFORM};
-    
-    
-    private long ledger;
-    private volatile long last;
-    private volatile long lastAddConfirmed = 0;
-    private HashMap<Integer, Long> lastRecvCorrectly;
-    private volatile ArrayList<BookieHandle> bookies;
-    private ArrayList<InetSocketAddress> bookieAddrList;
-    private TreeMap<Long, ArrayList<BookieHandle> > bookieConfigMap;
-    private long[] entryChange;
-    private BookKeeper bk;
-    private QuorumEngine qe;
-    private int qSize;
-    private QMode qMode = QMode.VERIFIABLE;
-    private int lMode;
-
-    private int threshold;
-    private String digestAlg = "SHA1";
-    
-    private byte[] macKey;
-    private byte[] ledgerKey;
-    private byte[] passwd;
-    
-    /**
-     * @param bk the bookkeeper handle
-     * @param ledger the id for this ledger
-     * @param last the last id written 
-     * @param passwd the passwd to encode
-     * the entries
-     * @throws InterruptedException
-     */
-    LedgerHandle(BookKeeper bk, 
-            long ledger, 
-            long last,
-            byte[] passwd) throws InterruptedException {
-        this.bk = bk;
-        this.ledger = ledger;
-        this.last = last;
-        this.bookies = new ArrayList<BookieHandle>();
-        this.lastRecvCorrectly = new HashMap<Integer, Long>();
-        this.passwd = passwd;
-        genLedgerKey(passwd);
-        genMacKey(passwd);
-        this.qSize = (bookies.size() + 1)/2;
-        this.qe = new QuorumEngine(this);
-    }
-    
-    /**
-     * @param bk the bookkeeper handle
-     * @param ledger the id for this ledger
-     * @param last the last entree written
-     * @param qSize the queuing size 
-     * for this ledger
-     * @param mode the quueuing mode
-     * for this ledger
-     * @param passwd the passwd to encode
-     * @throws InterruptedException
-     */
-    LedgerHandle(BookKeeper bk, 
-            long ledger, 
-            long last,
-            int qSize, 
-            QMode mode,
-            byte[] passwd) throws InterruptedException {
-        this.bk = bk;
-        this.ledger = ledger;
-        this.last = last;
-        this.bookies = new ArrayList<BookieHandle>();
-        this.lastRecvCorrectly = new HashMap<Integer, Long>();
-
-
-        this.qSize = qSize;
-        this.qMode = mode;
-        this.passwd = passwd;
-        genLedgerKey(passwd);
-        genMacKey(passwd);
-        this.qe = new QuorumEngine(this);
-    }
-        
-    /**
-     * 
-     * @param bk the bookkeeper handle
-     * @param ledger the id for this ledger
-     * @param last the last entree written
-     * @param qSize the queuing size 
-     * for this ledger
-     * @param passwd the passwd to encode
-     * @throws InterruptedException
-     */
-    LedgerHandle(BookKeeper bk, 
-            long ledger, 
-            long last,
-            int qSize,
-            byte[] passwd) throws InterruptedException {
-        this.bk = bk;
-        this.ledger = ledger;
-        this.last = last;
-        this.bookies = new ArrayList<BookieHandle>();
-        this.lastRecvCorrectly = new HashMap<Integer, Long>();
-
-
-        this.qSize = qSize;
-        this.passwd = passwd;
-        genLedgerKey(passwd);
-        genMacKey(passwd);
-        this.qe = new QuorumEngine(this);
-    }
-    
-    private void setBookies(ArrayList<InetSocketAddress> bookies)
-    throws InterruptedException {
-    	try{
-    		for(InetSocketAddress a : bookies){
-    			LOG.debug("Opening bookieHandle: " + a);
-            
-    			//BookieHandle bh = new BookieHandle(this, a);
-    			this.bookies.add(bk.getBookieHandle(this, a));
-    		}
-    	} catch(ConnectException e){
-    		LOG.error(e);
-    		InetSocketAddress addr = bk.getNewBookie(bookies);
-    		if(addr != null){
-    		    bookies.add(addr);
-    		}
-    	} catch(IOException e) {
-    		LOG.error(e);
-    	}
-    }
-    
-    /**
-     * set the quorum engine
-     * @param qe the quorum engine
-     */
-    void setQuorumEngine(QuorumEngine qe) {
-        this.qe = qe;
-    }
-    
-    /** get the quorum engine
-     * @return return the quorum engine
-     */
-    QuorumEngine getQuorumEngine() {
-        return this.qe;
-    }
-    
-    /**
-     * Create bookie handle and add it to the list
-     * 
-     * @param addr	socket address
-     */
-    int addBookieForWriting(InetSocketAddress addr)
-    throws IOException {
-        LOG.debug("Bookie address: " + addr);
-        lMode = BKDefs.WRITE;
-        //BookieHandle bh = new BookieHandle(this, addr);
-        this.bookies.add(bk.getBookieHandle(this, addr));
-        if(bookies.size() > qSize) setThreshold();
-        return (this.bookies.size() - 1);
-    }
-    
-    /**
-     * Create bookie handle and add it to the list
-     * 
-     * @param addr  socket address
-     */
-    int addBookieForReading(InetSocketAddress addr)
-    throws IOException {
-        LOG.debug("Bookie address: " + addr);
-        lMode = BKDefs.READ;
-        //BookieHandle bh = new BookieHandle(this, addr);
-        try{
-            this.bookies.add(bk.getBookieHandle(this, addr));
-        } catch (IOException e){
-            LOG.info("Inserting a decoy bookie handle");
-            this.bookies.add(new BookieHandle(addr, false));
-        }
-        if(bookies.size() > qSize) setThreshold();
-        return (this.bookies.size() - 1);
-    }
+public class LedgerHandle implements ReadCallback, AddCallback, CloseCallback {
+  final static Logger LOG = Logger.getLogger(LedgerHandle.class);
 
-    
-    private void setThreshold() {
-        switch(qMode){
-        case GENERIC:
-            threshold = bookies.size() - qSize/2;
-            break;
-        case VERIFIABLE:
-            threshold = bookies.size() - qSize + 1;
-            break;
-        default:
-            threshold = bookies.size();
-        }
-        
-    }
-    
-    public int getThreshold() {
-        return threshold;
-    }
-    
-    
-    /**
-     * Writes to BookKeeper changes to the ensemble.
-     *         
-     * @param addr  Address of faulty bookie
-     * @param entry Last entry written before change of ensemble.
-     */
-    
-    void changeEnsemble(long entry){
-        String path = BKDefs.prefix + 
-        bk.getZKStringId(getId()) +  
-        BKDefs.quorumEvolution + "/" + 
-        String.format("%010d", entry);
-        
-        LOG.info("Report failure: " + String.format("%010d", entry));
-        try{
-            if(bk.getZooKeeper().exists(BKDefs.prefix + 
-                    bk.getZKStringId(getId()) +  
-                    BKDefs.quorumEvolution, false) == null)
-                bk.getZooKeeper().create(BKDefs.prefix + bk.getZKStringId(getId()) + 
-                        BKDefs.quorumEvolution, new byte[0], Ids.OPEN_ACL_UNSAFE, 
-                        CreateMode.PERSISTENT);
-        
-            boolean first = true;
-            String addresses = "";
-            for(BookieHandle bh : bookies){
-                if(first){ 
-                    addresses = bh.addr.toString();
-                    first = false;
-                }
-                else 
-                    addresses = addresses + " " + bh.addr.toString();
+  final byte[] ledgerKey;
+  final LedgerMetadata metadata;
+  final BookKeeper bk;
+  final long ledgerId;
+  long lastAddPushed;
+  long lastAddConfirmed;
+  final DigestManager macManager;
+  final DistributionSchedule distributionSchedule;
+
+  final Queue<PendingAddOp> pendingAddOps = new ArrayDeque<PendingAddOp>();
+
+  LedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
+      DigestType digestType, byte[] password) throws GeneralSecurityException {
+    this.bk = bk;
+    this.metadata = metadata;
+    if (metadata.isClosed()) {
+      lastAddConfirmed = lastAddPushed = metadata.close;
+    } else {
+      lastAddConfirmed = lastAddPushed = -1;
+    }
+
+    this.ledgerId = ledgerId;
+    macManager = DigestManager.instantiate(ledgerId, password, digestType);
+    this.ledgerKey = MacDigestManager.genDigest("ledger", password);
+    distributionSchedule = new RoundRobinDistributionSchedule(
+        metadata.quorumSize, metadata.ensembleSize);
+  }
+
+  /**
+   * Get the id of the current ledger
+   * 
+   * @return
+   */
+  public long getId() {
+    return ledgerId;
+  }
+
+  /**
+   * Get the last confirmed entry id on this ledger
+   * 
+   * @return
+   */
+  public long getLastAddConfirmed() {
+    return lastAddConfirmed;
+  }
+
+  /**
+   * Get the entry id of the last entry that has been enqueued for addition (but
+   * may not have possibly been persited to the ledger)
+   * 
+   * @return
+   */
+  public long getLastAddPushed() {
+    return lastAddPushed;
+  }
+
+  void writeLedgerConfig(StatCallback callback, Object ctx) {
+    bk.getZkHandle().setData(StringUtils.getLedgerNodePath(ledgerId),
+        metadata.serialize(), -1, callback, ctx);
+  }
+
+  /**
+   * Close this ledger synchronously.
+   * 
+   */
+  public void close() throws InterruptedException {
+    SyncCounter counter = new SyncCounter();
+    counter.inc();
+
+    asyncClose(this, counter);
+
+    counter.block(0);
+  }
+
+  /**
+   * Asynchronous close, any adds in flight will return errors
+   * 
+   * @param cb
+   *          callback implementation
+   * @param ctx
+   *          control object
+   * @throws InterruptedException
+   */
+  public void asyncClose(CloseCallback cb, Object ctx) {
+    asyncClose(cb, ctx, BKException.Code.LedgerClosedException);
+  }
+
+  /**
+   * Same as public version of asynClose except that this one takes an
+   * additional parameter which is the return code to hand to all the pending
+   * add ops
+   * 
+   * @param cb
+   * @param ctx
+   * @param rc
+   */
+  private void asyncClose(final CloseCallback cb, final Object ctx, final int rc) {
+
+    bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
+
+      @Override
+      public void safeRun() {
+        // Close operation is idempotent, so no need to check if we are
+        // already closed
+        metadata.close(lastAddConfirmed);
+        errorOutPendingAdds(rc);
+        lastAddPushed = lastAddConfirmed;
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Closing ledger: " + ledgerId + " at entryId: "
+              + metadata.close);
+        }
+
+        writeLedgerConfig(new StatCallback() {
+          @Override
+          public void processResult(int rc, String path, Object subctx,
+              Stat stat) {
+            if (rc != KeeperException.Code.OK.intValue()) {
+              cb.closeComplete(BKException.Code.ZKException, LedgerHandle.this,
+                  ctx);
+            } else {
+              cb.closeComplete(BKException.Code.OK, LedgerHandle.this, ctx);
             }
-            
-            bk.getZooKeeper() .create(path, addresses.getBytes(),
-                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        } catch(Exception e){
-            LOG.error("Could not write to ZooKeeper: " + path + ", " + e);
-        }
-    }
-    
-    /**
-     * Replace bookie in the case of a failure 
-     */
-    void replaceBookie(int index) 
-    throws BKException {
-        InetSocketAddress addr = null;
-        try{
-            addr = bk.getNewBookie(bookieAddrList);
-        } catch(InterruptedException e){
-            LOG.error(e);
-        }
-        
-        if(addr == null){
-            throw BKException.create(Code.NoBookieAvailableException);
-        } else {           
-            try{
-                //BookieHandle bh = new BookieHandle(this, addr);
-                
-                /*
-                 * TODO: Read from current bookies, and write to this one
-                 */
-                
-                /*
-                 * If successful in writing to new bookie, add it to the set
-                 */
-                this.bookies.set(index, bk.getBookieHandle(this, addr));
-            } catch(ConnectException e){
-                bk.blackListBookie(addr);
-                LOG.error(e);
-            } catch(IOException e) {
-                bk.blackListBookie(addr);
-                LOG.error(e);
-            }
-        }
-    }
-    
-    /**
-     * This method is used when BK cannot find a bookie
-     * to replace the current faulty one. In such cases,
-     * we simply remove the bookie.
-     * 
-     * 
-     * @param BookieHandle
-     */
-    synchronized void removeBookie(BookieHandle bh){
-       if(lMode == BKDefs.WRITE){
-           LOG.info("Removing bookie: " + bh.addr);
-           int index = bookies.indexOf(bh);
-           if(index >= 0){
-               Long tmpLastRecv = lastRecvCorrectly.get(index);
-               bookies.remove(index);
-        
-               if(tmpLastRecv == null)
-                   changeEnsemble(0);
-               else
-                   changeEnsemble(tmpLastRecv);
-           }
-       }
-    }
-    
-    
-    /**
-     * Returns the ledger identifier
-     * @return long
-     */
-    public long getId(){
-        return ledger;
-    }
-    
-    /**
-     * Returns the last entry identifier submitted
-     * @return long
-     */
-    public long getLast(){
-        return last;   
-    }
-    
-    /**
-     * Returns the last entry identifier submitted and increments it.
-     * @return long
-     */
-    long incLast(){
-        return last++;
-    }
-    
-    /**
-     * Sets the last entry identifier submitted.
-     * 
-     * @param   last    last entry
-     * @return  long    returns the value just set
-     */
-    long setLast(long last){
-        this.last = last;
-        return this.last;
-    }
-    
-    /**
-     * Sets the value of the last add confirmed. This is used
-     * when adding new entries, since we use this value as a hint
-     * to recover from failures of the client.
-     */
-    void setAddConfirmed(long entryId){
-        if(entryId > lastAddConfirmed)
-            lastAddConfirmed = entryId;
-    }
-    
-    long getAddConfirmed(){
-        return lastAddConfirmed;
-    }
-    
-    void setLastRecvCorrectly(int sId, long entry){
-        //LOG.info("Setting last received correctly: " + entry);
-        lastRecvCorrectly.put(sId, entry);
-    }
-    
-    /**
-     * Returns the list of bookies
-     * @return ArrayList<BookieHandle>
-     */
-    ArrayList<BookieHandle> getBookies(){
-        return bookies;
-    }
-    
-    /**
-     * For reads, there might be multiple operations.
-     * 
-     * @param entry
-     * @return ArrayList<BookieHandle>  returns list of bookies
-     */
-    ArrayList<BookieHandle> getBookies(long entry){
-        return getConfig(entry);
-    }
-    
-    /**
-     * Returns the bookie handle corresponding to the addresses in the input.
-     * 
-     * @param addr
-     * @return
-     */
-    BookieHandle getBookieHandleDup(InetSocketAddress addr){
-        for(BookieHandle bh : bookies){
-            if(bh.addr.equals(addr))
-                return bh;
-        }
-        
-        return null;
-    }
-    
-    /**
-     * Sets a new bookie configuration corresponding to a failure during
-     * writes to the ledger. We have one configuration for every failure.
-     * 
-     * @param entry
-     * @param list
-     */
-    
-    void setNewBookieConfig(long entry, ArrayList<BookieHandle> list){
-        if(bookieConfigMap == null)
-            bookieConfigMap = new TreeMap<Long, ArrayList<BookieHandle> >();
-        
-        /*
-         * If initial config is not in the list, we include it.
-         */
-        if(!bookieConfigMap.containsKey(new Long(0))){
-            bookieConfigMap.put(new Long(0), bookies);
-        }
-        
-        LOG.info("Adding new entry: " + entry + ", " + bookies.size() + ", " + list.size());
-        bookieConfigMap.put(entry, list);
-    }
-    
-    /**
-     * Once we read all changes to the bookie configuration, we
-     * have to call this method to generate an array that we use
-     * to determine the bookie configuration for an entry.
-     * 
-     * Note that this array is a performance optimization and 
-     * it is not necessary for correctness. We could just use 
-     * bookieConfigMap but it would be slower.
-     */
-    
-    void prepareEntryChange(){
-        entryChange = new long[bookieConfigMap.size()];
-    
-        int counter = 0;
-        for(Long l : bookieConfigMap.keySet()){
-            entryChange[counter++] = l;
-        }
-    }
-    
-    /**
-     * Return the quorum size. By default, the size of a quorum is (n+1)/2, 
-     * where n is the size of the set of bookies.
-     * @return int
-     */
-    int getQuorumSize(){
-        return qSize;   
-    }
-    
-    
-    /**
-     *  Returns the config corresponding to the entry
-     *  
-     * @param entry
-     * @return
-     */
-    private ArrayList<BookieHandle> getConfig(long entry){
-        if(bookieConfigMap == null)
-            return bookies;
-        
-        int index = Arrays.binarySearch(entryChange, entry);
-        
-        /*
-         * If not on the map, binarySearch returns a negative value
-         */
-        int before = index;
-        index = index >= 0? index : ((-1) - index);
-
-        if(index == 0){
-            if((entry % 10) == 0){
-                LOG.info("Index: " + index + ", " + before + ", " + entry + ", " + bookieConfigMap.get(entryChange[index]).size());
+          }
+        }, null);
+
+      }
+    });
+  }
+
+  /**
+   * Read a sequence of entries synchronously.
+   * 
+   * @param firstEntry
+   *          id of first entry of sequence (included)
+   * @param lastEntry
+   *          id of last entry of sequence (included)
+   * 
+   */
+  public Enumeration<LedgerEntry> readEntries(long firstEntry, long lastEntry)
+      throws InterruptedException, BKException {
+    SyncCounter counter = new SyncCounter();
+    counter.inc();
+
+    asyncReadEntries(firstEntry, lastEntry, this, counter);
+
+    counter.block(0);
+    if (counter.getrc() != BKException.Code.OK) {
+      throw BKException.create(counter.getrc());
+    }
+
+    return counter.getSequence();
+  }
+
+  /**
+   * Read a sequence of entries asynchronously.
+   * 
+   * @param firstEntry
+   *          id of first entry of sequence
+   * @param lastEntry
+   *          id of last entry of sequence
+   * @param cb
+   *          object implementing read callback interface
+   * @param ctx
+   *          control object
+   */
+  public void asyncReadEntries(long firstEntry, long lastEntry,
+      ReadCallback cb, Object ctx) {
+    // Little sanity check
+    if (firstEntry < 0 || lastEntry > lastAddConfirmed
+        || firstEntry > lastEntry) {
+      cb.readComplete(BKException.Code.ReadException, this, null, ctx);
+      return;
+    }
+
+    new PendingReadOp(this, firstEntry, lastEntry, cb, ctx).initiate();
+
+  }
+
+  /**
+   * Add entry synchronously to an open ledger.
+   * 
+   * @param data
+   *         array of bytes to be written to the ledger
+   */
+
+  public long addEntry(byte[] data) throws InterruptedException, BKException {
+    LOG.debug("Adding entry " + data);
+    SyncCounter counter = new SyncCounter();
+    counter.inc();
+
+    asyncAddEntry(data, this, counter);
+    counter.block(0);
+
+    return counter.getrc();
+  }
+
+  /**
+   * Add entry asynchronously to an open ledger.
+   * 
+   * @param data
+   *          array of bytes to be written
+   * @param cb
+   *          object implementing callbackinterface
+   * @param ctx
+   *          some control object
+   */
+  public void asyncAddEntry(final byte[] data, final AddCallback cb,
+      final Object ctx) {
+    bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
+      @Override
+      public void safeRun() {
+        if (metadata.isClosed()) {
+          LOG.warn("Attempt to add to closed ledger: " + ledgerId);
+          cb.addComplete(BKException.Code.LedgerClosedException,
+              LedgerHandle.this, -1, ctx);
+          return;
+        }
+
+        long entryId = ++lastAddPushed;
+        PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx, entryId);
+        pendingAddOps.add(op);
+        ChannelBuffer toSend = macManager.computeDigestAndPackageForSending(
+            entryId, lastAddConfirmed, data);
+        op.initiate(toSend);
+
+      }
+    });
+  }
+
+  // close the ledger and send fails to all the adds in the pipeline
+  void handleUnrecoverableErrorDuringAdd(int rc) {
+    asyncClose(NoopCloseCallback.instance, null, rc);
+  }
+
+  void errorOutPendingAdds(int rc) {
+    PendingAddOp pendingAddOp;
+    while ((pendingAddOp = pendingAddOps.poll()) != null) {
+      pendingAddOp.submitCallback(rc);
+    }
+  }
+
+  void sendAddSuccessCallbacks() {
+    // Start from the head of the queue and proceed while there are
+    // entries that have had all their responses come back
+    PendingAddOp pendingAddOp;
+    while ((pendingAddOp = pendingAddOps.peek()) != null) {
+      if (pendingAddOp.numResponsesPending != 0) {
+        return;
+      }
+      pendingAddOps.remove();
+      lastAddConfirmed = pendingAddOp.entryId;
+      pendingAddOp.submitCallback(BKException.Code.OK);
+    }
+
+  }
+
+  void handleBookieFailure(InetSocketAddress addr, final int bookieIndex) {
+    InetSocketAddress newBookie;
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Handling failure of bookie: " + addr + " index: "
+          + bookieIndex);
+    }
+
+    try {
+      newBookie = bk.bookieWatcher
+          .getAdditionalBookie(metadata.currentEnsemble);
+    } catch (BKNotEnoughBookiesException e) {
+      LOG
+          .error("Could not get additional bookie to remake ensemble, closing ledger: "
+              + ledgerId);
+      handleUnrecoverableErrorDuringAdd(e.getCode());
+      return;
+    }
+
+    final ArrayList<InetSocketAddress> newEnsemble = new ArrayList<InetSocketAddress>(
+        metadata.currentEnsemble);
+    newEnsemble.set(bookieIndex, newBookie);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Changing ensemble from: " + metadata.currentEnsemble + " to: "
+          + newEnsemble + " for ledger: " + ledgerId + " starting at entry: "
+          + (lastAddConfirmed + 1));
+    }
+
+    metadata.addEnsemble(lastAddConfirmed + 1, newEnsemble);
+
+    writeLedgerConfig(new StatCallback() {
+      @Override
+      public void processResult(final int rc, String path, Object ctx, Stat stat) {
+
+        bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
+          @Override
+          public void safeRun() {
+            if (rc != KeeperException.Code.OK.intValue()) {
+              LOG
+                  .error("Could not persist ledger metadata while changing ensemble to: "
+                      + newEnsemble + " , closing ledger");
+              handleUnrecoverableErrorDuringAdd(BKException.Code.ZKException);
+              return;
             }
-            return bookieConfigMap.get(entryChange[index]); 
-        } else{
-            //LOG.warn("IndexDiff " + entry);
-            return bookieConfigMap.get(entryChange[index - 1]);
-        }
-    }
-    
-    /**
-     * Returns the quorum mode for this ledger: Verifiable or Generic
-     */
-    QMode getQMode(){
-        return qMode;   
-    }
-    
-    /**
-     * Sets message digest algorithm.
-     */
-    
-    void setDigestAlg(String alg){
-        this.digestAlg = alg;
-    }
-    
-    /**
-     * Get message digest algorithm.
-     */
-    
-    String getDigestAlg(){
-        return digestAlg;
-    }
-    
-    /**
-     * Generates and stores Ledger key.
-     * 
-     * @param passwd
-     */
-    
-    private void genLedgerKey(byte[] passwd){
-        try{
-            MessageDigest digest = MessageDigest.getInstance("SHA");
-            String pad = "ledger";
-            
-            byte[] toProcess = new byte[passwd.length + pad.length()];
-            System.arraycopy(pad.getBytes(), 0, toProcess, 0, pad.length());
-            System.arraycopy(passwd, 0, toProcess, pad.length(), passwd.length);
-        
-            digest.update(toProcess);
-            this.ledgerKey = digest.digest();
-        } catch(NoSuchAlgorithmException e){
-            this.passwd = passwd;
-            LOG.error("Storing password as plain text because secure hash implementation does not exist");
-        }
-    }
-    
-    /**
-     * Generates and stores Mac key.
-     * 
-     * @param passwd
-     */
-    
-    private void genMacKey(byte[] passwd){
-        try{
-            MessageDigest digest = MessageDigest.getInstance("SHA");
-            String pad = "mac";
-            
-            byte[] toProcess = new byte[passwd.length + pad.length()];
-            System.arraycopy(pad.getBytes(), 0, toProcess, 0, pad.length());
-            System.arraycopy(passwd, 0, toProcess, pad.length(), passwd.length);
-        
-            digest.update(toProcess);
-            this.macKey = digest.digest();
-        } catch(NoSuchAlgorithmException e){
-            this.passwd = passwd;
-            LOG.error("Storing password as plain text because secure hash implementation does not exist");
-        }
-    }
-    
-    /**
-     * Returns password in plain text
-     */
-    byte[] getPasswd(){
-    	return passwd;
-    }
-    
-    
-    /**
-     * Returns MAC key
-     * 
-     * @return byte[]
-     */
-    byte[] getMacKey(){
-       return macKey; 
-    }
-   
-    /**
-     * Returns Ledger key
-     * 
-     * @return byte[]
-     */
-    byte[] getLedgerKey(){
-       return ledgerKey; 
-    }
-    
-    void closeUp(){
-        ledger = -1;
-        last = -1;
-        bk.haltBookieHandles(this, bookies);
-    }
-    
-    /**
-     * Close ledger.
-     * 
-     */
-    public void close() 
-    throws KeeperException, InterruptedException, BKException {
-        //Set data on zookeeper
-        ByteBuffer last = ByteBuffer.allocate(8);
-        last.putLong(lastAddConfirmed);
-        LOG.info("Last saved on ZK is: " + lastAddConfirmed);
-        String closePath = BKDefs.prefix + bk.getZKStringId(getId()) + BKDefs.close; 
-        if(bk.getZooKeeper().exists(closePath, false) == null){
-           bk.getZooKeeper().create(closePath, 
-                   last.array(), 
-                   Ids.OPEN_ACL_UNSAFE, 
-                   CreateMode.PERSISTENT); 
-        } 
-        
-        closeUp();
-        StopOp sOp = new StopOp();
-        qe.sendOp(sOp);
-        LOG.info("##### CB worker queue size: " + qe.cbWorker.pendingOps.size());
-    }
-    
-    /**
-     * Asynchronous close
-     *
-     * @param cb    callback implementation
-     * @param ctx   control object
-     * @throws InterruptedException
-     */
-    public void asyncClose(CloseCallback cb, Object ctx)
-    throws InterruptedException {
-        CloseLedgerOp op = new CloseLedgerOp(this, cb, ctx);
-        LedgerManagementProcessor lmp = bk.getMngProcessor();
-        lmp.addOp(op);  
-    }
-       
-    /**
-     * Read a sequence of entries asynchronously.
-     * 
-     * @param firstEntry    id of first entry of sequence
-     * @param lastEntry     id of last entry of sequence
-     * @param cb    object implementing read callback interface
-     * @param ctx   control object 
-     */
-    public void asyncReadEntries(long firstEntry, 
-            long lastEntry, ReadCallback cb, Object ctx)
-    throws BKException, InterruptedException {
-        // Little sanity check
-        if((firstEntry > getLast()) || (firstEntry > lastEntry)) 
-            throw BKException.create(Code.ReadException);
-        
-        Operation r = new ReadOp(this, firstEntry, lastEntry, cb, ctx);
-        qe.sendOp(r); 
-        //qeMap.get(lh.getId()).put(r);
-    }
-    
-    
-    /**
-     * Read a sequence of entries synchronously.
-     * 
-     * @param firstEntry    id of first entry of sequence
-     * @param lastEntry     id of last entry of sequence
-     *
-     */
-    public LedgerSequence readEntries(long firstEntry, long lastEntry) 
-    throws InterruptedException, BKException {
-        // Little sanity check
-        if((firstEntry > getLast()) || (firstEntry > lastEntry))
-            throw BKException.create(Code.ReadException);
-        
-        RetCounter counter = new RetCounter();
-        counter.inc();
-     
-        Operation r = new ReadOp(this, firstEntry, lastEntry, this, counter);
-        qe.sendOp(r);
-        
-        LOG.debug("Going to wait for read entries: " + counter.i);
-        counter.block(0);
-        LOG.debug("Done with waiting: " + counter.i + ", " + firstEntry);
-        
-        if(counter.getSequence() == null){
-            LOG.error("Failed to read entries: " + firstEntry + ", " + lastEntry);
-            throw BKException.create(Code.ReadException);
-        }
-        return counter.getSequence();
-    }
-   
-    /**
-     * Add entry asynchronously to an open ledger.
-     * 
-     * @param data  array of bytes to be written
-     * @param cb    object implementing callbackinterface
-     * @param ctx   some control object
-     */
-    public void asyncAddEntry(byte[] data, AddCallback cb, Object ctx)
-    throws InterruptedException, BKException {
-        AddOp r = new AddOp(this, data, cb, ctx);
-        qe.sendOp(r);
-    }
-    
-    
-    /**
-     * Add entry synchronously to an open ledger.
-     * 
-     * @param   data byte[]
-     */
-    
-    public long addEntry(byte[] data)
-    throws InterruptedException, BKException{
-        LOG.debug("Adding entry " + data);
-        RetCounter counter = new RetCounter();
-        counter.inc();
-        
-        Operation r = new AddOp(this, data, this, counter);
-        qe.sendOp(r);   
-        //qeMap.get(lh.getId()).put(r);
-        counter.block(0);
-        return counter.getrc();
-    }
-    
-    
-    /**
-     * Implementation of callback interface for synchronous read method.
-     * 
-     * @param rc    return code
-     * @param leder ledger identifier
-     * @param seq   sequence of entries
-     * @param ctx   control object
-     */
-    public void readComplete(int rc, 
-            LedgerHandle lh,
-            LedgerSequence seq,  
-            Object ctx){        
-        
-        RetCounter counter = (RetCounter) ctx;
-        counter.setSequence(seq);
-        LOG.debug("Read complete: " + seq.size() + ", " + counter.i);
-        counter.dec();
-    }
-    
-    /**
-     * Implementation of callback interface for synchronous read method.
-     * 
-     * @param rc    return code
-     * @param leder ledger identifier
-     * @param entry entry identifier
-     * @param ctx   control object
-     */
-    public void addComplete(int rc, 
-            LedgerHandle lh,
-            long entry, 
-            Object ctx){          
-        RetCounter counter = (RetCounter) ctx;
-        
-        counter.setrc(rc);
-        counter.dec();
-    }
-    
-    
-    
-    /**
-     * Implements objects to help with the synchronization of asynchronous calls
-     * 
-     */
-    
-    private static class RetCounter {
-        int i;
-        int rc;
-        int total;
-        LedgerSequence seq = null;
-        
-        synchronized void inc() {
-            i++;
-            total++;
-        }
-        synchronized void dec() {
-            i--;
-            notifyAll();
-        }
-        synchronized void block(int limit) throws InterruptedException {
-            while(i > limit) {
-                int prev = i;
-                wait(15000);
-                if(i == prev){
-                    break;
-                }
+
+            for (PendingAddOp pendingAddOp : pendingAddOps) {
+              pendingAddOp.unsetSuccessAndSendWriteRequest(bookieIndex);
             }
-        }
-        synchronized int total() {
-            return total;
-        }
-        
-        void setrc(int rc){
-            this.rc = rc;
-        }
-        
-        int getrc(){
-            return rc;
-        }
-        
-        void setSequence(LedgerSequence seq){
-            this.seq = seq;
-        }
-        
-        LedgerSequence getSequence(){
-            return seq;
-        }
+          }
+        });
+
+      }
+    }, null);
+
+  }
+
+  void recover(GenericCallback<Void> cb) {
+    if (metadata.isClosed()) {
+      // We are already closed, nothing to do
+      cb.operationComplete(BKException.Code.OK, null);
+      return;
+    }
+
+    new LedgerRecoveryOp(this, cb).initiate();
+  }
+
+  static class NoopCloseCallback implements CloseCallback {
+    static NoopCloseCallback instance = new NoopCloseCallback();
+
+    @Override
+    public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
+      // noop
+    }
+  }
+
+  /**
+   * Implementation of callback interface for synchronous read method.
+   * 
+   * @param rc
+   *          return code
+   * @param leder
+   *          ledger identifier
+   * @param seq
+   *          sequence of entries
+   * @param ctx
+   *          control object
+   */
+  public void readComplete(int rc, LedgerHandle lh,
+      Enumeration<LedgerEntry> seq, Object ctx) {
+
+    SyncCounter counter = (SyncCounter) ctx;
+    synchronized (counter) {
+      counter.setSequence(seq);
+      counter.setrc(rc);
+      counter.dec();
+      counter.notify();
+    }
+  }
+
+  /**
+   * Implementation of callback interface for synchronous read method.
+   * 
+   * @param rc
+   *          return code
+   * @param leder
+   *          ledger identifier
+   * @param entry
+   *          entry identifier
+   * @param ctx
+   *          control object
+   */
+  public void addComplete(int rc, LedgerHandle lh, long entry, Object ctx) {
+    SyncCounter counter = (SyncCounter) ctx;
+
+    counter.setrc(rc);
+    counter.dec();
+  }
+
+  /**
+   * Close callback method
+   * 
+   * @param rc
+   * @param lh
+   * @param ctx
+   */
+  public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
+
+    SyncCounter counter = (SyncCounter) ctx;
+    counter.setrc(rc);
+    synchronized (counter) {
+      counter.dec();
+      counter.notify();
     }
+
+  }
 }

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerMetadata.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerMetadata.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerMetadata.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerMetadata.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,179 @@
+package org.apache.bookkeeper.client;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.bookkeeper.util.StringUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * This class encapsulates all the ledger metadata that is persistently stored
+ * in zookeeper. It provides parsing and serialization methods of such metadata.
+ * 
+ */
+class LedgerMetadata {
+    static final Logger LOG = Logger.getLogger(LedgerMetadata.class);
+
+    private static final String closed = "CLOSED";
+    private static final String lSplitter = "\n";
+    private static final String tSplitter = "\t";
+
+    // can't use -1 for NOTCLOSED because that is reserved for a closed, empty
+    // ledger
+    public static final int NOTCLOSED = -101;
+    int ensembleSize;
+    int quorumSize;
+    long close;
+    private SortedMap<Long, ArrayList<InetSocketAddress>> ensembles = new TreeMap<Long, ArrayList<InetSocketAddress>>();
+    ArrayList<InetSocketAddress> currentEnsemble;
+
+    public LedgerMetadata(int ensembleSize, int quorumSize) {
+        this.ensembleSize = ensembleSize;
+        this.quorumSize = quorumSize;
+        this.close = NOTCLOSED;
+    };
+
+    private LedgerMetadata() {
+        this(0, 0);
+    }
+
+    boolean isClosed() {
+        return close != NOTCLOSED;
+    }
+
+    void close(long entryId) {
+        close = entryId;
+    }
+
+    void addEnsemble(long startEntryId, ArrayList<InetSocketAddress> ensemble) {
+        assert ensembles.isEmpty() || startEntryId >= ensembles.lastKey();
+
+        ensembles.put(startEntryId, ensemble);
+        currentEnsemble = ensemble;
+    }
+
+    ArrayList<InetSocketAddress> getEnsemble(long entryId) {
+        // the head map cannot be empty, since we insert an ensemble for
+        // entry-id 0, right when we start
+        return ensembles.get(ensembles.headMap(entryId + 1).lastKey());
+    }
+
+    /**
+     * the entry id > the given entry-id at which the next ensemble change takes
+     * place ( -1 if no further ensemble changes)
+     * 
+     * @param entryId
+     * @return
+     */
+    long getNextEnsembleChange(long entryId) {
+        SortedMap<Long, ArrayList<InetSocketAddress>> tailMap = ensembles.tailMap(entryId + 1);
+
+        if (tailMap.isEmpty()) {
+            return -1;
+        } else {
+            return tailMap.firstKey();
+        }
+    }
+
+    /**
+     * Generates a byte array based on a LedgerConfig object received.
+     * 
+     * @param config
+     *            LedgerConfig object
+     * @return byte[]
+     */
+    public byte[] serialize() {
+        StringBuilder s = new StringBuilder();
+        s.append(quorumSize).append(lSplitter).append(ensembleSize);
+
+        for (Map.Entry<Long, ArrayList<InetSocketAddress>> entry : ensembles.entrySet()) {
+            s.append(lSplitter).append(entry.getKey());
+            for (InetSocketAddress addr : entry.getValue()) {
+                s.append(tSplitter);
+                StringUtils.addrToString(s, addr);
+            }
+        }
+
+        if (close != NOTCLOSED) {
+            s.append(lSplitter).append(close).append(tSplitter).append(closed);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Serialized config: " + s.toString());
+        }
+
+        return s.toString().getBytes();
+    }
+
+    /**
+     * Parses a given byte array and transforms into a LedgerConfig object
+     * 
+     * @param array
+     *            byte array to parse
+     * @return LedgerConfig
+     * @throws IOException
+     *             if the given byte[] cannot be parsed
+     */
+
+    static LedgerMetadata parseConfig(byte[] bytes) throws IOException {
+
+        LedgerMetadata lc = new LedgerMetadata();
+        String config = new String(bytes);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Parsing Config: " + config);
+        }
+
+        String lines[] = config.split(lSplitter);
+
+        if (lines.length < 2) {
+            throw new IOException("Quorum size or ensemble size absent from config: " + config);
+        }
+
+        try {
+            lc.quorumSize = new Integer(lines[0]);
+            lc.ensembleSize = new Integer(lines[1]);
+
+            for (int i = 2; i < lines.length; i++) {
+                String parts[] = lines[i].split(tSplitter);
+
+                if (parts[1].equals(closed)) {
+                    lc.close = new Long(parts[0]);
+                    break;
+                }
+
+                ArrayList<InetSocketAddress> addrs = new ArrayList<InetSocketAddress>();
+                for (int j = 1; j < parts.length; j++) {
+                    addrs.add(StringUtils.parseAddr(parts[j]));
+                }
+                lc.addEnsemble(new Long(parts[0]), addrs);
+            }
+        } catch (NumberFormatException e) {
+            throw new IOException(e);
+        }
+        return lc;
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerOpenOp.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerOpenOp.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerOpenOp.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerOpenOp.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,136 @@
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+package org.apache.bookkeeper.client;
+
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.util.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.AsyncCallback.DataCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Encapsulates the ledger open operation
+ * 
+ */
+class LedgerOpenOp implements DataCallback {
+    static final Logger LOG = Logger.getLogger(LedgerOpenOp.class);
+    
+    BookKeeper bk;
+    long ledgerId;
+    OpenCallback cb;
+    Object ctx;
+    LedgerHandle lh;
+    byte[] passwd;
+    DigestType digestType;
+    
+    /**
+     * Constructor.
+     * 
+     * @param bk
+     * @param ledgerId
+     * @param digestType
+     * @param passwd
+     * @param cb
+     * @param ctx
+     */
+    
+    public LedgerOpenOp(BookKeeper bk, long ledgerId, DigestType digestType, byte[] passwd, OpenCallback cb, Object ctx) {
+        this.bk = bk;
+        this.ledgerId = ledgerId;
+        this.passwd = passwd;
+        this.cb = cb;
+        this.ctx = ctx;
+        this.digestType = digestType;
+    }
+
+    /**
+     * Inititates the ledger open operation
+     */
+    public void initiate() {
+        /**
+         * Asynchronously read the ledger metadata node.
+         */
+
+        bk.getZkHandle().getData(StringUtils.getLedgerNodePath(ledgerId), false, this, ctx);
+
+    }
+
+    /**
+     * Implements ZooKeeper data callback.
+     * @see org.apache.zookeeper.AsyncCallback.DataCallback#processResult(int, String, Object, byte[], Stat)
+     */
+    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
+
+        if (rc == KeeperException.Code.NONODE.intValue()) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("No such ledger: " + ledgerId, KeeperException.create(KeeperException.Code.get(rc), path));
+            }
+            cb.openComplete(BKException.Code.NoSuchLedgerExistsException, null, this.ctx);
+            return;
+        }
+        if (rc != KeeperException.Code.OK.intValue()) {
+            LOG.error("Could not read metadata for ledger: " + ledgerId, KeeperException.create(KeeperException.Code
+                    .get(rc), path));
+            cb.openComplete(BKException.Code.ZKException, null, this.ctx);
+            return;
+        }
+
+        LedgerMetadata metadata;
+        try {
+            metadata = LedgerMetadata.parseConfig(data);
+        } catch (IOException e) {
+            LOG.error("Could not parse ledger metadata for ledger: " + ledgerId, e);
+            cb.openComplete(BKException.Code.ZKException, null, this.ctx);
+            return;
+        }
+
+        try {
+            lh = new LedgerHandle(bk, ledgerId, metadata, digestType, passwd);
+        } catch (GeneralSecurityException e) {
+            LOG.error("Security exception while opening ledger: " + ledgerId, e);
+            cb.openComplete(BKException.Code.DigestNotInitializedException, null, this.ctx);
+            return;
+        }
+
+        if (metadata.close != LedgerMetadata.NOTCLOSED) {
+            // Ledger was closed properly
+            cb.openComplete(BKException.Code.OK, lh, this.ctx);
+            return;
+        }
+
+        lh.recover(new GenericCallback<Void>() {
+            @Override
+            public void operationComplete(int rc, Void result) {
+                if (rc != BKException.Code.OK) {
+                    cb.openComplete(BKException.Code.LedgerRecoveryException, null, LedgerOpenOp.this.ctx);
+                } else {
+                    cb.openComplete(BKException.Code.OK, lh, LedgerOpenOp.this.ctx);
+                }
+            }
+        });
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,167 @@
+package org.apache.bookkeeper.client;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.Enumeration;
+
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
+import org.apache.bookkeeper.client.LedgerHandle.NoopCloseCallback;
+import org.apache.bookkeeper.client.DigestManager.RecoveryData;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.log4j.Logger;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ * This class encapsulated the ledger recovery operation. It first does a read
+ * with entry-id of -1 to all bookies. Then starting from the last confirmed
+ * entry (from hints in the ledger entries), it reads forward until it is not
+ * able to find a particular entry. It closes the ledger at that entry.
+ * 
+ */
+class LedgerRecoveryOp implements ReadEntryCallback, ReadCallback, AddCallback {
+    static final Logger LOG = Logger.getLogger(LedgerRecoveryOp.class);
+    LedgerHandle lh;
+    int numResponsesPending;
+    boolean proceedingWithRecovery = false;
+    long maxAddPushed = -1;
+    long maxAddConfirmed = -1;
+
+    GenericCallback<Void> cb;
+
+    public LedgerRecoveryOp(LedgerHandle lh, GenericCallback<Void> cb) {
+        this.cb = cb;
+        this.lh = lh;
+        numResponsesPending = lh.metadata.ensembleSize;
+    }
+
+    public void initiate() {
+        for (int i = 0; i < lh.metadata.currentEnsemble.size(); i++) {
+            lh.bk.bookieClient.readEntry(lh.metadata.currentEnsemble.get(i), lh.ledgerId, -1, this, i);
+        }
+    }
+
+    public synchronized void readEntryComplete(final int rc, final long ledgerId, final long entryId,
+            final ChannelBuffer buffer, final Object ctx) {
+
+        // Already proceeding with recovery, nothing to do
+        if (proceedingWithRecovery) {
+            return;
+        }
+
+        int bookieIndex = (Integer) ctx;
+
+        numResponsesPending--;
+
+        boolean heardValidResponse = false;
+
+        if (rc == BKException.Code.OK) {
+            try {
+                RecoveryData recoveryData = lh.macManager.verifyDigestAndReturnLastConfirmed(buffer);
+                maxAddConfirmed = Math.max(maxAddConfirmed, recoveryData.lastAddConfirmed);
+                maxAddPushed = Math.max(maxAddPushed, recoveryData.entryId);
+                heardValidResponse = true;
+            } catch (BKDigestMatchException e) {
+                // Too bad, this bookie didnt give us a valid answer, we
+                // still
+                // might be able to recover though so continue
+                LOG.error("Mac mismatch while reading last entry from bookie: "
+                        + lh.metadata.currentEnsemble.get(bookieIndex));
+            }
+        }
+
+        if (rc == BKException.Code.NoSuchLedgerExistsException || rc == BKException.Code.NoSuchEntryException) {
+            // this still counts as a valid response, e.g., if the
+            // client
+            // crashed without writing any entry
+            heardValidResponse = true;
+        }
+
+        // other return codes dont count as valid responses
+        if (heardValidResponse && lh.distributionSchedule.canProceedWithRecovery(bookieIndex)) {
+            proceedingWithRecovery = true;
+            lh.lastAddPushed = lh.lastAddConfirmed = maxAddConfirmed;
+            doRecoveryRead();
+            return;
+        }
+
+        if (numResponsesPending == 0) {
+            // Have got all responses back but was still not enough to
+            // start
+            // recovery, just fail the operation
+            LOG.error("While recovering ledger: " + ledgerId + " did not hear success responses from all quorums");
+            cb.operationComplete(BKException.Code.LedgerRecoveryException, null);
+        }
+
+    }
+
+    /**
+     * Try to read past the last confirmed.
+     */
+    private void doRecoveryRead() {
+        lh.lastAddConfirmed++;
+        lh.asyncReadEntries(lh.lastAddConfirmed, lh.lastAddConfirmed, this, null);
+
+    }
+
+    @Override
+    public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
+        // get back to prev value
+        lh.lastAddConfirmed--;
+        if (rc == BKException.Code.OK) {
+            lh.asyncAddEntry(seq.nextElement().getEntry(), this, null);
+            return;
+        }
+
+        if (rc == BKException.Code.NoSuchEntryException || rc == BKException.Code.NoSuchLedgerExistsException) {
+            lh.asyncClose(NoopCloseCallback.instance, null);
+            // we don't need to wait for the close to complete. Since we mark
+            // the
+            // ledger closed in memory, the application wont be able to add to
+            // it
+
+            cb.operationComplete(BKException.Code.OK, null);
+            return;
+        }
+
+        // otherwise, some other error, we can't handle
+        LOG.error("Failure " + BKException.getMessage(rc) + " while reading entry: " + lh.lastAddConfirmed + 1
+                + " ledger: " + lh.ledgerId + " while recovering ledger");
+        cb.operationComplete(rc, null);
+        return;
+    }
+
+    @Override
+    public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+        if (rc != BKException.Code.OK) {
+            // Give up, we can't recover from this error
+
+            LOG.error("Failure " + BKException.getMessage(rc) + " while writing entry: " + lh.lastAddConfirmed + 1
+                    + " ledger: " + lh.ledgerId + " while recovering ledger");
+            cb.operationComplete(rc, null);
+            return;
+        }
+
+        doRecoveryRead();
+
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/MacDigestManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/MacDigestManager.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/MacDigestManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/MacDigestManager.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,67 @@
+package org.apache.bookkeeper.client;
+
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+import java.security.GeneralSecurityException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+
+class MacDigestManager extends DigestManager {
+    public static String DIGEST_ALGORITHM = "SHA-1";
+    public static String KEY_ALGORITHM = "HmacSHA1";
+    Mac mac;
+
+    public MacDigestManager(long ledgerId, byte[] passwd) throws GeneralSecurityException {
+        super(ledgerId);
+        byte[] macKey = genDigest("mac", passwd);
+        SecretKeySpec keySpec = new SecretKeySpec(macKey, KEY_ALGORITHM);
+        mac = Mac.getInstance(KEY_ALGORITHM);
+        mac.init(keySpec);
+        
+        
+    }
+
+    static byte[] genDigest(String pad, byte[] passwd) throws NoSuchAlgorithmException {
+        MessageDigest digest = MessageDigest.getInstance(DIGEST_ALGORITHM);
+        digest.update(pad.getBytes());
+        digest.update(passwd);
+                return digest.digest();
+    }
+
+    @Override
+    int getMacCodeLength() {
+        return 20;
+    }
+
+    
+    @Override
+    byte[] getValueAndReset() {
+        return mac.doFinal();
+    }
+    
+    @Override
+    void update(byte[] data, int offset, int length) {
+        mac.update(data, offset, length);
+    }
+    
+    
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingAddOp.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingAddOp.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingAddOp.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingAddOp.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,137 @@
+package org.apache.bookkeeper.client;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.net.InetSocketAddress;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.log4j.Logger;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ * This represents a pending add operation. When it has got success from all
+ * bookies, it sees if its at the head of the pending adds queue, and if yes,
+ * sends ack back to the application. If a bookie fails, a replacement is made
+ * and placed at the same position in the ensemble. The pending adds are then
+ * rereplicated.
+ * 
+ * 
+ */
+class PendingAddOp implements WriteCallback {
+    final static Logger LOG = Logger.getLogger(PendingAddOp.class);
+
+    ChannelBuffer toSend;
+    AddCallback cb;
+    Object ctx;
+    long entryId;
+    boolean[] successesSoFar;
+    int numResponsesPending;
+    LedgerHandle lh;
+
+    PendingAddOp(LedgerHandle lh, AddCallback cb, Object ctx, long entryId) {
+        this.lh = lh;
+        this.cb = cb;
+        this.ctx = ctx;
+        this.entryId = entryId;
+        successesSoFar = new boolean[lh.metadata.quorumSize];
+        numResponsesPending = successesSoFar.length;
+    }
+
+    void sendWriteRequest(int bookieIndex, int arrayIndex) {
+        lh.bk.bookieClient.addEntry(lh.metadata.currentEnsemble.get(bookieIndex), lh.ledgerId, lh.ledgerKey, entryId, toSend,
+                this, arrayIndex);
+    }
+
+    void unsetSuccessAndSendWriteRequest(int bookieIndex) {
+        if (toSend == null) {
+            // this addOp hasn't yet had its mac computed. When the mac is
+            // computed, its write requests will be sent, so no need to send it
+            // now
+            return;
+        }
+
+        int replicaIndex = lh.distributionSchedule.getReplicaIndex(entryId, bookieIndex);
+        if (replicaIndex < 0) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Leaving unchanged, ledger: " + lh.ledgerId + " entry: " + entryId + " bookie index: "
+                        + bookieIndex);
+            }
+            return;
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Unsetting success for ledger: " + lh.ledgerId + " entry: " + entryId + " bookie index: "
+                    + bookieIndex);
+        }
+
+        // if we had already heard a success from this array index, need to
+        // increment our number of responses that are pending, since we are
+        // going to unset this success
+        if (successesSoFar[replicaIndex]) {
+            successesSoFar[replicaIndex] = false;
+            numResponsesPending++;
+        }
+        
+         sendWriteRequest(bookieIndex, replicaIndex);
+    }
+
+    void initiate(ChannelBuffer toSend) {
+        this.toSend = toSend;
+        for (int i = 0; i < successesSoFar.length; i++) {
+            int bookieIndex = lh.distributionSchedule.getBookieIndex(entryId, i);
+            sendWriteRequest(bookieIndex, i);
+        }
+    }
+
+    @Override
+    public void writeComplete(int rc, long ledgerId, long entryId, InetSocketAddress addr, Object ctx) {
+
+        Integer replicaIndex = (Integer) ctx;
+        int bookieIndex = lh.distributionSchedule.getBookieIndex(entryId, replicaIndex);
+
+        if (!lh.metadata.currentEnsemble.get(bookieIndex).equals(addr)) {
+            // ensemble has already changed, failure of this addr is immaterial
+            LOG.warn("Write did not succeed: " + ledgerId + ", " + entryId + ". But we have already fixed it.");
+            return;
+        }
+        
+        if (rc != BKException.Code.OK) {
+            LOG.warn("Write did not succeed: " + ledgerId + ", " + entryId);
+            lh.handleBookieFailure(addr, bookieIndex);
+            return;
+        }
+
+
+        if (!successesSoFar[replicaIndex]) {
+            successesSoFar[replicaIndex] = true;
+            numResponsesPending--;
+            
+            // do some quick checks to see if some adds may have finished. All
+            // this will be checked under locks again
+            if (numResponsesPending == 0 && lh.pendingAddOps.peek() == this) {
+                lh.sendAddSuccessCallbacks();
+            }
+        } 
+    }
+
+    void submitCallback(final int rc) {
+        cb.addComplete(rc, lh, entryId, ctx);
+    }
+
+}
\ No newline at end of file

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,145 @@
+package org.apache.bookkeeper.client;
+
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+import java.net.InetSocketAddress;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import org.apache.log4j.Logger;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferInputStream;
+
+/**
+ * Sequence of entries of a ledger that represents a pending read operation.
+ * When all the data read has come back, the application callback is called.
+ * This class could be improved because we could start pushing data to the
+ * application as soon as it arrives rather than waiting for the whole thing.
+ * 
+ */
+
+class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback {
+    Logger LOG = Logger.getLogger(PendingReadOp.class);
+
+    Queue<LedgerEntry> seq;
+    ReadCallback cb;
+    Object ctx;
+    LedgerHandle lh;
+    long numPendingReads;
+    long startEntryId;
+    long endEntryId;
+
+    PendingReadOp(LedgerHandle lh, long startEntryId, long endEntryId, ReadCallback cb, Object ctx) {
+
+        seq = new ArrayDeque<LedgerEntry>((int) (endEntryId - startEntryId));
+        this.cb = cb;
+        this.ctx = ctx;
+        this.lh = lh;
+        this.startEntryId = startEntryId;
+        this.endEntryId = endEntryId;
+        numPendingReads = endEntryId - startEntryId + 1;
+    }
+
+    public void initiate() {
+        long nextEnsembleChange = startEntryId, i = startEntryId;
+
+        ArrayList<InetSocketAddress> ensemble = null;
+        do {
+
+            if (i == nextEnsembleChange) {
+                ensemble = lh.metadata.getEnsemble(i);
+                nextEnsembleChange = lh.metadata.getNextEnsembleChange(i);
+            }
+            LedgerEntry entry = new LedgerEntry(lh.ledgerId, i);
+            seq.add(entry);
+            i++;
+            sendRead(ensemble, entry, BKException.Code.ReadException);
+
+        } while (i <= endEntryId);
+
+    }
+
+    void sendRead(ArrayList<InetSocketAddress> ensemble, LedgerEntry entry, int lastErrorCode) {
+        if (entry.nextReplicaIndexToReadFrom >= lh.metadata.quorumSize) {
+            // we are done, the read has failed from all replicas, just fail the
+            // read
+            cb.readComplete(lastErrorCode, lh, null, ctx);
+            return;
+        }
+
+        int bookieIndex = lh.distributionSchedule.getBookieIndex(entry.entryId, entry.nextReplicaIndexToReadFrom);
+        entry.nextReplicaIndexToReadFrom++;
+        lh.bk.bookieClient.readEntry(ensemble.get(bookieIndex), lh.ledgerId, entry.entryId, this, entry);
+    }
+
+    void logErrorAndReattemptRead(LedgerEntry entry, String errMsg, int rc) {
+        ArrayList<InetSocketAddress> ensemble = lh.metadata.getEnsemble(entry.entryId);
+        int bookeIndex = lh.distributionSchedule.getBookieIndex(entry.entryId, entry.nextReplicaIndexToReadFrom - 1);
+        LOG.error(errMsg + " while reading entry: " + entry.entryId + " ledgerId: " + lh.ledgerId + " from bookie: "
+                + ensemble.get(bookeIndex));
+        sendRead(ensemble, entry, rc);
+        return;
+    }
+
+    @Override
+    public void readEntryComplete(int rc, long ledgerId, final long entryId, final ChannelBuffer buffer, Object ctx) {
+        final LedgerEntry entry = (LedgerEntry) ctx;
+
+        if (rc != BKException.Code.OK) {
+            logErrorAndReattemptRead(entry, "Error: " + BKException.getMessage(rc), rc);
+            return;
+        }
+        
+        numPendingReads--;
+        ChannelBufferInputStream is;
+        try {
+            is = lh.macManager.verifyDigestAndReturnData(entryId, buffer);
+        } catch (BKDigestMatchException e) {
+            logErrorAndReattemptRead(entry, "Mac mismatch", BKException.Code.DigestMatchException);
+            return;
+        }
+
+        entry.entryDataStream = is;
+
+        if (numPendingReads == 0) {
+            cb.readComplete(BKException.Code.OK, lh, PendingReadOp.this, PendingReadOp.this.ctx);
+        }
+
+    }
+
+    public boolean hasMoreElements() {
+        return !seq.isEmpty();
+    }
+
+    public LedgerEntry nextElement() throws NoSuchElementException {
+        return seq.remove();
+    }
+
+    public int size() {
+        return seq.size();
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,87 @@
+package org.apache.bookkeeper.client;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.bookkeeper.util.MathUtils;
+
+/**
+ * A specific {@link DistributionSchedule} that places entries in round-robin
+ * fashion. For ensemble size 3, and quorum size 2, Entry 0 goes to bookie 0 and
+ * 1, entry 1 goes to bookie 1 and 2, and entry 2 goes to bookie 2 and 0, and so
+ * on.
+ * 
+ */
+class RoundRobinDistributionSchedule implements DistributionSchedule {
+    int quorumSize;
+    int ensembleSize;
+
+    // covered[i] is true if the quorum starting at bookie index i has been
+    // covered by a recovery reply
+    boolean[] covered = null;
+    int numQuorumsUncovered;
+
+    public RoundRobinDistributionSchedule(int quorumSize, int ensembleSize) {
+        this.quorumSize = quorumSize;
+        this.ensembleSize = ensembleSize;
+    }
+
+    @Override
+    public int getBookieIndex(long entryId, int replicaIndex) {
+        return (int) ((entryId + replicaIndex) % ensembleSize);
+    }
+
+    @Override
+    public int getReplicaIndex(long entryId, int bookieIndex) {
+        // NOTE: Java's % operator returns the sign of the dividend and is hence
+        // not always positive
+
+        int replicaIndex = MathUtils.signSafeMod(bookieIndex - entryId, ensembleSize);
+
+        return replicaIndex < quorumSize ? replicaIndex : -1;
+
+    }
+
+    public synchronized boolean canProceedWithRecovery(int bookieIndexHeardFrom) {
+        if (covered == null) {
+            covered = new boolean[ensembleSize];
+            numQuorumsUncovered = ensembleSize;
+        }
+
+        if (numQuorumsUncovered == 0) {
+            return true;
+        }
+
+        for (int i = 0; i < quorumSize; i++) {
+            int quorumStartIndex = MathUtils.signSafeMod(bookieIndexHeardFrom - i, ensembleSize);
+            if (!covered[quorumStartIndex]) {
+                covered[quorumStartIndex] = true;
+                numQuorumsUncovered--;
+
+                if (numQuorumsUncovered == 0) {
+                    return true;
+                }
+            }
+
+        }
+
+        return false;
+
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/SyncCounter.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/SyncCounter.java?rev=903483&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/SyncCounter.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/SyncCounter.java Tue Jan 26 23:16:45 2010
@@ -0,0 +1,85 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.client;
+
+import java.util.Enumeration;
+
+/**
+ * Implements objects to help with the synchronization of asynchronous calls
+ * 
+ */
+
+class SyncCounter {
+    int i;
+    int rc;
+    int total;
+    Enumeration<LedgerEntry> seq = null;
+    LedgerHandle lh = null;
+
+    synchronized void inc() {
+        i++;
+        total++;
+    }
+
+    synchronized void dec() {
+        i--;
+        notifyAll();
+    }
+
+    synchronized void block(int limit) throws InterruptedException {
+        while (i > limit) {
+            int prev = i;
+            wait();
+            if (i == prev) {
+                break;
+            }
+        }
+    }
+
+    synchronized int total() {
+        return total;
+    }
+
+    void setrc(int rc) {
+        this.rc = rc;
+    }
+
+    int getrc() {
+        return rc;
+    }
+
+    void setSequence(Enumeration<LedgerEntry> seq) {
+        this.seq = seq;
+    }
+
+    Enumeration<LedgerEntry> getSequence() {
+        return seq;
+    }
+
+    void setLh(LedgerHandle lh) {
+        this.lh = lh;
+    }
+
+    LedgerHandle getLh() {
+        return lh;
+    }
+}



Mime
View raw message