Author: breed
Date: Fri Jul 9 20:53:36 2010
New Revision: 962693
URL: http://svn.apache.org/viewvc?rev=962693&view=rev
Log:
ZOOKEEPER-719. Add throttling to BookKeeper client
Modified:
hadoop/zookeeper/trunk/CHANGES.txt
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerCreateOp.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerOpenOp.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingAddOp.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java
Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=962693&r1=962692&r2=962693&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Fri Jul 9 20:53:36 2010
@@ -64,6 +64,8 @@ BUGFIXES:
ZOOKEEPER-796. zkServer.sh should support an external PIDFILE variable
(Alex Newman via phunt)
+ ZOOKEEPER-719. Add throttling to BookKeeper client (fpj via breed)
+
IMPROVEMENTS:
ZOOKEEPER-724. Improve junit test integration - log harness information
(phunt via mahadev)
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java?rev=962693&r1=962692&r2=962693&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java
(original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java
Fri Jul 9 20:53:36 2010
@@ -70,6 +70,8 @@ public abstract class BKException extend
return new BKWriteException();
case Code.NoSuchEntryException:
return new BKNoSuchEntryException();
+ case Code.IncorrectParameterException:
+ return new BKIncorrectParameterException();
default:
return new BKIllegalOpException();
}
@@ -94,7 +96,8 @@ public abstract class BKException extend
int LedgerClosedException = -11;
int WriteException = -12;
int NoSuchEntryException = -13;
-
+ int IncorrectParameterException = -14;
+
int IllegalOpException = -100;
}
@@ -136,6 +139,8 @@ public abstract class BKException extend
return "Write failed on bookie";
case Code.NoSuchEntryException:
return "No such entry";
+ case Code.IncorrectParameterException:
+ return "Incorrect parameter input";
default:
return "Invalid operation";
}
@@ -224,4 +229,10 @@ public abstract class BKException extend
super(Code.LedgerClosedException);
}
}
+
+ public static class BKIncorrectParameterException extends BKException {
+ public BKIncorrectParameterException() {
+ super(Code.IncorrectParameterException);
+ }
+ }
}
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerCreateOp.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerCreateOp.java?rev=962693&r1=962692&r2=962693&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerCreateOp.java
(original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerCreateOp.java
Fri Jul 9 20:53:36 2010
@@ -145,6 +145,10 @@ class LedgerCreateOp implements StringCa
LOG.error("Security exception while creating ledger: " + ledgerId, e);
cb.createComplete(BKException.Code.DigestNotInitializedException, null, this.ctx);
return;
+ } catch (NumberFormatException e) {
+ LOG.error("Incorrectly entered parameter throttle: " + System.getProperty("throttle"),
e);
+ cb.createComplete(BKException.Code.IncorrectParameterException, null, this.ctx);
+ return;
}
lh.writeLedgerConfig(this, null);
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=962693&r1=962692&r2=962693&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
(original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
Fri Jul 9 20:53:36 2010
@@ -27,6 +27,8 @@ import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Queue;
+import java.util.concurrent.Semaphore;
+
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
@@ -61,10 +63,14 @@ public class LedgerHandle implements Rea
final DigestManager macManager;
final DistributionSchedule distributionSchedule;
+ final Semaphore opCounterSem;
+ private Integer throttling = 5000;
+
final Queue<PendingAddOp> pendingAddOps = new ArrayDeque<PendingAddOp>();
LedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
- DigestType digestType, byte[] password) throws GeneralSecurityException {
+ DigestType digestType, byte[] password)
+ throws GeneralSecurityException, NumberFormatException {
this.bk = bk;
this.metadata = metadata;
if (metadata.isClosed()) {
@@ -72,14 +78,21 @@ public class LedgerHandle implements Rea
} else {
lastAddConfirmed = lastAddPushed = -1;
}
-
+
this.ledgerId = ledgerId;
+
+ String throttleValue = System.getProperty("throttle");
+ if(throttleValue != null){
+ this.throttling = new Integer(throttleValue);
+ }
+ this.opCounterSem = new Semaphore(throttling);
+
macManager = DigestManager.instantiate(ledgerId, password, digestType);
this.ledgerKey = MacDigestManager.genDigest("ledger", password);
distributionSchedule = new RoundRobinDistributionSchedule(
metadata.quorumSize, metadata.ensembleSize);
}
-
+
/**
* Get the id of the current ledger
*
@@ -219,7 +232,7 @@ public class LedgerHandle implements Rea
* control object
*/
public void asyncReadEntries(long firstEntry, long lastEntry,
- ReadCallback cb, Object ctx) {
+ ReadCallback cb, Object ctx) throws InterruptedException {
// Little sanity check
if (firstEntry < 0 || lastEntry > lastAddConfirmed
|| firstEntry > lastEntry) {
@@ -228,7 +241,7 @@ public class LedgerHandle implements Rea
}
new PendingReadOp(this, firstEntry, lastEntry, cb, ctx).initiate();
-
+ opCounterSem.acquire();
}
/**
@@ -260,8 +273,8 @@ public class LedgerHandle implements Rea
* some control object
*/
public void asyncAddEntry(final byte[] data, final AddCallback cb,
- final Object ctx) {
- bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
+ final Object ctx) throws InterruptedException {
+ bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
@Override
public void safeRun() {
if (metadata.isClosed()) {
@@ -279,7 +292,8 @@ public class LedgerHandle implements Rea
op.initiate(toSend);
}
- });
+ });
+ opCounterSem.acquire();
}
// close the ledger and send fails to all the adds in the pipeline
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerOpenOp.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerOpenOp.java?rev=962693&r1=962692&r2=962693&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerOpenOp.java
(original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerOpenOp.java
Fri Jul 9 20:53:36 2010
@@ -114,6 +114,10 @@ class LedgerOpenOp implements DataCallba
LOG.error("Security exception while opening ledger: " + ledgerId, e);
cb.openComplete(BKException.Code.DigestNotInitializedException, null, this.ctx);
return;
+ } catch (NumberFormatException e) {
+ LOG.error("Incorrectly entered parameter throttle: " + System.getProperty("throttle"),
e);
+ cb.openComplete(BKException.Code.IncorrectParameterException, null, this.ctx);
+ return;
}
if (metadata.close != LedgerMetadata.NOTCLOSED) {
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java?rev=962693&r1=962692&r2=962693&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
(original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
Fri Jul 9 20:53:36 2010
@@ -117,9 +117,13 @@ class LedgerRecoveryOp implements ReadEn
* Try to read past the last confirmed.
*/
private void doRecoveryRead() {
- lh.lastAddConfirmed++;
- lh.asyncReadEntries(lh.lastAddConfirmed, lh.lastAddConfirmed, this, null);
-
+ try{
+ lh.lastAddConfirmed++;
+ lh.asyncReadEntries(lh.lastAddConfirmed, lh.lastAddConfirmed, this, null);
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted while trying to read entry.", e);
+ Thread.currentThread().interrupt();
+ }
}
@Override
@@ -127,7 +131,12 @@ class LedgerRecoveryOp implements ReadEn
// get back to prev value
lh.lastAddConfirmed--;
if (rc == BKException.Code.OK) {
- lh.asyncAddEntry(seq.nextElement().getEntry(), this, null);
+ try{
+ lh.asyncAddEntry(seq.nextElement().getEntry(), this, null);
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted while adding entry.", e);
+ Thread.currentThread().interrupt();
+ }
return;
}
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingAddOp.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingAddOp.java?rev=962693&r1=962692&r2=962693&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingAddOp.java
(original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingAddOp.java
Fri Jul 9 20:53:36 2010
@@ -132,6 +132,7 @@ class PendingAddOp implements WriteCallb
void submitCallback(final int rc) {
cb.addComplete(rc, lh, entryId, ctx);
+ lh.opCounterSem.release();
}
}
\ No newline at end of file
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java?rev=962693&r1=962692&r2=962693&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java
(original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java
Fri Jul 9 20:53:36 2010
@@ -87,7 +87,7 @@ class PendingReadOp implements Enumerati
if (entry.nextReplicaIndexToReadFrom >= lh.metadata.quorumSize) {
// we are done, the read has failed from all replicas, just fail the
// read
- cb.readComplete(lastErrorCode, lh, null, ctx);
+ submitCallback(lastErrorCode);
return;
}
@@ -126,11 +126,15 @@ class PendingReadOp implements Enumerati
entry.entryDataStream = is;
if (numPendingReads == 0) {
- cb.readComplete(BKException.Code.OK, lh, PendingReadOp.this, PendingReadOp.this.ctx);
+ submitCallback(BKException.Code.OK);
}
}
+ private void submitCallback(int code){
+ cb.readComplete(code, lh, PendingReadOp.this, PendingReadOp.this.ctx);
+ lh.opCounterSem.release();
+ }
public boolean hasMoreElements() {
return !seq.isEmpty();
}
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java?rev=962693&r1=962692&r2=962693&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
(original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
Fri Jul 9 20:53:36 2010
@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.client.BKException;
@@ -69,6 +70,7 @@ public class PerChannelBookieClient exte
InetSocketAddress addr;
boolean connected = false;
+ Semaphore opCounterSem = new Semaphore(2000);
AtomicLong totalBytesOutstanding;
ClientSocketChannelFactory channelFactory;
OrderedSafeExecutor executor;
@@ -206,6 +208,7 @@ public class PerChannelBookieClient exte
Object ctx) {
final int entrySize = toSend.readableBytes();
+
// if (totalBytesOutstanding.get() > maxMemory) {
// // TODO: how to throttle, throw an exception, or call the callback?
// // Maybe this should be done at the layer above?
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java?rev=962693&r1=962692&r2=962693&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java
(original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java
Fri Jul 9 20:53:36 2010
@@ -238,6 +238,101 @@ public class BookieReadWriteTest extends
}
@Test
+ public void testReadWriteAsyncSingleClientThrottle() throws IOException {
+ try {
+ // Create a BookKeeper client and a ledger
+ System.setProperty("throttle", "1000");
+ bkc = new BookKeeper("127.0.0.1");
+ lh = bkc.createLedger(digestType, ledgerPassword);
+ // bkc.initMessageDigest("SHA1");
+ ledgerId = lh.getId();
+ LOG.info("Ledger ID: " + lh.getId());
+
+ numEntriesToWrite = 20000;
+ for (int i = 0; i < (numEntriesToWrite - 10000); i++) {
+ ByteBuffer entry = ByteBuffer.allocate(4);
+ entry.putInt(rng.nextInt(maxInt));
+ entry.position(0);
+
+ entries.add(entry.array());
+ entriesSize.add(entry.array().length);
+ lh.asyncAddEntry(entry.array(), this, sync);
+ }
+
+
+ for (int i = 0; i < 10000; i++) {
+ ByteBuffer entry = ByteBuffer.allocate(4);
+ entry.putInt(rng.nextInt(maxInt));
+ entry.position(0);
+
+ entries.add(entry.array());
+ entriesSize.add(entry.array().length);
+ lh.asyncAddEntry(entry.array(), this, sync);
+ }
+
+ // wait for all entries to be acknowledged
+ synchronized (sync) {
+ while (sync.counter < numEntriesToWrite) {
+ LOG.debug("Entries counter = " + sync.counter);
+ sync.wait();
+ }
+ }
+
+ LOG.debug("*** WRITE COMPLETE ***");
+ // close ledger
+ lh.close();
+
+ // *** WRITING PART COMPLETE // READ PART BEGINS ***
+
+ // open ledger
+ lh = bkc.openLedger(ledgerId, digestType, ledgerPassword);
+ LOG.debug("Number of entries written: " + (lh.getLastAddConfirmed() + 1));
+ assertTrue("Verifying number of entries written", lh.getLastAddConfirmed() ==
(numEntriesToWrite - 1));
+
+ // read entries
+ lh.asyncReadEntries(0, numEntriesToWrite - 1, this, (Object) sync);
+
+ synchronized (sync) {
+ while (sync.value == false) {
+ sync.wait();
+ }
+ }
+
+ LOG.debug("*** READ COMPLETE ***");
+
+ // at this point, LedgerSequence ls is filled with the returned
+ // values
+ int i = 0;
+ while (ls.hasMoreElements()) {
+ ByteBuffer origbb = ByteBuffer.wrap(entries.get(i));
+ Integer origEntry = origbb.getInt();
+ byte[] entry = ls.nextElement().getEntry();
+ ByteBuffer result = ByteBuffer.wrap(entry);
+ LOG.debug("Length of result: " + result.capacity());
+ LOG.debug("Original entry: " + origEntry);
+
+ Integer retrEntry = result.getInt();
+ LOG.debug("Retrieved entry: " + retrEntry);
+ assertTrue("Checking entry " + i + " for equality", origEntry.equals(retrEntry));
+ assertTrue("Checking entry " + i + " for size", entry.length == entriesSize.get(i).intValue());
+ i++;
+ }
+ assertTrue("Checking number of read entries", i == numEntriesToWrite);
+
+ lh.close();
+ } catch (KeeperException e) {
+ LOG.error("Test failed", e);
+ fail("Test failed due to ZooKeeper exception");
+ } catch (BKException e) {
+ LOG.error("Test failed", e);
+ fail("Test failed due to BookKeeper exception");
+ } catch (InterruptedException e) {
+ LOG.error("Test failed", e);
+ fail("Test failed due to interruption");
+ }
+ }
+
+ @Test
public void testSyncReadAsyncWriteStringsSingleClient() throws IOException {
LOG.info("TEST READ WRITE STRINGS MIXED SINGLE CLIENT");
String charset = "utf-8";
|