Author: mahadev
Date: Wed Apr 1 21:50:03 2009
New Revision: 761077
URL: http://svn.apache.org/viewvc?rev=761077&view=rev
Log:
ZOOKEEPER-288. Cleanup and fixes to BookKeeper (flavio via mahadev)
Added:
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/BookieException.java
Modified:
hadoop/zookeeper/trunk/CHANGES.txt
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerEntry.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieProtocol.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LoopbackClient.java
Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=761077&r1=761076&r2=761077&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Wed Apr 1 21:50:03 2009
@@ -59,6 +59,8 @@
ZOOKEEPER-349. to automate patch testing. (giridharan kesavan via mahadev)
+ ZOOKEEPER-288. Cleanup and fixes to BookKeeper (flavio via mahadev)
+
NEW FEATURES:
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java?rev=761077&r1=761076&r2=761077&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/Bookie.java Wed Apr 1 21:50:03 2009
@@ -27,10 +27,12 @@
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.HashMap;
+import java.util.WeakHashMap;
import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.client.AddCallback;
import org.apache.log4j.Logger;
@@ -53,6 +55,8 @@
final File ledgerDirectories[];
+ WeakHashMap<Long, ByteBuffer> masterKeys = new WeakHashMap<Long, ByteBuffer>();
+
public static class NoLedgerException extends IOException {
private static final long serialVersionUID = 1L;
private long ledgerId;
@@ -93,6 +97,20 @@
}
}
+ private LedgerDescriptor getHandle(long ledgerId, boolean readonly, byte[] masterKey) throws IOException {
+ LedgerDescriptor handle = null;
+ synchronized (ledgers) {
+ handle = ledgers.get(ledgerId);
+ if (handle == null) {
+ handle = createHandle(ledgerId, readonly);
+ ledgers.put(ledgerId, handle);
+ masterKeys.put(ledgerId, ByteBuffer.wrap(masterKey));
+ }
+ handle.incRef();
+ }
+ return handle;
+ }
+
private LedgerDescriptor getHandle(long ledgerId, boolean readonly) throws IOException {
LedgerDescriptor handle = null;
synchronized (ledgers) {
@@ -100,11 +118,12 @@
if (handle == null) {
handle = createHandle(ledgerId, readonly);
ledgers.put(ledgerId, handle);
- }
+ }
handle.incRef();
}
return handle;
}
+
private LedgerDescriptor createHandle(long ledgerId, boolean readOnly) throws IOException {
RandomAccessFile ledgerFile = null;
@@ -266,10 +285,15 @@
}
}
- public void addEntry(ByteBuffer entry, AddCallback cb, Object ctx)
- throws IOException {
+ public void addEntry(ByteBuffer entry, AddCallback cb, Object ctx, byte[] masterKey)
+ throws IOException, BookieException {
+
long ledgerId = entry.getLong();
- LedgerDescriptor handle = getHandle(ledgerId, false);
+ LedgerDescriptor handle = getHandle(ledgerId, false, masterKey);
+
+ if(!masterKeys.get(ledgerId).equals(ByteBuffer.wrap(masterKey))){
+ throw BookieException.create(BookieException.Code.UnauthorizedAccessException);
+ }
try {
entry.rewind();
long entryId = handle.addEntry(entry);
@@ -323,7 +347,7 @@
* @throws InterruptedException
*/
public static void main(String[] args) throws IOException,
- InterruptedException {
+ InterruptedException, BookieException {
Bookie b = new Bookie(new File("/tmp"), new File[] { new File("/tmp") });
CounterCallback cb = new CounterCallback();
long start = System.currentTimeMillis();
@@ -334,7 +358,7 @@
buff.limit(1024);
buff.position(0);
cb.incCount();
- b.addEntry(buff, cb, null);
+ b.addEntry(buff, cb, null, new byte[0]);
}
cb.waitZero();
long end = System.currentTimeMillis();
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/BookieException.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/BookieException.java?rev=761077&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/BookieException.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/BookieException.java Wed Apr 1 21:50:03 2009
@@ -0,0 +1,81 @@
+package org.apache.bookkeeper.bookie;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+ import java.lang.Exception;
+
+ @SuppressWarnings("serial")
+public abstract class BookieException extends Exception {
+
+ private int code;
+ public BookieException(int code){
+ this.code = code;
+ }
+
+ public static BookieException create(int code){
+ switch(code){
+ case Code.UnauthorizedAccessException:
+ return new BookieUnauthorizedAccessException();
+ default:
+ return new BookieIllegalOpException();
+ }
+ }
+
+ public interface Code {
+ int OK = 0;
+ int UnauthorizedAccessException = -1;
+
+ int IllegalOpException = -100;
+ }
+
+ public void setCode(int code){
+ this.code = code;
+ }
+
+ public int getCode(){
+ return this.code;
+ }
+
+ public String getMessage(int code){
+ switch(code){
+ case Code.OK:
+ return "No problem";
+ case Code.UnauthorizedAccessException:
+ return "Error while reading ledger";
+ default:
+ return "Invalid operation";
+ }
+ }
+
+ public static class BookieUnauthorizedAccessException extends BookieException {
+ public BookieUnauthorizedAccessException(){
+ super(Code.UnauthorizedAccessException);
+ }
+ }
+
+ public static class BookieIllegalOpException extends BookieException {
+ public BookieIllegalOpException(){
+ super(Code.UnauthorizedAccessException);
+ }
+ }
+}
\ No newline at end of file
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java?rev=761077&r1=761076&r2=761077&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java Wed Apr 1 21:50:03 2009
@@ -76,7 +76,7 @@
*/
offsetBuffer.rewind();
offsetBuffer.putLong(ledger.position());
- LOG.debug("Offset: " + ledger.position() + ", " + entry.position() + ", " + calcEntryOffset(entryId) + ", " + entryId);
+ //LOG.debug("Offset: " + ledger.position() + ", " + entry.position() + ", " + calcEntryOffset(entryId) + ", " + entryId);
offsetBuffer.flip();
/*
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java?rev=761077&r1=761076&r2=761077&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java Wed Apr 1 21:50:03 2009
@@ -278,9 +278,12 @@
String bookie = list.remove(index);
LOG.info("Bookie: " + bookie);
InetSocketAddress tAddr = parseAddr(bookie);
- lh.addBookie(tAddr);
+ int bindex = lh.addBookie(tAddr);
+ ByteBuffer bindexBuf = ByteBuffer.allocate(4);
+ bindexBuf.putInt(bindex);
+
String pBookie = "/" + bookie;
- zk.create(prefix + getZKStringId(lId) + ensemble + pBookie, new byte[0],
+ zk.create(prefix + getZKStringId(lId) + ensemble + pBookie, bindexBuf.array(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (IOException e) {
LOG.error(e);
@@ -382,12 +385,18 @@
List<String> list =
zk.getChildren(prefix + getZKStringId(lId) + ensemble, false);
-
- for(String s : list){
- try{
- lh.addBookie(parseAddr(s));
- } catch (IOException e){
- LOG.error(e);
+
+ for(int i = 0 ; i < list.size() ; i++){
+ for(String s : list){
+ byte[] bindex = zk.getData(prefix + getZKStringId(lId) + ensemble + "/" + s, false, stat);
+ ByteBuffer bindexBuf = ByteBuffer.wrap(bindex);
+ if(bindexBuf.getInt() == i){
+ try{
+ lh.addBookie(parseAddr(s));
+ } catch (IOException e){
+ LOG.error(e);
+ }
+ }
}
}
@@ -451,31 +460,13 @@
*/
public void asyncAddEntry(LedgerHandle lh, byte[] data, AddCallback cb, Object ctx)
throws InterruptedException {
- LOG.debug("Adding entry asynchronously: " + data);
- //lh.incLast();
+
if(lh != null){
AddOp r = new AddOp(lh, data, cb, ctx);
engines.get(lh.getId()).sendOp(r);
}
- //qeMap.get(lh.getId()).put(r);
}
- /**
- * Add entry asynchronously to an open ledger.
- */
- //public void asyncAddEntryVerifiable(LedgerHandle lh, byte[] data, AddCallback cb, Object ctx)
- //throws InterruptedException, IOException, BKException, NoSuchAlgorithmException {
- // if(md == null)
- // throw BKException.create(Code.DigestNotInitializedException);
- //
- // LOG.info("Data size: " + data.length);
- // AddOp r = new AddOp(lh, data, cb, ctx);
- // //r.addDigest();
- // LOG.info("Data length: " + r.data.length);
- // engines.get(lh.getId()).sendOp(r);
- // //qeMap.get(lh.getId()).put(r);
- //}
-
/**
* Read a sequence of entries synchronously.
@@ -496,7 +487,7 @@
Operation r = new ReadOp(lh, firstEntry, lastEntry, this, counter);
engines.get(lh.getId()).sendOp(r);
- //qeMap.get(lh.getId()).put(r);
+
LOG.debug("Going to wait for read entries: " + counter.i);
counter.block(0);
LOG.debug("Done with waiting: " + counter.i + ", " + firstEntry);
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java?rev=761077&r1=761076&r2=761077&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookieHandle.java Wed Apr 1 21:50:03 2009
@@ -27,7 +27,10 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.security.NoSuchAlgorithmException;
+import java.security.InvalidKeyException;
import java.security.MessageDigest;
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
import org.apache.bookkeeper.client.LedgerHandle.QMode;
import org.apache.bookkeeper.client.QuorumEngine.Operation;
@@ -84,9 +87,11 @@
this.addr = addr;
this.incomingQueue = new ArrayBlockingQueue<ToSend>(2000);
+ //genSecurePadding();
start();
}
+
/**
* Restart BookieClient if can't talk to bookie
*
@@ -112,11 +117,6 @@
} catch(InterruptedException e){
e.printStackTrace();
}
- //client.addEntry(self.getId(),
- // r.entry,
- // ByteBuffer.wrap(r.data),
- // cb,
- // ctx);
}
/**
@@ -124,6 +124,7 @@
*
*/
MessageDigest digest = null;
+ Mac mac = null;
/**
* Get digest instance if there is none.
@@ -138,40 +139,14 @@
return digest;
}
- /**
- * Computes the digest for a given ByteBuffer.
- *
- */
-
- public ByteBuffer addDigest(long entryId, ByteBuffer data)
- throws NoSuchAlgorithmException, IOException {
- if(digest == null)
- getDigestInstance(self.getDigestAlg());
-
- ByteBuffer bb = ByteBuffer.allocate(8 + 8);
- bb.putLong(self.getId());
- bb.putLong(entryId);
-
- byte[] msgDigest;
-
- // synchronized(LedgerHandle.digest){
- digest.update(self.getPasswdHash());
- digest.update(bb.array());
- digest.update(data.array());
-
- //baos.write(data);
- //baos.write(Operation.digest.digest());
- msgDigest = digest.digest();
- //}
- ByteBuffer extendedData = ByteBuffer.allocate(data.capacity() + msgDigest.length);
- data.rewind();
- extendedData.put(data);
- extendedData.put(msgDigest);
-
- //LOG.debug("Data length (" + self.getId() + ", " + entryId + "): " + data.capacity());
- //LOG.debug("Digest: " + new String(msgDigest));
+ Mac getMac(String alg)
+ throws NoSuchAlgorithmException, InvalidKeyException {
+ if(mac == null){
+ mac = Mac.getInstance(alg);
+ mac.init(new SecretKeySpec(self.getMacKey(), "HmacSHA1"));
+ }
- return extendedData;
+ return mac;
}
/**
@@ -207,22 +182,38 @@
* TODO: Really add the confirmed add to the op
*/
long confirmed = self.getAddConfirmed();
- //LOG.info("Confirmed: " + confirmed);
- ByteBuffer extendedData = ByteBuffer.allocate(op.data.length + 8);
- extendedData.putLong(confirmed);
- extendedData.put(op.data);
- extendedData.rewind();
-
+ ByteBuffer extendedData;
+
if(self.getQMode() == QMode.VERIFIABLE){
- extendedData = addDigest(ts.entry, extendedData);
+ extendedData = ByteBuffer.allocate(op.data.length + 28 + 16);
+ extendedData.putLong(self.getId());
+ extendedData.putLong(ts.entry);
+ extendedData.putLong(confirmed);
+ extendedData.put(op.data);
+
+
+ extendedData.rewind();
+ byte[] toProcess = new byte[op.data.length + 24];
+ extendedData.get(toProcess, 0, op.data.length + 24);
+ //extendedData.limit(extendedData.capacity() - 20);
+ extendedData.position(extendedData.capacity() - 20);
+ if(mac == null)
+ getMac("HmacSHA1");
+ extendedData.put(mac.doFinal(toProcess));
+ extendedData.position(16);
+ } else {
+ extendedData = ByteBuffer.allocate(op.data.length + 8);
+ extendedData.putLong(confirmed);
+ extendedData.put(op.data);
+ extendedData.flip();
}
- //LOG.debug("Extended data: " + extendedData.capacity());
- client.addEntry(self.getId(),
- ts.entry,
- extendedData,
- aOp.wcb,
- ts.ctx);
+ client.addEntry(self.getId(),
+ self.getLedgerKey(),
+ ts.entry,
+ extendedData,
+ aOp.wcb,
+ ts.ctx);
break;
case Operation.READ:
client.readEntry(self.getId(),
@@ -238,6 +229,8 @@
LOG.error(e);
} catch (NoSuchAlgorithmException e){
LOG.error(e);
+ } catch (InvalidKeyException e) {
+ LOG.error(e);
}
}
}
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java?rev=761077&r1=761076&r2=761077&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ClientCBWorker.java Wed Apr 1 21:50:03 2009
@@ -54,28 +54,48 @@
return instance;
}
+ /**
+ * Constructor initiates queue of pending operations and
+ * runs thread.
+ *
+ */
ClientCBWorker(){
pendingOps = new ArrayBlockingQueue<Operation>(4000);
start();
LOG.debug("Have started cbWorker");
}
+
+ /**
+ * Adds operation to queue of pending.
+ *
+ * @param op operation to add to queue
+ */
+
void addOperation(Operation op)
throws InterruptedException {
pendingOps.put(op);
- LOG.debug("Added operation to queue of pending");
}
+ /**
+ * Gets thread out of its main loop.
+ *
+ */
synchronized void shutdown(){
stop = true;
instance = null;
LOG.debug("Shutting down");
}
+
+ /**
+ * Main thread loop.
+ *
+ */
+
public void run(){
try{
while(!stop){
- LOG.debug("Going to sleep on queue");
Operation op = pendingOps.poll(1000, TimeUnit.MILLISECONDS);
if(op != null){
synchronized(op){
@@ -83,7 +103,6 @@
op.wait();
}
}
- LOG.debug("Request ready");
switch(op.type){
case Operation.ADD:
@@ -96,7 +115,7 @@
break;
case Operation.READ:
ReadOp rOp = (ReadOp) op;
- LOG.debug("Got one message from the queue: " + rOp.firstEntry);
+ //LOG.debug("Got one message from the queue: " + rOp.firstEntry);
rOp.cb.readComplete(rOp.getErrorCode(),
rOp.getLedger().getId(),
new LedgerSequence(rOp.seq),
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerEntry.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerEntry.java?rev=761077&r1=761076&r2=761077&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerEntry.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerEntry.java Wed Apr 1 21:50:03 2009
@@ -31,7 +31,6 @@
*
*/
-
public class LedgerEntry {
Logger LOG = Logger.getLogger(LedgerEntry.class);
@@ -43,10 +42,6 @@
this.lId = lId;
this.eId = eId;
this.entry = entry;
- if(entry != null)
- LOG.debug("Entry: " + entry.length + " , " + new String(entry));
- else
- LOG.debug("Entry is null");
}
public long getLedgerId(){
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=761077&r1=761076&r2=761077&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java Wed Apr 1 21:50:03 2009
@@ -62,7 +62,8 @@
private int threshold;
private String digestAlg = "SHA1";
- private byte[] passwdHash;
+ private byte[] macKey;
+ private byte[] ledgerKey;
private byte[] passwd;
LedgerHandle(BookKeeper bk,
@@ -74,7 +75,8 @@
this.last = last;
this.bookies = new ArrayList<BookieHandle>();
this.passwd = passwd;
- genPasswdHash(passwd);
+ genLedgerKey(passwd);
+ genMacKey(passwd);
this.qSize = (bookies.size() + 1)/2;
}
@@ -93,7 +95,8 @@
this.qSize = qSize;
this.qMode = mode;
this.passwd = passwd;
- genPasswdHash(passwd);
+ genLedgerKey(passwd);
+ genMacKey(passwd);
}
@@ -109,7 +112,8 @@
this.qSize = qSize;
this.passwd = passwd;
- genPasswdHash(passwd);
+ genLedgerKey(passwd);
+ genMacKey(passwd);
}
private void setBookies(ArrayList<InetSocketAddress> bookies)
@@ -135,17 +139,20 @@
}
+
/**
* Create bookie handle and add it to the list
*
* @param addr socket address
*/
- void addBookie(InetSocketAddress addr)
+ int addBookie(InetSocketAddress addr)
throws IOException {
BookieHandle bh = new BookieHandle(this, addr);
this.bookies.add(bh);
if(bookies.size() > qSize) setThreshold();
+
+ return (this.bookies.size() - 1);
}
private void setThreshold(){
@@ -311,23 +318,50 @@
}
/**
- * Generates and stores password hash.
+ * Generates and stores Ledger key.
*
* @param passwd
*/
- private void genPasswdHash(byte[] passwd){
+ private void genLedgerKey(byte[] passwd){
try{
- MessageDigest digest = MessageDigest.getInstance("MD5");
+ MessageDigest digest = MessageDigest.getInstance("SHA");
+ String pad = "ledger";
+
+ byte[] toProcess = new byte[passwd.length + pad.length()];
+ System.arraycopy(pad.getBytes(), 0, toProcess, 0, pad.length());
+ System.arraycopy(passwd, 0, toProcess, pad.length(), passwd.length);
- digest.update(passwd);
- this.passwdHash = digest.digest();
+ digest.update(toProcess);
+ this.ledgerKey = digest.digest();
} catch(NoSuchAlgorithmException e){
this.passwd = passwd;
LOG.error("Storing password as plain text because secure hash implementation does not exist");
}
}
+ /**
+ * Generates and stores Mac key.
+ *
+ * @param passwd
+ */
+
+ private void genMacKey(byte[] passwd){
+ try{
+ MessageDigest digest = MessageDigest.getInstance("SHA");
+ String pad = "mac";
+
+ byte[] toProcess = new byte[passwd.length + pad.length()];
+ System.arraycopy(pad.getBytes(), 0, toProcess, 0, pad.length());
+ System.arraycopy(passwd, 0, toProcess, pad.length(), passwd.length);
+
+ digest.update(toProcess);
+ this.macKey = digest.digest();
+ } catch(NoSuchAlgorithmException e){
+ this.passwd = passwd;
+ LOG.error("Storing password as plain text because secure hash implementation does not exist");
+ }
+ }
/**
* Returns password in plain text
@@ -338,12 +372,21 @@
/**
- * Returns password hash
+ * Returns MAC key
*
* @return byte[]
*/
- byte[] getPasswdHash(){
- return passwdHash;
+ byte[] getMacKey(){
+ return macKey;
}
+ /**
+ * Returns Ledger key
+ *
+ * @return byte[]
+ */
+ byte[] getLedgerKey(){
+ return ledgerKey;
+ }
+
}
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java?rev=761077&r1=761076&r2=761077&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryMonitor.java Wed Apr 1 21:50:03 2009
@@ -64,6 +64,15 @@
private int minimum;
+ /**
+ * Constructor simply initiates data structures
+ *
+ * @param self Instance of BookKeeper
+ * @param lId Ledger identifier
+ * @param qSize Quorum size
+ * @param bookies List of bookie addresses
+ * @param qMode Quorum mode
+ */
LedgerRecoveryMonitor(BookKeeper self,
long lId,
int qSize,
@@ -88,6 +97,13 @@
}
+
+ /**
+ * Determines the last entry written to a ledger not closed properly
+ * due to a client crash
+ *
+ * @param passwd
+ */
boolean recover(byte[] passwd) throws
IOException, InterruptedException, BKException, KeeperException {
/*
@@ -139,9 +155,11 @@
//if(ls == null) throw BKException.create(Code.ReadException);
LOG.debug("Received entry for: " + lh.getLast());
- if(ls.nextElement().getEntry() != null){
+ byte[] le = ls.nextElement().getEntry();
+ if(le != null){
if(notLegitimate) notLegitimate = false;
- lh.incLast();
+ self.addEntry(lh, le);
+ //lh.incLast();
hasMore = true;
}
}
@@ -165,7 +183,16 @@
}
-
+ /**
+ * Read callback implementation
+ *
+ * @param rc return code
+ * @param ledgerId Ledger identifier
+ * @param entryId Entry identifier
+ * @param bb Data
+ * @param ctx Control object
+ *
+ */
public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuffer bb, Object ctx){
if(rc == 0){
bb.rewind();
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java?rev=761077&r1=761076&r2=761077&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumEngine.java Wed Apr 1 21:50:03 2009
@@ -49,7 +49,6 @@
public class QuorumEngine {
Logger LOG = Logger.getLogger(QuorumEngine.class);
- //ArrayBlockingQueue<Operation> incomingQueue;
QuorumOpMonitor opMonitor;
ClientCBWorker cbWorker;
LedgerHandle lh;
@@ -57,8 +56,9 @@
boolean stop = false;
/**
- * Requests generated by BookKeeper.java upon client calls.
- * There are three types of requests: READ, ADD, STOP.
+ * Operation descriptor: Requests generated by BookKeeper.java
+ * upon client calls. There are three types of requests: READ,
+ * ADD, STOP.
*/
public static class Operation {
@@ -192,74 +192,82 @@
this.cbWorker = ClientCBWorker.getInstance();
LOG.debug("Created cbWorker");
}
-
+
+ /**
+ * Sends requests to BookieHandle instances. Methods in BookKeeper call
+ * this method to submit both add and read requests.
+ *
+ * @param r Operation descriptor
+ */
void sendOp(Operation r)
throws InterruptedException {
- switch(r.type){
- case Operation.READ:
- Operation.ReadOp rOp = (Operation.ReadOp) r;
- LOG.debug("Adding read operation to opMonitor: " + rOp.firstEntry + ", " + rOp.lastEntry);
- cbWorker.addOperation(r);
-
- for(long entry = rOp.firstEntry;
- entry <= rOp.lastEntry;
- entry++){
-
- //Send requests to bookies
- for(BookieHandle bh : lh.getBookies()){
- try{
- SubOp.SubReadOp sRead = new SubOp.SubReadOp(rOp,
- new PendingReadOp(lh),
- lh.getBookies().indexOf(bh),
- opMonitor);
- bh.sendRead(sRead, entry);
-
- } catch(IOException e){
- LOG.error(e);
- }
- }
+
+ int n = lh.getBookies().size();
+ switch(r.type){
+ case Operation.READ:
+ Operation.ReadOp rOp = (Operation.ReadOp) r;
+
+ LOG.debug("Adding read operation to opMonitor: " + rOp.firstEntry + ", " + rOp.lastEntry);
+ cbWorker.addOperation(r);
+
+ for(long entry = rOp.firstEntry;
+ entry <= rOp.lastEntry;
+ entry++){
+ long counter = 0;
+ PendingReadOp pROp = new PendingReadOp(lh);
+
+ //Send requests to bookies
+ while(counter < lh.getQuorumSize()){
+ int index = (int)((entry + counter++) % n);
+ try{
+ SubOp.SubReadOp sRead = new SubOp.SubReadOp(rOp,
+ pROp,
+ index,
+ opMonitor);
+ lh.getBookies().get((index) % n).sendRead(sRead, entry);
+
+ } catch(IOException e){
+ LOG.error(e);
}
- //r.cb.processResult();
- break;
- case Operation.ADD:
- long counter = 0;
- //ArrayList<BookieHandle> list = lh.getBookies();
- int n = lh.getBookies().size();
-
- LOG.debug("Adding add operation to opMonitor");
- //opMonitor.addOp(lh, (Operation.AddOp) r);
- cbWorker.addOperation(r);
- Operation.AddOp aOp = (Operation.AddOp) r;
- PendingOp pOp = new PendingOp();
- while(counter < lh.getQuorumSize() ){
- int index = (int)((aOp.entry + counter++) % n);
-
- try{
- SubOp.SubAddOp sAdd = new
- SubOp.SubAddOp(aOp,
- pOp,
- index,
- opMonitor);
- lh.getBookies().get((index) % n).sendAdd(sAdd, aOp.entry);
- } catch (IOException io) {
- LOG.error(io);
- try{
- /*
- * Before getting a new bookie, try to reconnect
- */
- lh.getBookies().get((index) % n).restart();
- } catch (IOException nio){
- lh.removeBookie(index);
- }
- }
+ }
+ }
+
+ break;
+ case Operation.ADD:
+ long counter = 0;
+
+ cbWorker.addOperation(r);
+ Operation.AddOp aOp = (Operation.AddOp) r;
+ PendingOp pOp = new PendingOp();
+ while(counter < lh.getQuorumSize() ){
+ int index = (int)((aOp.entry + counter++) % n);
+
+ try{
+ SubOp.SubAddOp sAdd = new
+ SubOp.SubAddOp(aOp,
+ pOp,
+ index,
+ opMonitor);
+ lh.getBookies().get((index) % n).sendAdd(sAdd, aOp.entry);
+ } catch (IOException io) {
+ LOG.error(io);
+ try{
+ /*
+ * Before getting a new bookie, try to reconnect
+ */
+ lh.getBookies().get((index) % n).restart();
+ } catch (IOException nio){
+ lh.removeBookie(index);
}
- //qRef = (qRef + 1) % n;
- break;
+ }
+ }
+ //qRef = (qRef + 1) % n;
+ break;
case Operation.STOP:
stop = true;
cbWorker.shutdown();
break;
- }
+ }
}
}
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java?rev=761077&r1=761076&r2=761077&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/QuorumOpMonitor.java Wed Apr 1 21:50:03 2009
@@ -27,10 +27,14 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ConcurrentHashMap;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
+import java.security.InvalidKeyException;
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
import org.apache.bookkeeper.client.BookieHandle;
@@ -44,6 +48,7 @@
import org.apache.bookkeeper.client.QuorumEngine.Operation.ReadOp;
import org.apache.bookkeeper.client.QuorumEngine.SubOp.SubAddOp;
import org.apache.bookkeeper.client.QuorumEngine.SubOp.SubReadOp;
+import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.ReadEntryCallback;
import org.apache.bookkeeper.proto.WriteCallback;
import org.apache.log4j.Logger;
@@ -77,6 +82,7 @@
*
*/
MessageDigest digest = null;
+ int dLength;
/**
* Get digest instance if there is none.
@@ -101,36 +107,9 @@
this.bookieIdSent = new HashSet<Integer>();
this.bookieIdRecv = new HashSet<Integer>();
}
-
- //PendingOp(Operation op){
- // this.op = op;
- // bookieIdSent = new HashSet<Integer>();
- // bookieIdRecv = new HashSet<Integer>();
- //}
-
- //void setOp(Operation op){
- // this.op = op;
- //}
-
- //Operation getOp(){
- // return this.op;
- //}
};
- /**
- * Objects of this type are used to keep track of the status of
- * a given write request.
- *
- *
- */
- //public static class PendingAddOp extends PendingOp{
- // AddOp op;
-
- // PendingAddOp(LedgerHandle lh, AddOp op){
- // this.op = op;
- // }
- //}
/**
* Objects of this type are used to keep track of the status of
@@ -142,30 +121,27 @@
/*
* Values for ongoing reads
*/
- ConcurrentHashMap<Long, ArrayList<ByteBuffer>> proposedValues;
-
- /*
- * Bookies from which received a response
- */
- //ConcurrentHashMap<Long, HashSet<Integer>> received;
-
-
+
+ ArrayList<ByteBuffer> proposedValues;
+
PendingReadOp(LedgerHandle lh){
- this.proposedValues =
- new ConcurrentHashMap<Long, ArrayList<ByteBuffer>>();
- //this.received =
- // new ConcurrentHashMap<Long, HashSet<Integer>>();
+ this.proposedValues =
+ new ArrayList<ByteBuffer>();
}
}
QuorumOpMonitor(LedgerHandle lh){
this.lh = lh;
+ try{
+ this.dLength = getDigestInstance(lh.getDigestAlg()).getDigestLength();
+ } catch(NoSuchAlgorithmException e){
+ LOG.error("Problem with message digest: " + e);
+ this.dLength = 0;
+ }
}
-
-
-
+
/**
* Callback method for write operations. There is one callback for
* each write to a server.
@@ -258,89 +234,96 @@
* Collect responses, and reply when there are sufficient
* answers.
*/
- LOG.debug("New response: " + rc);
+
if(rc == 0){
SubReadOp sRead = (SubReadOp) ctx;
ReadOp rOp = (ReadOp) sRead.op;
PendingReadOp pOp = sRead.pOp;
if(pOp != null){
HashSet<Integer> received = pOp.bookieIdRecv;
- //if(!received.containsKey(entryId)){
- // received.put(entryId, new HashSet<Integer>());
- //}
+
boolean result = received.add(sRead.bIndex);
int counter = -1;
if(result){
-
- if(!pOp.proposedValues.containsKey(entryId)){
- pOp.proposedValues.put(entryId, new ArrayList<ByteBuffer>());
- }
- ArrayList<ByteBuffer> list = pOp.proposedValues.get(entryId);
- list.add(bb);
-
+
+ ByteBuffer voted = null;
+ ArrayList<ByteBuffer> list;
switch(lh.getQMode()){
- case VERIFIABLE:
- if(list.size() >= 1){
- try{
- ByteBuffer voted = voteVerifiable(list);
- if(voted != null){
- LOG.debug("Voted: " + new String(voted.array()));
-
- MessageDigest md = getDigestInstance(lh.getDigestAlg());
- int dlength = md.getDigestLength();
- if(voted.capacity() - dlength > 0){
- byte[] data = new byte[voted.capacity() - dlength - 24];
- LOG.info("Digest length: " + dlength + ", " + data.length);
- voted.position(24);
- voted.get(data, 0, data.length);
- counter = addNewEntry(new LedgerEntry(ledgerId, entryId, data), rOp);
- } else {
- LOG.error("Short message: " + voted.capacity());
- }
- }
- } catch(NoSuchAlgorithmException e){
- LOG.error("Problem with message digest: " + e);
- } catch(BKException bke) {
- LOG.error(bke.toString() + "( " + ledgerId + ", " + entryId + ", " + pOp.bookieIdRecv + ")");
- countNacks((ReadOp) ((SubReadOp) ctx).op, (SubReadOp) ctx, ledgerId, entryId);
- }
+ case VERIFIABLE:
+ if(rOp.seq[(int) (entryId % (rOp.lastEntry - rOp.firstEntry + 1))] == null)
+ try{
+ voted = voteVerifiable(bb);
+ } catch(NoSuchAlgorithmException e){
+ LOG.error("Problem with message digest: " + e);
+ } catch(BKException bke) {
+ LOG.error(bke.toString() + "( " + ledgerId + ", " + entryId + ", " + pOp.bookieIdRecv + ")");
+ countNacks((ReadOp) ((SubReadOp) ctx).op, (SubReadOp) ctx, ledgerId, entryId);
+ } catch(InvalidKeyException e){
+ LOG.error(e);
}
- break;
- case GENERIC:
- if(list.size() >= ((lh.getQuorumSize() + 1)/2)){
- ByteBuffer voted = voteGeneric(list, (lh.getQuorumSize() + 1)/2);
- if(voted != null){
- LOG.debug("Voted: " + voted.array());
- byte[] data = new byte[voted.capacity() - 24];
- voted.position(24);
+
+ if(voted != null) {
+ if(voted.capacity() - dLength > 0){
+ byte[] data = new byte[voted.capacity() - dLength - 24];
+ voted.position(24);
voted.get(data, 0, data.length);
counter = addNewEntry(new LedgerEntry(ledgerId, entryId, data), rOp);
- }
+ }
}
- break;
- case FREEFORM:
- if(list.size() == lh.getQuorumSize()){
- ByteBuffer voted = voteFree(list);
- if(voted != null){
- LOG.debug("Voted: " + voted.array());
- byte[] data = new byte[voted.capacity() - 24];
- voted.position(24);
- voted.get(data, 0, data.length);
- counter = addNewEntry(new LedgerEntry(ledgerId, entryId, voted.array()), rOp);
+
+ break;
+ case GENERIC:
+ list = pOp.proposedValues;
+ LOG.debug("List length before: " + list.size());
+
+ synchronized(list){
+ if(rOp.seq[(int) (entryId % (rOp.lastEntry - rOp.firstEntry + 1))] == null){
+ list.add(bb);
+ bb.position(24);
+ if(list.size() >= ((lh.getQuorumSize() + 1)/2)){
+ voted = voteGeneric(list, (lh.getQuorumSize() + 1)/2);
}
- }
+ }
+ }
+
+
+ if(voted != null){
+ LOG.debug("Voted: " + voted.array());
+ byte[] data = new byte[voted.capacity() - 24];
+ voted.position(24);
+ voted.get(data, 0, data.length);
+ counter = addNewEntry(new LedgerEntry(ledgerId, entryId, data), rOp);
+ }
+
+
+ break;
+ case FREEFORM:
+ list = pOp.proposedValues;
+ LOG.debug("List length before: " + list.size());
+ synchronized(list){
+ if(list.size() == lh.getQuorumSize()){
+ voted = voteFree(list);
+ }
+ }
+
+ if(voted != null){
+ LOG.debug("Voted: " + voted.array());
+ byte[] data = new byte[voted.capacity() - 24];
+ voted.position(24);
+ voted.get(data, 0, data.length);
+ counter = addNewEntry(new LedgerEntry(ledgerId, entryId, voted.array()), rOp);
+ }
+ }
+
+ if((counter == (rOp.lastEntry - rOp.firstEntry + 1)) &&
+ !sRead.op.isReady()){
+
+ sRead.op.setReady();
}
- }
-
- if((counter == (rOp.lastEntry - rOp.firstEntry + 1)) &&
- !sRead.op.isReady()){
-
- sRead.op.setReady();
- //sRead.op.cb.readComplete(0, ledgerId, new LedgerSequence(sRead.op.seq), sRead.op.ctx);
- //sRead.op.complete = true;
- }
- LOG.debug("Counter: " + rOp.counter);
+ long diff = rOp.lastEntry - rOp.firstEntry;
+ //LOG.debug("Counter: " + rOp.counter + ", " + diff);
+ }
}
} else {
/*
@@ -385,43 +368,40 @@
* @return
*/
- private ByteBuffer voteVerifiable(ArrayList<ByteBuffer> list)
- throws NoSuchAlgorithmException, BKException{
+
+ private ByteBuffer voteVerifiable(ByteBuffer bb)
+ throws NoSuchAlgorithmException, InvalidKeyException, BKException{
/*
* Check if checksum matches
*/
- ByteBuffer bb = list.get(0);
- list.remove(0);
- MessageDigest md = getDigestInstance(lh.getDigestAlg());
- int dlength = md.getDigestLength();
+ Mac mac = ((BookieClient) Thread.currentThread()).getMac("HmacSHA1", lh.getMacKey());
+ int dlength = mac.getMacLength();
- /*
- * TODO: The if check below is legitimate, but in reality it should never happen,
- * bt it showed up a few times in experiments. Have to check why it is happening.
- */
if(bb.capacity() <= dlength){
- LOG.warn("Something wrong with this entry, length smaller than digest length");
- return null;
+ LOG.warn("Something wrong with this entry, length smaller than digest length");
+ return null;
}
byte[] data = new byte[bb.capacity() - dlength];
bb.get(data, 0, bb.capacity() - dlength);
+
byte[] sig = new byte[dlength];
bb.position(bb.capacity() - dlength);
bb.get(sig, 0, dlength);
-
+
bb.rewind();
- //LOG.warn("Data: " + data.toString() + ", Signature: " + sig.toString());
- md.update(lh.getPasswdHash());
- md.update(data);
- if(MessageDigest.isEqual(md.digest(), sig)){
+ byte[] msgDigest = mac.doFinal(data);
+ if(Arrays.equals(mac.doFinal(data), sig)){
+
return bb;
} else {
+ LOG.error("Entry id: " + new String(msgDigest) + new String(sig));
throw BKException.create(Code.DigestMatchException);
}
+
}
/**
@@ -434,15 +414,16 @@
private ByteBuffer voteGeneric(ArrayList<ByteBuffer> list, int threshold){
HashMap<ByteBuffer, Integer> map = new HashMap<ByteBuffer, Integer>();
- for(ByteBuffer bb : list){
+ for(ByteBuffer bb : list){
if(!map.containsKey(bb)){
- map.put(bb, Integer.valueOf(0));
- }
+ map.put(bb, new Integer(0));
+ } else LOG.debug("Not equal");
- map.put(bb, map.get(bb) + 1);
+ if(bb != null)
+ map.put(bb, map.get(bb) + 1);
if(map.get(bb) >= threshold)
- return bb;
+ return bb;
}
return null;
@@ -459,6 +440,7 @@
private ByteBuffer voteFree(ArrayList<ByteBuffer> list){
HashMap<ByteBuffer, Integer> map = new HashMap<ByteBuffer, Integer>();
for(ByteBuffer bb : list){
+ bb.position(24);
if(!map.containsKey(bb)){
map.put(bb, Integer.valueOf(0));
}
@@ -483,11 +465,6 @@
if(op.seq[(int) index] == null){
op.seq[(int) index] = le;
- if(le.getEntry() != null)
- LOG.debug("Adding entry: " + le.getEntryId() + ", " + le.getEntry().length);
- else
- LOG.debug("Entry is null: " + le.getEntryId());
-
return op.counter.incrementAndGet();
}
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java?rev=761077&r1=761076&r2=761077&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieClient.java Wed Apr 1 21:50:03 2009
@@ -30,7 +30,12 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Semaphore;
-
+import java.util.Arrays;
+import java.security.NoSuchAlgorithmException;
+import java.security.InvalidKeyException;
+import java.security.MessageDigest;
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
import org.apache.bookkeeper.proto.ReadEntryCallback;
import org.apache.bookkeeper.proto.WriteCallback;
@@ -51,7 +56,7 @@
throws IOException, ConnectException {
sock = SocketChannel.open(addr);
setDaemon(true);
- //sock.configureBlocking(false);
+
sock.socket().setSoTimeout(recvTimeout);
sock.socket().setTcpNoDelay(true);
start();
@@ -102,8 +107,8 @@
ConcurrentHashMap<CompletionKey, Completion<WriteCallback>> addCompletions = new ConcurrentHashMap<CompletionKey, Completion<WriteCallback>>();
ConcurrentHashMap<CompletionKey, Completion<ReadEntryCallback>> readCompletions = new ConcurrentHashMap<CompletionKey, Completion<ReadEntryCallback>>();
- Object writeLock = new Object();
- Object readLock = new Object();
+ //Object writeLock = new Object();
+ //Object readLock = new Object();
/*
* Use this semaphore to control the number of completion key in both addCompletions
@@ -114,6 +119,41 @@
/**
+ * Message disgest instance
+ *
+ */
+ MessageDigest digest = null;
+
+ /**
+ * Get digest instance if there is none.
+ *
+ */
+ public MessageDigest getDigestInstance(String alg)
+ throws NoSuchAlgorithmException {
+ if(digest == null){
+ digest = MessageDigest.getInstance(alg);
+ }
+
+ return digest;
+ }
+
+ /**
+ * Mac instance
+ *
+ */
+ Mac mac = null;
+
+ public Mac getMac(String alg, byte[] key)
+ throws NoSuchAlgorithmException, InvalidKeyException {
+ if(mac == null){
+ mac = Mac.getInstance(alg);
+ mac.init(new SecretKeySpec(key, "HmacSHA1"));
+ }
+
+ return mac;
+ }
+
+ /**
* Send addEntry operation to bookie.
*
* @param ledgerId ledger identifier
@@ -123,7 +163,7 @@
* @throws IOException
* @throws InterruptedException
*/
- public void addEntry(long ledgerId, long entryId,
+ synchronized public void addEntry(long ledgerId, byte[] masterKey, long entryId,
ByteBuffer entry, WriteCallback cb, Object ctx)
throws IOException, InterruptedException {
@@ -132,12 +172,14 @@
addCompletions.put(new CompletionKey(ledgerId, entryId),
new Completion<WriteCallback>(cb, ctx));
//entry = entry.duplicate();
- entry.position(0);
+ //entry.position(0);
- ByteBuffer tmpEntry = ByteBuffer.allocate(entry.capacity() + 8 + 8 + 8);
+ ByteBuffer tmpEntry = ByteBuffer.allocate(entry.remaining() + 44);
tmpEntry.position(4);
tmpEntry.putInt(BookieProtocol.ADDENTRY);
+ tmpEntry.put(masterKey);
+ //LOG.debug("Master key: " + new String(masterKey));
tmpEntry.putLong(ledgerId);
tmpEntry.putLong(entryId);
tmpEntry.put(entry);
@@ -147,14 +189,12 @@
// 4 bytes for the message type
tmpEntry.putInt(tmpEntry.remaining() - 4);
tmpEntry.position(0);
- synchronized(writeLock) {
- //sock.write(len);
- //len.clear();
- //len.putInt(BookieProtocol.ADDENTRY);
- //len.flip();
- //sock.write(len);
- sock.write(tmpEntry);
- }
+ //sock.write(len);
+ //len.clear();
+ //len.putInt(BookieProtocol.ADDENTRY);
+ //len.flip();
+ //sock.write(len);
+ sock.write(tmpEntry);
//LOG.debug("addEntry:finished");
}
@@ -167,30 +207,30 @@
* @param ctx control object
* @throws IOException
*/
- public void readEntry(long ledgerId, long entryId,
+ synchronized public void readEntry(long ledgerId, long entryId,
ReadEntryCallback cb, Object ctx)
throws IOException, InterruptedException {
completionSemaphore.acquire();
readCompletions.put(new CompletionKey(ledgerId, entryId),
new Completion<ReadEntryCallback>(cb, ctx));
- ByteBuffer tmpEntry = ByteBuffer.allocate(8 + 8);
+ ByteBuffer tmpEntry = ByteBuffer.allocate(8 + 8 + 8);
+ tmpEntry.putInt(20);
+ tmpEntry.putInt(BookieProtocol.READENTRY);
tmpEntry.putLong(ledgerId);
tmpEntry.putLong(entryId);
tmpEntry.position(0);
- ByteBuffer len = ByteBuffer.allocate(4);
- len.putInt(tmpEntry.remaining() + 4);
- len.flip();
+ //ByteBuffer len = ByteBuffer.allocate(4);
+ //len.putInt(tmpEntry.remaining() + 4);
+ //len.flip();
//LOG.debug("readEntry: Writing to socket");
- synchronized(readLock) {
- sock.write(len);
- len.clear();
- len.putInt(BookieProtocol.READENTRY);
- len.flip();
- sock.write(len);
- sock.write(tmpEntry);
- }
+ //sock.write(len);
+ //len.clear();
+ //len.putInt(BookieProtocol.READENTRY);
+ //len.flip();
+ //sock.write(len);
+ sock.write(tmpEntry);
//LOG.error("Size of readCompletions: " + readCompletions.size());
}
@@ -241,8 +281,8 @@
byte[] data = new byte[bb.capacity() - 24];
bb.get(data);
ByteBuffer entryData = ByteBuffer.wrap(data);
-
- LOG.info("Received entry: " + ledgerId + ", " + entryId + ", " + rc + ", " + entryData.array().length + ", " + bb.array().length + ", " + bb.remaining());
+ //ByteBuffer entryData = bb;
+ //LOG.info("Received entry: " + ledgerId + ", " + entryId + ", " + rc + ", " + entryData.array().length + ", " + bb.array().length + ", " + bb.remaining());
CompletionKey key = new CompletionKey(ledgerId, entryId);
@@ -334,7 +374,7 @@
entry.put(hello);
entry.flip();
counter.inc();
- bc.addEntry(ledger, i, entry, cb, counter);
+ bc.addEntry(ledger, new byte[0], i, entry, cb, counter);
}
counter.wait(0);
System.out.println("Total = " + counter.total());
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieProtocol.java?rev=761077&r1=761076&r2=761077&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieProtocol.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieProtocol.java Wed Apr 1 21:50:03 2009
@@ -66,5 +66,11 @@
/**
* General error occurred at the server
*/
- public static final int EIO = 0;
+ public static final int EIO = 101;
+
+ /**
+ * Unauthorized access to ledger
+ */
+ public static final int EUA = 102;
+
}
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java?rev=761077&r1=761076&r2=761077&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/BookieServer.java Wed Apr 1 21:50:03 2009
@@ -26,6 +26,7 @@
import java.nio.ByteBuffer;
import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.client.AddCallback;
import org.apache.bookkeeper.proto.NIOServerFactory.Cnxn;
import org.apache.log4j.Logger;
@@ -90,10 +91,14 @@
switch(type) {
case BookieProtocol.ADDENTRY:
try {
- bookie.addEntry(packet.slice(), this, src);
+ byte[] masterKey = new byte[20];
+ packet.get(masterKey, 0, 20);
+ //LOG.debug("Master key: " + new String(masterKey));
+ bookie.addEntry(packet.slice(), this, src, masterKey);
} catch(IOException e) {
if (LOG.isTraceEnabled()) {
ByteBuffer bb = packet.duplicate();
+
long ledgerId = bb.getLong();
long entryId = bb.getLong();
LOG.trace("Error reading " + entryId + "@" + ledgerId, e);
@@ -103,6 +108,17 @@
eio.putInt(BookieProtocol.EIO);
eio.flip();
src.sendResponse(new ByteBuffer[] {eio});
+ } catch(BookieException e){
+ ByteBuffer bb = packet.duplicate();
+ long ledgerId = bb.getLong();
+
+ LOG.error("Unauthorized access to ledger " + ledgerId);
+
+ ByteBuffer eio = ByteBuffer.allocate(8);
+ eio.putInt(type);
+ eio.putInt(BookieProtocol.EUA);
+ eio.flip();
+ src.sendResponse(new ByteBuffer[] {eio});
}
break;
case BookieProtocol.READENTRY:
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java?rev=761077&r1=761076&r2=761077&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieClientTest.java Wed Apr 1 21:50:03 2009
@@ -23,6 +23,7 @@
import java.io.File;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import org.junit.Test;
import org.apache.bookkeeper.bookie.Bookie;
@@ -102,21 +103,24 @@
@Test
public void testWriteGaps() throws Exception {
final Object notifyObject = new Object();
+ byte[] passwd = new byte[20];
+ Arrays.fill(passwd, (byte) 'a');
+
BookieClient bc = new BookieClient("127.0.0.1", port, 50000);
ByteBuffer bb;
bb = createByteBuffer(1);
- bc.addEntry(1, 1, bb, wrcb, null);
+ bc.addEntry(1, passwd, 1, bb, wrcb, null);
bb = createByteBuffer(2);
- bc.addEntry(1, 2, bb, wrcb, null);
+ bc.addEntry(1, passwd, 2, bb, wrcb, null);
bb = createByteBuffer(3);
- bc.addEntry(1, 3, bb, wrcb, null);
+ bc.addEntry(1, passwd, 3, bb, wrcb, null);
bb = createByteBuffer(5);
- bc.addEntry(1, 5, bb, wrcb, null);
+ bc.addEntry(1, passwd, 5, bb, wrcb, null);
bb = createByteBuffer(7);
- bc.addEntry(1, 7, bb, wrcb, null);
+ bc.addEntry(1, passwd, 7, bb, wrcb, null);
synchronized(notifyObject) {
bb = createByteBuffer(11);
- bc.addEntry(1, 11, bb, wrcb, notifyObject);
+ bc.addEntry(1, passwd, 11, bb, wrcb, notifyObject);
notifyObject.wait();
}
ResultStruct arc = new ResultStruct();
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LoopbackClient.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LoopbackClient.java?rev=761077&r1=761076&r2=761077&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LoopbackClient.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/LoopbackClient.java Wed Apr 1 21:50:03 2009
@@ -25,6 +25,7 @@
import java.nio.ByteBuffer;
import java.io.IOException;
import java.lang.InterruptedException;
+import java.util.Arrays;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.WriteCallback;
@@ -73,7 +74,11 @@
void write(long ledgerId, long entry, byte[] data, WriteCallback cb, Object ctx)
throws IOException, InterruptedException {
LOG.info("Ledger id: " + ledgerId + ", Entry: " + entry);
+ byte[] passwd = new byte[20];
+ Arrays.fill(passwd, (byte) 'a');
+
client.addEntry(ledgerId,
+ passwd,
entry,
ByteBuffer.wrap(data),
cb,
|