zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From f..@apache.org
Subject svn commit: r1239167 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/
Date Wed, 01 Feb 2012 15:19:57 GMT
Author: fpj
Date: Wed Feb  1 15:19:57 2012
New Revision: 1239167

URL: http://svn.apache.org/viewvc?rev=1239167&view=rev
Log:
BOOKKEEPER-157. For small packets, increasing number of bookies actually degrades performance.
(ivank via fpj)


Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/CRC32DigestManager.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/MacDigestManager.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1239167&r1=1239166&r2=1239167&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed Feb  1 15:19:57 2012
@@ -40,6 +40,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-98: collect add/read statistics on bookie server (Sijie Guo via ivank)
 
+	BOOKKEEPER-157:	For small packets, increasing number of bookies actually degrades performance.
(ivank via fpj)
+
 
 Release 4.0.0 - 2011-11-30
 

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/CRC32DigestManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/CRC32DigestManager.java?rev=1239167&r1=1239166&r2=1239167&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/CRC32DigestManager.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/CRC32DigestManager.java
Wed Feb  1 15:19:57 2012
@@ -23,7 +23,12 @@ import java.nio.ByteBuffer;
 import java.util.zip.CRC32;
 
 class CRC32DigestManager extends DigestManager {
-    CRC32 crc = new CRC32();
+    private final ThreadLocal<CRC32> crc = new ThreadLocal<CRC32>() {
+        @Override
+        protected CRC32 initialValue() {
+            return new CRC32();
+        }
+    };
 
     public CRC32DigestManager(long ledgerId) {
         super(ledgerId);
@@ -38,13 +43,13 @@ class CRC32DigestManager extends DigestM
     byte[] getValueAndReset() {
         byte[] value = new byte[8];
         ByteBuffer buf = ByteBuffer.wrap(value);
-        buf.putLong(crc.getValue());
-        crc.reset();
+        buf.putLong(crc.get().getValue());
+        crc.get().reset();
         return value;
     }
 
     @Override
     void update(byte[] data, int offset, int length) {
-        crc.update(data, offset, length);
+        crc.get().update(data, offset, length);
     }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=1239167&r1=1239166&r2=1239167&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
Wed Feb  1 15:19:57 2012
@@ -244,18 +244,22 @@ public class LedgerHandle {
      * @param rc
      */
     void asyncCloseInternal(final CloseCallback cb, final Object ctx, final int rc) {
- 
         bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
-
             @Override
             public void safeRun() {
-                metadata.length = length;
-                // Close operation is idempotent, so no need to check if we are
-                // already closed
-
-                metadata.close(lastAddConfirmed);
-                errorOutPendingAdds(rc);
-                lastAddPushed = lastAddConfirmed;
+                synchronized(LedgerHandle.this) {
+                    // synchronized on LedgerHandle.this to ensure that 
+                    // lastAddPushed can not be updated after the metadata 
+                    // is closed. 
+                    metadata.length = length;
+
+                    // Close operation is idempotent, so no need to check if we are
+                    // already closed
+
+                    metadata.close(lastAddConfirmed);
+                    errorOutPendingAdds(rc);
+                    lastAddPushed = lastAddConfirmed;
+                }
 
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Closing ledger: " + ledgerId + " at entryId: "
@@ -438,22 +442,30 @@ public class LedgerHandle {
                            LedgerHandle.this, -1, ctx);
         }
 
+        final long entryId;
+        final long currentLength;
+        synchronized(this) {
+            // synchronized on this to ensure that
+            // the ledger isn't closed between checking and 
+            // updating lastAddPushed
+            if (metadata.isClosed()) {
+                LOG.warn("Attempt to add to closed ledger: " + ledgerId);
+                LedgerHandle.this.opCounterSem.release();
+                cb.addComplete(BKException.Code.LedgerClosedException,
+                               LedgerHandle.this, -1, ctx);
+                return;
+            }
+
+            entryId = ++lastAddPushed;
+            currentLength = addToLength(length);
+            op.setEntryId(entryId);
+            pendingAddOps.add(op);
+        }
+
         try {
-            bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() {
+            bk.mainWorkerPool.submit(new SafeRunnable() {
                 @Override
                 public void safeRun() {
-                    if (metadata.isClosed()) {
-                        LOG.warn("Attempt to add to closed ledger: " + ledgerId);
-                        LedgerHandle.this.opCounterSem.release();
-                        cb.addComplete(BKException.Code.LedgerClosedException,
-                                       LedgerHandle.this, -1, ctx);
-                        return;
-                    }
-
-                    long entryId = ++lastAddPushed;
-                    long currentLength = addToLength(length);
-                    op.setEntryId(entryId);
-                    pendingAddOps.add(op);
                     ChannelBuffer toSend = macManager.computeDigestAndPackageForSending(
                                                entryId, lastAddConfirmed, currentLength,
data, offset, length);
                     op.initiate(toSend);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/MacDigestManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/MacDigestManager.java?rev=1239167&r1=1239166&r2=1239167&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/MacDigestManager.java
(original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/MacDigestManager.java
Wed Feb  1 15:19:57 2012
@@ -25,19 +25,36 @@ import java.security.NoSuchAlgorithmExce
 import javax.crypto.Mac;
 import javax.crypto.spec.SecretKeySpec;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 class MacDigestManager extends DigestManager {
+    final static Logger LOG = LoggerFactory.getLogger(MacDigestManager.class);
+
     public static String DIGEST_ALGORITHM = "SHA-1";
     public static String KEY_ALGORITHM = "HmacSHA1";
-    Mac mac;
 
-    public MacDigestManager(long ledgerId, byte[] passwd) throws GeneralSecurityException
{
-        super(ledgerId);
-        byte[] macKey = genDigest("mac", passwd);
-        SecretKeySpec keySpec = new SecretKeySpec(macKey, KEY_ALGORITHM);
-        mac = Mac.getInstance(KEY_ALGORITHM);
-        mac.init(keySpec);
+    final byte[] passwd;
 
+    private final ThreadLocal<Mac> mac = new ThreadLocal<Mac>() {
+        @Override
+        protected Mac initialValue() {
+            try {
+                byte[] macKey = genDigest("mac", passwd);
+                SecretKeySpec keySpec = new SecretKeySpec(macKey, KEY_ALGORITHM);
+                Mac mac = Mac.getInstance(KEY_ALGORITHM);
+                mac.init(keySpec);
+                return mac;
+            } catch (GeneralSecurityException gse) {
+                LOG.error("Couldn't not get mac instance", gse);
+                return null;
+            }
+        }
+    };
 
+    public MacDigestManager(long ledgerId, byte[] passwd) throws GeneralSecurityException
{
+        super(ledgerId);
+        this.passwd = passwd;
     }
 
     static byte[] genDigest(String pad, byte[] passwd) throws NoSuchAlgorithmException {
@@ -55,12 +72,12 @@ class MacDigestManager extends DigestMan
 
     @Override
     byte[] getValueAndReset() {
-        return mac.doFinal();
+        return mac.get().doFinal();
     }
 
     @Override
     void update(byte[] data, int offset, int length) {
-        mac.update(data, offset, length);
+        mac.get().update(data, offset, length);
     }
 
 



Mime
View raw message