Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,250 @@
+package org.apache.bookkeeper.client;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.security.NoSuchAlgorithmException;
+import java.security.MessageDigest;
+
+import org.apache.bookkeeper.client.LedgerHandle.QMode;
+import org.apache.bookkeeper.client.QuorumEngine.Operation;
+import org.apache.bookkeeper.client.QuorumEngine.SubOp;
+import org.apache.bookkeeper.client.QuorumEngine.Operation.AddOp;
+import org.apache.bookkeeper.client.QuorumEngine.Operation.ReadOp;
+import org.apache.bookkeeper.client.QuorumEngine.SubOp.SubAddOp;
+import org.apache.bookkeeper.client.QuorumEngine.SubOp.SubReadOp;
+import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.ReadEntryCallback;
+import org.apache.bookkeeper.proto.WriteCallback;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Maintains a queue of request to a given bookie. For verifiable
+ * ledgers, it computes the digest.
+ *
+ */
+
+public class BookieHandle extends Thread{
+ Logger LOG = Logger.getLogger(BookieClient.class);
+
+ boolean stop = false;
+ LedgerHandle self;
+ BookieClient client;
+ InetSocketAddress addr;
+ static int recvTimeout = 2000;
+ ArrayBlockingQueue<ToSend> incomingQueue;
+
+ /**
+ * Objects of this class are queued waiting to be
+ * processed.
+ */
+ class ToSend {
+ long entry = -1;
+ Object ctx;
+ int type;
+
+ ToSend(SubOp sop, long entry){
+ this.type = sop.op.type;
+ this.entry = entry;
+ this.ctx = sop;
+ }
+ }
+
+ /**
+ * @param lh ledger handle
+ * @param addr address
+ */
+ BookieHandle(LedgerHandle lh, InetSocketAddress addr) throws IOException {
+ this.client = new BookieClient(addr, recvTimeout);
+ this.self = lh;
+ this.addr = addr;
+ this.incomingQueue = new ArrayBlockingQueue<ToSend>(2000);
+
+ start();
+ }
+
+ /**
+ * Restart BookieClient if can't talk to bookie
+ *
+ * @return
+ * @throws IOException
+ */
+ void restart() throws IOException {
+ this.client = new BookieClient(addr, recvTimeout);
+ }
+
+ /**
+ * Sending add operation to bookie
+ *
+ * @param r
+ * @param cb
+ * @param ctx
+ * @throws IOException
+ */
+ public void sendAdd(SubAddOp r, long entry)
+ throws IOException {
+ try{
+ incomingQueue.put(new ToSend(r, entry));
+ } catch(InterruptedException e){
+ e.printStackTrace();
+ }
+ //client.addEntry(self.getId(),
+ // r.entry,
+ // ByteBuffer.wrap(r.data),
+ // cb,
+ // ctx);
+ }
+
+ /**
+ * Message disgest instance
+ *
+ */
+ MessageDigest digest = null;
+
+ /**
+ * Get digest instance if there is none.
+ *
+ */
+ MessageDigest getDigestInstance(String alg)
+ throws NoSuchAlgorithmException {
+ if(digest == null){
+ digest = MessageDigest.getInstance(alg);
+ }
+
+ return digest;
+ }
+
+ /**
+ * Computes the digest for a given ByteBuffer.
+ *
+ */
+
+ public ByteBuffer addDigest(long entryId, ByteBuffer data)
+ throws NoSuchAlgorithmException, IOException {
+ if(digest == null)
+ getDigestInstance(self.getDigestAlg());
+
+ ByteBuffer bb = ByteBuffer.allocate(8 + 8);
+ bb.putLong(self.getId());
+ bb.putLong(entryId);
+
+ byte[] msgDigest;
+
+ // synchronized(LedgerHandle.digest){
+ digest.update(self.getPasswdHash());
+ digest.update(bb.array());
+ digest.update(data.array());
+
+ //baos.write(data);
+ //baos.write(Operation.digest.digest());
+ msgDigest = digest.digest();
+ //}
+ ByteBuffer extendedData = ByteBuffer.allocate(data.capacity() + msgDigest.length);
+ data.rewind();
+ extendedData.put(data);
+ extendedData.put(msgDigest);
+
+ //LOG.debug("Data length (" + self.getId() + ", " + entryId + "): " + data.capacity());
+ //LOG.debug("Digest: " + new String(msgDigest));
+
+ return extendedData;
+ }
+
+ /**
+ * Sending read operation to bookie
+ *
+ * @param r
+ * @param entry
+ * @param cb
+ * @param ctx
+ * @throws IOException
+ */
+ public void sendRead(SubReadOp r, long entry)
+ throws IOException {
+ //LOG.debug("readEntry: " + entry);
+ try{
+ incomingQueue.put(new ToSend(r, entry));
+ } catch(InterruptedException e){
+ e.printStackTrace();
+ }
+ }
+
+ public void run(){
+ while(!stop){
+ try{
+ ToSend ts = incomingQueue.poll(1000, TimeUnit.MILLISECONDS);
+ if(ts != null){
+ switch(ts.type){
+ case Operation.ADD:
+ SubAddOp aOp = (SubAddOp) ts.ctx;
+ AddOp op = ((AddOp) aOp.op);
+
+ /*
+ * TODO: Really add the confirmed add to the op
+ */
+ long confirmed = self.getAddConfirmed();
+ //LOG.info("Confirmed: " + confirmed);
+ ByteBuffer extendedData = ByteBuffer.allocate(op.data.length + 8);
+ extendedData.putLong(confirmed);
+ extendedData.put(op.data);
+ extendedData.rewind();
+
+ if(self.getQMode() == QMode.VERIFIABLE){
+ extendedData = addDigest(ts.entry, extendedData);
+ }
+
+ //LOG.debug("Extended data: " + extendedData.capacity());
+ client.addEntry(self.getId(),
+ ts.entry,
+ extendedData,
+ aOp.wcb,
+ ts.ctx);
+ break;
+ case Operation.READ:
+ client.readEntry(self.getId(),
+ ts.entry,
+ ((SubReadOp) ts.ctx).rcb,
+ ts.ctx);
+ break;
+ }
+ }
+ } catch (InterruptedException e){
+ LOG.error(e);
+ } catch (IOException e){
+ LOG.error(e);
+ } catch (NoSuchAlgorithmException e){
+ LOG.error(e);
+ }
+ }
+ }
+
+ void halt(){
+ stop = true;
+ }
+}
+
+
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,112 @@
+package org.apache.bookkeeper.client;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.client.QuorumEngine.Operation;
+import org.apache.bookkeeper.client.QuorumEngine.Operation.AddOp;
+import org.apache.bookkeeper.client.QuorumEngine.Operation.ReadOp;
+import org.apache.log4j.Logger;
+
+/**
+ * Thread responsible for delivering results to clients. This thread
+ * basically isolates the application from the remainder of the
+ * BookKeeper client.
+ *
+ */
+
+class ClientCBWorker extends Thread{
+ Logger LOG = Logger.getLogger(ClientCBWorker.class);
+ static ClientCBWorker instance = null;
+
+ private boolean stop = false;
+
+ ArrayBlockingQueue<Operation> pendingOps;
+ QuorumOpMonitor monitor;
+
+
+ static synchronized ClientCBWorker getInstance(){
+ if(instance == null){
+ instance = new ClientCBWorker();
+ }
+
+ return instance;
+ }
+
+ ClientCBWorker(){
+ pendingOps = new ArrayBlockingQueue<Operation>(4000);
+ start();
+ LOG.debug("Have started cbWorker");
+ }
+
+ void addOperation(Operation op)
+ throws InterruptedException {
+ pendingOps.put(op);
+ LOG.debug("Added operation to queue of pending");
+ }
+
+ synchronized void shutdown(){
+ stop = true;
+ instance = null;
+ LOG.debug("Shutting down");
+ }
+
+ public void run(){
+ try{
+ while(!stop){
+ LOG.debug("Going to sleep on queue");
+ Operation op = pendingOps.poll(1000, TimeUnit.MILLISECONDS);
+ if(op != null){
+ synchronized(op){
+ while(!op.isReady()){
+ op.wait();
+ }
+ }
+ LOG.debug("Request ready");
+
+ switch(op.type){
+ case Operation.ADD:
+ AddOp aOp = (AddOp) op;
+
+ aOp.cb.addComplete(aOp.getErrorCode(),
+ aOp.getLedger().getId(), aOp.entry,
+ aOp.ctx);
+ aOp.getLedger().setAddConfirmed(aOp.entry);
+ break;
+ case Operation.READ:
+ ReadOp rOp = (ReadOp) op;
+ LOG.debug("Got one message from the queue: " + rOp.firstEntry);
+ rOp.cb.readComplete(rOp.getErrorCode(),
+ rOp.getLedger().getId(),
+ new LedgerSequence(rOp.seq),
+ rOp.ctx);
+ break;
+ }
+ }
+ }
+ } catch (InterruptedException e){
+ LOG.error("Exception while waiting on queue or operation");
+ }
+ }
+}
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ErrorCodes.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ErrorCodes.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ErrorCodes.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ErrorCodes.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,29 @@
+package org.apache.bookkeeper.client;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+public class ErrorCodes {
+
+ static final int ENUMRETRIES = -1;
+ static final int ENOAVAILABLEBOOKIE = -2;
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerEntry.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerEntry.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerEntry.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerEntry.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,63 @@
+package org.apache.bookkeeper.client;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+
+import org.apache.log4j.Logger;
+
+/**
+ * Ledger entry. Currently only holds the necessary
+ * fields to identify a ledger entry, and the entry
+ * content.
+ *
+ */
+
+
+public class LedgerEntry {
+ Logger LOG = Logger.getLogger(LedgerEntry.class);
+
+ private long lId;
+ private long eId;
+ private byte[] entry;
+
+ LedgerEntry(long lId, long eId, byte[] entry){
+ this.lId = lId;
+ this.eId = eId;
+ this.entry = entry;
+ if(entry != null)
+ LOG.debug("Entry: " + entry.length + " , " + new String(entry));
+ else
+ LOG.debug("Entry is null");
+ }
+
+ public long getLedgerId(){
+ return lId;
+ }
+
+ public long getEntryId(){
+ return eId;
+ }
+
+ public byte[] getEntry(){
+ return entry;
+ }
+}
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,349 @@
+package org.apache.bookkeeper.client;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ConnectException;
+import java.nio.ByteBuffer;
+import java.security.NoSuchAlgorithmException;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookieHandle;
+import org.apache.bookkeeper.client.BKException.Code;
+import org.apache.log4j.Logger;
+
+
+
+
+/**
+ * Ledger handle on the client side. Contains ledger metadata
+ * used to access it.
+ *
+ */
+
+public class LedgerHandle {
+ Logger LOG = Logger.getLogger(LedgerHandle.class);
+
+ public enum QMode {VERIFIABLE, GENERIC, FREEFORM};
+
+
+ private long ledger;
+ private volatile long last;
+ private volatile long lastAddConfirmed = 0;
+ private ArrayList<BookieHandle> bookies;
+ private ArrayList<InetSocketAddress> bookieAddrList;
+ private BookKeeper bk;
+
+ private int qSize;
+ private QMode qMode = QMode.VERIFIABLE;
+
+ private int threshold;
+ private String digestAlg = "SHA1";
+
+ private byte[] passwdHash;
+ private byte[] passwd;
+
+ LedgerHandle(BookKeeper bk,
+ long ledger,
+ long last,
+ byte[] passwd) throws InterruptedException {
+ this.bk = bk;
+ this.ledger = ledger;
+ this.last = last;
+ this.bookies = new ArrayList<BookieHandle>();
+ this.passwd = passwd;
+ genPasswdHash(passwd);
+
+ this.qSize = (bookies.size() + 1)/2;
+ }
+
+ LedgerHandle(BookKeeper bk,
+ long ledger,
+ long last,
+ int qSize,
+ QMode mode,
+ byte[] passwd) throws InterruptedException {
+ this.bk = bk;
+ this.ledger = ledger;
+ this.last = last;
+ this.bookies = new ArrayList<BookieHandle>();
+
+ this.qSize = qSize;
+ this.qMode = mode;
+ this.passwd = passwd;
+ genPasswdHash(passwd);
+ }
+
+
+ LedgerHandle(BookKeeper bk,
+ long ledger,
+ long last,
+ int qSize,
+ byte[] passwd) throws InterruptedException {
+ this.bk = bk;
+ this.ledger = ledger;
+ this.last = last;
+ this.bookies = new ArrayList<BookieHandle>();
+
+ this.qSize = qSize;
+ this.passwd = passwd;
+ genPasswdHash(passwd);
+ }
+
+ private void setBookies(ArrayList<InetSocketAddress> bookies)
+ throws InterruptedException {
+ for(InetSocketAddress a : bookies){
+ LOG.debug("Opening bookieHandle: " + a);
+ try{
+ BookieHandle bh = new BookieHandle(this, a);
+ this.bookies.add(bh);
+ } catch(ConnectException e){
+ LOG.error(e + "(bookie: " + a + ")");
+
+ InetSocketAddress addr = null;
+ addr = bk.getNewBookie(bookies);
+
+ if(addr != null){
+ bookies.add(addr);
+ }
+ } catch(IOException e) {
+ LOG.error(e);
+ }
+ }
+ }
+
+
+ /**
+ * Create bookie handle and add it to the list
+ *
+ * @param addr socket address
+ */
+ void addBookie(InetSocketAddress addr)
+ throws IOException {
+ BookieHandle bh = new BookieHandle(this, addr);
+ this.bookies.add(bh);
+
+ if(bookies.size() > qSize) setThreshold();
+ }
+
+ private void setThreshold(){
+ switch(qMode){
+ case GENERIC:
+ threshold = bookies.size() - qSize/2;
+ break;
+ case VERIFIABLE:
+ threshold = bookies.size() - qSize + 1;
+ break;
+ default:
+ threshold = bookies.size();
+ }
+
+ }
+
+ public int getThreshold(){
+ return threshold;
+ }
+
+ /**
+ * Replace bookie in the case of a failure
+ */
+
+ void replaceBookie(int index)
+ throws BKException {
+ InetSocketAddress addr = null;
+ try{
+ addr = bk.getNewBookie(bookieAddrList);
+ } catch(InterruptedException e){
+ LOG.error(e);
+ }
+
+ if(addr == null){
+ throw BKException.create(Code.NoBookieAvailableException);
+ } else {
+ try{
+ BookieHandle bh = new BookieHandle(this, addr);
+
+ /*
+ * TODO: Read from current bookies, and write to this one
+ */
+
+ /*
+ * If successful in writing to new bookie, add it to the set
+ */
+ this.bookies.set(index, bh);
+ } catch(ConnectException e){
+ bk.blackListBookie(addr);
+ LOG.error(e);
+ } catch(IOException e) {
+ bk.blackListBookie(addr);
+ LOG.error(e);
+ }
+ }
+ }
+
+ /**
+ * This method is used when BK cannot find a bookie
+ * to replace the current faulty one. In such cases,
+ * we simply remove the bookie.
+ *
+ * @param index
+ */
+ void removeBookie(int index){
+ bookies.remove(index);
+ }
+
+ void close(){
+ ledger = -1;
+ last = -1;
+ for(BookieHandle bh : bookies){
+ bh.halt();
+ }
+ }
+
+
+ /**
+ * Returns the ledger identifier
+ * @return long
+ */
+ public long getId(){
+ return ledger;
+ }
+
+ /**
+ * Returns the last entry identifier submitted
+ * @return long
+ */
+ public long getLast(){
+ return last;
+ }
+
+ /**
+ * Returns the last entry identifier submitted and increments it.
+ * @return long
+ */
+ long incLast(){
+ return last++;
+ }
+
+ /**
+ * Returns the last entry identifier submitted and increments it.
+ * @return long
+ */
+ long setLast(long last){
+ this.last = last;
+ return this.last;
+ }
+
+ /**
+ * Sets the value of the last add confirmed. This is used
+ * when adding new entries, since we use this value as a hint
+ * to recover from failures of the client.
+ */
+ void setAddConfirmed(long entryId){
+ if(entryId > lastAddConfirmed)
+ lastAddConfirmed = entryId;
+ }
+
+ long getAddConfirmed(){
+ return lastAddConfirmed;
+ }
+
+ /**
+ * Returns the list of bookies
+ * @return ArrayList<BookieHandle>
+ */
+ ArrayList<BookieHandle> getBookies(){
+ return bookies;
+ }
+
+ /**
+ * Return the quorum size. By default, the size of a quorum is (n+1)/2,
+ * where n is the size of the set of bookies.
+ * @return int
+ */
+ int getQuorumSize(){
+ return qSize;
+ }
+
+ /**
+ * Returns the quorum mode for this ledger: Verifiable or Generic
+ */
+ QMode getQMode(){
+ return qMode;
+ }
+
+ /**
+ * Sets message digest algorithm.
+ */
+
+ void setDigestAlg(String alg){
+ this.digestAlg = alg;
+ }
+
+ /**
+ * Get message digest algorithm.
+ */
+
+ String getDigestAlg(){
+ return digestAlg;
+ }
+
+ /**
+ * Generates and stores password hash.
+ *
+ * @param passwd
+ */
+
+ private void genPasswdHash(byte[] passwd){
+ try{
+ MessageDigest digest = MessageDigest.getInstance("MD5");
+
+ digest.update(passwd);
+ this.passwdHash = digest.digest();
+ } catch(NoSuchAlgorithmException e){
+ this.passwd = passwd;
+ LOG.error("Storing password as plain text because secure hash implementation does not exist");
+ }
+ }
+
+
+ /**
+ * Returns password in plain text
+ */
+ byte[] getPasswd(){
+ return passwd;
+ }
+
+
+ /**
+ * Returns password hash
+ *
+ * @return byte[]
+ */
+ byte[] getPasswdHash(){
+ return passwdHash;
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,213 @@
+package org.apache.bookkeeper.client;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.lang.Math;
+import java.lang.InterruptedException;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.HashMap;
+import java.util.TreeMap;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerSequence;
+import org.apache.bookkeeper.client.BKException.Code;
+import org.apache.bookkeeper.client.LedgerHandle.QMode;
+import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.ReadEntryCallback;
+import org.apache.log4j.Logger;
+
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Implements the mechanism to recover a ledger that was not closed
+ * correctly. It reads entries from the ledger using the hint field
+ * until it finds the last entry written. It then writes to ZooKeeper.
+ *
+ */
+
+class LedgerRecoveryMonitor implements ReadEntryCallback{
+ Logger LOG = Logger.getLogger(LedgerRecoveryMonitor.class);
+
+ BookKeeper self;
+ long lId;
+ int qSize;
+ QMode qMode;
+ ArrayList<InetSocketAddress> bookies;
+ ArrayList<BookieClient> clients;
+ HashMap<Long, ArrayList<ByteBuffer> > votes;
+ TreeMap<Long, Integer > hints;
+ AtomicInteger counter;
+
+ private int minimum;
+
+ LedgerRecoveryMonitor(BookKeeper self,
+ long lId,
+ int qSize,
+ ArrayList<InetSocketAddress> bookies,
+ QMode qMode){
+ this.self = self;
+ this.lId = lId;
+ this.qSize = qSize;
+ this.qMode = qMode;
+ this.bookies = bookies;
+ this.clients = new ArrayList<BookieClient>();
+ this.votes = new HashMap<Long, ArrayList<ByteBuffer> >();
+ this.hints = new TreeMap<Long, Integer>();
+ this.counter = new AtomicInteger(0);
+
+ this.minimum = bookies.size();
+ if(qMode == QMode.VERIFIABLE){
+ this.minimum += 1 - qSize;
+ } else if(qMode == QMode.GENERIC){
+ this.minimum -= Math.floor(qSize/2);
+ }
+
+ }
+
+ boolean recover(byte[] passwd) throws
+ IOException, InterruptedException, BKException, KeeperException {
+ /*
+ * Create BookieClient objects and send a request to each one.
+ */
+
+ for(InetSocketAddress s : bookies){
+ LOG.info(s);
+ BookieClient client = new BookieClient(s, 3000);
+ clients.add(client);
+ client.readEntry(lId,
+ -1,
+ this,
+ null);
+ }
+
+ /*
+ * Wait until I have received enough responses
+ */
+ synchronized(counter){
+ LOG.info("Counter: " + counter.get() + ", " + minimum + ", " + qMode);
+ if(counter.get() < minimum){
+ LOG.info("Waiting...");
+ counter.wait(5000);
+ }
+ }
+
+ /*
+ * Obtain largest hint
+ */
+
+ LedgerHandle lh = new LedgerHandle(self, lId, 0, qSize, qMode, passwd);
+ self.engines.put(lh.getId(), new QuorumEngine(lh));
+ for(InetSocketAddress addr : bookies){
+ lh.addBookie(addr);
+ }
+
+ boolean notLegitimate = true;
+ long readCounter = 0;
+ while(notLegitimate){
+ readCounter = getNextHint();
+ if(readCounter != -1){
+ lh.setLast(readCounter - 1);
+ boolean hasMore = true;
+ while(hasMore){
+ hasMore = false;
+ LOG.debug("Recovering: " + lh.getLast());
+ LedgerSequence ls = self.readEntries(lh, lh.getLast(), lh.getLast());
+ //if(ls == null) throw BKException.create(Code.ReadException);
+ LOG.debug("Received entry for: " + lh.getLast());
+
+ if(ls.nextElement().getEntry() != null){
+ if(notLegitimate) notLegitimate = false;
+ lh.incLast();
+ hasMore = true;
+ }
+ }
+ } else break;
+ }
+
+ /*
+ * Write counter as the last entry of ledger
+ */
+ if(!notLegitimate){
+ //lh.setLast(readCounter);
+ self.closeLedger(lh);
+
+ return true;
+ } else {
+ lh.setLast(0);
+ self.closeLedger(lh);
+
+ return false;
+ }
+
+ }
+
+
+ public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuffer bb, Object ctx){
+ if(rc == 0){
+ bb.rewind();
+
+ /*
+ * Collect new vote
+ */
+ if(!votes.containsKey(entryId)){
+ votes.put(entryId, new ArrayList<ByteBuffer>());
+ }
+ votes.get(entryId).add(bb);
+
+ /*
+ * Extract hint
+ */
+
+ bb.position(16);
+ long hint = bb.getLong();
+
+ LOG.info("Received a response: " + rc + ", " + entryId + ", " + hint);
+
+ if(!hints.containsKey(hint)){
+ hints.put(hint, 0);
+ }
+ hints.put(hint, hints.get(hint) + 1);
+
+ synchronized(counter){
+ if(counter.incrementAndGet() >= minimum);
+ counter.notify();
+ }
+ } else {
+ LOG.debug("rc != 0");
+ }
+
+ }
+
+ private long getNextHint(){
+ if(hints.size() == 0) return -1;
+
+ long hint = hints.lastKey();
+ hints.remove(hint);
+
+ return hint;
+ }
+}
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerSequence.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerSequence.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerSequence.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerSequence.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,65 @@
+package org.apache.bookkeeper.client;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Arrays;
+import java.util.NoSuchElementException;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Sequence of entries of a ledger. Used to return a sequence of entries
+ * upon an asynchornous read call.
+ *
+ */
+
+
+public class LedgerSequence
+implements Enumeration<LedgerEntry> {
+ Logger LOG = Logger.getLogger(LedgerSequence.class);
+
+ int index = 0;
+ List<LedgerEntry> seq;
+
+ LedgerSequence(LedgerEntry[] seq){
+ this.seq = Arrays.asList(seq);
+ LOG.debug("Sequence provided: " + this.seq.size());
+ }
+
+ public boolean hasMoreElements(){
+ if(index < seq.size())
+ return true;
+ else
+ return false;
+ }
+
+ public LedgerEntry nextElement() throws NoSuchElementException{
+ LOG.debug("Next element of sequence: " + seq.size() + ", " + index);
+ return seq.get(index++);
+ }
+
+ public int size(){
+ return seq.size();
+ }
+}
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerStream.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerStream.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerStream.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerStream.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,104 @@
+package org.apache.bookkeeper.client;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.NoSuchElementException;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Sequence of entries of a ledger. Used to return a sequence of entries
+ * upon an asynchornous read call.
+ *
+ * This is feature is under construction.
+ *
+ */
+
+public class LedgerStream {
+ Logger LOG = Logger.getLogger(LedgerStream.class);
+
+ private ArrayList<LedgerEntry> pending;
+ private ArrayList<LedgerEntry> toDeliver;
+ private long index;
+
+
+ /**
+ * Constructor takes the first entry id expected.
+ *
+ * @param first long
+ */
+ public LedgerStream(long first){
+ pending = new ArrayList<LedgerEntry>();
+ toDeliver = new ArrayList<LedgerEntry>();
+ index = first;
+ }
+
+ /**
+ * Read the next entry if available. It blocks if the next entry
+ * is not yet available.
+ */
+
+ public LedgerEntry readEntry(){
+ synchronized(toDeliver){
+ if(toDeliver.size() == 0){
+ try{
+ toDeliver.wait();
+ } catch(InterruptedException e){
+ LOG.info("Received an interrupted exception", e);
+ }
+ }
+ return toDeliver.get(0);
+ }
+ }
+
+ /**
+ * Invoked upon reception of a new ledger entry.
+ *
+ * @param le a new ledger entry to deliver.
+ */
+
+ public void addEntry(LedgerEntry le){
+ synchronized(toDeliver){
+ if(index == le.getEntryId()){
+ toDeliver.add(le);
+ index++;
+
+ boolean noMore = false;
+ while(!noMore){
+ noMore = true;
+ for(int i = 0; i < pending.size(); i++){
+ if(pending.get(i).getEntryId() == index){
+ toDeliver.add(pending.get(i));
+ index++;
+ noMore = false;
+ }
+ }
+ }
+ toDeliver.notify();
+ } else {
+ pending.add(le);
+ }
+ }
+ }
+}
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,265 @@
+package org.apache.bookkeeper.client;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.io.ByteArrayOutputStream;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.client.BookieHandle;
+import org.apache.bookkeeper.client.ClientCBWorker;
+import org.apache.bookkeeper.client.ErrorCodes;
+import org.apache.bookkeeper.client.QuorumOpMonitor;
+import org.apache.bookkeeper.client.QuorumOpMonitor.PendingOp;
+import org.apache.bookkeeper.client.QuorumOpMonitor.PendingReadOp;
+import org.apache.bookkeeper.proto.ReadEntryCallback;
+import org.apache.bookkeeper.proto.WriteCallback;
+import org.apache.log4j.Logger;
+
+
+
+/**
+ * Implements the quorum protocol.It basically handles requests coming
+ * from BookKeeper and forward to the appropriate BookieHandle objects.
+ */
+
+public class QuorumEngine {
+ Logger LOG = Logger.getLogger(QuorumEngine.class);
+
+ //ArrayBlockingQueue<Operation> incomingQueue;
+ QuorumOpMonitor opMonitor;
+ ClientCBWorker cbWorker;
+ LedgerHandle lh;
+ int qRef = 0;
+ boolean stop = false;
+
+ /**
+ * Requests generated by BookKeeper.java upon client calls.
+ * There are three types of requests: READ, ADD, STOP.
+ */
+
+ public static class Operation {
+ public static final int READ = 0;
+ public static final int ADD = 1;
+ public static final int STOP = 2;
+
+
+ int type;
+ LedgerHandle ledger;
+ int rc = 0;
+ boolean ready = false;
+
+ public static class AddOp extends Operation {
+ AddCallback cb;
+ Object ctx;
+ byte[] data;
+ long entry;
+
+ public AddOp(LedgerHandle ledger, byte[] data, AddCallback cb, Object ctx){
+ type = Operation.ADD;
+
+ this.data = data;
+ this.entry = ledger.incLast();
+ this.cb = cb;
+ this.ctx = ctx;
+
+ this.ledger = ledger;
+ }
+
+ }
+
+
+ public static class ReadOp extends Operation {
+ ReadCallback cb;
+ Object ctx;
+ long firstEntry;
+ long lastEntry;
+ LedgerEntry[] seq;
+ AtomicInteger counter;
+ HashMap<Long, AtomicInteger> nacks;
+ //boolean complete;
+
+ public ReadOp(LedgerHandle ledger, long firstEntry, long lastEntry, ReadCallback cb, Object ctx){
+ type = READ;
+
+ this.firstEntry = firstEntry;
+ this.lastEntry = lastEntry;
+ this.cb = cb;
+ this.ctx = ctx;
+ this.seq = new LedgerEntry[(int) (lastEntry - firstEntry + 1)];
+ this.counter = new AtomicInteger(0);
+ this.nacks = new HashMap<Long, AtomicInteger>();
+ //this.complete = false;
+
+ this.ledger = ledger;
+ }
+ }
+
+ public static class StopOp extends Operation {
+ public StopOp(){
+ type = STOP;
+ }
+ }
+
+
+
+
+ void setErrorCode(int rc){
+ this.rc = rc;
+ }
+
+ int getErrorCode(){
+ return this.rc;
+ }
+
+ synchronized boolean isReady(){
+ return ready;
+ }
+
+ synchronized void setReady(){
+ ready = true;
+ this.notify();
+ }
+
+ LedgerHandle getLedger(){
+ return ledger;
+ }
+ }
+
+
+ public static class SubOp{
+ int bIndex;
+ Operation op;
+
+ public static class SubAddOp extends SubOp{
+ PendingOp pOp;
+ WriteCallback wcb;
+
+ SubAddOp(Operation op,
+ PendingOp pOp,
+ int bIndex,
+ WriteCallback wcb){
+ this.op = op;
+ this.pOp = pOp;
+ this.bIndex = bIndex;
+ this.wcb = wcb;
+ }
+ }
+
+ public static class SubReadOp extends SubOp{
+ PendingReadOp pOp;
+ ReadEntryCallback rcb;
+
+ SubReadOp(Operation op,
+ PendingReadOp pOp,
+ int bIndex,
+ ReadEntryCallback rcb){
+ this.op = op;
+ this.pOp = pOp;
+ this.bIndex = bIndex;
+ this.rcb = rcb;
+ }
+ }
+ }
+
+ public QuorumEngine(LedgerHandle lh){
+ this.lh = lh;
+ this.opMonitor = QuorumOpMonitor.getInstance(lh);
+ LOG.debug("Creating cbWorker");
+ this.cbWorker = ClientCBWorker.getInstance();
+ LOG.debug("Created cbWorker");
+ }
+
+ void sendOp(Operation r)
+ throws InterruptedException {
+ switch(r.type){
+ case Operation.READ:
+ Operation.ReadOp rOp = (Operation.ReadOp) r;
+ LOG.debug("Adding read operation to opMonitor: " + rOp.firstEntry + ", " + rOp.lastEntry);
+ cbWorker.addOperation(r);
+
+ for(long entry = rOp.firstEntry;
+ entry <= rOp.lastEntry;
+ entry++){
+
+ //Send requests to bookies
+ for(BookieHandle bh : lh.getBookies()){
+ try{
+ SubOp.SubReadOp sRead = new SubOp.SubReadOp(rOp,
+ new PendingReadOp(lh),
+ lh.getBookies().indexOf(bh),
+ opMonitor);
+ bh.sendRead(sRead, entry);
+
+ } catch(IOException e){
+ LOG.error(e);
+ }
+ }
+ }
+ //r.cb.processResult();
+ break;
+ case Operation.ADD:
+ long counter = 0;
+ //ArrayList<BookieHandle> list = lh.getBookies();
+ int n = lh.getBookies().size();
+
+ LOG.debug("Adding add operation to opMonitor");
+ //opMonitor.addOp(lh, (Operation.AddOp) r);
+ cbWorker.addOperation(r);
+ Operation.AddOp aOp = (Operation.AddOp) r;
+ PendingOp pOp = new PendingOp();
+ while(counter < lh.getQuorumSize() ){
+ int index = (int)((aOp.entry + counter++) % n);
+
+ try{
+ SubOp.SubAddOp sAdd = new
+ SubOp.SubAddOp(aOp,
+ pOp,
+ index,
+ opMonitor);
+ lh.getBookies().get((index) % n).sendAdd(sAdd, aOp.entry);
+ } catch (IOException io) {
+ LOG.error(io);
+ try{
+ /*
+ * Before getting a new bookie, try to reconnect
+ */
+ lh.getBookies().get((index) % n).restart();
+ } catch (IOException nio){
+ lh.removeBookie(index);
+ }
+ }
+ }
+ //qRef = (qRef + 1) % n;
+ break;
+ case Operation.STOP:
+ stop = true;
+ cbWorker.shutdown();
+ break;
+ }
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,496 @@
+package org.apache.bookkeeper.client;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentHashMap;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+
+import org.apache.bookkeeper.client.BookieHandle;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BKException.Code;
+import org.apache.bookkeeper.client.ErrorCodes;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.LedgerHandle.QMode;
+import org.apache.bookkeeper.client.QuorumEngine.Operation;
+import org.apache.bookkeeper.client.QuorumEngine.Operation.AddOp;
+import org.apache.bookkeeper.client.QuorumEngine.Operation.ReadOp;
+import org.apache.bookkeeper.client.QuorumEngine.SubOp.SubAddOp;
+import org.apache.bookkeeper.client.QuorumEngine.SubOp.SubReadOp;
+import org.apache.bookkeeper.proto.ReadEntryCallback;
+import org.apache.bookkeeper.proto.WriteCallback;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Monitors reponses from bookies to requests of a client. It implements
+ * two interfaces of the proto package that correspond to callbacks from
+ * BookieClient objects.
+ *
+ */
+public class QuorumOpMonitor implements WriteCallback, ReadEntryCallback {
+ Logger LOG = Logger.getLogger(QuorumOpMonitor.class);
+
+ LedgerHandle lh;
+
+ static final int MAXRETRIES = 2;
+ static HashMap<Long, QuorumOpMonitor> instances =
+ new HashMap<Long, QuorumOpMonitor>();
+
+ public static QuorumOpMonitor getInstance(LedgerHandle lh){
+ if(instances.get(lh.getId()) == null) {
+ instances.put(lh.getId(), new QuorumOpMonitor(lh));
+ }
+
+ return instances.get(lh.getId());
+ }
+
+ /**
+ * Message disgest instance
+ *
+ */
+ MessageDigest digest = null;
+
+ /**
+ * Get digest instance if there is none.
+ *
+ */
+ MessageDigest getDigestInstance(String alg)
+ throws NoSuchAlgorithmException {
+ if(digest == null){
+ digest = MessageDigest.getInstance(alg);
+ }
+
+ return digest;
+ }
+
+ public static class PendingOp{
+ //Operation op = null;
+ HashSet<Integer> bookieIdSent;
+ HashSet<Integer> bookieIdRecv;
+ int retries = 0;
+
+ PendingOp(){
+ this.bookieIdSent = new HashSet<Integer>();
+ this.bookieIdRecv = new HashSet<Integer>();
+ }
+
+ //PendingOp(Operation op){
+ // this.op = op;
+ // bookieIdSent = new HashSet<Integer>();
+ // bookieIdRecv = new HashSet<Integer>();
+ //}
+
+ //void setOp(Operation op){
+ // this.op = op;
+ //}
+
+ //Operation getOp(){
+ // return this.op;
+ //}
+
+ };
+
+ /**
+ * Objects of this type are used to keep track of the status of
+ * a given write request.
+ *
+ *
+ */
+ //public static class PendingAddOp extends PendingOp{
+ // AddOp op;
+
+ // PendingAddOp(LedgerHandle lh, AddOp op){
+ // this.op = op;
+ // }
+ //}
+
+ /**
+ * Objects of this type are used to keep track of the status of
+ * a given read request.
+ *
+ */
+
+ public static class PendingReadOp extends PendingOp{
+ /*
+ * Values for ongoing reads
+ */
+ ConcurrentHashMap<Long, ArrayList<ByteBuffer>> proposedValues;
+
+ /*
+ * Bookies from which received a response
+ */
+ //ConcurrentHashMap<Long, HashSet<Integer>> received;
+
+
+ PendingReadOp(LedgerHandle lh){
+ this.proposedValues =
+ new ConcurrentHashMap<Long, ArrayList<ByteBuffer>>();
+ //this.received =
+ // new ConcurrentHashMap<Long, HashSet<Integer>>();
+ }
+
+ }
+
+ QuorumOpMonitor(LedgerHandle lh){
+ this.lh = lh;
+ }
+
+
+
+
+ /**
+ * Callback method for write operations. There is one callback for
+ * each write to a server.
+ *
+ */
+
+ public void writeComplete(int rc, long ledgerId, long entryId, Object ctx){
+ //PendingAddOp pOp;
+ String logmsg;
+
+ //synchronized(pendingAdds){
+ //pOp = pendingAdds.get(entryId);
+ //}
+ SubAddOp sAdd = (SubAddOp) ctx;
+ PendingOp pOp = sAdd.pOp;
+ Integer sId = sAdd.bIndex;
+
+ if(pOp == null){
+ LOG.error("No such an entry ID: " + entryId + "(" + ledgerId + ")");
+ return;
+ }
+
+ ArrayList<BookieHandle> list = lh.getBookies();
+ int n = list.size();
+
+ if(rc == 0){
+ // Everything went ok with this op
+ synchronized(pOp){
+ //pOp.bookieIdSent.add(sId);
+ pOp.bookieIdRecv.add(sId);
+ if(pOp.bookieIdRecv.size() == lh.getQuorumSize()){
+ //pendingAdds.remove(entryId);
+ //sAdd.op.cb.addComplete(sAdd.op.getErrorCode(),
+ // ledgerId, entryId, sAdd.op.ctx);
+ sAdd.op.setReady();
+ }
+ }
+ } else {
+ LOG.error("Error sending write request: " + rc + " : " + ledgerId);
+ HashSet<Integer> ids;
+
+ synchronized(pOp){
+ pOp.bookieIdSent.add(sId);
+ ids = pOp.bookieIdSent;
+ //Check if we tried all possible bookies already
+ if(ids.size() == lh.getBookies().size()){
+ if(pOp.retries++ >= MAXRETRIES){
+ //Call back with error code
+ //sAdd.op.cb.addComplete(ErrorCodes.ENUMRETRIES,
+ // ledgerId, entryId, sAdd.op.ctx);
+ sAdd.op.setErrorCode(ErrorCodes.ENUMRETRIES);
+ sAdd.op.setReady();
+ return;
+ }
+
+ ids.clear();
+ }
+ // Select another bookie that we haven't contacted yet
+ for(int i = 0; i < lh.getBookies().size(); i++){
+ if(!ids.contains(Integer.valueOf(i))){
+ // and send it to new bookie
+ try{
+ list.get(i).sendAdd(new SubAddOp(sAdd.op,
+ pOp,
+ i,
+ this), ((AddOp) sAdd.op).entry);
+ pOp.bookieIdRecv.add(sId.intValue());
+
+ break;
+ } catch(IOException e){
+ LOG.error(e);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Callback method for read operations. There is one callback for
+ * each entry of a read request.
+ *
+ * TODO: We might want to change the way a client application specify
+ * the quorum size. It is really loose now, and it allows an application
+ * to set any quorum size the client wants.
+ */
+
+ public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuffer bb, Object ctx){
+ /*
+ * Collect responses, and reply when there are sufficient
+ * answers.
+ */
+ LOG.debug("New response: " + rc);
+ if(rc == 0){
+ SubReadOp sRead = (SubReadOp) ctx;
+ ReadOp rOp = (ReadOp) sRead.op;
+ PendingReadOp pOp = sRead.pOp;
+ if(pOp != null){
+ HashSet<Integer> received = pOp.bookieIdRecv;
+ //if(!received.containsKey(entryId)){
+ // received.put(entryId, new HashSet<Integer>());
+ //}
+ boolean result = received.add(sRead.bIndex);
+ int counter = -1;
+ if(result){
+
+ if(!pOp.proposedValues.containsKey(entryId)){
+ pOp.proposedValues.put(entryId, new ArrayList<ByteBuffer>());
+ }
+ ArrayList<ByteBuffer> list = pOp.proposedValues.get(entryId);
+ list.add(bb);
+
+ switch(lh.getQMode()){
+ case VERIFIABLE:
+ if(list.size() >= 1){
+ try{
+ ByteBuffer voted = voteVerifiable(list);
+ if(voted != null){
+ LOG.debug("Voted: " + new String(voted.array()));
+
+ MessageDigest md = getDigestInstance(lh.getDigestAlg());
+ int dlength = md.getDigestLength();
+ if(voted.capacity() - dlength > 0){
+ byte[] data = new byte[voted.capacity() - dlength - 24];
+ LOG.info("Digest length: " + dlength + ", " + data.length);
+ voted.position(24);
+ voted.get(data, 0, data.length);
+ counter = addNewEntry(new LedgerEntry(ledgerId, entryId, data), rOp);
+ } else {
+ LOG.error("Short message: " + voted.capacity());
+ }
+ }
+ } catch(NoSuchAlgorithmException e){
+ LOG.error("Problem with message digest: " + e);
+ } catch(BKException bke) {
+ LOG.error(bke.toString() + "( " + ledgerId + ", " + entryId + ", " + pOp.bookieIdRecv + ")");
+ countNacks((ReadOp) ((SubReadOp) ctx).op, (SubReadOp) ctx, ledgerId, entryId);
+ }
+ }
+ break;
+ case GENERIC:
+ if(list.size() >= ((lh.getQuorumSize() + 1)/2)){
+ ByteBuffer voted = voteGeneric(list, (lh.getQuorumSize() + 1)/2);
+ if(voted != null){
+ LOG.debug("Voted: " + voted.array());
+ byte[] data = new byte[voted.capacity() - 24];
+ voted.position(24);
+ voted.get(data, 0, data.length);
+ counter = addNewEntry(new LedgerEntry(ledgerId, entryId, data), rOp);
+ }
+ }
+ break;
+ case FREEFORM:
+ if(list.size() == lh.getQuorumSize()){
+ ByteBuffer voted = voteFree(list);
+ if(voted != null){
+ LOG.debug("Voted: " + voted.array());
+ byte[] data = new byte[voted.capacity() - 24];
+ voted.position(24);
+ voted.get(data, 0, data.length);
+ counter = addNewEntry(new LedgerEntry(ledgerId, entryId, voted.array()), rOp);
+ }
+ }
+ }
+ }
+
+ if((counter == (rOp.lastEntry - rOp.firstEntry + 1)) &&
+ !sRead.op.isReady()){
+
+ sRead.op.setReady();
+ //sRead.op.cb.readComplete(0, ledgerId, new LedgerSequence(sRead.op.seq), sRead.op.ctx);
+ //sRead.op.complete = true;
+ }
+
+ LOG.debug("Counter: " + rOp.counter);
+ }
+ } else {
+ /*
+ * Have to count the number of negative responses
+ */
+ countNacks((ReadOp) ((SubReadOp) ctx).op, (SubReadOp) ctx, ledgerId, entryId);
+
+ }
+ }
+
+
+ /**
+ * Counts negative responses
+ *
+ * @param rOp read operation
+ * @param sRead specific read sub-operation
+ */
+
+ synchronized void countNacks(ReadOp rOp, SubReadOp sRead, long ledgerId, long entryId){
+
+ if(!rOp.nacks.containsKey(entryId)){
+ rOp.nacks.put(entryId, new AtomicInteger(0));
+ }
+
+ if(rOp.nacks.get(entryId).incrementAndGet() >= lh.getThreshold()){
+ int counter = -1;
+ counter = addNewEntry(new LedgerEntry(ledgerId, entryId, null), rOp);
+
+ if((counter == (rOp.lastEntry - rOp.firstEntry + 1)) &&
+ !sRead.op.isReady()){
+
+ sRead.op.setReady();
+ }
+ }
+ }
+
+ /**
+ * Verify if the set of votes in the list can produce a correct answer
+ * for verifiable data.
+ *
+ * @param list
+ * @return
+ */
+
+ private ByteBuffer voteVerifiable(ArrayList<ByteBuffer> list)
+ throws NoSuchAlgorithmException, BKException{
+ /*
+ * Check if checksum matches
+ */
+ ByteBuffer bb = list.get(0);
+ list.remove(0);
+
+ MessageDigest md = getDigestInstance(lh.getDigestAlg());
+ int dlength = md.getDigestLength();
+
+ /*
+ * TODO: The if check below is legitimate, but in reality it should never happen,
+ * bt it showed up a few times in experiments. Have to check why it is happening.
+ */
+ if(bb.capacity() <= dlength){
+ LOG.warn("Something wrong with this entry, length smaller than digest length");
+ return null;
+ }
+
+ byte[] data = new byte[bb.capacity() - dlength];
+ bb.get(data, 0, bb.capacity() - dlength);
+
+ byte[] sig = new byte[dlength];
+ bb.position(bb.capacity() - dlength);
+ bb.get(sig, 0, dlength);
+
+ bb.rewind();
+
+ //LOG.warn("Data: " + data.toString() + ", Signature: " + sig.toString());
+ md.update(lh.getPasswdHash());
+ md.update(data);
+ if(MessageDigest.isEqual(md.digest(), sig)){
+ return bb;
+ } else {
+ throw BKException.create(Code.DigestMatchException);
+ }
+ }
+
+ /**
+ * Verify if the set of votes in the list can produce a correct answer
+ * for generic data.
+ *
+ * @param list
+ * @return
+ */
+
+ private ByteBuffer voteGeneric(ArrayList<ByteBuffer> list, int threshold){
+ HashMap<ByteBuffer, Integer> map = new HashMap<ByteBuffer, Integer>();
+ for(ByteBuffer bb : list){
+ if(!map.containsKey(bb)){
+ map.put(bb, Integer.valueOf(0));
+ }
+
+ map.put(bb, map.get(bb) + 1);
+
+ if(map.get(bb) >= threshold)
+ return bb;
+ }
+
+ return null;
+ }
+
+ /**
+ * Verify if the set of votes in the list can produce a correct answer
+ * for generic data.
+ *
+ * @param list
+ * @return
+ */
+
+ private ByteBuffer voteFree(ArrayList<ByteBuffer> list){
+ HashMap<ByteBuffer, Integer> map = new HashMap<ByteBuffer, Integer>();
+ for(ByteBuffer bb : list){
+ if(!map.containsKey(bb)){
+ map.put(bb, Integer.valueOf(0));
+ }
+ map.put(bb, map.get(bb) + 1);
+
+ if(map.get(bb) == list.size())
+ return bb;
+ }
+
+ return null;
+ }
+
+ /**
+ * Add new entry to the list of received.
+ *
+ * @param le ledger entry to add to list
+ * @param op read operation metadata
+ */
+
+ private int addNewEntry(LedgerEntry le, ReadOp op){
+ long index = le.getEntryId() % (op.lastEntry - op.firstEntry + 1);
+ if(op.seq[(int) index] == null){
+ op.seq[(int) index] = le;
+
+ if(le.getEntry() != null)
+ LOG.debug("Adding entry: " + le.getEntryId() + ", " + le.getEntry().length);
+ else
+ LOG.debug("Entry is null: " + le.getEntryId());
+
+ return op.counter.incrementAndGet();
+ }
+
+ return -1;
+ }
+}
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ReadCallback.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ReadCallback.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ReadCallback.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ReadCallback.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,38 @@
+package org.apache.bookkeeper.client;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+/**
+ * Callback interface
+ */
+
+public interface ReadCallback {
+ /**
+ * Callback declaration
+ *
+ * @param rc return code
+ * @param ledgerId ledger identifier
+ * @param seq sequence of entries
+ * @param ctx control object
+ */
+ void readComplete(int rc, long ledgerId, LedgerSequence seq, Object ctx);
+}
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,342 @@
+package org.apache.bookkeeper.proto;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.ConnectException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Semaphore;
+
+
+import org.apache.bookkeeper.proto.ReadEntryCallback;
+import org.apache.bookkeeper.proto.WriteCallback;
+import org.apache.log4j.Logger;
+
+
+
+/**
+ * Implements the client-side part of the BookKeeper protocol.
+ *
+ */
+public class BookieClient extends Thread {
+ Logger LOG = Logger.getLogger(BookieClient.class);
+ SocketChannel sock;
+ int myCounter = 0;
+
+ public BookieClient(InetSocketAddress addr, int recvTimeout)
+ throws IOException, ConnectException {
+ sock = SocketChannel.open(addr);
+ setDaemon(true);
+ //sock.configureBlocking(false);
+ sock.socket().setSoTimeout(recvTimeout);
+ sock.socket().setTcpNoDelay(true);
+ start();
+ }
+
+ public BookieClient(String host, int port, int recvTimeout)
+ throws IOException, ConnectException {
+ this(new InetSocketAddress(host, port), recvTimeout);
+ }
+
+ private static class Completion<T> {
+ Completion(T cb, Object ctx) {
+ this.cb = cb;
+ this.ctx = ctx;
+ }
+
+ T cb;
+
+ Object ctx;
+ }
+
+ private static class CompletionKey {
+ long ledgerId;
+
+ long entryId;
+
+ CompletionKey(long ledgerId, long entryId) {
+ this.ledgerId = ledgerId;
+ this.entryId = entryId;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof CompletionKey) || obj == null) {
+ return false;
+ }
+ CompletionKey that = (CompletionKey) obj;
+ return this.ledgerId == that.ledgerId && this.entryId == that.entryId;
+ }
+
+ @Override
+ public int hashCode() {
+ return ((int) ledgerId << 16) ^ ((int) entryId);
+ }
+
+ }
+
+ ConcurrentHashMap<CompletionKey, Completion<WriteCallback>> addCompletions = new ConcurrentHashMap<CompletionKey, Completion<WriteCallback>>();
+ ConcurrentHashMap<CompletionKey, Completion<ReadEntryCallback>> readCompletions = new ConcurrentHashMap<CompletionKey, Completion<ReadEntryCallback>>();
+
+ Object writeLock = new Object();
+ Object readLock = new Object();
+
+ /*
+ * Use this semaphore to control the number of completion key in both addCompletions
+ * and readCompletions. This is more of a problem for readCompletions because one
+ * readEntries opertion is expanded into individual operations to read entries.
+ */
+ Semaphore completionSemaphore = new Semaphore(1000);
+
+
+ /**
+ * Send addEntry operation to bookie.
+ *
+ * @param ledgerId ledger identifier
+ * @param entryId entry identifier
+ * @param cb object implementing callback method
+ * @param ctx control object
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public void addEntry(long ledgerId, long entryId,
+ ByteBuffer entry, WriteCallback cb, Object ctx)
+ throws IOException, InterruptedException {
+
+ //LOG.info("Data length: " + entry.capacity());
+ completionSemaphore.acquire();
+ addCompletions.put(new CompletionKey(ledgerId, entryId),
+ new Completion<WriteCallback>(cb, ctx));
+ //entry = entry.duplicate();
+ entry.position(0);
+
+ ByteBuffer tmpEntry = ByteBuffer.allocate(entry.capacity() + 8 + 8 + 8);
+
+ tmpEntry.position(4);
+ tmpEntry.putInt(BookieProtocol.ADDENTRY);
+ tmpEntry.putLong(ledgerId);
+ tmpEntry.putLong(entryId);
+ tmpEntry.put(entry);
+ tmpEntry.position(0);
+
+ //ByteBuffer len = ByteBuffer.allocate(4);
+ // 4 bytes for the message type
+ tmpEntry.putInt(tmpEntry.remaining() - 4);
+ tmpEntry.position(0);
+ synchronized(writeLock) {
+ //sock.write(len);
+ //len.clear();
+ //len.putInt(BookieProtocol.ADDENTRY);
+ //len.flip();
+ //sock.write(len);
+ sock.write(tmpEntry);
+ }
+ //LOG.debug("addEntry:finished");
+ }
+
+ /**
+ * Send readEntry operation to bookie.
+ *
+ * @param ledgerId ledger identifier
+ * @param entryId entry identifier
+ * @param cb object implementing callback method
+ * @param ctx control object
+ * @throws IOException
+ */
+ public void readEntry(long ledgerId, long entryId,
+ ReadEntryCallback cb, Object ctx)
+ throws IOException, InterruptedException {
+
+ completionSemaphore.acquire();
+ readCompletions.put(new CompletionKey(ledgerId, entryId),
+ new Completion<ReadEntryCallback>(cb, ctx));
+ ByteBuffer tmpEntry = ByteBuffer.allocate(8 + 8);
+ tmpEntry.putLong(ledgerId);
+ tmpEntry.putLong(entryId);
+ tmpEntry.position(0);
+
+ ByteBuffer len = ByteBuffer.allocate(4);
+ len.putInt(tmpEntry.remaining() + 4);
+ len.flip();
+ //LOG.debug("readEntry: Writing to socket");
+ synchronized(readLock) {
+ sock.write(len);
+ len.clear();
+ len.putInt(BookieProtocol.READENTRY);
+ len.flip();
+ sock.write(len);
+ sock.write(tmpEntry);
+ }
+ //LOG.error("Size of readCompletions: " + readCompletions.size());
+ }
+
+ private void readFully(ByteBuffer bb) throws IOException {
+ while(bb.remaining() > 0) {
+ sock.read(bb);
+ }
+ }
+
+ public void run() {
+ int len = -1;
+ ByteBuffer lenBuffer = ByteBuffer.allocate(4);
+ int type = -1, rc = -1;
+ try {
+ while(sock.isConnected()) {
+ lenBuffer.clear();
+ readFully(lenBuffer);
+ lenBuffer.flip();
+ len = lenBuffer.getInt();
+ ByteBuffer bb = ByteBuffer.allocate(len);
+ readFully(bb);
+ bb.flip();
+ type = bb.getInt();
+ rc = bb.getInt();
+
+ switch(type) {
+ case BookieProtocol.ADDENTRY:
+ {
+ long ledgerId = bb.getLong();
+ long entryId = bb.getLong();
+ Completion<WriteCallback> ac = addCompletions.remove(new CompletionKey(ledgerId, entryId));
+ completionSemaphore.release();
+
+ if (ac != null) {
+ ac.cb.writeComplete(rc, ledgerId, entryId, ac.ctx);
+ } else {
+ LOG.error("Callback object null: " + ledgerId + " : " + entryId);
+ }
+ break;
+ }
+ case BookieProtocol.READENTRY:
+ {
+ //ByteBuffer entryData = bb.slice();
+ long ledgerId = bb.getLong();
+ long entryId = bb.getLong();
+
+ bb.position(24);
+ byte[] data = new byte[bb.capacity() - 24];
+ bb.get(data);
+ ByteBuffer entryData = ByteBuffer.wrap(data);
+
+ LOG.info("Received entry: " + ledgerId + ", " + entryId + ", " + rc + ", " + entryData.array().length + ", " + bb.array().length + ", " + bb.remaining());
+
+
+ CompletionKey key = new CompletionKey(ledgerId, entryId);
+ Completion<ReadEntryCallback> c;
+
+ if(readCompletions.containsKey(key)){
+ c = readCompletions.remove(key);
+ //LOG.error("Found key");
+ }
+ else{
+ /*
+ * This is a special case. When recovering a ledger, a client submits
+ * a read request with id -1, and receives a response with a different
+ * entry id.
+ */
+ c = readCompletions.remove(new CompletionKey(ledgerId, -1));
+ }
+ completionSemaphore.release();
+
+ if (c != null) {
+ c.cb.readEntryComplete(rc,
+ ledgerId,
+ entryId,
+ entryData,
+ c.ctx);
+ }
+ break;
+ }
+ default:
+ System.err.println("Got error " + rc + " for type " + type);
+ }
+ }
+ } catch(Exception e) {
+ LOG.error("Len = " + len + ", Type = " + type + ", rc = " + rc);
+ e.printStackTrace();
+ }
+ }
+
+ private static class Counter {
+ int i;
+ int total;
+ synchronized void inc() {
+ i++;
+ total++;
+ }
+ synchronized void dec() {
+ i--;
+ notifyAll();
+ }
+ synchronized void wait(int limit) throws InterruptedException {
+ while(i > limit) {
+ wait();
+ }
+ }
+ synchronized int total() {
+ return total;
+ }
+ }
+ /**
+ * @param args
+ * @throws IOException
+ * @throws NumberFormatException
+ * @throws InterruptedException
+ */
+ public static void main(String[] args) throws NumberFormatException, IOException, InterruptedException {
+ if (args.length != 3) {
+ System.err.println("USAGE: BookieClient bookieHost port ledger#");
+ return;
+ }
+ WriteCallback cb = new WriteCallback() {
+
+ public void writeComplete(int rc, long ledger, long entry, Object ctx) {
+ Counter counter = (Counter)ctx;
+ counter.dec();
+ if (rc != 0) {
+ System.out.println("rc = " + rc + " for " + entry + "@" + ledger);
+ }
+ }
+ };
+ Counter counter = new Counter();
+ byte hello[] = "hello".getBytes();
+ long ledger = Long.parseLong(args[2]);
+ BookieClient bc = new BookieClient(args[0], Integer.parseInt(args[1]), 5000);
+ for(int i = 0; i < 100000; i++) {
+ ByteBuffer entry = ByteBuffer.allocate(100);
+ entry.putLong(ledger);
+ entry.putLong(i);
+ entry.putInt(0);
+ entry.put(hello);
+ entry.flip();
+ counter.inc();
+ bc.addEntry(ledger, i, entry, cb, counter);
+ }
+ counter.wait(0);
+ System.out.println("Total = " + counter.total());
+ }
+}
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieProtocol.java?rev=739388&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieProtocol.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieProtocol.java Fri Jan 30 19:30:27 2009
@@ -0,0 +1,70 @@
+package org.apache.bookkeeper.proto;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+/**
+ * The packets of the Bookie protocol all have a 4-byte integer
+ * indicating the type of request or response at the very beginning
+ * of the packet followed by a payload.
+ *
+ */
+public interface BookieProtocol {
+ /**
+ * The Add entry request payload will be a ledger entry exactly
+ * as it should be logged. The response payload will be a 4-byte
+ * integer that has the error code followed by the 8-byte
+ * ledger number and 8-byte entry number of the entry written.
+ */
+ public static final int ADDENTRY = 1;
+ /**
+ * The Read entry request payload will be the ledger number and
+ * entry number to read. (The ledger number is an 8-byte integer
+ * and the entry number is a 8-byte integer.) The
+ * response payload will be a 4-byte integer representing an
+ * error code and a ledger entry if the error code is EOK, otherwise
+ * it will be the 8-byte ledger number and the 4-byte entry number
+ * requested. (Note that the first sixteen bytes of the entry happen
+ * to be the ledger number and entry number as well.)
+ */
+ public static final int READENTRY = 2;
+
+ /**
+ * The error code that indicates success
+ */
+ public static final int EOK = 0;
+ /**
+ * The error code that indicates that the ledger does not exist
+ */
+ public static final int ENOLEDGER = 1;
+ /**
+ * The error code that indicates that the requested entry does not exist
+ */
+ public static final int ENOENTRY = 2;
+ /**
+ * The error code that indicates an invalid request type
+ */
+ public static final int EBADREQ = 100;
+ /**
+ * General error occurred at the server
+ */
+ public static final int EIO = 0;
+}
|