pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] merlimat closed pull request #882: Use thread local to allocate temp byte[] instead of allocator
Date Thu, 01 Jan 1970 00:00:00 GMT
merlimat closed pull request #882: Use thread local to allocate temp byte[] instead of allocator
URL: https://github.com/apache/incubator-pulsar/pull/882
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index 79783c9a1..0c7466899 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -71,10 +71,6 @@
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.RecyclableDuplicateByteBuf;
 import io.netty.buffer.Unpooled;
-import io.netty.buffer.UnpooledByteBufAllocator;
-import io.netty.buffer.UnpooledHeapByteBuf;
-import io.netty.util.Recycler;
-import io.netty.util.Recycler.Handle;
 
 public class Commands {
 
@@ -824,32 +820,6 @@ private static int getCurrentProtocolVersion() {
         return ProtocolVersion.values()[ProtocolVersion.values().length - 1].getNumber();
     }
 
-    public static final class RecyclableHeapByteBuf extends UnpooledHeapByteBuf {
-        private static final Recycler<RecyclableHeapByteBuf> RECYCLER = new Recycler<RecyclableHeapByteBuf>()
{
-            @Override
-            protected RecyclableHeapByteBuf newObject(Handle handle) {
-                return new RecyclableHeapByteBuf(handle);
-            }
-        };
-
-        private final Handle handle;
-
-        private RecyclableHeapByteBuf(Handle handle) {
-            super(UnpooledByteBufAllocator.DEFAULT, 4096, PulsarDecoder.MaxMessageSize);
-            this.handle = handle;
-        }
-
-        public static RecyclableHeapByteBuf get() {
-            return RECYCLER.get();
-
-        }
-
-        public void recycle() {
-            clear();
-            RECYCLER.recycle(this, handle);
-        }
-    }
-
     public static enum ChecksumType {
         Crc32c,
         None;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedInputStream.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedInputStream.java
index b4d8ddb45..647e4bca1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedInputStream.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedInputStream.java
@@ -55,8 +55,6 @@
 import java.io.IOException;
 import java.nio.ByteOrder;
 
-import org.apache.pulsar.common.api.Commands.RecyclableHeapByteBuf;
-
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ExtensionRegistryLite;
 import com.google.protobuf.InvalidProtocolBufferException;
@@ -65,6 +63,7 @@
 import io.netty.buffer.ByteBuf;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
+import io.netty.util.concurrent.FastThreadLocal;
 
 public class ByteBufCodedInputStream {
     public static interface ByteBufMessageBuilder {
@@ -145,20 +144,22 @@ public void readMessage(final ByteBufMessageBuilder builder, final ExtensionRegi
         buf.writerIndex(writerIdx);
     }
 
+    private static final FastThreadLocal<byte[]> localByteArray = new FastThreadLocal<>();
+
     /** Read a {@code bytes} field value from the stream. */
     public ByteString readBytes() throws IOException {
         final int size = readRawVarint32();
         if (size == 0) {
             return ByteString.EMPTY;
         } else {
-            RecyclableHeapByteBuf heapBuf = RecyclableHeapByteBuf.get();
-            if (size > heapBuf.writableBytes()) {
-                heapBuf.capacity(size);
+            byte[] localBuf = localByteArray.get();
+            if (localBuf == null || localBuf.length < size) {
+                localBuf = new byte[Math.max(size, 1024)];
+                localByteArray.set(localBuf);
             }
 
-            heapBuf.writeBytes(buf, size);
-            ByteString res = ByteString.copyFrom(heapBuf.array(), heapBuf.arrayOffset(),
heapBuf.readableBytes());
-            heapBuf.recycle();
+            buf.readBytes(localBuf, 0, size);
+            ByteString res = ByteString.copyFrom(localBuf, 0, size);
             return res;
         }
     }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedOutputStream.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedOutputStream.java
index de44aa6d4..9aa84a1e0 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedOutputStream.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedOutputStream.java
@@ -55,14 +55,13 @@
 import java.io.IOException;
 import java.nio.ByteOrder;
 
-import org.apache.pulsar.common.api.Commands.RecyclableHeapByteBuf;
-
 import com.google.protobuf.ByteString;
 import com.google.protobuf.WireFormat;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
+import io.netty.util.concurrent.FastThreadLocal;
 
 public class ByteBufCodedOutputStream {
     public static interface ByteBufGeneratedMessage {
@@ -183,17 +182,19 @@ public void writeBytesNoTag(final ByteString value) throws IOException
{
         writeRawBytes(value);
     }
 
+
+    private static final FastThreadLocal<byte[]> localByteArray = new FastThreadLocal<>();
+
     /** Write a byte string. */
     public void writeRawBytes(final ByteString value) throws IOException {
-        RecyclableHeapByteBuf heapBuf = RecyclableHeapByteBuf.get();
-        if (value.size() > heapBuf.writableBytes()) {
-            heapBuf.capacity(value.size());
+        byte[] localBuf = localByteArray.get();
+        if (localBuf == null || localBuf.length < value.size()) {
+            localBuf = new byte[Math.max(value.size(), 1024)];
+            localByteArray.set(localBuf);
         }
 
-        value.copyTo(heapBuf.array(), 0);
-        heapBuf.writerIndex(value.size());
-        buf.writeBytes(heapBuf);
-        heapBuf.recycle();
+        value.copyTo(localBuf, 0);
+        buf.writeBytes(localBuf, 0, value.size());
     }
 
     public void writeEnum(final int fieldNumber, final int value) throws IOException {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message