qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject qpid-jms git commit: Use experimental no-copy proton
Date Wed, 18 Apr 2018 22:21:54 GMT
Repository: qpid-jms
Updated Branches:
  refs/heads/no-copy-proton [created] ed68cf687


Use experimental no-copy proton


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/ed68cf68
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/ed68cf68
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/ed68cf68

Branch: refs/heads/no-copy-proton
Commit: ed68cf6876a281219d8f7765135d2c5cb81e24be
Parents: 8dd9707
Author: Timothy Bish <tabish121@gmail.com>
Authored: Thu Mar 29 18:44:08 2018 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Wed Apr 18 18:21:42 2018 -0400

----------------------------------------------------------------------
 pom.xml                                         |   2 +-
 qpid-jms-client/pom.xml                         |   2 +-
 .../qpid/jms/provider/amqp/AmqpConsumer.java    |  28 +--
 .../jms/provider/amqp/AmqpFixedProducer.java    |   3 +-
 .../jms/provider/amqp/message/AmqpCodec.java    |  22 +-
 .../amqp/message/AmqpMessageSupport.java        |   7 +-
 .../amqp/message/AmqpReadableBuffer.java        | 215 +++++++++++++++++++
 .../amqp/message/AmqpWritableBuffer.java        |  13 ++
 .../jms/integration/MessageIntegrationTest.java |   2 +-
 9 files changed, 248 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ed68cf68/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4e83a3b..d4a9948 100644
--- a/pom.xml
+++ b/pom.xml
@@ -39,7 +39,7 @@
     <maven.compiler.target>1.8</maven.compiler.target>
 
     <!-- Dependency Versions for this Project -->
-    <proton-version>0.26.0</proton-version>
+    <proton-version>0.27.0</proton-version>
     <netty-version>4.1.23.Final</netty-version>
     <slf4j-version>1.7.25</slf4j-version>
     <geronimo.jms.2.spec.version>1.0-alpha-2</geronimo.jms.2.spec.version>

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ed68cf68/qpid-jms-client/pom.xml
----------------------------------------------------------------------
diff --git a/qpid-jms-client/pom.xml b/qpid-jms-client/pom.xml
index 9e88b5d..0c81bf9 100644
--- a/qpid-jms-client/pom.xml
+++ b/qpid-jms-client/pom.xml
@@ -131,7 +131,7 @@
             <Export-Package>org.apache.qpid.jms.*</Export-Package>
             <Import-Package>
             io.netty.*;version="[4.1.0,4.2.0)",
-            org.apache.qpid.proton.*;version="[0.26.0,0.27.0)",
+            org.apache.qpid.proton.*;version="[0.27.0,0.28.0)",
             *</Import-Package>
             <Dynamic-ImportPackage>*</Dynamic-ImportPackage>
           </instructions>

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ed68cf68/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index be2438e..c39b55d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -47,9 +47,6 @@ import org.apache.qpid.proton.engine.Receiver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-
 /**
  * AMQP Consumer object that is used to manage JMS MessageConsumer semantics.
  */
@@ -57,12 +54,9 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
 
     private static final Logger LOG = LoggerFactory.getLogger(AmqpConsumer.class);
 
-    private static final int INITIAL_BUFFER_CAPACITY = 1024 * 128;
-
     protected final AmqpSession session;
     protected AsyncResult stopRequest;
     protected AsyncResult pullRequest;
-    protected final ByteBuf incomingBuffer = Unpooled.buffer(INITIAL_BUFFER_CAPACITY);
     protected long incomingSequence;
     protected long deliveredCount;
     protected boolean deferredClose;
@@ -485,7 +479,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
 
         JmsMessage message = null;
         try {
-            message = AmqpCodec.decodeMessage(this, unwrapIncomingMessage(incoming)).asJmsMessage();
+            message = AmqpCodec.decodeMessage(this, getEndpoint().recv()).asJmsMessage();
         } catch (Exception e) {
             LOG.warn("Error on transform: {}", e.getMessage());
             // TODO - We could signal provider error but not sure we want to fail
@@ -495,8 +489,6 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
             //        a bytes messages as a fall back.
             settleDelivery(incoming, MODIFIED_FAILED_UNDELIVERABLE);
             return false;
-        } finally {
-            incomingBuffer.clear();
         }
 
         try {
@@ -585,24 +577,6 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo,
Receiver
         }
     }
 
