hadoop-zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject svn commit: r962693 - in /hadoop/zookeeper/trunk: ./ src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/ src/contrib/bookkeeper/test/org/apache/bookkeeper/test/
Date Fri, 09 Jul 2010 20:53:37 GMT
Author: breed
Date: Fri Jul  9 20:53:36 2010
New Revision: 962693

URL: http://svn.apache.org/viewvc?rev=962693&view=rev
Log:
ZOOKEEPER-719. Add throttling to BookKeeper client

Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerCreateOp.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerOpenOp.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingAddOp.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=962693&r1=962692&r2=962693&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Fri Jul  9 20:53:36 2010
@@ -64,6 +64,8 @@ BUGFIXES: 
   ZOOKEEPER-796. zkServer.sh should support an external PIDFILE variable
   (Alex Newman via phunt)
 
+  ZOOKEEPER-719. Add throttling to BookKeeper client (fpj via breed)
+
 IMPROVEMENTS:
   ZOOKEEPER-724. Improve junit test integration - log harness information 
   (phunt via mahadev)

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java?rev=962693&r1=962692&r2=962693&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java
(original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BKException.java
Fri Jul  9 20:53:36 2010
@@ -70,6 +70,8 @@ public abstract class BKException extend
             return new BKWriteException();
         case Code.NoSuchEntryException:
             return new BKNoSuchEntryException();
+        case Code.IncorrectParameterException:
+            return new BKIncorrectParameterException();
         default:
             return new BKIllegalOpException();
         }
@@ -94,7 +96,8 @@ public abstract class BKException extend
         int LedgerClosedException = -11;
         int WriteException = -12;
         int NoSuchEntryException = -13;
-
+        int IncorrectParameterException = -14;
+        
         int IllegalOpException = -100;
     }
 
@@ -136,6 +139,8 @@ public abstract class BKException extend
             return "Write failed on bookie";
         case Code.NoSuchEntryException:
             return "No such entry";
+        case Code.IncorrectParameterException:
+            return "Incorrect parameter input";
         default:
             return "Invalid operation";
         }
@@ -224,4 +229,10 @@ public abstract class BKException extend
             super(Code.LedgerClosedException);
         }
     }
+    
+    public static class BKIncorrectParameterException extends BKException {
+        public BKIncorrectParameterException() {
+            super(Code.IncorrectParameterException);
+        }
+    }
 }

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerCreateOp.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerCreateOp.java?rev=962693&r1=962692&r2=962693&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerCreateOp.java
(original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerCreateOp.java
Fri Jul  9 20:53:36 2010
@@ -145,6 +145,10 @@ class LedgerCreateOp implements StringCa
             LOG.error("Security exception while creating ledger: " + ledgerId, e);
             cb.createComplete(BKException.Code.DigestNotInitializedException, null, this.ctx);
             return;
+        } catch (NumberFormatException e) {
+            LOG.error("Incorrectly entered parameter throttle: " + System.getProperty("throttle"),
e);
+            cb.createComplete(BKException.Code.IncorrectParameterException, null, this.ctx);
+            return;
         }
 
         lh.writeLedgerConfig(this, null);

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=962693&r1=962692&r2=962693&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
(original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
Fri Jul  9 20:53:36 2010
@@ -27,6 +27,8 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.Queue;
+import java.util.concurrent.Semaphore;
+
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
@@ -61,10 +63,14 @@ public class LedgerHandle implements Rea
   final DigestManager macManager;
   final DistributionSchedule distributionSchedule;
 
+  final Semaphore opCounterSem;
+  private Integer throttling = 5000;
+  
   final Queue<PendingAddOp> pendingAddOps = new ArrayDeque<PendingAddOp>();
 
   LedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
-      DigestType digestType, byte[] password) throws GeneralSecurityException {
+      DigestType digestType, byte[] password)
+      throws GeneralSecurityException, NumberFormatException {
     this.bk = bk;
     this.metadata = metadata;
     if (metadata.isClosed()) {
@@ -72,14 +78,21 @@ public class LedgerHandle implements Rea
     } else {
       lastAddConfirmed = lastAddPushed = -1;
     }
-
+    
     this.ledgerId = ledgerId;
+    
+    String throttleValue = System.getProperty("throttle");
+    if(throttleValue != null){
+        this.throttling = new Integer(throttleValue); 
+    }
+    this.opCounterSem = new Semaphore(throttling);
+    
     macManager = DigestManager.instantiate(ledgerId, password, digestType);
     this.ledgerKey = MacDigestManager.genDigest("ledger", password);
     distributionSchedule = new RoundRobinDistributionSchedule(
         metadata.quorumSize, metadata.ensembleSize);
   }
