hadoop-zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maha...@apache.org
Subject svn commit: r739388 [3/4] - in /hadoop/zookeeper/trunk: ./ src/contrib/ src/contrib/bookkeeper/ src/contrib/bookkeeper/benchmark/ src/contrib/bookkeeper/benchmark/org/ src/contrib/bookkeeper/benchmark/org/apache/ src/contrib/bookkeeper/benchmark/org/ap...
Date Fri, 30 Jan 2009 19:30:28 GMT
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,179 @@
+package org.apache.bookkeeper.proto;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.client.AddCallback;
+import org.apache.bookkeeper.proto.NIOServerFactory.Cnxn;
+import org.apache.log4j.Logger;
+
+
+
+/**
+ * Implements the server-side part of the BookKeeper protocol.
+ *
+ */
+public class BookieServer implements NIOServerFactory.PacketProcessor, AddCallback {
+    int port;
+    NIOServerFactory nioServerFactory;
+    Bookie bookie;
+    static Logger LOG = Logger.getLogger(BookieServer.class);
+    
+    public BookieServer(int port, File journalDirectory, File ledgerDirectories[]) {
+        this.port = port;
+        this.bookie = new Bookie(journalDirectory, ledgerDirectories);
+    }
+    public void start() throws IOException {
+        nioServerFactory = new NIOServerFactory(port, this);
+    }
+    public void shutdown() throws InterruptedException {
+        nioServerFactory.shutdown();
+        bookie.shutdown();
+    }
+    public void join() throws InterruptedException {
+        nioServerFactory.join();
+    }
+    /**
+     * @param args
+     * @throws IOException 
+     * @throws InterruptedException 
+     */
+    public static void main(String[] args) throws IOException, InterruptedException {
+    	if (args.length < 3) {
+            System.err.println("USAGE: BookieServer port journalDirectory ledgerDirectory [ledgerDirectory]*");
+            return;
+        }
+        int port = Integer.parseInt(args[0]);
+        File journalDirectory = new File(args[1]);
+        File ledgerDirectory[] = new File[args.length-2];
+        StringBuilder sb = new StringBuilder();
+        for(int i = 0; i < ledgerDirectory.length; i++) {
+            ledgerDirectory[i] = new File(args[i+2]);
+            if (i != 0) {
+                sb.append(',');
+            }
+            sb.append(ledgerDirectory[i]);
+        }
+        String hello = String.format("Hello, I'm your bookie, listening on port %1$s. Journals are in %2$s. Ledgers are stored in %3$s.", port, journalDirectory, sb);
+        LOG.info(hello);
+        BookieServer bs = new BookieServer(port, journalDirectory, ledgerDirectory);
+        bs.start();
+        bs.join();
+    }
+
+   
+    public void processPacket(ByteBuffer packet, Cnxn src) {
+        int type = packet.getInt();
+        switch(type) {
+        case BookieProtocol.ADDENTRY:
+            try {
+                bookie.addEntry(packet.slice(), this, src);
+            } catch(IOException e) {
+                if (LOG.isTraceEnabled()) {
+                    ByteBuffer bb = packet.duplicate();
+                    long ledgerId = bb.getLong();
+                    long entryId = bb.getLong();
+                    LOG.trace("Error reading " + entryId + "@" + ledgerId, e);
+                }
+                ByteBuffer eio = ByteBuffer.allocate(8);
+                eio.putInt(type);
+                eio.putInt(BookieProtocol.EIO);
+                eio.flip();
+                src.sendResponse(new ByteBuffer[] {eio});
+            }
+            break;
+        case BookieProtocol.READENTRY:
+            ByteBuffer[] rsp = new ByteBuffer[2];
+            ByteBuffer rc = ByteBuffer.allocate(8+8+8);
+            rsp[0] = rc;
+            rc.putInt(type);
+            
+            long ledgerId = packet.getLong();
+            long entryId = packet.getLong();
+            LOG.debug("Received new read request: " + ledgerId + ", " + entryId);
+            try {
+                rsp[1] = bookie.readEntry(ledgerId, entryId);
+                LOG.debug("##### Read entry ##### " + rsp[1].remaining());
+                rc.putInt(BookieProtocol.EOK);
+            } catch(Bookie.NoLedgerException e) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.error("Error reading " + entryId + "@" + ledgerId, e);
+                }
+                rc.putInt(BookieProtocol.ENOLEDGER);
+            } catch(Bookie.NoEntryException e) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.error("Error reading " + entryId + "@" + ledgerId, e);
+                }
+                rc.putInt(BookieProtocol.ENOENTRY);
+            } catch(IOException e) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.error("Error reading " + entryId + "@" + ledgerId, e);
+                }
+                rc.putInt(BookieProtocol.EIO);
+            }
+            rc.putLong(ledgerId);
+            rc.putLong(entryId);
+            rc.flip();
+            if (LOG.isTraceEnabled()) {
+                int rcCode = rc.getInt();
+                rc.rewind();
+                LOG.trace("Read entry rc = " + rcCode + " for " + entryId + "@" + ledgerId);
+            }
+            if (rsp[1] == null) {
+                // We haven't filled in entry data, so we have to send back
+                // the ledger and entry ids here
+                rsp[1] = ByteBuffer.allocate(16);
+                rsp[1].putLong(ledgerId);
+                rsp[1].putLong(entryId);
+                rsp[1].flip();
+            }
+            LOG.debug("Sending response for: " + entryId + ", " + new String(rsp[1].array()));
+            src.sendResponse(rsp);
+            break;
+        default:
+            ByteBuffer badType = ByteBuffer.allocate(8);
+            badType.putInt(type);
+            badType.putInt(BookieProtocol.EBADREQ);
+            badType.flip();
+            src.sendResponse(new ByteBuffer[] {packet});
+        }
+    }
+    
+    public void addComplete(int rc, long ledgerId, long entryId, Object ctx) {
+        Cnxn src = (Cnxn)ctx;
+        ByteBuffer bb = ByteBuffer.allocate(24);
+        bb.putInt(BookieProtocol.ADDENTRY);
+        bb.putInt(rc);
+        bb.putLong(ledgerId);
+        bb.putLong(entryId);
+        bb.flip();
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Add entry rc = " + rc + " for " + entryId + "@" + ledgerId);
+        }
+        src.sendResponse(new ByteBuffer[] {bb});
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/NIOServerFactory.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/NIOServerFactory.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/NIOServerFactory.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/NIOServerFactory.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,532 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.proto;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.Channel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.log4j.Logger;
+
+/**
+ * This class handles communication with clients using NIO. There is one Cnxn
+ * per client, but only one thread doing the communication.
+ */
+public class NIOServerFactory extends Thread {
+
+    public interface PacketProcessor {
+        public void processPacket(ByteBuffer packet, Cnxn src);
+    }
+    ServerStats stats = new ServerStats();
+    
+    Logger LOG = Logger.getLogger(NIOServerFactory.class);
+
+    ServerSocketChannel ss;
+
+    Selector selector = Selector.open();
+
+    /**
+     * We use this buffer to do efficient socket I/O. Since there is a single
+     * sender thread per NIOServerCnxn instance, we can use a member variable to
+     * only allocate it once.
+     */
+    ByteBuffer directBuffer = ByteBuffer.allocateDirect(64 * 1024);
+
+    HashSet<Cnxn> cnxns = new HashSet<Cnxn>();
+
+    int outstandingLimit = 2000;
+
+    PacketProcessor processor;
+
+    long minLatency = 99999999;
+
+    public NIOServerFactory(int port, PacketProcessor processor) throws IOException {
+        super("NIOServerFactory");
+        setDaemon(true);
+        this.processor = processor;
+        this.ss = ServerSocketChannel.open();
+        ss.socket().bind(new InetSocketAddress(port));
+        ss.configureBlocking(false);
+        ss.register(selector, SelectionKey.OP_ACCEPT);
+        start();
+    }
+
+    public InetSocketAddress getLocalAddress() {
+        return (InetSocketAddress) ss.socket().getLocalSocketAddress();
+    }
+
+    private void addCnxn(Cnxn cnxn) {
+        synchronized (cnxns) {
+            cnxns.add(cnxn);
+        }
+    }
+
+    public void run() {
+        while (!ss.socket().isClosed()) {
+            try {
+                selector.select(1000);
+                Set<SelectionKey> selected;
+                synchronized (this) {
+                    selected = selector.selectedKeys();
+                }
+                ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
+                        selected);
+                Collections.shuffle(selectedList);
+                for (SelectionKey k : selectedList) {
+                    if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
+                        SocketChannel sc = ((ServerSocketChannel) k.channel())
+                                .accept();
+                        sc.configureBlocking(false);
+                        SelectionKey sk = sc.register(selector,
+                                SelectionKey.OP_READ);
+                        Cnxn cnxn = new Cnxn(sc, sk);
+                        sk.attach(cnxn);
+                        addCnxn(cnxn);
+                    } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
+                        Cnxn c = (Cnxn) k.attachment();
+                        c.doIO(k);
+                    }
+                }
+                selected.clear();
+            } catch (Exception e) {
+                LOG.warn(e);
+                e.printStackTrace();
+            }
+        }
+        LOG.debug("NIOServerCnxn factory exitedloop.");
+        clear();
+        // System.exit(0);
+    }
+
+    /**
+     * clear all the connections in the selector
+     * 
+     */
+    synchronized public void clear() {
+        selector.wakeup();
+        synchronized (cnxns) {
+            // got to clear all the connections that we have in the selector
+            for (Iterator<Cnxn> it = cnxns.iterator(); it.hasNext();) {
+                Cnxn cnxn = it.next();
+                it.remove();
+                try {
+                    cnxn.close();
+                } catch (Exception e) {
+                    // Do nothing.
+                }
+            }
+        }
+
+    }
+
+    public void shutdown() {
+        try {
+            ss.close();
+            clear();
+            this.interrupt();
+            this.join();
+        } catch (InterruptedException e) {
+            LOG.warn("Interrupted", e);
+        } catch (Exception e) {
+            LOG.error("Unexpected exception", e);
+        }
+    }
+
+    /**
+     * The buffer will cause the connection to be close when we do a send.
+     */
+    static final ByteBuffer closeConn = ByteBuffer.allocate(0);
+
+    public class Cnxn {
+
+        private SocketChannel sock;
+        
+        private SelectionKey sk;
+
+        boolean initialized;
+
+        ByteBuffer lenBuffer = ByteBuffer.allocate(4);
+
+        ByteBuffer incomingBuffer = lenBuffer;
+
+        LinkedBlockingQueue<ByteBuffer> outgoingBuffers = new LinkedBlockingQueue<ByteBuffer>();
+
+        int sessionTimeout;
+
+        int packetsSent;
+
+        int packetsReceived;
+        
+        void doIO(SelectionKey k) throws InterruptedException {
+            try {
+                if (sock == null) {
+                    return;
+                }
+                if (k.isReadable()) {
+                    int rc = sock.read(incomingBuffer);
+                    if (rc < 0) {
+                        throw new IOException("Read error");
+                    }
+                    if (incomingBuffer.remaining() == 0) {
+                        incomingBuffer.flip();
+                        if (incomingBuffer == lenBuffer) {
+                            readLength(k);
+                        } else {
+                            cnxnStats.packetsReceived++;
+                            stats.incrementPacketsReceived();
+                            try {
+                                readRequest();
+                            } finally {
+                                lenBuffer.clear();
+                                incomingBuffer = lenBuffer;
+                            }
+                        }
+                    }
+                }
+                if (k.isWritable()) {
+                    if (outgoingBuffers.size() > 0) {
+                        // ZooLog.logTraceMessage(LOG,
+                        // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK,
+                        // "sk " + k + " is valid: " +
+                        // k.isValid());
+
+                        /*
+                         * This is going to reset the buffer position to 0 and
+                         * the limit to the size of the buffer, so that we can
+                         * fill it with data from the non-direct buffers that we
+                         * need to send.
+                         */
+                        directBuffer.clear();
+
+                        for (ByteBuffer b : outgoingBuffers) {
+                            if (directBuffer.remaining() < b.remaining()) {
+                                /*
+                                 * When we call put later, if the directBuffer
+                                 * is to small to hold everything, nothing will
+                                 * be copied, so we've got to slice the buffer
+                                 * if it's too big.
+                                 */
+                                b = (ByteBuffer) b.slice().limit(
+                                        directBuffer.remaining());
+                            }
+                            /*
+                             * put() is going to modify the positions of both
+                             * buffers, put we don't want to change the position
+                             * of the source buffers (we'll do that after the
+                             * send, if needed), so we save and reset the
+                             * position after the copy
+                             */
+                            int p = b.position();
+                            directBuffer.put(b);
+                            b.position(p);
+                            if (directBuffer.remaining() == 0) {
+                                break;
+                            }
+                        }
+                        /*
+                         * Do the flip: limit becomes position, position gets
+                         * set to 0. This sets us up for the write.
+                         */
+                        directBuffer.flip();
+
+                        int sent = sock.write(directBuffer);
+                        ByteBuffer bb;
+
+                        // Remove the buffers that we have sent
+                        while (outgoingBuffers.size() > 0) {
+                            bb = outgoingBuffers.peek();
+                            if (bb == closeConn) {
+                                throw new IOException("closing");
+                            }
+                            int left = bb.remaining() - sent;
+                            if (left > 0) {
+                                /*
+                                 * We only partially sent this buffer, so we
+                                 * update the position and exit the loop.
+                                 */
+                                bb.position(bb.position() + sent);
+                                break;
+                            }
+                            cnxnStats.packetsSent++;
+                            /* We've sent the whole buffer, so drop the buffer */
+                            sent -= bb.remaining();
+                            ServerStats.getInstance().incrementPacketsSent();
+                            outgoingBuffers.remove();
+                        }
+                        // ZooLog.logTraceMessage(LOG,
+                        // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, "after send,
+                        // outgoingBuffers.size() = " + outgoingBuffers.size());
+                    }
+                    synchronized (this) {
+                        if (outgoingBuffers.size() == 0) {
+                            if (!initialized
+                                    && (sk.interestOps() & SelectionKey.OP_READ) == 0) {
+                                throw new IOException("Responded to info probe");
+                            }
+                            sk.interestOps(sk.interestOps()
+                                    & (~SelectionKey.OP_WRITE));
+                        } else {
+                            sk.interestOps(sk.interestOps()
+                                    | SelectionKey.OP_WRITE);
+                        }
+                    }
+                }
+            } catch (CancelledKeyException e) {
+                close();
+            } catch (IOException e) {
+                // LOG.error("FIXMSG",e);
+                close();
+            }
+        }
+
+        private void readRequest() throws IOException {
+            incomingBuffer = incomingBuffer.slice();
+            processor.processPacket(incomingBuffer, this);
+        }
+
+        public void disableRecv() {
+            sk.interestOps(sk.interestOps() & (~SelectionKey.OP_READ));
+        }
+
+        public void enableRecv() {
+            if (sk.isValid()) {
+                int interest = sk.interestOps();
+                if ((interest & SelectionKey.OP_READ) == 0) {
+                    sk.interestOps(interest | SelectionKey.OP_READ);
+                }
+            }
+        }
+
+        private void readLength(SelectionKey k) throws IOException {
+            // Read the length, now get the buffer
+            int len = lenBuffer.getInt();
+            if (len < 0 || len > 0xfffff) {
+                throw new IOException("Len error " + len);
+            }
+            incomingBuffer = ByteBuffer.allocate(len);
+        }
+
+        /**
+         * The number of requests that have been submitted but not yet responded
+         * to.
+         */
+        int outstandingRequests;
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see org.apache.zookeeper.server.ServerCnxnIface#getSessionTimeout()
+         */
+        public int getSessionTimeout() {
+            return sessionTimeout;
+        }
+
+        String peerName;
+        
+        public Cnxn(SocketChannel sock, SelectionKey sk)
+                throws IOException {
+            this.sock = sock;
+            this.sk = sk;
+            sock.socket().setTcpNoDelay(true);
+            sock.socket().setSoLinger(true, 2);
+            sk.interestOps(SelectionKey.OP_READ);
+            if (LOG.isTraceEnabled()) {
+                peerName = sock.socket().toString();
+            }
+            
+            lenBuffer.clear();
+            incomingBuffer = lenBuffer;
+        }
+
+        public String toString() {
+            return "NIOServerCnxn object with sock = " + sock + " and sk = "
+                    + sk;
+        }
+
+        boolean closed;
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see org.apache.zookeeper.server.ServerCnxnIface#close()
+         */
+        public void close() {
+            if (closed) {
+                return;
+            }
+            closed = true;
+            synchronized (cnxns) {
+                cnxns.remove(this);
+            }
+            LOG.debug("close  NIOServerCnxn: " + sock);
+            try {
+                /*
+                 * The following sequence of code is stupid! You would think
+                 * that only sock.close() is needed, but alas, it doesn't work
+                 * that way. If you just do sock.close() there are cases where
+                 * the socket doesn't actually close...
+                 */
+                sock.socket().shutdownOutput();
+            } catch (IOException e) {
+                // This is a relatively common exception that we can't avoid
+            }
+            try {
+                sock.socket().shutdownInput();
+            } catch (IOException e) {
+            }
+            try {
+                sock.socket().close();
+            } catch (IOException e) {
+                LOG.error("FIXMSG", e);
+            }
+            try {
+                sock.close();
+                // XXX The next line doesn't seem to be needed, but some posts
+                // to forums suggest that it is needed. Keep in mind if errors
+                // in
+                // this section arise.
+                // factory.selector.wakeup();
+            } catch (IOException e) {
+                LOG.error("FIXMSG", e);
+            }
+            sock = null;
+            if (sk != null) {
+                try {
+                    // need to cancel this selection key from the selector
+                    sk.cancel();
+                } catch (Exception e) {
+                }
+            }
+        }
+
+        private void makeWritable(SelectionKey sk) {
+            try {
+                selector.wakeup();
+                if (sk.isValid()) {
+                    sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
+                }
+            } catch (RuntimeException e) {
+                LOG.error("Problem setting writable", e);
+                throw e;
+            }
+        }
+        
+        private void sendBuffers(ByteBuffer bb[]) {
+            ByteBuffer len = ByteBuffer.allocate(4);
+            int total = 0;
+            for(int i = 0; i < bb.length; i++) {
+                if (bb[i] != null) {
+                    total += bb[i].remaining();
+                }
+            }
+            if (LOG.isTraceEnabled()) {
+                LOG.debug("Sending response of size " + total + " to " + peerName);
+            }
+            len.putInt(total);
+            len.flip();
+            outgoingBuffers.add(len);
+            for(int i = 0; i < bb.length; i++) {
+                if (bb[i] != null) {
+                    outgoingBuffers.add(bb[i]);
+                }
+            }
+            makeWritable(sk);
+        }
+        
+        synchronized public void sendResponse(ByteBuffer bb[]) {
+            if (closed) {
+                return;
+            }
+            sendBuffers(bb);
+            synchronized (NIOServerFactory.this) {
+                outstandingRequests--;
+                // check throttling
+                if (outstandingRequests < outstandingLimit) {
+                    sk.selector().wakeup();
+                    enableRecv();
+                }
+            }
+        }
+
+        public InetSocketAddress getRemoteAddress() {
+            return (InetSocketAddress) sock.socket().getRemoteSocketAddress();
+        }
+
+        private class CnxnStats {
+            long packetsReceived;
+
+            long packetsSent;
+
+            /**
+             * The number of requests that have been submitted but not yet responded
+             * to.
+             */
+            public long getOutstandingRequests() {
+                return outstandingRequests;
+            }
+
+            public long getPacketsReceived() {
+                return packetsReceived;
+            }
+
+            public long getPacketsSent() {
+                return packetsSent;
+            }
+
+            public String toString() {
+                StringBuilder sb = new StringBuilder();
+                Channel channel = sk.channel();
+                if (channel instanceof SocketChannel) {
+                    sb.append(" ").append(
+                            ((SocketChannel) channel).socket()
+                                    .getRemoteSocketAddress()).append("[")
+                            .append(Integer.toHexString(sk.interestOps()))
+                            .append("](queued=").append(
+                                    getOutstandingRequests())
+                            .append(",recved=").append(getPacketsReceived())
+                            .append(",sent=").append(getPacketsSent()).append(
+                                    ")\n");
+                }
+                return sb.toString();
+            }
+        }
+
+        private CnxnStats cnxnStats = new CnxnStats();
+
+        public CnxnStats getStats() {
+            return cnxnStats;
+        }
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ReadEntryCallback.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ReadEntryCallback.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ReadEntryCallback.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ReadEntryCallback.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,35 @@
+package org.apache.bookkeeper.proto;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.nio.ByteBuffer;
+
+/**
+ * Declaration of a callback implementation for calls from BookieClient
+ * objects. Such calls are for replies of read operations (operations to
+ * read an entry from a ledger).
+ *
+ */
+
+public interface ReadEntryCallback {
+    void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuffer bb, Object ctx);
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ServerStats.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ServerStats.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ServerStats.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ServerStats.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,133 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.proto;
+
+
+public class ServerStats {
+    private static ServerStats instance= new ServerStats();
+    private long packetsSent;
+    private long packetsReceived;
+    private long maxLatency;
+    private long minLatency = Long.MAX_VALUE;
+    private long totalLatency = 0;
+    private long count = 0;
+    
+    public interface Provider{
+        public long getOutstandingRequests();
+        public long getLastProcessedZxid();
+    }
+    private Provider provider=null;
+    private Object mutex=new Object();
+    
+    static public ServerStats getInstance(){
+        return instance;
+    }
+    static public void registerAsConcrete() {
+        setInstance(new ServerStats());
+    }
+    static synchronized public void unregister() {
+        instance=null;
+    }
+    static synchronized protected void setInstance(ServerStats newInstance){
+        assert instance==null;
+        instance = newInstance;
+    }
+    protected ServerStats(){}
+    
+    // getters
+    synchronized public long getMinLatency() {
+        return (minLatency == Long.MAX_VALUE) ? 0 : minLatency;
+    }
+
+    synchronized public long getAvgLatency() {
+        if(count!=0)
+            return totalLatency / count;
+        return 0;
+    }
+
+    synchronized public long getMaxLatency() {
+        return maxLatency;
+    }
+
+    public long getOutstandingRequests() {
+        synchronized(mutex){
+            return (provider!=null)?provider.getOutstandingRequests():-1;
+        }
+    }
+    public long getLastProcessedZxid(){
+        synchronized(mutex){
+            return (provider!=null)?provider.getLastProcessedZxid():-1;
+        }
+    }
+    synchronized public long getPacketsReceived() {
+        return packetsReceived;
+    }
+
+    synchronized public long getPacketsSent() {
+        return packetsSent;
+    }
+
+    public String getServerState(){
+        return "standalone";
+    }
+    
+    public String toString(){
+        StringBuilder sb = new StringBuilder();
+        sb.append("Latency min/avg/max: " + getMinLatency() + "/"
+                + getAvgLatency() + "/" + getMaxLatency() + "\n");
+        sb.append("Received: " + getPacketsReceived() + "\n");
+        sb.append("Sent: " + getPacketsSent() + "\n");
+        if (provider != null) {
+            sb.append("Outstanding: " + getOutstandingRequests() + "\n");
+            sb.append("Zxid: 0x"+ Long.toHexString(getLastProcessedZxid())+ "\n");
+        }
+        sb.append("Mode: "+getServerState()+"\n");
+        return sb.toString();
+    }
+    // mutators
+    public void setStatsProvider(Provider zk){
+        synchronized(mutex){
+            provider=zk;
+        }
+    }
+    synchronized void updateLatency(long requestCreateTime) {
+        long latency = System.currentTimeMillis() - requestCreateTime;
+        totalLatency += latency;
+        count++;
+        if (latency < minLatency) {
+            minLatency = latency;
+        }
+        if (latency > maxLatency) {
+            maxLatency = latency;
+        }
+    }
+    synchronized public void resetLatency(){
+        totalLatency=count=maxLatency=0;
+        minLatency=Long.MAX_VALUE;
+    }
+    synchronized public void resetMaxLatency(){
+        maxLatency=getMinLatency();
+    }
+    synchronized public void incrementPacketsReceived() {
+        packetsReceived++;
+    }
+    synchronized public void incrementPacketsSent() {
+        packetsSent++;
+    }
+    synchronized public void resetRequestCounters(){
+        packetsReceived=packetsSent=0;
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/WriteCallback.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/WriteCallback.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/WriteCallback.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/WriteCallback.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,32 @@
+package org.apache.bookkeeper.proto;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+/**
+ * Declaration of a callback implementation for calls from BookieClient
+ * objects. Such calls are for replies of write operations (operations to
+ * add an entry to a ledger).
+ *
+ */
+
+public interface WriteCallback {
+    void writeComplete(int rc, long ledgerId, long entryId, Object ctx);
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/ClientBase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/ClientBase.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/ClientBase.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/ClientBase.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,343 @@
+package org.apache.bookkeeper.util;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ServerStats;
+import org.apache.zookeeper.server.SyncRequestProcessor;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+
+/**
+ * Base class for tests.
+ */
+
+public abstract class ClientBase extends TestCase {
+    protected static final Logger LOG = Logger.getLogger(ClientBase.class);
+
+    public static final int CONNECTION_TIMEOUT = 30000;
+    static final File BASETEST =
+        new File(System.getProperty("build.test.dir", "build"));
+
+    protected String hostPort = "127.0.0.1:33221";
+    protected NIOServerCnxn.Factory serverFactory = null;
+    protected File tmpDir = null;
+
+    public ClientBase() {
+        super();
+    }
+
+    public ClientBase(String name) {
+        super(name);
+    }
+
+    /**
+     * In general don't use this. Only use in the special case that you
+     * want to ignore results (for whatever reason) in your test. Don't
+     * use empty watchers in real code!
+     *
+     */
+    protected class NullWatcher implements Watcher {
+        public void process(WatchedEvent event) { /* nada */ }
+    }
+
+    protected static class CountdownWatcher implements Watcher {
+        volatile CountDownLatch clientConnected = new CountDownLatch(1);
+
+        public void process(WatchedEvent event) {
+            if (event.getState() == Event.KeeperState.SyncConnected) {
+                clientConnected.countDown();
+            }
+        }
+    }
+    
+    protected ZooKeeper createClient()
+        throws IOException, InterruptedException
+    {
+        return createClient(hostPort);
+    }
+
+    protected ZooKeeper createClient(String hp)
+        throws IOException, InterruptedException
+    {
+        CountdownWatcher watcher = new CountdownWatcher();
+        return createClient(watcher, hp);
+    }
+
+    protected ZooKeeper createClient(CountdownWatcher watcher, String hp)
+        throws IOException, InterruptedException
+    {
+        ZooKeeper zk = new ZooKeeper(hp, 20000, watcher);
+        if (!watcher.clientConnected.await(CONNECTION_TIMEOUT,
+                TimeUnit.MILLISECONDS))
+        {
+            fail("Unable to connect to server");
+        }
+        return zk;
+    }
+
+    public static boolean waitForServerUp(String hp, long timeout) {
+        long start = System.currentTimeMillis();
+        String split[] = hp.split(":");
+        String host = split[0];
+        int port = Integer.parseInt(split[1]);
+        while (true) {
+            try {
+                Socket sock = new Socket(host, port);
+                BufferedReader reader = null;
+                try {
+                    OutputStream outstream = sock.getOutputStream();
+                    outstream.write("stat".getBytes());
+                    outstream.flush();
+
+                    reader =
+                        new BufferedReader(
+                                new InputStreamReader(sock.getInputStream()));
+                    String line = reader.readLine();
+                    if (line != null && line.startsWith("Zookeeper version:")) {
+                    	LOG.info("Server UP");
+                        return true;
+                    }
+                } finally {
+                    sock.close();
+                    if (reader != null) {
+                        reader.close();
+                    }
+                }
+            } catch (IOException e) {
+                // ignore as this is expected
+                LOG.info("server " + hp + " not up " + e);
+            }
+
+            if (System.currentTimeMillis() > start + timeout) {
+                break;
+            }
+            try {
+                Thread.sleep(250);
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+        return false;
+    }
+    public static boolean waitForServerDown(String hp, long timeout) {
+        long start = System.currentTimeMillis();
+        String split[] = hp.split(":");
+        String host = split[0];
+        int port = Integer.parseInt(split[1]);
+        while (true) {
+            try {
+                Socket sock = new Socket(host, port);
+                try {
+                    OutputStream outstream = sock.getOutputStream();
+                    outstream.write("stat".getBytes());
+                    outstream.flush();
+                } finally {
+                    sock.close();
+                }
+            } catch (IOException e) {
+                return true;
+            }
+
+            if (System.currentTimeMillis() > start + timeout) {
+                break;
+            }
+            try {
+                Thread.sleep(250);
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+        return false;
+    }
+    
+    static void verifyThreadTerminated(Thread thread, long millis)
+        throws InterruptedException
+    {
+        thread.join(millis);
+        if (thread.isAlive()) {
+            LOG.error("Thread " + thread.getName() + " : "
+                    + Arrays.toString(thread.getStackTrace()));
+            assertFalse("thread " + thread.getName() 
+                    + " still alive after join", true);
+        }
+    }
+
+
+    public static File createTmpDir() throws IOException {
+        return createTmpDir(BASETEST);
+    }
+    
+    static File createTmpDir(File parentDir) throws IOException {
+        File tmpFile = File.createTempFile("test", ".junit", parentDir);
+        // don't delete tmpFile - this ensures we don't attempt to create
+        // a tmpDir with a duplicate name
+        
+        File tmpDir = new File(tmpFile + ".dir");
+        assertFalse(tmpDir.exists()); // never true if tmpfile does it's job
+        assertTrue(tmpDir.mkdirs());
+        
+        return tmpDir;
+    }
+    
+    static NIOServerCnxn.Factory createNewServerInstance(File dataDir,
+            NIOServerCnxn.Factory factory, String hostPort)
+        throws IOException, InterruptedException 
+    {
+        ZooKeeperServer zks = new ZooKeeperServer(dataDir, dataDir, 3000);
+        final int PORT = Integer.parseInt(hostPort.split(":")[1]);
+        if (factory == null) {
+            factory = new NIOServerCnxn.Factory(PORT);
+        }
+        factory.startup(zks);
+
+        assertTrue("waiting for server up",
+                   ClientBase.waitForServerUp("127.0.0.1:" + PORT,
+                                              CONNECTION_TIMEOUT));
+
+        return factory;
+    }
+    
+    static void shutdownServerInstance(NIOServerCnxn.Factory factory,
+            String hostPort)
+    {
+        if (factory != null) {
+            factory.shutdown();
+            final int PORT = Integer.parseInt(hostPort.split(":")[1]);
+
+            assertTrue("waiting for server down",
+                       ClientBase.waitForServerDown("127.0.0.1:" + PORT,
+                                                    CONNECTION_TIMEOUT));
+        }
+    }
+    
+    /**
+     * Test specific setup
+     */
+    public static void setupTestEnv() {
+        // during the tests we run with 100K prealloc in the logs.
+        // on windows systems prealloc of 64M was seen to take ~15seconds
+        // resulting in test failure (client timeout on first session).
+        // set env and directly in order to handle static init/gc issues
+        System.setProperty("zookeeper.preAllocSize", "100");
+        FileTxnLog.setPreallocSize(100);
+    }
+    
+    @Override
+    protected void setUp() throws Exception {
+        LOG.info("STARTING " + getName());
+
+        //ServerStats.registerAsConcrete();
+
+        tmpDir = createTmpDir(BASETEST);
+        
+        setupTestEnv();
+        serverFactory =
+            createNewServerInstance(tmpDir, serverFactory, hostPort);
+        
+        LOG.info("Client test setup finished");
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        LOG.info("tearDown starting");
+
+        shutdownServerInstance(serverFactory, hostPort);
+        
+        if (tmpDir != null) {
+            //assertTrue("delete " + tmpDir.toString(), recursiveDelete(tmpDir));
+            // FIXME see ZOOKEEPER-121 replace following line with previous
+            recursiveDelete(tmpDir);
+        }
+
+        //ServerStats.unregister();
+
+        LOG.info("FINISHED " + getName());
+    }
+
+    private static boolean recursiveDelete(File d) {
+        if (d.isDirectory()) {
+            File children[] = d.listFiles();
+            for (File f : children) {
+                //assertTrue("delete " + f.toString(), recursiveDelete(f));
+                // FIXME see ZOOKEEPER-121 replace following line with previous
+                recursiveDelete(f);
+            }
+        }
+        return d.delete();
+    }
+
+    /*
+     * Verify that all of the servers see the same number of nodes
+     * at the root
+     */
+    void verifyRootOfAllServersMatch(String hostPort)
+        throws InterruptedException, KeeperException, IOException
+    {
+        String parts[] = hostPort.split(",");
+
+        // run through till the counts no longer change on each server
+        // max 15 tries, with 2 second sleeps, so approx 30 seconds
+        int[] counts = new int[parts.length];
+        for (int j = 0; j < 100; j++) {
+            int newcounts[] = new int[parts.length];
+            int i = 0;
+            for (String hp : parts) {
+                ZooKeeper zk = createClient(hp);
+                try {
+                    newcounts[i++] = zk.getChildren("/", false).size();
+                } finally {
+                    zk.close();
+                }
+            }
+
+            if (Arrays.equals(newcounts, counts)) {
+                LOG.info("Found match with array:"
+                        + Arrays.toString(newcounts));
+                counts = newcounts;
+                break;
+            } else {
+                counts = newcounts;
+                Thread.sleep(10000);
+            }
+        }
+
+        // verify all the servers reporting same number of nodes
+        for (int i = 1; i < parts.length; i++) {
+            assertEquals("node count not consistent", counts[i-1], counts[i]);
+        }
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/LocalBookKeeper.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/LocalBookKeeper.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/LocalBookKeeper.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/LocalBookKeeper.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,164 @@
+package org.apache.bookkeeper.util;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.LedgerSequence;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ServerStats;
+import org.apache.zookeeper.server.ZooKeeperServer;
+
+public class LocalBookKeeper {
+	Logger LOG;
+	ConsoleAppender ca;
+	int numberOfBookies;
+	
+	public LocalBookKeeper() {
+		LOG = Logger.getRootLogger();
+		ca = new ConsoleAppender(new PatternLayout());
+		LOG.addAppender(ca);
+		LOG.setLevel(Level.INFO);
+		numberOfBookies = 3;
+	}
+	
+	public LocalBookKeeper(int numberOfBookies){
+		this();
+		this.numberOfBookies = numberOfBookies;
+		LOG.info("Running " + this.numberOfBookies + " bookie(s).");
+	}
+	
+	private final String HOSTPORT = "127.0.0.1:2181";
+	NIOServerCnxn.Factory serverFactory;
+	ZooKeeperServer zks;
+	ZooKeeper zkc;
+	int ZooKeeperDefaultPort = 2181;
+	File ZkTmpDir;
+
+	//BookKeeper variables
+	File tmpDirs[];
+	BookieServer bs[];
+	Integer initialPort = 5000;
+
+	/**
+	 * @param args
+	 */
+	
+	private void runZookeeper() throws IOException{
+		// create a ZooKeeper server(dataDir, dataLogDir, port)
+		LOG.info("Starting ZK server");
+		//ServerStats.registerAsConcrete();
+		//ClientBase.setupTestEnv();
+		ZkTmpDir = File.createTempFile("zookeeper", "test");
+        ZkTmpDir.delete();
+        ZkTmpDir.mkdir();
+		    
+		try {
+			zks = new ZooKeeperServer(ZkTmpDir, ZkTmpDir, ZooKeeperDefaultPort);
+			serverFactory =  new NIOServerCnxn.Factory(ZooKeeperDefaultPort);
+			serverFactory.startup(zks);
+		} catch (IOException e1) {
+			// TODO Auto-generated catch block
+			e1.printStackTrace();
+		} catch (InterruptedException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+        boolean b = ClientBase.waitForServerUp(HOSTPORT, ClientBase.CONNECTION_TIMEOUT);
+        LOG.debug("ZooKeeper server up: " + b);
+	}
+	
+	private void initializeZookeper(){
+		LOG.info("Instantiate ZK Client");
+		//initialize the zk client with values
+		try {
+			zkc = new ZooKeeper("127.0.0.1", ZooKeeperDefaultPort, new emptyWatcher());
+			zkc.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+			zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+			// create an entry for each requested bookie
+			for(int i = 0; i < numberOfBookies; i++){
+				zkc.create("/ledgers/available/127.0.0.1:" + 
+					Integer.toString(initialPort + i), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+			}
+		} catch (KeeperException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		} catch (InterruptedException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		} catch (IOException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}		
+	}
+	private void runBookies() throws IOException{
+		LOG.info("Starting Bookie(s)");
+		// Create Bookie Servers (B1, B2, B3)
+		
+		tmpDirs = new File[numberOfBookies];		
+		bs = new BookieServer[numberOfBookies];
+		
+		for(int i = 0; i < numberOfBookies; i++){
+			tmpDirs[i] = File.createTempFile("bookie" + Integer.toString(i), "test");
+			tmpDirs[i].delete();
+			tmpDirs[i].mkdir();
+			
+			bs[i] = new BookieServer(initialPort + i, tmpDirs[i], new File[]{tmpDirs[i]});
+			bs[i].start();
+		}		
+	}
+	
+	public static void main(String[] args) throws IOException, InterruptedException {
+		if(args.length < 1){
+			usage();
+			System.exit(-1);
+		}
+		LocalBookKeeper lb = new LocalBookKeeper(Integer.parseInt(args[0]));
+		lb.runZookeeper();
+		lb.initializeZookeper();
+		lb.runBookies();
+		while (true){
+			Thread.sleep(5000);
+		}
+	}
+
+	private static void usage() {
+		System.err.println("Usage: LocalBookKeeper number-of-bookies");	
+	}
+
+	/*	User for testing purposes, void */
+	class emptyWatcher implements Watcher{
+		public void process(WatchedEvent event) {}
+	}
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/Main.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/Main.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/Main.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/util/Main.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,55 @@
+package org.apache.bookkeeper.util;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.io.IOException;
+
+import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookieServer;
+
+
+public class Main {
+
+    static void usage() {
+        System.err.println("USAGE: bookeeper client|bookie");
+    }
+    /**
+     * @param args
+     * @throws InterruptedException 
+     * @throws IOException 
+     */
+    public static void main(String[] args) throws IOException, InterruptedException {
+        if (args.length < 1 || !(args[0].equals("client") || 
+                args[0].equals("bookie"))) {
+            usage();
+            return;
+        }
+        String newArgs[] = new String[args.length-1];
+        System.arraycopy(args, 1, newArgs, 0, newArgs.length);
+        if (args[0].equals("bookie")) {
+            BookieServer.main(newArgs);
+        } else {
+            BookieClient.main(newArgs);
+        }
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,202 @@
+package org.apache.bookkeeper.test;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.io.File;
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.client.AddCallback;
+import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.proto.ReadEntryCallback;
+import org.apache.bookkeeper.proto.WriteCallback;
+import org.apache.log4j.Logger;
+
+
+import junit.framework.TestCase;
+
+public class BookieClientTest extends TestCase {
+    static Logger LOG = Logger.getLogger(BookieClientTest.class);
+    BookieServer bs;
+    File tmpDir;
+    int port = 13645;
+    protected void setUp() throws Exception {
+        tmpDir = File.createTempFile("bookie", "test");
+        tmpDir.delete();
+        tmpDir.mkdir();
+        bs = new BookieServer(port, tmpDir, new File[] { tmpDir });
+        bs.start();
+    }
+    protected void tearDown() throws Exception {
+        bs.shutdown();
+        recursiveDelete(tmpDir);
+    }
+    private static void recursiveDelete(File dir) {
+        File children[] = dir.listFiles();
+        if (children != null) {
+            for(File child: children) {
+                recursiveDelete(child);
+            }
+        }
+        dir.delete();
+    }
+    
+    static class ResultStruct {
+        int rc;
+        ByteBuffer entry;
+    }
+    ReadEntryCallback recb = new ReadEntryCallback() {
+
+        public void readEntryComplete(int rc, long ledgerId, long entryId,
+                ByteBuffer bb, Object ctx) {
+            ResultStruct rs = (ResultStruct)ctx;
+            synchronized(rs) {
+                LOG.info("Capacity " + bb.capacity() + ", " + bb.position());
+                rs.rc = rc;
+                bb.position(bb.position()+16);
+                //if (bb.remaining() >=4) {
+                //    // Skip the len
+                //    bb.position(bb.position()+4);
+                //}
+                rs.entry = bb.slice();
+                LOG.info("Received " + bb.remaining());
+                rs.notifyAll();
+            }
+        }
+        
+    };
+
+    WriteCallback wrcb = new WriteCallback() {
+        public void writeComplete(int rc, long ledgerId, long entryId,
+                Object ctx) {
+            if (ctx != null) {
+                synchronized(ctx) {
+                    ctx.notifyAll();
+                }
+            }
+        }
+    };
+
+    @Test
+    public void testWriteGaps() throws Exception {
+        final Object notifyObject = new Object();
+        BookieClient bc = new BookieClient("127.0.0.1", port, 50000);
+        ByteBuffer bb;
+        bb = createByteBuffer(1);
+        bc.addEntry(1, 1, bb, wrcb, null);
+        bb = createByteBuffer(2);
+        bc.addEntry(1, 2, bb, wrcb, null);
+        bb = createByteBuffer(3);
+        bc.addEntry(1, 3, bb, wrcb, null);
+        bb = createByteBuffer(5);
+        bc.addEntry(1, 5, bb, wrcb, null);
+        bb = createByteBuffer(7);
+        bc.addEntry(1, 7, bb, wrcb, null);
+        synchronized(notifyObject) {
+            bb = createByteBuffer(11);
+            bc.addEntry(1, 11, bb, wrcb, notifyObject);
+            notifyObject.wait();
+        }
+        ResultStruct arc = new ResultStruct();
+        synchronized(arc) {
+            bc.readEntry(1, 6, recb, arc);
+            arc.wait(1000);
+            assertEquals(BookieProtocol.ENOENTRY, arc.rc);
+        }
+        synchronized(arc) {
+            bc.readEntry(1, 7, recb, arc);
+            arc.wait(1000);
+            assertEquals(0, arc.rc);
+            assertEquals(7, arc.entry.getInt());
+        }
+        synchronized(arc) {
+            bc.readEntry(1, 1, recb, arc);
+            arc.wait(1000);
+            assertEquals(0, arc.rc);
+            assertEquals(1, arc.entry.getInt());
+        }
+        synchronized(arc) {
+            bc.readEntry(1, 2, recb, arc);
+            arc.wait(1000);
+            assertEquals(0, arc.rc);
+            assertEquals(2, arc.entry.getInt());
+        }
+        synchronized(arc) {
+            bc.readEntry(1, 3, recb, arc);
+            arc.wait(1000);
+            assertEquals(0, arc.rc);
+            assertEquals(3, arc.entry.getInt());
+        }
+        synchronized(arc) {
+            bc.readEntry(1, 4, recb, arc);
+            arc.wait(1000);
+            assertEquals(BookieProtocol.ENOENTRY, arc.rc);
+        }
+        synchronized(arc) {
+            bc.readEntry(1, 11, recb, arc);
+            arc.wait(1000);
+            assertEquals(0, arc.rc);
+            assertEquals(11, arc.entry.getInt());
+        }
+        synchronized(arc) {
+            bc.readEntry(1, 5, recb, arc);
+            arc.wait(1000);
+            assertEquals(0, arc.rc);
+            assertEquals(5, arc.entry.getInt());
+        }
+        synchronized(arc) {
+            bc.readEntry(1, 10, recb, arc);
+            arc.wait(1000);
+            assertEquals(BookieProtocol.ENOENTRY, arc.rc);
+        }
+        synchronized(arc) {
+            bc.readEntry(1, 12, recb, arc);
+            arc.wait(1000);
+            assertEquals(BookieProtocol.ENOENTRY, arc.rc);
+        }
+        synchronized(arc) {
+            bc.readEntry(1, 13, recb, arc);
+            arc.wait(1000);
+            assertEquals(BookieProtocol.ENOENTRY, arc.rc);
+        }
+    }
+    private ByteBuffer createByteBuffer(int i) {
+        ByteBuffer bb;
+        bb = ByteBuffer.allocate(4);
+        bb.putInt(i);
+        bb.flip();
+        return bb;
+    }
+    @Test
+    public void testNoLedger() throws Exception {
+        ResultStruct arc = new ResultStruct();
+        BookieClient bc = new BookieClient("127.0.0.1", port, 50000);
+        synchronized(arc) {
+            bc.readEntry(2, 13, recb, arc);
+            arc.wait(1000);
+            assertEquals(BookieProtocol.ENOLEDGER, arc.rc);
+        }
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,497 @@
+package org.apache.bookkeeper.test;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.bookkeeper.client.AddCallback;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.LedgerSequence;
+import org.apache.bookkeeper.client.ReadCallback;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.util.ClientBase;
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ServerStats;
+import org.apache.zookeeper.server.ZooKeeperServer;
+
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.Test;
+
+/**
+ * This test tests read and write, synchronous and 
+ * asynchronous, strings and integers for a BookKeeper client. 
+ * The test deployment uses a ZooKeeper server 
+ * and three BookKeepers. 
+ * 
+ */
+
+public class BookieReadWriteTest 
+	extends junit.framework.TestCase 
+	implements AddCallback, ReadCallback{
+
+	//Depending on the taste, select the amount of logging
+	// by decommenting one of the two lines below
+	//static Logger LOG = Logger.getRootLogger();
+	static Logger LOG = Logger.getLogger(BookieClientTest.class);
+
+	static ConsoleAppender ca = new ConsoleAppender(new PatternLayout());
+
+	// ZooKeeper related variables
+    private static final String HOSTPORT = "127.0.0.1:2181";
+	static Integer ZooKeeperDefaultPort = 2181;
+	ZooKeeperServer zks;
+	ZooKeeper zkc; //zookeeper client
+	NIOServerCnxn.Factory serverFactory;
+	File ZkTmpDir;
+	
+	//BookKeeper 
+	File tmpDirB1, tmpDirB2, tmpDirB3;
+	BookieServer bs1, bs2, bs3;
+	Integer initialPort = 5000;
+	BookKeeper bkc; // bookkeeper client
+	byte[] ledgerPassword = "aaa".getBytes();
+	LedgerHandle lh;
+	long ledgerId;
+	LedgerSequence ls;
+	
+	//test related variables 
+	int numEntriesToWrite = 20;
+	int maxInt = 2147483647;
+	Random rng; // Random Number Generator 
+	ArrayList<byte[]> entries; // generated entries
+	ArrayList<Integer> entriesSize;
+	
+	// Synchronization
+	SyncObj sync;
+	Set<Object> syncObjs;
+	
+    class SyncObj {
+    	int counter;
+    	boolean value;    	
+    	public SyncObj() {
+			counter = 0;
+			value = false;
+		}    	
+    }
+    
+    @Test
+	public void testReadWriteAsyncSingleClient() throws IOException{
+		try {
+			// Create a BookKeeper client and a ledger
+			bkc = new BookKeeper("127.0.0.1");
+			lh = bkc.createLedger(ledgerPassword);
+			bkc.initMessageDigest("SHA1");
+			ledgerId = lh.getId();
+			LOG.info("Ledger ID: " + lh.getId());
+			for(int i = 0; i < numEntriesToWrite; i++){
+				ByteBuffer entry = ByteBuffer.allocate(4);
+				entry.putInt(rng.nextInt(maxInt));
+				entry.position(0);
+				
+				entries.add(entry.array());
+				entriesSize.add(entry.array().length);
+				bkc.asyncAddEntry(lh, entry.array(), this, sync);
+			}
+			
+			// wait for all entries to be acknowledged
+			synchronized (sync) {
+				if (sync.counter < numEntriesToWrite){
+					LOG.debug("Entries counter = " + sync.counter);
+					sync.wait();
+				}
+			}
+			
+			LOG.debug("*** WRITE COMPLETED ***");
+			// close ledger 
+			bkc.closeLedger(lh);
+			
+			//*** WRITING PART COMPLETED // READ PART BEGINS ***
+			
+			// open ledger
+			lh = bkc.openLedger(ledgerId, ledgerPassword);
+			LOG.debug("Number of entries written: " + lh.getLast());
+			assertTrue("Verifying number of entries written", lh.getLast() == numEntriesToWrite);		
+			
+			//read entries
+			bkc.asyncReadEntries(lh, 0, numEntriesToWrite - 1, this, (Object) sync);
+			
+			synchronized (sync) {
+				while(sync.value == false){
+					sync.wait();
+				}				
+			}
+			
+			assertTrue("Checking number of read entries", ls.size() == numEntriesToWrite);
+			
+			LOG.debug("*** READ COMPLETED ***");
+			
+			// at this point, LedgerSequence ls is filled with the returned values
+			int i = 0;
+			while(ls.hasMoreElements()){
+			    ByteBuffer origbb = ByteBuffer.wrap(entries.get(i));
+				Integer origEntry = origbb.getInt();
+				byte[] entry = ls.nextElement().getEntry();
+				ByteBuffer result = ByteBuffer.wrap(entry);
+				LOG.debug("Length of result: " + result.capacity());
+				LOG.debug("Original entry: " + origEntry);
+
+				Integer retrEntry = result.getInt();
+				LOG.debug("Retrieved entry: " + retrEntry);
+				assertTrue("Checking entry " + i + " for equality", origEntry.equals(retrEntry));
+				assertTrue("Checking entry " + i + " for size", entry.length == entriesSize.get(i).intValue());
+				i++;
+			}
+			bkc.closeLedger(lh);
+		} catch (KeeperException e) {
+			e.printStackTrace();
+		} catch (BKException e) {
+			e.printStackTrace();
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+		} catch (NoSuchAlgorithmException e) {
+			e.printStackTrace();
+		}
+		
+	}
+	
+    @Test
+	public void testSyncReadAsyncWriteStringsSingleClient() throws IOException{
+    	LOG.info("TEST READ WRITE STRINGS MIXED SINGLE CLIENT");
+		String charset = "utf-8";
+		LOG.debug("Default charset: "  + Charset.defaultCharset());
+		try {
+			// Create a BookKeeper client and a ledger
+			bkc = new BookKeeper("127.0.0.1");
+			lh = bkc.createLedger(ledgerPassword);
+			bkc.initMessageDigest("SHA1");
+			ledgerId = lh.getId();
+			LOG.info("Ledger ID: " + lh.getId());
+			for(int i = 0; i < numEntriesToWrite; i++){
+				int randomInt = rng.nextInt(maxInt);
+				byte[] entry = new String(Integer.toString(randomInt)).getBytes(charset);
+				entries.add(entry);
+				bkc.asyncAddEntry(lh, entry, this, sync);
+			}
+			
+			// wait for all entries to be acknowledged
+			synchronized (sync) {
+				if (sync.counter < numEntriesToWrite){
+					LOG.debug("Entries counter = " + sync.counter);
+					sync.wait();
+				}
+			}
+			
+			LOG.debug("*** ASYNC WRITE COMPLETED ***");
+			// close ledger 
+			bkc.closeLedger(lh);
+			
+			//*** WRITING PART COMPLETED // READ PART BEGINS ***
+			
+			// open ledger
+			lh = bkc.openLedger(ledgerId, ledgerPassword);
+			LOG.debug("Number of entries written: " + lh.getLast());
+			assertTrue("Verifying number of entries written", lh.getLast() == numEntriesToWrite);		
+			
+			//read entries			
+			ls = bkc.readEntries(lh, 0, numEntriesToWrite - 1);
+			
+			assertTrue("Checking number of read entries", ls.size() == numEntriesToWrite);
+			
+			LOG.debug("*** SYNC READ COMPLETED ***");
+			
+			// at this point, LedgerSequence ls is filled with the returned values
+			int i = 0;
+			while(ls.hasMoreElements()){
+				byte[] origEntryBytes = entries.get(i++);
+				byte[] retrEntryBytes = ls.nextElement().getEntry();
+				
+				LOG.debug("Original byte entry size: " + origEntryBytes.length);
+				LOG.debug("Saved byte entry size: " + retrEntryBytes.length);
+				
+				String origEntry = new String(origEntryBytes, charset);
+				String retrEntry = new String(retrEntryBytes, charset);
+				
+				LOG.debug("Original entry: " + origEntry);
+				LOG.debug("Retrieved entry: " + retrEntry);
+				
+				assertTrue("Checking entry " + i + " for equality", origEntry.equals(retrEntry));
+			}
+			bkc.closeLedger(lh);
+		} catch (KeeperException e) {
+			e.printStackTrace();
+		} catch (BKException e) {
+			e.printStackTrace();
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+		} catch (NoSuchAlgorithmException e) {
+			e.printStackTrace();
+		}
+		
+	}
+    
+    public void testReadWriteSyncSingleClient() throws IOException {
+		try {
+			// Create a BookKeeper client and a ledger
+			bkc = new BookKeeper("127.0.0.1");
+			lh = bkc.createLedger(ledgerPassword);
+			bkc.initMessageDigest("SHA1");
+			ledgerId = lh.getId();
+			LOG.info("Ledger ID: " + lh.getId());
+			for(int i = 0; i < numEntriesToWrite; i++){
+				ByteBuffer entry = ByteBuffer.allocate(4);
+				entry.putInt(rng.nextInt(maxInt));
+				entry.position(0);
+				entries.add(entry.array());				
+				bkc.addEntry(lh, entry.array());
+			}
+			bkc.closeLedger(lh);
+			lh = bkc.openLedger(ledgerId, ledgerPassword);
+			LOG.debug("Number of entries written: " + lh.getLast());
+			assertTrue("Verifying number of entries written", lh.getLast() == numEntriesToWrite);		
+			
+			ls = bkc.readEntries(lh, 0, numEntriesToWrite - 1);
+			int i = 0;
+			while(ls.hasMoreElements()){
+			    ByteBuffer origbb = ByteBuffer.wrap(entries.get(i++));
+				Integer origEntry = origbb.getInt();
+				ByteBuffer result = ByteBuffer.wrap(ls.nextElement().getEntry());
+				LOG.debug("Length of result: " + result.capacity());
+				LOG.debug("Original entry: " + origEntry);
+
+				Integer retrEntry = result.getInt();
+				LOG.debug("Retrieved entry: " + retrEntry);
+				assertTrue("Checking entry " + i + " for equality", origEntry.equals(retrEntry));
+			}
+			bkc.closeLedger(lh);
+		} catch (KeeperException e) {
+			e.printStackTrace();
+		} catch (BKException e) {
+			e.printStackTrace();
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+		} catch (NoSuchAlgorithmException e) {
+			e.printStackTrace();
+		}
+	}
+	
+    public void testReadWriteZero() throws IOException {
+		try {
+			// Create a BookKeeper client and a ledger
+			bkc = new BookKeeper("127.0.0.1");
+			lh = bkc.createLedger(ledgerPassword);
+			bkc.initMessageDigest("SHA1");
+			ledgerId = lh.getId();
+			LOG.info("Ledger ID: " + lh.getId());
+			for(int i = 0; i < numEntriesToWrite; i++){				
+				bkc.addEntry(lh, new byte[0]);
+			}
+			
+			/*
+			 * Write a non-zero entry
+			 */
+			ByteBuffer entry = ByteBuffer.allocate(4);
+			entry.putInt(rng.nextInt(maxInt));
+			entry.position(0);
+			entries.add(entry.array());				
+			bkc.addEntry(lh, entry.array());
+			
+			bkc.closeLedger(lh);
+			lh = bkc.openLedger(ledgerId, ledgerPassword);
+			LOG.debug("Number of entries written: " + lh.getLast());
+			assertTrue("Verifying number of entries written", lh.getLast() == (numEntriesToWrite + 1));		
+			
+			ls = bkc.readEntries(lh, 0, numEntriesToWrite - 1);
+			int i = 0;
+			while(ls.hasMoreElements()){
+				ByteBuffer result = ByteBuffer.wrap(ls.nextElement().getEntry());
+				LOG.debug("Length of result: " + result.capacity());
+				
+				assertTrue("Checking if entry " + i + " has zero bytes", result.capacity() == 0);
+			}
+			bkc.closeLedger(lh);
+		} catch (KeeperException e) {
+			e.printStackTrace();
+		} catch (BKException e) {
+			e.printStackTrace();
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+		} catch (NoSuchAlgorithmException e) {
+			e.printStackTrace();
+		}
+	}
+    
+    
+	public void addComplete(int rc, long ledgerId, long entryId, Object ctx) {
+		SyncObj x = (SyncObj) ctx;
+		synchronized (x) {
+			x.counter++;
+			x.notify();
+		}
+	}
+
+	public void readComplete(int rc, long ledgerId, LedgerSequence seq,
+			Object ctx) {
+		ls = seq;				
+		synchronized (sync) {
+			sync.value = true;
+			sync.notify();
+		}
+		
+	}
+	 
+	protected void setUp() throws IOException {
+		LOG.addAppender(ca);
+		LOG.setLevel((Level) Level.DEBUG);
+		
+		// create a ZooKeeper server(dataDir, dataLogDir, port)
+		LOG.debug("Running ZK server");
+		//ServerStats.registerAsConcrete();
+		ClientBase.setupTestEnv();
+		ZkTmpDir = File.createTempFile("zookeeper", "test");
+        ZkTmpDir.delete();
+        ZkTmpDir.mkdir();
+		    
+		try {
+			zks = new ZooKeeperServer(ZkTmpDir, ZkTmpDir, ZooKeeperDefaultPort);
+			serverFactory =  new NIOServerCnxn.Factory(ZooKeeperDefaultPort);
+			serverFactory.startup(zks);
+		} catch (IOException e1) {
+			// TODO Auto-generated catch block
+			e1.printStackTrace();
+		} catch (InterruptedException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+        boolean b = ClientBase.waitForServerUp(HOSTPORT, ClientBase.CONNECTION_TIMEOUT);
+		
+        LOG.debug("Server up: " + b);
+        
+		// create a zookeeper client
+		LOG.debug("Instantiate ZK Client");
+		zkc = new ZooKeeper("127.0.0.1", ZooKeeperDefaultPort, new emptyWatcher());
+		
+		//initialize the zk client with values
+		try {
+			zkc.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+			zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+			zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+			zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + 1), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+			zkc.create("/ledgers/available/127.0.0.1:" + Integer.toString(initialPort + 2), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+		} catch (KeeperException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		} catch (InterruptedException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+		
+		// Create Bookie Servers (B1, B2, B3)
+		tmpDirB1 = File.createTempFile("bookie1", "test");
+        tmpDirB1.delete();
+        tmpDirB1.mkdir();
+		 
+		bs1 = new BookieServer(initialPort, tmpDirB1, new File[]{tmpDirB1});
+		bs1.start();
+		
+		tmpDirB2 = File.createTempFile("bookie2", "test");
+        tmpDirB2.delete();
+        tmpDirB2.mkdir();
+		    
+		bs2 = new BookieServer(initialPort + 1, tmpDirB2, new File[]{tmpDirB2});
+		bs2.start();
+
+		tmpDirB3 = File.createTempFile("bookie3", "test");
+        tmpDirB3.delete();
+        tmpDirB3.mkdir();
+        
+		bs3 = new BookieServer(initialPort + 2, tmpDirB3, new File[]{tmpDirB3});
+		bs3.start();
+		
+		rng = new Random(System.currentTimeMillis());	// Initialize the Random Number Generator 
+		entries = new ArrayList<byte[]>(); // initialize the  entries list
+		entriesSize = new ArrayList<Integer>(); 
+		sync = new SyncObj(); // initialize the synchronization data structure
+	}
+	
+	protected void tearDown(){
+		LOG.debug("TearDown");
+
+		//shutdown bookie servers 
+		try {
+			bs1.shutdown();
+			bs2.shutdown();
+			bs3.shutdown();
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+		}
+		cleanUpDir(tmpDirB1);
+		cleanUpDir(tmpDirB2);
+		cleanUpDir(tmpDirB3);
+		
+		//shutdown ZK server
+		serverFactory.shutdown();
+		assertTrue("waiting for server down",
+                ClientBase.waitForServerDown(HOSTPORT,
+                                             ClientBase.CONNECTION_TIMEOUT));
+		//ServerStats.unregister();
+		cleanUpDir(ZkTmpDir);
+		
+	}
+
+	/*	Clean up a directory recursively */
+	protected boolean cleanUpDir(File dir){
+		if (dir.isDirectory()) {
+			System.err.println("Cleaning up " + dir.getName());
+            String[] children = dir.list();
+            for (String string : children) {
+				boolean success = cleanUpDir(new File(dir, string));
+				if (!success) return false;
+			}
+        }
+        // The directory is now empty so delete it
+        return dir.delete();		
+	}
+
+	/*	User for testing purposes, void */
+	class emptyWatcher implements Watcher{
+		public void process(WatchedEvent event) {}
+	}
+
+}



Mime
View raw message