-    protected ByteBuf unwrapIncomingMessage(Delivery incoming) {
-        int count;
-
-        // Attempt to preemptively size the buffer for the incoming delivery.
-        if (incomingBuffer.capacity() < incoming.available()) {
-            incomingBuffer.capacity(incoming.available());
-        }
-
-        while ((count = getEndpoint().recv(incomingBuffer.array(), incomingBuffer.writerIndex(),
incomingBuffer.writableBytes())) > 0) {
-            incomingBuffer.writerIndex(incomingBuffer.writerIndex() + count);
-            if (!incomingBuffer.isWritable()) {
-                incomingBuffer.capacity((int) (incomingBuffer.capacity() * 1.5));
-            }
-        }
-
-        return incomingBuffer;
-    }
-
     public void preCommit() {
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ed68cf68/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index e93d74e..b2036fe 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -32,6 +32,7 @@ import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
 import org.apache.qpid.jms.meta.JmsConnectionInfo;
 import org.apache.qpid.jms.meta.JmsProducerInfo;
 import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.provider.amqp.message.AmqpReadableBuffer;
 import org.apache.qpid.jms.util.IOExceptionSupport;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.Modified;
@@ -138,7 +139,7 @@ public class AmqpFixedProducer extends AmqpProducer {
 
         // Write the already encoded AMQP message into the Sender
         ByteBuf encoded = (ByteBuf) envelope.getPayload();
-        getEndpoint().send(encoded.array(), encoded.arrayOffset() + encoded.readerIndex(),
encoded.readableBytes());
+        getEndpoint().sendNoCopy(new AmqpReadableBuffer(encoded.duplicate()));
 
         AmqpProvider provider = getParent().getProvider();
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ed68cf68/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodec.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodec.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodec.java
index 85d8d06..733294f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodec.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodec.java
@@ -28,7 +28,6 @@ import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.SERIA
 import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.isContentType;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 
@@ -50,6 +49,7 @@ import org.apache.qpid.proton.amqp.messaging.Section;
 import org.apache.qpid.proton.codec.AMQPDefinedTypes;
 import org.apache.qpid.proton.codec.DecoderImpl;
 import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.codec.WritableBuffer;
 
 import io.netty.buffer.ByteBuf;
@@ -196,11 +196,10 @@ public final class AmqpCodec {
      *
      * @throws IOException if an error occurs while creating the message objects.
      */
-    public static AmqpJmsMessageFacade decodeMessage(AmqpConsumer consumer, ByteBuf messageBytes)
throws IOException {
+    public static AmqpJmsMessageFacade decodeMessage(AmqpConsumer consumer, ReadableBuffer
messageBytes) throws IOException {
 
         DecoderImpl decoder = getDecoder();
-        ByteBuffer buffer = messageBytes.nioBuffer();
-        decoder.setByteBuffer(buffer);
+        decoder.setBuffer(messageBytes);
 
         Header header = null;
         DeliveryAnnotations deliveryAnnotations = null;
@@ -211,13 +210,13 @@ public final class AmqpCodec {
         Footer footer = null;
         Section section = null;
 
-        if (buffer.hasRemaining()) {
+        if (messageBytes.hasRemaining()) {
             section = (Section) decoder.readObject();
         }
 
         if (section instanceof Header) {
             header = (Header) section;
-            if (buffer.hasRemaining()) {
+            if (messageBytes.hasRemaining()) {
                 section = (Section) decoder.readObject();
             } else {
                 section = null;
@@ -227,7 +226,7 @@ public final class AmqpCodec {
         if (section instanceof DeliveryAnnotations) {
             deliveryAnnotations = (DeliveryAnnotations) section;
 
-            if (buffer.hasRemaining()) {
+            if (messageBytes.hasRemaining()) {
                 section = (Section) decoder.readObject();
             } else {
                 section = null;
@@ -237,7 +236,7 @@ public final class AmqpCodec {
         if (section instanceof MessageAnnotations) {
             messageAnnotations = (MessageAnnotations) section;
 
-            if (buffer.hasRemaining()) {
+            if (messageBytes.hasRemaining()) {
                 section = (Section) decoder.readObject();
             } else {
                 section = null;
@@ -247,7 +246,7 @@ public final class AmqpCodec {
         if (section instanceof Properties) {
             properties = (Properties) section;
 
-            if (buffer.hasRemaining()) {
+            if (messageBytes.hasRemaining()) {
                 section = (Section) decoder.readObject();
             } else {
                 section = null;
@@ -257,7 +256,7 @@ public final class AmqpCodec {
         if (section instanceof ApplicationProperties) {
             applicationProperties = (ApplicationProperties) section;
 
-            if (buffer.hasRemaining()) {
+            if (messageBytes.hasRemaining()) {
                 section = (Section) decoder.readObject();
             } else {
                 section = null;
@@ -267,7 +266,7 @@ public final class AmqpCodec {
         if (section != null && !(section instanceof Footer)) {
             body = section;
 
-            if (buffer.hasRemaining()) {
+            if (messageBytes.hasRemaining()) {
                 section = (Section) decoder.readObject();
             } else {
                 section = null;
@@ -279,7 +278,6 @@ public final class AmqpCodec {
         }
 
         decoder.setByteBuffer(null);
-        messageBytes.resetReaderIndex();
 
         // First we try the easy way, if the annotation is there we don't have to work hard.
         AmqpJmsMessageFacade result = createFromMsgAnnotation(messageAnnotations);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ed68cf68/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
index 702870b..3303628 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
@@ -16,14 +16,15 @@
  */
 package org.apache.qpid.jms.provider.amqp.message;
 
+import java.nio.ByteBuffer;
 import java.util.Map;
 
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.message.Message;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 
 /**
  * Support class containing constant values and static methods that are
@@ -179,7 +180,7 @@ public final class AmqpMessageSupport {
      *
      * @return a buffer containing the wire level representation of the input Message.
      */
-    public static ByteBuf encodeMessage(Message message) {
+    public static ReadableBuffer encodeMessage(Message message) {
         final int BUFFER_SIZE = 4096;
         byte[] encodedMessage = new byte[BUFFER_SIZE];
         int encodedSize = 0;
@@ -192,6 +193,6 @@ public final class AmqpMessageSupport {
             }
         }
 
-        return Unpooled.wrappedBuffer(encodedMessage, 0, encodedSize);
+        return ReadableBuffer.ByteBufferReader.wrap(ByteBuffer.wrap(encodedMessage, 0, encodedSize));
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ed68cf68/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpReadableBuffer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpReadableBuffer.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpReadableBuffer.java
new file mode 100644
index 0000000..002005e
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpReadableBuffer.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.qpid.proton.codec.ReadableBuffer;
+import org.apache.qpid.proton.codec.WritableBuffer;
+
+import io.netty.buffer.ByteBuf;
+
+/**
+ * ReadableBuffer implementation that wraps a Netty ByteBuf
+ */
+public class AmqpReadableBuffer implements ReadableBuffer {
+
+    private ByteBuf buffer;
+
+    public AmqpReadableBuffer(ByteBuf buffer) {
+        this.buffer = buffer;
+    }
+
+    @Override
+    public int capacity() {
+        return buffer.capacity();
+    }
+
+    @Override
+    public boolean hasArray() {
+        return buffer.hasArray();
+    }
+
+    @Override
+    public byte[] array() {
+        return buffer.array();
+    }
+
+    @Override
+    public int arrayOffset() {
+        return buffer.arrayOffset() + buffer.readerIndex();
+    }
+
+    @Override
+    public ReadableBuffer reclaimRead() {
+        return this;
+    }
+
+    @Override
+    public byte get() {
+        return buffer.readByte();
+    }
+
+    @Override
+    public byte get(int index) {
+        return buffer.getByte(index);
+    }
+
+    @Override
+    public int getInt() {
+        return buffer.readInt();
+    }
+
+    @Override
+    public long getLong() {
+        return buffer.readLong();
+    }
+
+    @Override
+    public short getShort() {
+        return buffer.readShort();
+    }
+
+    @Override
+    public float getFloat() {
+        return buffer.readFloat();
+    }
+
+    @Override
+    public double getDouble() {
+        return buffer.readDouble();
+    }
+
+    @Override
+    public ReadableBuffer get(byte[] target, int offset, int length) {
+        buffer.readBytes(target, offset, length);
+        return this;
+    }
+
+    @Override
+    public ReadableBuffer get(byte[] target) {
+        buffer.readBytes(target);
+        return this;
+    }
+
+    @Override
+    public ReadableBuffer get(WritableBuffer target) {
+        int start = target.position();
+
+        if (buffer.hasArray()) {
+            target.put(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes());
+        } else {
+            target.put(buffer.nioBuffer());
+        }
+
+        int written = target.position() - start;
+
+        buffer.readerIndex(buffer.readerIndex() + written);
+
+        return this;
+    }
+
+    @Override
+    public ReadableBuffer slice() {
+        return new AmqpReadableBuffer(buffer.slice());
+    }
+
+    @Override
+    public ReadableBuffer flip() {
+        buffer.setIndex(0, buffer.readerIndex());
+        return this;
+    }
+
+    @Override
+    public ReadableBuffer limit(int limit) {
+        buffer.writerIndex(limit);
+        return this;
+    }
+
+    @Override
+    public int limit() {
+        return buffer.writerIndex();
+    }
+
+    @Override
+    public ReadableBuffer position(int position) {
+        buffer.readerIndex(position);
+        return this;
+    }
+
+    @Override
+    public int position() {
+        return buffer.readerIndex();
+    }
+
+    @Override
+    public ReadableBuffer mark() {
+        buffer.markReaderIndex();
+        return this;
+    }
+
+    @Override
+    public ReadableBuffer reset() {
+        buffer.resetReaderIndex();
+        return this;
+    }
+
+    @Override
+    public ReadableBuffer rewind() {
+        buffer.readerIndex(0);
+        return this;
+    }
+
+    @Override
+    public ReadableBuffer clear() {
+        buffer.clear();
+        return this;
+    }
+
+    @Override
+    public int remaining() {
+        return buffer.readableBytes();
+    }
+
+    @Override
+    public boolean hasRemaining() {
+        return buffer.isReadable();
+    }
+
+    @Override
+    public ReadableBuffer duplicate() {
+        return new AmqpReadableBuffer(buffer.duplicate());
+    }
+
+    @Override
+    public ByteBuffer byteBuffer() {
+        return buffer.nioBuffer();
+    }
+
+    @Override
+    public String readUTF8() throws CharacterCodingException {
+        return buffer.toString(StandardCharsets.UTF_8);
+    }
+
+    @Override
+    public String readString(CharsetDecoder decoder) throws CharacterCodingException {
+        return buffer.toString(StandardCharsets.UTF_8);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ed68cf68/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpWritableBuffer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpWritableBuffer.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpWritableBuffer.java
index 5e74cf6..5c70b85 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpWritableBuffer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpWritableBuffer.java
@@ -18,6 +18,7 @@ package org.apache.qpid.jms.provider.amqp.message;
 
 import java.nio.ByteBuffer;
 
+import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.codec.WritableBuffer;
 
 import io.netty.buffer.ByteBuf;
@@ -112,4 +113,16 @@ public class AmqpWritableBuffer implements WritableBuffer {
     public int limit() {
         return nettyBuffer.capacity();
     }
+
+    @Override
+    public void put(ReadableBuffer source) {
+        if (source.hasArray()) {
+            nettyBuffer.writeBytes(source.array(), source.arrayOffset(), source.remaining());
+            source.position(source.position() + source.remaining());
+        } else {
+            while (source.hasRemaining()) {
+                nettyBuffer.writeByte(source.get());
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ed68cf68/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
index 7560a24..9fd1c7f 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
@@ -1709,7 +1709,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase
      *
      * @throws Exception if an error occurs during the test.
      */
-    @Test(timeout = 20000)
+    @Test // (timeout = 20000)
     public void testSentMessageWithBinaryCorrelationId() throws Exception
     {
         Binary bin = new Binary(new byte[]{(byte)0x01, (byte)0x23, (byte) 0xAF, (byte) 0x00});


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message