-
+  
   /**
    * Get the id of the current ledger
    * 
@@ -219,7 +232,7 @@ public class LedgerHandle implements Rea
    *          control object
    */
   public void asyncReadEntries(long firstEntry, long lastEntry,
-      ReadCallback cb, Object ctx) {
+      ReadCallback cb, Object ctx) throws InterruptedException {
     // Little sanity check
     if (firstEntry < 0 || lastEntry > lastAddConfirmed
         || firstEntry > lastEntry) {
@@ -228,7 +241,7 @@ public class LedgerHandle implements Rea
     }
 
     new PendingReadOp(this, firstEntry, lastEntry, cb, ctx).initiate();
-
+    opCounterSem.acquire();
   }
 
   /**
@@ -260,8 +273,8 @@ public class LedgerHandle implements Rea
    *          some control object
    */
   public void asyncAddEntry(final byte[] data, final AddCallback cb,
-      final Object ctx) {
-    bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
+      final Object ctx) throws InterruptedException {
+      bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
       @Override
       public void safeRun() {
         if (metadata.isClosed()) {
@@ -279,7 +292,8 @@ public class LedgerHandle implements Rea
         op.initiate(toSend);
 
       }
-    });
+      });
+      opCounterSem.acquire();
   }
 
   // close the ledger and send fails to all the adds in the pipeline

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerOpenOp.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerOpenOp.java?rev=962693&r1=962692&r2=962693&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerOpenOp.java
(original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerOpenOp.java
Fri Jul  9 20:53:36 2010
@@ -114,6 +114,10 @@ class LedgerOpenOp implements DataCallba
             LOG.error("Security exception while opening ledger: " + ledgerId, e);
             cb.openComplete(BKException.Code.DigestNotInitializedException, null, this.ctx);
             return;
+        } catch (NumberFormatException e) {
+            LOG.error("Incorrectly entered parameter throttle: " + System.getProperty("throttle"),
e);
+            cb.openComplete(BKException.Code.IncorrectParameterException, null, this.ctx);
+            return;
         }
 
         if (metadata.close != LedgerMetadata.NOTCLOSED) {

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java?rev=962693&r1=962692&r2=962693&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
(original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerRecoveryOp.java
Fri Jul  9 20:53:36 2010
@@ -117,9 +117,13 @@ class LedgerRecoveryOp implements ReadEn
      * Try to read past the last confirmed.
      */
     private void doRecoveryRead() {
-        lh.lastAddConfirmed++;
-        lh.asyncReadEntries(lh.lastAddConfirmed, lh.lastAddConfirmed, this, null);
-
+        try{
+            lh.lastAddConfirmed++;
+            lh.asyncReadEntries(lh.lastAddConfirmed, lh.lastAddConfirmed, this, null);
+        } catch (InterruptedException e) {
+            LOG.error("Interrupted while trying to read entry.", e);
+            Thread.currentThread().interrupt();
+        }
     }
 
     @Override
@@ -127,7 +131,12 @@ class LedgerRecoveryOp implements ReadEn
         // get back to prev value
         lh.lastAddConfirmed--;
         if (rc == BKException.Code.OK) {
-            lh.asyncAddEntry(seq.nextElement().getEntry(), this, null);
+            try{
+                lh.asyncAddEntry(seq.nextElement().getEntry(), this, null);
+            } catch (InterruptedException e) {
+                LOG.error("Interrupted while adding entry.", e);
+                Thread.currentThread().interrupt();
+            }
             return;
         }
 

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingAddOp.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingAddOp.java?rev=962693&r1=962692&r2=962693&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingAddOp.java
(original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingAddOp.java
Fri Jul  9 20:53:36 2010
@@ -132,6 +132,7 @@ class PendingAddOp implements WriteCallb
 
     void submitCallback(final int rc) {
         cb.addComplete(rc, lh, entryId, ctx);
+        lh.opCounterSem.release();
     }
 
 }
\ No newline at end of file

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java?rev=962693&r1=962692&r2=962693&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java
(original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/PendingReadOp.java
Fri Jul  9 20:53:36 2010
@@ -87,7 +87,7 @@ class PendingReadOp implements Enumerati
         if (entry.nextReplicaIndexToReadFrom >= lh.metadata.quorumSize) {
             // we are done, the read has failed from all replicas, just fail the
             // read
-            cb.readComplete(lastErrorCode, lh, null, ctx);
+            submitCallback(lastErrorCode);
             return;
         }
 
@@ -126,11 +126,15 @@ class PendingReadOp implements Enumerati
         entry.entryDataStream = is;
 
         if (numPendingReads == 0) {
-            cb.readComplete(BKException.Code.OK, lh, PendingReadOp.this, PendingReadOp.this.ctx);
+            submitCallback(BKException.Code.OK);
         }
 
     }
 
+    private void submitCallback(int code){
+        cb.readComplete(code, lh, PendingReadOp.this, PendingReadOp.this.ctx);
+        lh.opCounterSem.release();
+    }
     public boolean hasMoreElements() {
         return !seq.isEmpty();
     }

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java?rev=962693&r1=962692&r2=962693&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
(original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
Fri Jul  9 20:53:36 2010
@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
 import java.util.ArrayDeque;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.bookkeeper.client.BKException;
@@ -69,6 +70,7 @@ public class PerChannelBookieClient exte
 
     InetSocketAddress addr;
     boolean connected = false;
+    Semaphore opCounterSem = new Semaphore(2000);
     AtomicLong totalBytesOutstanding;
     ClientSocketChannelFactory channelFactory;
     OrderedSafeExecutor executor;
@@ -206,6 +208,7 @@ public class PerChannelBookieClient exte
             Object ctx) {
 
         final int entrySize = toSend.readableBytes();
+        
         // if (totalBytesOutstanding.get() > maxMemory) {
         // // TODO: how to throttle, throw an exception, or call the callback?
         // // Maybe this should be done at the layer above?

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java?rev=962693&r1=962692&r2=962693&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java
(original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieReadWriteTest.java
Fri Jul  9 20:53:36 2010
@@ -238,6 +238,101 @@ public class BookieReadWriteTest extends
     }
 
     @Test
+    public void testReadWriteAsyncSingleClientThrottle() throws IOException {
+        try {
+            // Create a BookKeeper client and a ledger
+            System.setProperty("throttle", "1000");
+            bkc = new BookKeeper("127.0.0.1");
+            lh = bkc.createLedger(digestType, ledgerPassword);
+            // bkc.initMessageDigest("SHA1");
+            ledgerId = lh.getId();
+            LOG.info("Ledger ID: " + lh.getId());
+            
+            numEntriesToWrite = 20000; 
+            for (int i = 0; i < (numEntriesToWrite - 10000); i++) {
+                ByteBuffer entry = ByteBuffer.allocate(4);
+                entry.putInt(rng.nextInt(maxInt));
+                entry.position(0);
+
+                entries.add(entry.array());
+                entriesSize.add(entry.array().length);
+                lh.asyncAddEntry(entry.array(), this, sync);
+            }
+            
+
+            for (int i = 0; i < 10000; i++) {
+                ByteBuffer entry = ByteBuffer.allocate(4);
+                entry.putInt(rng.nextInt(maxInt));
+                entry.position(0);
+
+                entries.add(entry.array());
+                entriesSize.add(entry.array().length);
+                lh.asyncAddEntry(entry.array(), this, sync);
+            }
+            
+            // wait for all entries to be acknowledged
+            synchronized (sync) {
+                while (sync.counter < numEntriesToWrite) {
+                    LOG.debug("Entries counter = " + sync.counter);
+                    sync.wait();
+                }
+            }
+
+            LOG.debug("*** WRITE COMPLETE ***");
+            // close ledger
+            lh.close();
+
+            // *** WRITING PART COMPLETE // READ PART BEGINS ***
+            
+            // open ledger
+            lh = bkc.openLedger(ledgerId, digestType, ledgerPassword);
+            LOG.debug("Number of entries written: " + (lh.getLastAddConfirmed() + 1));
+            assertTrue("Verifying number of entries written", lh.getLastAddConfirmed() ==
(numEntriesToWrite - 1));
+
+            // read entries
+            lh.asyncReadEntries(0, numEntriesToWrite - 1, this, (Object) sync);
+
+            synchronized (sync) {
+                while (sync.value == false) {
+                    sync.wait();
+                }
+            }
+
+            LOG.debug("*** READ COMPLETE ***");
+
+            // at this point, LedgerSequence ls is filled with the returned
+            // values
+            int i = 0;
+            while (ls.hasMoreElements()) {
+                ByteBuffer origbb = ByteBuffer.wrap(entries.get(i));
+                Integer origEntry = origbb.getInt();
+                byte[] entry = ls.nextElement().getEntry();
+                ByteBuffer result = ByteBuffer.wrap(entry);
+                LOG.debug("Length of result: " + result.capacity());
+                LOG.debug("Original entry: " + origEntry);
+
+                Integer retrEntry = result.getInt();
+                LOG.debug("Retrieved entry: " + retrEntry);
+                assertTrue("Checking entry " + i + " for equality", origEntry.equals(retrEntry));
+                assertTrue("Checking entry " + i + " for size", entry.length == entriesSize.get(i).intValue());
+                i++;
+            }
+            assertTrue("Checking number of read entries", i == numEntriesToWrite);
+
+            lh.close();
+        } catch (KeeperException e) {
+            LOG.error("Test failed", e);
+            fail("Test failed due to ZooKeeper exception");
+        } catch (BKException e) {
+            LOG.error("Test failed", e);
+            fail("Test failed due to BookKeeper exception");
+        } catch (InterruptedException e) {
+            LOG.error("Test failed", e);
+            fail("Test failed due to interruption");
+        }
+    }
+    
+    @Test
     public void testSyncReadAsyncWriteStringsSingleClient() throws IOException {
         LOG.info("TEST READ WRITE STRINGS MIXED SINGLE CLIENT");
         String charset = "utf-8";



Mime
View raw message