hadoop-zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maha...@apache.org
Subject svn commit: r739388 [2/4] - in /hadoop/zookeeper/trunk: ./ src/contrib/ src/contrib/bookkeeper/ src/contrib/bookkeeper/benchmark/ src/contrib/bookkeeper/benchmark/org/ src/contrib/bookkeeper/benchmark/org/apache/ src/contrib/bookkeeper/benchmark/org/ap...
Date Fri, 30 Jan 2009 19:30:28 GMT
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,250 @@
+package org.apache.bookkeeper.client;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.security.NoSuchAlgorithmException;
+import java.security.MessageDigest;
+
+import org.apache.bookkeeper.client.LedgerHandle.QMode;
+import org.apache.bookkeeper.client.QuorumEngine.Operation;
+import org.apache.bookkeeper.client.QuorumEngine.SubOp;
+import org.apache.bookkeeper.client.QuorumEngine.Operation.AddOp;
+import org.apache.bookkeeper.client.QuorumEngine.Operation.ReadOp;
+import org.apache.bookkeeper.client.QuorumEngine.SubOp.SubAddOp;
+import org.apache.bookkeeper.client.QuorumEngine.SubOp.SubReadOp;
+import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.ReadEntryCallback;
+import org.apache.bookkeeper.proto.WriteCallback;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Maintains a queue of request to a given bookie. For verifiable
+ * ledgers, it computes the digest.
+ * 
+ */
+
+public class BookieHandle extends Thread{
+    Logger LOG = Logger.getLogger(BookieClient.class);
+    
+    boolean stop = false;
+    LedgerHandle self;
+    BookieClient client;
+    InetSocketAddress addr;
+    static int recvTimeout = 2000;
+    ArrayBlockingQueue<ToSend> incomingQueue;
+    
+    /**
+     * Objects of this class are queued waiting to be
+     * processed.
+     */
+    class ToSend {
+        long entry = -1;
+        Object ctx;
+        int type;
+        
+        ToSend(SubOp sop, long entry){
+            this.type = sop.op.type;
+            this.entry = entry;
+            this.ctx = sop;
+        }
+    }
+    
+    /**
+     * @param lh	ledger handle
+     * @param addr	address
+     */
+    BookieHandle(LedgerHandle lh, InetSocketAddress addr) throws IOException {
+        this.client = new BookieClient(addr, recvTimeout);
+        this.self = lh;
+        this.addr = addr;
+        this.incomingQueue = new ArrayBlockingQueue<ToSend>(2000);
+        
+        start();
+    }
+    
+    /**
+     * Restart BookieClient if can't talk to bookie
+     * 
+     * @return
+     * @throws IOException
+     */
+    void restart() throws IOException {
+        this.client = new BookieClient(addr, recvTimeout);
+    }
+
+    /**
+     * Sending add operation to bookie
+     * 
+     * @param r
+     * @param cb
+     * @param ctx
+     * @throws IOException
+     */
+    public void sendAdd(SubAddOp r, long entry)
+    throws IOException {
+        try{
+            incomingQueue.put(new ToSend(r, entry));
+        } catch(InterruptedException e){
+            e.printStackTrace();
+        }
+        //client.addEntry(self.getId(), 
+        //        r.entry, 
+        //        ByteBuffer.wrap(r.data), 
+        //        cb,
+        //        ctx);
+    }
+    
+    /**
+     * Message disgest instance
+     * 
+     */
+    MessageDigest digest = null;
+    
+    /** 
+     * Get digest instance if there is none.
+     * 
+     */
+    MessageDigest getDigestInstance(String alg)
+    throws NoSuchAlgorithmException {
+        if(digest == null){
+            digest = MessageDigest.getInstance(alg);
+        }
+        
+        return digest;
+    }
+    
+    /**
+     * Computes the digest for a given ByteBuffer.
+     * 
+     */
+    
+    public ByteBuffer addDigest(long entryId, ByteBuffer data)
+    throws NoSuchAlgorithmException, IOException {
+        if(digest == null)
+            getDigestInstance(self.getDigestAlg());
+        
+        ByteBuffer bb = ByteBuffer.allocate(8 + 8);
+        bb.putLong(self.getId());
+        bb.putLong(entryId);
+        
+        byte[] msgDigest;
+        
+        // synchronized(LedgerHandle.digest){
+        digest.update(self.getPasswdHash());
+        digest.update(bb.array());
+        digest.update(data.array());
+            
+        //baos.write(data);
+        //baos.write(Operation.digest.digest());
+        msgDigest = digest.digest();
+        //}
+        ByteBuffer extendedData = ByteBuffer.allocate(data.capacity() + msgDigest.length);
+        data.rewind();
+        extendedData.put(data);
+        extendedData.put(msgDigest);
+        
+        //LOG.debug("Data length (" + self.getId() + ", " + entryId + "): " + data.capacity());
+        //LOG.debug("Digest: " + new String(msgDigest));
+        
+        return extendedData;
+    }
+    
+    /**
+     * Sending read operation to bookie
+     * 
+     * @param r
+     * @param entry
+     * @param cb
+     * @param ctx
+     * @throws IOException
+     */
+    public void sendRead(SubReadOp r, long entry)
+    throws IOException {
+        //LOG.debug("readEntry: " + entry);
+        try{
+            incomingQueue.put(new ToSend(r, entry));
+        } catch(InterruptedException e){
+            e.printStackTrace();
+        }
+    }
+    
+    public void run(){
+        while(!stop){
+            try{
+                ToSend ts = incomingQueue.poll(1000, TimeUnit.MILLISECONDS);
+                if(ts != null){
+                    switch(ts.type){
+                    case Operation.ADD:
+                        SubAddOp aOp = (SubAddOp) ts.ctx;
+                        AddOp op = ((AddOp) aOp.op);
+                        
+                        /*
+                         * TODO: Really add the confirmed add to the op
+                         */
+                        long confirmed = self.getAddConfirmed();
+                        //LOG.info("Confirmed: " + confirmed);
+                        ByteBuffer extendedData = ByteBuffer.allocate(op.data.length + 8);
+                        extendedData.putLong(confirmed);
+                        extendedData.put(op.data);
+                        extendedData.rewind();
+                        
+                        if(self.getQMode() == QMode.VERIFIABLE){
+                            extendedData = addDigest(ts.entry, extendedData);
+                        }
+                        
+                        //LOG.debug("Extended data: " + extendedData.capacity());
+                        client.addEntry(self.getId(), 
+                            ts.entry, 
+                            extendedData, 
+                            aOp.wcb,
+                            ts.ctx);
+                        break;
+                    case Operation.READ:
+                        client.readEntry(self.getId(),
+                            ts.entry,
+                            ((SubReadOp) ts.ctx).rcb,
+                            ts.ctx);
+                        break;
+                    }
+                }
+            } catch (InterruptedException e){
+                LOG.error(e);
+            } catch (IOException e){
+                LOG.error(e);
+            } catch (NoSuchAlgorithmException e){
+                LOG.error(e);
+            }
+        }
+    }
+    
+    void halt(){
+        stop = true;
+    }
+}
+
+    

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,112 @@
+package org.apache.bookkeeper.client;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.client.QuorumEngine.Operation;
+import org.apache.bookkeeper.client.QuorumEngine.Operation.AddOp;
+import org.apache.bookkeeper.client.QuorumEngine.Operation.ReadOp;
+import org.apache.log4j.Logger;
+
+/**
+ * Thread responsible for delivering results to clients. This thread
+ * basically isolates the application from the remainder of the
+ * BookKeeper client. 
+ * 
+ */
+
+class ClientCBWorker extends Thread{
+    Logger LOG = Logger.getLogger(ClientCBWorker.class);
+    static ClientCBWorker instance = null;
+    
+    private boolean stop = false;
+    
+    ArrayBlockingQueue<Operation> pendingOps;
+    QuorumOpMonitor monitor;
+    
+    
+    static synchronized ClientCBWorker getInstance(){
+        if(instance == null){
+            instance = new ClientCBWorker();
+        }
+        
+        return instance;
+    }
+    
+    ClientCBWorker(){
+       pendingOps = new ArrayBlockingQueue<Operation>(4000);  
+       start();
+       LOG.debug("Have started cbWorker");
+    }
+    
+    void addOperation(Operation op) 
+    throws InterruptedException {
+        pendingOps.put(op);
+        LOG.debug("Added operation to queue of pending");
+    }
+    
+    synchronized void shutdown(){
+        stop = true;
+        instance = null;
+        LOG.debug("Shutting down");
+    }
+    
+    public void run(){
+        try{
+            while(!stop){
+                LOG.debug("Going to sleep on queue");
+                Operation op = pendingOps.poll(1000, TimeUnit.MILLISECONDS);
+                if(op != null){
+                    synchronized(op){
+                        while(!op.isReady()){
+                            op.wait();
+                        }
+                    }
+                    LOG.debug("Request ready");
+
+                    switch(op.type){
+                    case Operation.ADD:
+                        AddOp aOp = (AddOp) op;
+                    
+                        aOp.cb.addComplete(aOp.getErrorCode(),
+                            aOp.getLedger().getId(), aOp.entry, 
+                            aOp.ctx);
+                        aOp.getLedger().setAddConfirmed(aOp.entry);
+                        break;
+                    case Operation.READ:
+                        ReadOp rOp = (ReadOp) op;
+                        LOG.debug("Got one message from the queue: " + rOp.firstEntry);
+                        rOp.cb.readComplete(rOp.getErrorCode(), 
+                            rOp.getLedger().getId(), 
+                            new LedgerSequence(rOp.seq), 
+                            rOp.ctx);
+                        break;
+                    }
+                }
+            }
+        } catch (InterruptedException e){
+           LOG.error("Exception while waiting on queue or operation"); 
+        }
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ErrorCodes.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ErrorCodes.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ErrorCodes.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ErrorCodes.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,29 @@
+package org.apache.bookkeeper.client;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+public class ErrorCodes {
+
+    static final int ENUMRETRIES = -1;
+    static final int ENOAVAILABLEBOOKIE = -2;
+    
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerEntry.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerEntry.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerEntry.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerEntry.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,63 @@
+package org.apache.bookkeeper.client;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+
+import org.apache.log4j.Logger;
+
+/**
+ * Ledger entry. Currently only holds the necessary
+ * fields to identify a ledger entry, and the entry
+ * content.
+ * 
+ */
+
+
+public class LedgerEntry {
+    Logger LOG = Logger.getLogger(LedgerEntry.class);
+    
+    private long lId;
+    private long eId;
+    private byte[] entry;
+    
+    LedgerEntry(long lId, long eId, byte[] entry){
+        this.lId = lId;
+        this.eId = eId;
+        this.entry = entry;
+        if(entry != null)
+            LOG.debug("Entry: " + entry.length + " , " + new String(entry));
+        else
+            LOG.debug("Entry is null");
+    }
+    
+    public long getLedgerId(){
+        return lId;
+    }
+    
+    public long getEntryId(){
+        return eId;
+    }
+    
+    public byte[] getEntry(){
+        return entry;
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,349 @@
+package org.apache.bookkeeper.client;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ConnectException;
+import java.nio.ByteBuffer;
+import java.security.NoSuchAlgorithmException;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookieHandle;
+import org.apache.bookkeeper.client.BKException.Code;
+import org.apache.log4j.Logger;
+
+
+
+
+/**
+ * Ledger handle on the client side. Contains ledger metadata
+ * used to access it.
+ * 
+ */
+
+public class LedgerHandle {
+    Logger LOG = Logger.getLogger(LedgerHandle.class);
+    
+    public enum QMode {VERIFIABLE, GENERIC, FREEFORM};
+    
+    
+    private long ledger;
+    private volatile long last;
+    private volatile long lastAddConfirmed = 0;
+    private ArrayList<BookieHandle> bookies;
+    private ArrayList<InetSocketAddress> bookieAddrList;
+    private BookKeeper bk;
+
+    private int qSize;
+    private QMode qMode = QMode.VERIFIABLE;
+
+    private int threshold;
+    private String digestAlg = "SHA1";
+    
+    private byte[] passwdHash;
+    private byte[] passwd;
+    
+    LedgerHandle(BookKeeper bk, 
+            long ledger, 
+            long last,
+            byte[] passwd) throws InterruptedException {
+        this.bk = bk;
+        this.ledger = ledger;
+        this.last = last;
+        this.bookies = new ArrayList<BookieHandle>();
+        this.passwd = passwd;
+        genPasswdHash(passwd);
+
+        this.qSize = (bookies.size() + 1)/2;
+    }
+    
+    LedgerHandle(BookKeeper bk, 
+            long ledger, 
+            long last,
+            int qSize, 
+            QMode mode,
+            byte[] passwd) throws InterruptedException {
+        this.bk = bk;
+        this.ledger = ledger;
+        this.last = last;
+        this.bookies = new ArrayList<BookieHandle>();
+
+        this.qSize = qSize;
+        this.qMode = mode;
+        this.passwd = passwd;
+        genPasswdHash(passwd);
+    }
+        
+        
+    LedgerHandle(BookKeeper bk, 
+            long ledger, 
+            long last,
+            int qSize,
+            byte[] passwd) throws InterruptedException {
+        this.bk = bk;
+        this.ledger = ledger;
+        this.last = last;
+        this.bookies = new ArrayList<BookieHandle>();
+
+        this.qSize = qSize;
+        this.passwd = passwd;
+        genPasswdHash(passwd);
+    }
+    
+    private void setBookies(ArrayList<InetSocketAddress> bookies)
+    throws InterruptedException {
+        for(InetSocketAddress a : bookies){
+            LOG.debug("Opening bookieHandle: " + a);
+            try{
+                BookieHandle bh = new BookieHandle(this, a);
+                this.bookies.add(bh);
+            } catch(ConnectException e){
+                LOG.error(e + "(bookie: " + a + ")");
+                
+                InetSocketAddress addr = null;
+                addr = bk.getNewBookie(bookies);
+                
+                if(addr != null){
+                    bookies.add(addr);
+                }
+            } catch(IOException e) {
+                LOG.error(e);
+            }
+        }
+    }
+    
+    
+    /**
+     * Create bookie handle and add it to the list
+     * 
+     * @param addr	socket address
+     */
+    void addBookie(InetSocketAddress addr)
+    throws IOException {
+        BookieHandle bh = new BookieHandle(this, addr);
+        this.bookies.add(bh);
+        
+        if(bookies.size() > qSize) setThreshold();
+    }
+    
+    private void setThreshold(){
+        switch(qMode){
+        case GENERIC:
+            threshold = bookies.size() - qSize/2;
+            break;
+        case VERIFIABLE:
+            threshold = bookies.size() - qSize + 1;
+            break;
+        default:
+            threshold = bookies.size();
+        }
+        
+    }
+    
+    public int getThreshold(){
+        return threshold;
+    }
+    
+    /**
+     * Replace bookie in the case of a failure 
+     */
+    
+    void replaceBookie(int index) 
+    throws BKException {
+        InetSocketAddress addr = null;
+        try{
+            addr = bk.getNewBookie(bookieAddrList);
+        } catch(InterruptedException e){
+            LOG.error(e);
+        }
+        
+        if(addr == null){
+            throw BKException.create(Code.NoBookieAvailableException);
+        } else {           
+            try{
+                BookieHandle bh = new BookieHandle(this, addr);
+                
+                /*
+                 * TODO: Read from current bookies, and write to this one
+                 */
+                
+                /*
+                 * If successful in writing to new bookie, add it to the set
+                 */
+                this.bookies.set(index, bh);
+            } catch(ConnectException e){
+                bk.blackListBookie(addr);
+                LOG.error(e);
+            } catch(IOException e) {
+                bk.blackListBookie(addr);
+                LOG.error(e);
+            }
+        }
+    }
+    
+    /**
+     * This method is used when BK cannot find a bookie
+     * to replace the current faulty one. In such cases,
+     * we simply remove the bookie.
+     * 
+     * @param index
+     */
+    void removeBookie(int index){
+        bookies.remove(index);
+    }
+    
+    void close(){
+        ledger = -1;
+        last = -1;
+        for(BookieHandle bh : bookies){
+            bh.halt();
+        }
+    }
+    
+    
+    /**
+     * Returns the ledger identifier
+     * @return long
+     */
+    public long getId(){
+        return ledger;
+    }
+    
+    /**
+     * Returns the last entry identifier submitted
+     * @return long
+     */
+    public long getLast(){
+        return last;   
+    }
+    
+    /**
+     * Returns the last entry identifier submitted and increments it.
+     * @return long
+     */
+    long incLast(){
+        return last++;
+    }
+    
+    /**
+     * Returns the last entry identifier submitted and increments it.
+     * @return long
+     */
+    long setLast(long last){
+        this.last = last;
+        return this.last;
+    }
+    
+    /**
+     * Sets the value of the last add confirmed. This is used
+     * when adding new entries, since we use this value as a hint
+     * to recover from failures of the client.
+     */
+    void setAddConfirmed(long entryId){
+        if(entryId > lastAddConfirmed)
+            lastAddConfirmed = entryId;
+    }
+    
+    long getAddConfirmed(){
+        return lastAddConfirmed;
+    }
+    
+    /**
+     * Returns the list of bookies
+     * @return ArrayList<BookieHandle>
+     */
+    ArrayList<BookieHandle> getBookies(){
+        return bookies;
+    }
+    
+    /**
+     * Return the quorum size. By default, the size of a quorum is (n+1)/2, 
+     * where n is the size of the set of bookies.
+     * @return int
+     */
+    int getQuorumSize(){
+        return qSize;   
+    }
+    
+    /**
+     * Returns the quorum mode for this ledger: Verifiable or Generic
+     */
+    QMode getQMode(){
+        return qMode;   
+    }
+    
+    /**
+     * Sets message digest algorithm.
+     */
+    
+    void setDigestAlg(String alg){
+        this.digestAlg = alg;
+    }
+    
+    /**
+     * Get message digest algorithm.
+     */
+    
+    String getDigestAlg(){
+        return digestAlg;
+    }
+    
+    /**
+     * Generates and stores password hash.
+     * 
+     * @param passwd
+     */
+    
+    private void genPasswdHash(byte[] passwd){
+        try{
+            MessageDigest digest = MessageDigest.getInstance("MD5");
+        
+            digest.update(passwd);
+            this.passwdHash = digest.digest();
+        } catch(NoSuchAlgorithmException e){
+            this.passwd = passwd;
+            LOG.error("Storing password as plain text because secure hash implementation does not exist");
+        }
+    }
+    
+    
+    /**
+     * Returns password in plain text
+     */
+    byte[] getPasswd(){
+    	return passwd;
+    }
+    
+    
+    /**
+     * Returns password hash
+     * 
+     * @return byte[]
+     */
+    byte[] getPasswdHash(){
+       return passwdHash; 
+    }
+   
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,213 @@
+package org.apache.bookkeeper.client;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.lang.Math;
+import java.lang.InterruptedException;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.HashMap;
+import java.util.TreeMap;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerSequence;
+import org.apache.bookkeeper.client.BKException.Code;
+import org.apache.bookkeeper.client.LedgerHandle.QMode;
+import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.ReadEntryCallback;
+import org.apache.log4j.Logger;
+
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Implements the mechanism to recover a ledger that was not closed 
+ * correctly. It reads entries from the ledger using the hint field
+ * until it finds the last entry written. It then writes to ZooKeeper. 
+ * 
+ */
+
+class LedgerRecoveryMonitor implements ReadEntryCallback{
+    Logger LOG = Logger.getLogger(LedgerRecoveryMonitor.class);
+    
+    BookKeeper self;
+    long lId;
+    int qSize;
+    QMode qMode;
+    ArrayList<InetSocketAddress> bookies;
+    ArrayList<BookieClient> clients;
+    HashMap<Long, ArrayList<ByteBuffer> > votes;
+    TreeMap<Long, Integer > hints;
+    AtomicInteger counter;
+    
+    private int minimum;
+    
+    LedgerRecoveryMonitor(BookKeeper self,
+            long lId, 
+            int qSize, 
+            ArrayList<InetSocketAddress> bookies, 
+            QMode qMode){
+        this.self = self;
+        this.lId = lId;
+        this.qSize = qSize;
+        this.qMode = qMode;
+        this.bookies = bookies;
+        this.clients = new ArrayList<BookieClient>();
+        this.votes = new HashMap<Long, ArrayList<ByteBuffer> >();
+        this.hints = new TreeMap<Long, Integer>();
+        this.counter = new AtomicInteger(0);
+        
+        this.minimum = bookies.size();
+        if(qMode == QMode.VERIFIABLE){
+            this.minimum += 1 - qSize; 
+        } else if(qMode == QMode.GENERIC){
+            this.minimum -= Math.floor(qSize/2);
+        } 
+        
+    }
+    
+    boolean recover(byte[] passwd) throws 
+    IOException, InterruptedException, BKException, KeeperException {
+        /*
+         * Create BookieClient objects and send a request to each one.
+         */
+        
+        for(InetSocketAddress s : bookies){
+            LOG.info(s);
+            BookieClient client = new BookieClient(s, 3000);
+            clients.add(client);
+            client.readEntry(lId,
+                    -1,
+                    this,
+                    null);
+        }        
+        
+        /*
+         * Wait until I have received enough responses
+         */
+        synchronized(counter){
+            LOG.info("Counter: " + counter.get() + ", " + minimum + ", " + qMode);
+            if(counter.get() < minimum){
+                LOG.info("Waiting...");
+                counter.wait(5000);
+            }
+        }
+        
+        /*
+         * Obtain largest hint 
+         */
+        
+        LedgerHandle lh = new LedgerHandle(self, lId, 0, qSize, qMode, passwd);
+        self.engines.put(lh.getId(), new QuorumEngine(lh));
+        for(InetSocketAddress addr : bookies){
+            lh.addBookie(addr);
+        }
+        
+        boolean notLegitimate = true;
+        long readCounter = 0;
+        while(notLegitimate){
+            readCounter = getNextHint();
+            if(readCounter != -1){
+                lh.setLast(readCounter - 1);
+                boolean hasMore = true;
+                while(hasMore){
+                    hasMore = false;
+                    LOG.debug("Recovering: " + lh.getLast());
+                    LedgerSequence ls = self.readEntries(lh, lh.getLast(), lh.getLast());
+                    //if(ls == null) throw BKException.create(Code.ReadException);
+                    LOG.debug("Received entry for: " + lh.getLast());
+                    
+                    if(ls.nextElement().getEntry() != null){
+                        if(notLegitimate) notLegitimate = false;
+                        lh.incLast();
+                        hasMore = true;
+                    }
+                }
+            } else break;   
+        }
+        
+        /*
+         * Write counter as the last entry of ledger
+         */
+        if(!notLegitimate){
+            //lh.setLast(readCounter);
+            self.closeLedger(lh);
+            
+            return true;
+        } else {
+        	lh.setLast(0);
+        	self.closeLedger(lh);
+        	
+        	return false;
+        }
+                
+    }
+    
+    
+    public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuffer bb, Object ctx){
+        if(rc == 0){
+            bb.rewind();
+        
+            /*
+             * Collect new vote
+             */
+            if(!votes.containsKey(entryId)){            
+                votes.put(entryId, new ArrayList<ByteBuffer>());
+            }
+            votes.get(entryId).add(bb);
+         
+            /*
+             * Extract hint
+             */
+        
+            bb.position(16);
+            long hint = bb.getLong();
+        
+            LOG.info("Received a response: " + rc + ", " + entryId + ", " + hint);
+        
+            if(!hints.containsKey(hint)){
+                hints.put(hint, 0);
+            }
+            hints.put(hint, hints.get(hint) + 1);
+        
+            synchronized(counter){
+                if(counter.incrementAndGet() >= minimum);
+                counter.notify();
+            }
+        } else {
+            LOG.debug("rc != 0");
+        }
+        
+    }
+    
+    private long getNextHint(){
+        if(hints.size() == 0) return -1;
+        
+        long hint = hints.lastKey();
+        hints.remove(hint);
+        
+        return hint;
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerSequence.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerSequence.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerSequence.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerSequence.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,65 @@
+package org.apache.bookkeeper.client;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Arrays;
+import java.util.NoSuchElementException;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Sequence of entries of a ledger. Used to return a sequence of entries
+ * upon an asynchornous read call.
+ *
+ */
+
+
+public class LedgerSequence 
+implements Enumeration<LedgerEntry> {
+    Logger LOG = Logger.getLogger(LedgerSequence.class);
+    
+    int index = 0;
+    List<LedgerEntry> seq;
+    
+    LedgerSequence(LedgerEntry[] seq){
+        this.seq = Arrays.asList(seq);
+        LOG.debug("Sequence provided: " + this.seq.size());
+    }
+    
+    public boolean hasMoreElements(){
+        if(index < seq.size())
+            return true;
+        else
+            return false;
+    }
+    
+    public LedgerEntry nextElement() throws NoSuchElementException{
+        LOG.debug("Next element of sequence: " + seq.size() + ", " + index);
+        return seq.get(index++);
+    }
+    
+    public int size(){
+        return seq.size();
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerStream.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerStream.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerStream.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerStream.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,104 @@
+package org.apache.bookkeeper.client;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.NoSuchElementException;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Sequence of entries of a ledger. Used to return a sequence of entries
+ * upon an asynchornous read call.
+ * 
+ * This is feature is under construction.
+ * 
+ */
+
+public class LedgerStream {
+    Logger LOG = Logger.getLogger(LedgerStream.class);
+    
+    private ArrayList<LedgerEntry> pending;
+    private ArrayList<LedgerEntry> toDeliver;
+    private long index;
+    
+    
+    /**
+     * Constructor takes the first entry id expected.
+     * 
+     *  @param first    long
+     */
+    public LedgerStream(long first){
+        pending = new ArrayList<LedgerEntry>();
+        toDeliver = new ArrayList<LedgerEntry>();
+        index = first;
+    }
+    
+    /**
+     * Read the next entry if available. It blocks if the next entry
+     * is not yet available.
+     */
+    
+    public LedgerEntry readEntry(){
+        synchronized(toDeliver){
+            if(toDeliver.size() == 0){
+                try{
+                    toDeliver.wait();
+                } catch(InterruptedException e){
+                    LOG.info("Received an interrupted exception", e);
+                }
+            }
+            return toDeliver.get(0);
+        }
+    }
+    
+    /**
+     * Invoked upon reception of a new ledger entry.
+     * 
+     * @param le    a new ledger entry to deliver.
+     */
+    
+    public void addEntry(LedgerEntry le){
+        synchronized(toDeliver){
+            if(index == le.getEntryId()){
+                toDeliver.add(le);
+                index++;
+                
+                boolean noMore = false;
+                while(!noMore){
+                    noMore = true;
+                    for(int i = 0; i < pending.size(); i++){
+                        if(pending.get(i).getEntryId() == index){
+                            toDeliver.add(pending.get(i));
+                            index++;
+                            noMore = false;
+                        }
+                    }   
+                }
+                toDeliver.notify();
+            } else {
+                pending.add(le);
+            }
+        }
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,265 @@
+package org.apache.bookkeeper.client;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.io.ByteArrayOutputStream;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.client.BookieHandle;
+import org.apache.bookkeeper.client.ClientCBWorker;
+import org.apache.bookkeeper.client.ErrorCodes;
+import org.apache.bookkeeper.client.QuorumOpMonitor;
+import org.apache.bookkeeper.client.QuorumOpMonitor.PendingOp;
+import org.apache.bookkeeper.client.QuorumOpMonitor.PendingReadOp;
+import org.apache.bookkeeper.proto.ReadEntryCallback;
+import org.apache.bookkeeper.proto.WriteCallback;
+import org.apache.log4j.Logger;
+
+
+
+/**
+ * Implements the quorum protocol.It basically handles requests coming 
+ * from BookKeeper and forward to the appropriate BookieHandle objects.
+ */
+
+public class QuorumEngine {
+    Logger LOG = Logger.getLogger(QuorumEngine.class);
+
+    //ArrayBlockingQueue<Operation> incomingQueue;
+    QuorumOpMonitor opMonitor;
+    ClientCBWorker cbWorker;
+    LedgerHandle lh;
+    int qRef = 0;
+    boolean stop = false;
+    
+    /**
+     * Requests generated by BookKeeper.java upon client calls.
+     * There are three types of requests: READ, ADD, STOP.
+     */
+    
+    public static class Operation {
+        public static final int READ = 0;
+        public static final int ADD = 1;
+        public static final int STOP = 2;
+        
+        
+        int type;
+        LedgerHandle ledger;
+        int rc = 0;
+        boolean ready = false;
+        
+        public static class AddOp extends Operation {
+            AddCallback cb;
+            Object ctx;
+            byte[] data;
+            long entry;
+            
+            public AddOp(LedgerHandle ledger, byte[] data, AddCallback cb, Object ctx){
+                type = Operation.ADD;
+            
+                this.data = data;
+                this.entry = ledger.incLast(); 
+                this.cb = cb;
+                this.ctx = ctx;
+                
+                this.ledger = ledger;
+            }
+            
+        }
+        
+        
+        public static class ReadOp extends Operation {
+            ReadCallback cb;
+            Object ctx;
+            long firstEntry;
+            long lastEntry;
+            LedgerEntry[] seq;
+            AtomicInteger counter;
+            HashMap<Long, AtomicInteger> nacks;
+            //boolean complete;
+            
+            public ReadOp(LedgerHandle ledger, long firstEntry, long lastEntry, ReadCallback cb, Object ctx){
+                type = READ;
+            
+                this.firstEntry = firstEntry;
+                this.lastEntry = lastEntry;
+                this.cb = cb;
+                this.ctx = ctx;
+                this.seq = new LedgerEntry[(int) (lastEntry - firstEntry + 1)];
+                this.counter = new AtomicInteger(0);
+                this.nacks = new HashMap<Long, AtomicInteger>();
+                //this.complete = false;
+                
+                this.ledger = ledger;
+            }
+        }
+        
+        public static class StopOp extends Operation {
+            public StopOp(){
+                type = STOP;
+            }
+        }
+        
+        
+        
+        
+        void setErrorCode(int rc){
+            this.rc = rc;
+        }
+        
+        int getErrorCode(){
+            return this.rc;
+        }
+        
+        synchronized boolean isReady(){
+                return ready;
+        }
+        
+        synchronized void setReady(){    
+              ready = true;
+              this.notify();
+        }
+        
+        LedgerHandle getLedger(){
+            return ledger;
+        }
+    }
+    
+    
+    public static class SubOp{
+     int bIndex;   
+     Operation op;
+     
+     public static class SubAddOp extends SubOp{
+         PendingOp pOp;
+         WriteCallback wcb;
+        
+         SubAddOp(Operation op, 
+                 PendingOp pOp, 
+                 int bIndex,
+                 WriteCallback wcb){
+             this.op = op;
+             this.pOp = pOp;
+             this.bIndex = bIndex;
+             this.wcb = wcb;
+         }
+     }
+    
+     public static class SubReadOp extends SubOp{
+         PendingReadOp pOp;
+         ReadEntryCallback rcb;
+         
+         SubReadOp(Operation op, 
+                 PendingReadOp pOp, 
+                 int bIndex, 
+                 ReadEntryCallback rcb){
+             this.op = op;
+             this.pOp = pOp;
+             this.bIndex = bIndex;
+             this.rcb = rcb;
+         }
+     }
+    }
+    
+    public QuorumEngine(LedgerHandle lh){ 
+        this.lh = lh;
+        this.opMonitor = QuorumOpMonitor.getInstance(lh);
+        LOG.debug("Creating cbWorker");
+        this.cbWorker = ClientCBWorker.getInstance();
+        LOG.debug("Created cbWorker");
+    }
+    
+    void sendOp(Operation r)
+    throws InterruptedException {
+                switch(r.type){
+                case Operation.READ:
+                    Operation.ReadOp rOp = (Operation.ReadOp) r;
+                    LOG.debug("Adding read operation to opMonitor: " + rOp.firstEntry + ", " + rOp.lastEntry);
+                    cbWorker.addOperation(r);
+                    
+                    for(long entry = rOp.firstEntry; 
+                        entry <= rOp.lastEntry;
+                        entry++){
+                        
+                        //Send requests to bookies
+                        for(BookieHandle bh : lh.getBookies()){
+                            try{
+                                SubOp.SubReadOp sRead = new SubOp.SubReadOp(rOp, 
+                                        new PendingReadOp(lh), 
+                                        lh.getBookies().indexOf(bh),
+                                        opMonitor);
+                                bh.sendRead(sRead, entry);
+                        
+                            } catch(IOException e){
+                                LOG.error(e);
+                            }
+                        }  
+                    }
+                    //r.cb.processResult();
+                    break;
+                case Operation.ADD:
+                    long counter = 0;
+                    //ArrayList<BookieHandle> list = lh.getBookies();
+                    int n = lh.getBookies().size();
+                    
+                    LOG.debug("Adding add operation to opMonitor");
+                    //opMonitor.addOp(lh, (Operation.AddOp) r);
+                    cbWorker.addOperation(r);
+                    Operation.AddOp aOp = (Operation.AddOp) r;
+                    PendingOp pOp = new PendingOp();
+                    while(counter < lh.getQuorumSize()  ){
+                        int index = (int)((aOp.entry + counter++) % n);
+                    	
+                        try{
+                            SubOp.SubAddOp sAdd = new 
+                                SubOp.SubAddOp(aOp, 
+                                    pOp, 
+                                    index,
+                                    opMonitor);
+                            lh.getBookies().get((index) % n).sendAdd(sAdd, aOp.entry);
+                        } catch (IOException io) {
+                            LOG.error(io);
+                            try{
+                                /*
+                                 * Before getting a new bookie, try to reconnect
+                                 */
+                                lh.getBookies().get((index) % n).restart();
+                            } catch (IOException nio){
+                                    lh.removeBookie(index);
+                            }
+                        }
+                    }
+                    //qRef = (qRef + 1) % n;
+                    break;
+                case Operation.STOP:
+                    stop = true;
+                    cbWorker.shutdown();
+                    break;
+                }
+    }
+    
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,496 @@
+package org.apache.bookkeeper.client;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentHashMap;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+
+import org.apache.bookkeeper.client.BookieHandle;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BKException.Code;
+import org.apache.bookkeeper.client.ErrorCodes;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.LedgerHandle.QMode;
+import org.apache.bookkeeper.client.QuorumEngine.Operation;
+import org.apache.bookkeeper.client.QuorumEngine.Operation.AddOp;
+import org.apache.bookkeeper.client.QuorumEngine.Operation.ReadOp;
+import org.apache.bookkeeper.client.QuorumEngine.SubOp.SubAddOp;
+import org.apache.bookkeeper.client.QuorumEngine.SubOp.SubReadOp;
+import org.apache.bookkeeper.proto.ReadEntryCallback;
+import org.apache.bookkeeper.proto.WriteCallback;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Monitors reponses from bookies to requests of a client. It implements 
+ * two interfaces of the proto package that correspond to callbacks from
+ * BookieClient objects.
+ * 
+ */
+public class QuorumOpMonitor implements WriteCallback, ReadEntryCallback {
+    Logger LOG = Logger.getLogger(QuorumOpMonitor.class);
+    
+    LedgerHandle lh;
+    
+    static final int MAXRETRIES = 2;
+    static HashMap<Long, QuorumOpMonitor> instances = 
+        new HashMap<Long, QuorumOpMonitor>();
+    
+    public static QuorumOpMonitor getInstance(LedgerHandle lh){
+        if(instances.get(lh.getId()) == null) {
+            instances.put(lh.getId(), new QuorumOpMonitor(lh));
+        }
+        
+        return instances.get(lh.getId());
+    }
+    
+    /**
+     * Message disgest instance
+     * 
+     */
+    MessageDigest digest = null;
+    
+    /** 
+     * Get digest instance if there is none.
+     * 
+     */
+    MessageDigest getDigestInstance(String alg)
+    throws NoSuchAlgorithmException {
+        if(digest == null){
+            digest = MessageDigest.getInstance(alg);
+        }
+        
+        return digest;
+    }
+    
+    public static class PendingOp{
+        //Operation op = null;
+        HashSet<Integer> bookieIdSent;
+        HashSet<Integer> bookieIdRecv;
+        int retries = 0;
+      
+        PendingOp(){
+            this.bookieIdSent = new HashSet<Integer>();
+            this.bookieIdRecv = new HashSet<Integer>();
+        }
+
+        //PendingOp(Operation op){
+        //    this.op = op;
+        //    bookieIdSent = new HashSet<Integer>();
+        //    bookieIdRecv = new HashSet<Integer>();
+        //}
+        
+        //void setOp(Operation op){
+        //    this.op = op;
+        //}
+        
+        //Operation getOp(){
+        //    return this.op;
+        //}
+        
+    };
+    
+    /**
+     * Objects of this type are used to keep track of the status of
+     * a given write request.
+     * 
+     *
+     */
+    //public static class PendingAddOp extends PendingOp{
+    //    AddOp op;  
+        
+    //    PendingAddOp(LedgerHandle lh, AddOp op){
+    //        this.op = op;
+    //    }
+    //}
+    
+    /**
+     * Objects of this type are used to keep track of the status of
+     * a given read request.
+     * 
+     */
+    
+    public static class PendingReadOp extends PendingOp{
+        /*
+         * Values for ongoing reads
+         */
+        ConcurrentHashMap<Long, ArrayList<ByteBuffer>> proposedValues;
+        
+        /*
+         * Bookies from which received a response
+         */
+        //ConcurrentHashMap<Long, HashSet<Integer>> received;
+        
+        
+        PendingReadOp(LedgerHandle lh){
+            this.proposedValues = 
+                new ConcurrentHashMap<Long, ArrayList<ByteBuffer>>();
+            //this.received = 
+            //    new ConcurrentHashMap<Long, HashSet<Integer>>();
+        }    
+      
+    }
+    
+    QuorumOpMonitor(LedgerHandle lh){
+        this.lh = lh;
+    }
+    
+    
+    
+    
+    /**
+     * Callback method for write operations. There is one callback for
+     * each write to a server.
+     * 
+     */
+    
+    public void writeComplete(int rc, long ledgerId, long entryId, Object ctx){ 
+        //PendingAddOp pOp;
+        String logmsg;
+        
+        //synchronized(pendingAdds){
+        //pOp = pendingAdds.get(entryId);
+        //}
+        SubAddOp sAdd = (SubAddOp) ctx;
+        PendingOp pOp = sAdd.pOp;
+        Integer sId = sAdd.bIndex;
+        
+        if(pOp == null){
+            LOG.error("No such an entry ID: " + entryId + "(" + ledgerId + ")");
+            return;
+        }
+        
+        ArrayList<BookieHandle> list = lh.getBookies();
+        int n = list.size();
+         
+        if(rc == 0){
+            // Everything went ok with this op
+            synchronized(pOp){ 
+                //pOp.bookieIdSent.add(sId);
+                pOp.bookieIdRecv.add(sId);
+                if(pOp.bookieIdRecv.size() == lh.getQuorumSize()){
+                    //pendingAdds.remove(entryId);
+                    //sAdd.op.cb.addComplete(sAdd.op.getErrorCode(),
+                    //        ledgerId, entryId, sAdd.op.ctx);
+                    sAdd.op.setReady();     
+                }
+            }
+        } else {
+            LOG.error("Error sending write request: " + rc + " : " + ledgerId);
+            HashSet<Integer> ids;
+              
+            synchronized(pOp){
+                pOp.bookieIdSent.add(sId);
+                ids = pOp.bookieIdSent;                
+                //Check if we tried all possible bookies already
+                if(ids.size() == lh.getBookies().size()){
+                    if(pOp.retries++ >= MAXRETRIES){
+                        //Call back with error code
+                        //sAdd.op.cb.addComplete(ErrorCodes.ENUMRETRIES,
+                        //        ledgerId, entryId, sAdd.op.ctx);
+                        sAdd.op.setErrorCode(ErrorCodes.ENUMRETRIES);
+                        sAdd.op.setReady();
+                        return;
+                    }
+                    
+                    ids.clear();
+                }
+                // Select another bookie that we haven't contacted yet
+                for(int i = 0; i < lh.getBookies().size(); i++){
+                    if(!ids.contains(Integer.valueOf(i))){
+                        // and send it to new bookie
+                        try{
+                            list.get(i).sendAdd(new SubAddOp(sAdd.op, 
+                                    pOp, 
+                                    i, 
+                                    this), ((AddOp) sAdd.op).entry);
+                            pOp.bookieIdRecv.add(sId.intValue());
+                                
+                            break;
+                        } catch(IOException e){
+                            LOG.error(e);
+                        }
+                    }
+                }       
+            }
+        }
+    }
+    
+    /**
+     * Callback method for read operations. There is one callback for
+     * each entry of a read request.
+     * 
+     * TODO: We might want to change the way a client application specify
+     * the quorum size. It is really loose now, and it allows an application
+     * to set any quorum size the client wants.
+     */
+    
+    public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuffer bb, Object ctx){
+        /*
+         * Collect responses, and reply when there are sufficient 
+         * answers.
+         */
+        LOG.debug("New response: " + rc);
+        if(rc == 0){
+            SubReadOp sRead = (SubReadOp) ctx;
+            ReadOp rOp = (ReadOp) sRead.op;
+            PendingReadOp pOp = sRead.pOp;
+            if(pOp != null){
+                HashSet<Integer> received = pOp.bookieIdRecv;
+                //if(!received.containsKey(entryId)){
+                //    received.put(entryId, new HashSet<Integer>());
+                //}
+                boolean result = received.add(sRead.bIndex);
+                int counter = -1;
+                if(result){
+
+                    if(!pOp.proposedValues.containsKey(entryId)){
+                        pOp.proposedValues.put(entryId, new ArrayList<ByteBuffer>());
+                    }
+                    ArrayList<ByteBuffer> list = pOp.proposedValues.get(entryId);
+                    list.add(bb);
+        
+                    switch(lh.getQMode()){
+                        case VERIFIABLE:
+                            if(list.size() >= 1){
+                                try{
+                                    ByteBuffer voted = voteVerifiable(list);
+                                    if(voted != null){
+                                        LOG.debug("Voted: " + new String(voted.array()));
+                                    
+                                        MessageDigest md = getDigestInstance(lh.getDigestAlg());
+                                        int dlength = md.getDigestLength();
+                                        if(voted.capacity() - dlength > 0){
+                                            byte[] data = new byte[voted.capacity() - dlength - 24];
+                                            LOG.info("Digest length: " + dlength + ", " + data.length);
+                                            voted.position(24);
+                                            voted.get(data, 0, data.length);
+                                            counter = addNewEntry(new LedgerEntry(ledgerId, entryId, data), rOp);
+                                        } else {
+                                            LOG.error("Short message: " + voted.capacity());
+                                        }
+                                    }
+                                } catch(NoSuchAlgorithmException e){
+                                    LOG.error("Problem with message digest: " + e);
+                                } catch(BKException bke) {
+                                    LOG.error(bke.toString() + "( " + ledgerId + ", " + entryId + ", " + pOp.bookieIdRecv + ")");
+                                    countNacks((ReadOp) ((SubReadOp) ctx).op, (SubReadOp) ctx, ledgerId, entryId);
+                                }
+                            }
+                            break;
+                        case GENERIC:
+                            if(list.size() >= ((lh.getQuorumSize() + 1)/2)){
+                                ByteBuffer voted = voteGeneric(list, (lh.getQuorumSize() + 1)/2);
+                                if(voted != null){
+                                    LOG.debug("Voted: " + voted.array());
+                                    byte[] data = new byte[voted.capacity() - 24];
+                                    voted.position(24);
+                                    voted.get(data, 0, data.length);
+                                    counter = addNewEntry(new LedgerEntry(ledgerId, entryId, data), rOp);
+                                }
+                            }
+                            break;
+                        case FREEFORM:
+                        	if(list.size() == lh.getQuorumSize()){
+                        		ByteBuffer voted = voteFree(list);
+                                if(voted != null){
+                                    LOG.debug("Voted: " + voted.array());
+                                    byte[] data = new byte[voted.capacity() - 24];
+                                    voted.position(24);
+                                    voted.get(data, 0, data.length);
+                                    counter = addNewEntry(new LedgerEntry(ledgerId, entryId, voted.array()), rOp);
+                                }
+                        	}
+                    }
+                }    
+        
+                if((counter == (rOp.lastEntry - rOp.firstEntry + 1)) && 
+                        !sRead.op.isReady()){
+                    
+                    sRead.op.setReady();
+                    //sRead.op.cb.readComplete(0, ledgerId, new LedgerSequence(sRead.op.seq), sRead.op.ctx);
+                    //sRead.op.complete = true;
+                }
+            
+                LOG.debug("Counter: " + rOp.counter);
+            }
+        } else {
+            /*
+             * Have to count the number of negative responses
+             */
+            countNacks((ReadOp) ((SubReadOp) ctx).op, (SubReadOp) ctx, ledgerId, entryId);
+            
+        }
+    }
+    
+    
+    /**
+     * Counts negative responses
+     * 
+     * @param   rOp read operation
+     * @param   sRead   specific read sub-operation
+     */
+    
+    synchronized void countNacks(ReadOp rOp, SubReadOp sRead, long ledgerId, long entryId){
+        
+        if(!rOp.nacks.containsKey(entryId)){
+            rOp.nacks.put(entryId, new AtomicInteger(0));
+        }
+        
+        if(rOp.nacks.get(entryId).incrementAndGet() >= lh.getThreshold()){
+            int counter = -1;
+            counter = addNewEntry(new LedgerEntry(ledgerId, entryId, null), rOp);
+            
+            if((counter == (rOp.lastEntry - rOp.firstEntry + 1)) && 
+                    !sRead.op.isReady()){
+                
+                sRead.op.setReady();
+            }
+        }
+    }
+    
+    /**
+     * Verify if the set of votes in the list can produce a correct answer
+     * for verifiable data.
+     * 
+     * @param list
+     * @return
+     */
+    
+    private ByteBuffer voteVerifiable(ArrayList<ByteBuffer> list) 
+    throws NoSuchAlgorithmException, BKException{
+        /*
+         * Check if checksum matches
+         */
+        ByteBuffer bb = list.get(0);
+        list.remove(0);
+        
+        MessageDigest md = getDigestInstance(lh.getDigestAlg());
+        int dlength = md.getDigestLength();
+       
+        /*
+         * TODO: The if check below is legitimate, but in reality it should never happen,
+         * bt it showed up a few times in experiments. Have to check why it is happening.
+         */
+        if(bb.capacity() <= dlength){
+        	LOG.warn("Something wrong with this entry, length smaller than digest length");
+        	return null;
+        }
+        
+        byte[] data = new byte[bb.capacity() - dlength];
+        bb.get(data, 0, bb.capacity() - dlength);
+        
+        byte[] sig = new byte[dlength];
+        bb.position(bb.capacity() - dlength);
+        bb.get(sig, 0, dlength);
+        
+        bb.rewind();
+        
+        //LOG.warn("Data: " + data.toString() + ", Signature: " + sig.toString());
+        md.update(lh.getPasswdHash());
+        md.update(data);
+        if(MessageDigest.isEqual(md.digest(), sig)){
+            return bb;
+        } else {
+            throw BKException.create(Code.DigestMatchException);
+        }
+    }
+    
+    /**
+     * Verify if the set of votes in the list can produce a correct answer
+     * for generic data.
+     * 
+     * @param list
+     * @return
+     */
+        
+    private ByteBuffer voteGeneric(ArrayList<ByteBuffer> list, int threshold){  
+        HashMap<ByteBuffer, Integer> map = new HashMap<ByteBuffer, Integer>();
+        for(ByteBuffer bb : list){
+            if(!map.containsKey(bb)){
+                map.put(bb, Integer.valueOf(0));
+            }
+            
+            map.put(bb, map.get(bb) + 1);
+            
+            if(map.get(bb) >= threshold)
+                return bb;
+        }
+        
+        return null;   
+    }
+
+    /**
+     * Verify if the set of votes in the list can produce a correct answer
+     * for generic data.
+     * 
+     * @param list
+     * @return
+     */
+        
+    private ByteBuffer voteFree(ArrayList<ByteBuffer> list){  
+        HashMap<ByteBuffer, Integer> map = new HashMap<ByteBuffer, Integer>();
+        for(ByteBuffer bb : list){
+            if(!map.containsKey(bb)){
+                map.put(bb, Integer.valueOf(0));
+            }
+            map.put(bb, map.get(bb) + 1);
+            
+            if(map.get(bb) == list.size())
+                return bb;
+        }
+        
+        return null;   
+    }
+    
+    /**
+     * Add new entry to the list of received. 
+     * 
+     * @param le	ledger entry to add to list
+     * @param op	read operation metadata
+     */
+    
+    private int addNewEntry(LedgerEntry le, ReadOp op){
+        long index = le.getEntryId() % (op.lastEntry - op.firstEntry + 1);
+        if(op.seq[(int) index] == null){
+            op.seq[(int) index] = le;
+            
+            if(le.getEntry() != null)
+                LOG.debug("Adding entry: " + le.getEntryId() + ", " + le.getEntry().length);
+            else
+                LOG.debug("Entry is null: " + le.getEntryId());
+            
+            return op.counter.incrementAndGet();
+        }
+        
+        return -1;
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ReadCallback.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ReadCallback.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ReadCallback.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ReadCallback.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,38 @@
+package org.apache.bookkeeper.client;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+/**
+ * Callback interface
+ */
+
+public interface ReadCallback {
+	/**
+	 * Callback declaration
+	 * 
+	 * @param rc	return code
+	 * @param ledgerId	ledger identifier
+	 * @param seq	sequence of entries
+	 * @param ctx	control object
+	 */
+    void readComplete(int rc, long ledgerId, LedgerSequence seq, Object ctx);
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,342 @@
+package org.apache.bookkeeper.proto;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.ConnectException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Semaphore;
+
+
+import org.apache.bookkeeper.proto.ReadEntryCallback;
+import org.apache.bookkeeper.proto.WriteCallback;
+import org.apache.log4j.Logger;
+
+
+
+/**
+ * Implements the client-side part of the BookKeeper protocol. 
+ * 
+ */
+public class BookieClient extends Thread {
+	Logger LOG = Logger.getLogger(BookieClient.class);
+    SocketChannel sock;
+    int myCounter = 0;
+
+    public BookieClient(InetSocketAddress addr, int recvTimeout)
+    throws IOException, ConnectException {
+        sock = SocketChannel.open(addr);
+        setDaemon(true);
+        //sock.configureBlocking(false);
+        sock.socket().setSoTimeout(recvTimeout);
+        sock.socket().setTcpNoDelay(true);
+        start();
+    }
+    
+    public BookieClient(String host, int port, int recvTimeout)
+    throws IOException, ConnectException {
+        this(new InetSocketAddress(host, port), recvTimeout);
+    }
+    
+    private static class Completion<T> {
+        Completion(T cb, Object ctx) {
+            this.cb = cb;
+            this.ctx = ctx;
+        }
+
+        T cb;
+
+        Object ctx;
+    }
+
+    private static class CompletionKey {
+        long ledgerId;
+
+        long entryId;
+
+        CompletionKey(long ledgerId, long entryId) {
+            this.ledgerId = ledgerId;
+            this.entryId = entryId;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (!(obj instanceof CompletionKey) || obj == null) {
+                return false;
+            }
+            CompletionKey that = (CompletionKey) obj;
+            return this.ledgerId == that.ledgerId && this.entryId == that.entryId;
+        }
+
+        @Override
+        public int hashCode() {
+            return ((int) ledgerId << 16) ^ ((int) entryId);
+        }
+
+    }
+
+    ConcurrentHashMap<CompletionKey, Completion<WriteCallback>> addCompletions = new ConcurrentHashMap<CompletionKey, Completion<WriteCallback>>();
+    ConcurrentHashMap<CompletionKey, Completion<ReadEntryCallback>> readCompletions = new ConcurrentHashMap<CompletionKey, Completion<ReadEntryCallback>>();
+    
+    Object writeLock = new Object();
+    Object readLock = new Object();
+    
+    /*
+     * Use this semaphore to control the number of completion key in both addCompletions
+     * and readCompletions. This is more of a problem for readCompletions because one
+     * readEntries opertion is expanded into individual operations to read entries.
+     */
+    Semaphore completionSemaphore = new Semaphore(1000);
+    
+   
+    /**
+     * Send addEntry operation to bookie.
+     * 
+     * @param ledgerId	ledger identifier
+     * @param entryId 	entry identifier
+     * @param cb		object implementing callback method
+     * @param ctx		control object
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public void addEntry(long ledgerId, long entryId,
+            ByteBuffer entry, WriteCallback cb, Object ctx) 
+    throws IOException, InterruptedException {
+        
+        //LOG.info("Data length: " + entry.capacity());
+    	completionSemaphore.acquire();
+        addCompletions.put(new CompletionKey(ledgerId, entryId),
+                new Completion<WriteCallback>(cb, ctx));
+        //entry = entry.duplicate();
+        entry.position(0);
+        
+        ByteBuffer tmpEntry = ByteBuffer.allocate(entry.capacity() + 8 + 8 + 8);
+
+        tmpEntry.position(4);
+        tmpEntry.putInt(BookieProtocol.ADDENTRY);
+        tmpEntry.putLong(ledgerId);
+        tmpEntry.putLong(entryId);
+        tmpEntry.put(entry);
+        tmpEntry.position(0);
+        
+        //ByteBuffer len = ByteBuffer.allocate(4);
+        // 4 bytes for the message type
+        tmpEntry.putInt(tmpEntry.remaining() - 4);
+        tmpEntry.position(0);
+        synchronized(writeLock) {
+            //sock.write(len);
+            //len.clear();
+            //len.putInt(BookieProtocol.ADDENTRY);
+            //len.flip();
+            //sock.write(len);
+            sock.write(tmpEntry);
+        }
+        //LOG.debug("addEntry:finished");
+    }
+    
+    /**
+     * Send readEntry operation to bookie.
+     * 
+     * @param ledgerId	ledger identifier
+     * @param entryId	entry identifier
+     * @param cb		object implementing callback method
+     * @param ctx		control object
+     * @throws IOException
+     */
+    public void readEntry(long ledgerId, long entryId,
+            ReadEntryCallback cb, Object ctx) 
+    throws IOException, InterruptedException {
+    	
+    	completionSemaphore.acquire();
+        readCompletions.put(new CompletionKey(ledgerId, entryId),
+                new Completion<ReadEntryCallback>(cb, ctx));
+        ByteBuffer tmpEntry = ByteBuffer.allocate(8 + 8);
+        tmpEntry.putLong(ledgerId);
+        tmpEntry.putLong(entryId);
+        tmpEntry.position(0);
+        
+        ByteBuffer len = ByteBuffer.allocate(4);
+        len.putInt(tmpEntry.remaining() + 4);
+        len.flip();
+        //LOG.debug("readEntry: Writing to socket");
+        synchronized(readLock) {
+            sock.write(len);
+            len.clear();
+            len.putInt(BookieProtocol.READENTRY);
+            len.flip();
+            sock.write(len);
+            sock.write(tmpEntry);
+        }
+        //LOG.error("Size of readCompletions: " + readCompletions.size());
+    }
+    
+    private void readFully(ByteBuffer bb) throws IOException {
+        while(bb.remaining() > 0) {
+            sock.read(bb);
+        }
+    }
+    
+    public void run() {
+        int len = -1;
+        ByteBuffer lenBuffer = ByteBuffer.allocate(4);
+        int type = -1, rc = -1;
+        try {
+            while(sock.isConnected()) {
+                lenBuffer.clear();
+                readFully(lenBuffer);
+                lenBuffer.flip();
+                len = lenBuffer.getInt();
+                ByteBuffer bb = ByteBuffer.allocate(len);
+                readFully(bb);
+                bb.flip();
+                type = bb.getInt();
+                rc = bb.getInt();
+ 
+                switch(type) {
+                case BookieProtocol.ADDENTRY:
+                {
+                    long ledgerId = bb.getLong();
+                    long entryId = bb.getLong();
+                    Completion<WriteCallback> ac = addCompletions.remove(new CompletionKey(ledgerId, entryId));
+                    completionSemaphore.release();
+                    
+                    if (ac != null) {
+                        ac.cb.writeComplete(rc, ledgerId, entryId, ac.ctx);
+                    } else {
+                        LOG.error("Callback object null: " + ledgerId + " : " + entryId);
+                    }
+                    break;
+                }
+                case BookieProtocol.READENTRY:
+                {
+                    //ByteBuffer entryData = bb.slice();
+                    long ledgerId = bb.getLong();
+                    long entryId = bb.getLong();
+                    
+                    bb.position(24);
+                    byte[] data = new byte[bb.capacity() - 24];
+                    bb.get(data);
+                    ByteBuffer entryData = ByteBuffer.wrap(data);
+                    
+                    LOG.info("Received entry: " + ledgerId + ", " + entryId + ", " + rc + ", " + entryData.array().length + ", " + bb.array().length + ", " + bb.remaining());
+          
+                    
+                    CompletionKey key = new CompletionKey(ledgerId, entryId);
+                    Completion<ReadEntryCallback> c;
+                    
+                    if(readCompletions.containsKey(key)){
+                        c = readCompletions.remove(key);
+                        //LOG.error("Found key");
+                    }
+                    else{    
+                        /*
+                         * This is a special case. When recovering a ledger, a client submits
+                         * a read request with id -1, and receives a response with a different
+                         * entry id.
+                         */
+                        c = readCompletions.remove(new CompletionKey(ledgerId, -1));
+                    }
+                    completionSemaphore.release();
+                    
+                    if (c != null) {
+                        c.cb.readEntryComplete(rc, 
+                                ledgerId, 
+                                entryId, 
+                                entryData, 
+                                c.ctx);
+                    }
+                    break;
+                }
+                default:
+                    System.err.println("Got error " + rc + " for type " + type);
+                }
+            }
+        } catch(Exception e) {
+            LOG.error("Len = " + len + ", Type = " + type + ", rc = " + rc);
+            e.printStackTrace();
+        }
+    }
+
+    private static class Counter {
+        int i;
+        int total;
+        synchronized void inc() {
+            i++;
+            total++;
+        }
+        synchronized void dec() {
+            i--;
+            notifyAll();
+        }
+        synchronized void wait(int limit) throws InterruptedException {
+            while(i > limit) {
+                wait();
+            }
+        }
+        synchronized int total() {
+            return total;
+        }
+    }
+    /**
+     * @param args
+     * @throws IOException 
+     * @throws NumberFormatException 
+     * @throws InterruptedException 
+     */
+    public static void main(String[] args) throws NumberFormatException, IOException, InterruptedException {
+        if (args.length != 3) {
+            System.err.println("USAGE: BookieClient bookieHost port ledger#");
+            return;
+        }
+        WriteCallback cb = new WriteCallback() {
+
+            public void writeComplete(int rc, long ledger, long entry, Object ctx) {
+                Counter counter = (Counter)ctx;
+                counter.dec();
+                if (rc != 0) {
+                    System.out.println("rc = " + rc + " for " + entry + "@" + ledger);
+                }
+            }
+        };
+        Counter counter = new Counter();
+        byte hello[] = "hello".getBytes();
+        long ledger = Long.parseLong(args[2]);
+        BookieClient bc = new BookieClient(args[0], Integer.parseInt(args[1]), 5000);
+        for(int i = 0; i < 100000; i++) {
+            ByteBuffer entry = ByteBuffer.allocate(100);
+            entry.putLong(ledger);
+            entry.putLong(i);
+            entry.putInt(0);
+            entry.put(hello);
+            entry.flip();
+            counter.inc();
+            bc.addEntry(ledger, i, entry, cb, counter);
+        }
+        counter.wait(0);
+        System.out.println("Total = " + counter.total());
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieProtocol.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieProtocol.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieProtocol.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,70 @@
+package org.apache.bookkeeper.proto;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+/**
+ * The packets of the Bookie protocol all have a 4-byte integer
+ * indicating the type of request or response at the very beginning
+ * of the packet followed by a payload.
+ *
+ */
+public interface BookieProtocol {
+    /**
+     * The Add entry request payload will be a ledger entry exactly
+     * as it should be logged. The response payload will be a 4-byte
+     * integer that has the error code followed by the 8-byte
+     * ledger number and 8-byte entry number of the entry written.
+     */
+    public static final int ADDENTRY = 1;
+    /**
+     * The Read entry request payload will be the ledger number and
+     * entry number to read. (The ledger number is an 8-byte integer
+     * and the entry number is a 8-byte integer.) The
+     * response payload will be a 4-byte integer representing an 
+     * error code and a ledger entry if the error code is EOK, otherwise
+     * it will be the 8-byte ledger number and the 4-byte entry number
+     * requested. (Note that the first sixteen bytes of the entry happen
+     * to be the ledger number and entry number as well.)
+     */
+    public static final int READENTRY = 2;
+    
+    /**
+     * The error code that indicates success
+     */
+    public static final int EOK = 0;
+    /**
+     * The error code that indicates that the ledger does not exist
+     */
+    public static final int ENOLEDGER = 1;
+    /**
+     * The error code that indicates that the requested entry does not exist
+     */
+    public static final int ENOENTRY = 2;
+    /**
+     * The error code that indicates an invalid request type
+     */
+    public static final int EBADREQ = 100;
+    /**
+     * General error occurred at the server
+     */
+    public static final int EIO = 0;
+}



Mime
View raw message