activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbert...@apache.org
Subject [2/3] activemq-artemis git commit: ARTEMIS-1056 Performance improvements on AMQP
Date Wed, 22 Mar 2017 14:55:18 GMT
ARTEMIS-1056 Performance improvements on AMQP


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0bfb39bf
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0bfb39bf
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0bfb39bf

Branch: refs/heads/master
Commit: 0bfb39bfb577200499f6ee2a80a00b65c2ef8d02
Parents: ac7cafb
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Tue Mar 21 11:43:06 2017 -0400
Committer: Justin Bertram <jbertram@apache.org>
Committed: Wed Mar 22 09:55:02 2017 -0500

----------------------------------------------------------------------
 .../impl/netty/ActiveMQChannelHandler.java      |   6 +-
 .../netty/PartialPooledByteBufAllocator.java    |   6 +-
 .../protocol/amqp/broker/AMQPMessage.java       |  11 +-
 .../amqp/proton/ProtonServerSenderContext.java  | 159 ++++++++++---------
 .../amqp/proton/handler/ProtonHandler.java      |   2 +-
 .../hornetq/HornetQProtocolManager.java         |   2 +-
 .../protocol/core/impl/CoreProtocolManager.java |   2 +-
 7 files changed, 104 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bfb39bf/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java
index 93be281..cc4407c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java
@@ -65,7 +65,11 @@ public class ActiveMQChannelHandler extends ChannelDuplexHandler {
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception
{
       ByteBuf buffer = (ByteBuf) msg;
 
-      handler.bufferReceived(channelId(ctx.channel()), new ChannelBufferWrapper(buffer));
+      try {
+         handler.bufferReceived(channelId(ctx.channel()), new ChannelBufferWrapper(buffer));
+      } finally {
+         buffer.release();
+      }
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bfb39bf/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/PartialPooledByteBufAllocator.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/PartialPooledByteBufAllocator.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/PartialPooledByteBufAllocator.java
index 5e67952..3a192e8 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/PartialPooledByteBufAllocator.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/PartialPooledByteBufAllocator.java
@@ -53,17 +53,17 @@ public class PartialPooledByteBufAllocator implements ByteBufAllocator
{
 
    @Override
    public ByteBuf ioBuffer() {
-      return UNPOOLED.heapBuffer();
+      return POOLED.directBuffer();
    }
 
    @Override
    public ByteBuf ioBuffer(int initialCapacity) {
-      return UNPOOLED.heapBuffer(initialCapacity);
+      return POOLED.directBuffer(initialCapacity);
    }
 
    @Override
    public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) {
-      return UNPOOLED.heapBuffer(initialCapacity, maxCapacity);
+      return POOLED.directBuffer(initialCapacity, maxCapacity);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bfb39bf/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 7de9577..d02eace 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -63,7 +63,7 @@ public class AMQPMessage extends RefCountMessage {
    final long messageFormat;
    ByteBuf data;
    boolean bufferValid;
-   boolean durable;
+   Boolean durable;
    long messageID;
    String address;
    MessageImpl protonMessage;
@@ -491,10 +491,15 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public boolean isDurable() {
+      if (durable != null) {
+         return durable;
+      }
+
       if (getHeader() != null && getHeader().getDurable() != null) {
-         return getHeader().getDurable().booleanValue();
-      } else {
+         durable =  getHeader().getDurable().booleanValue();
          return durable;
+      } else {
+         return durable != null ? durable : false;
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bfb39bf/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index d24464c..780ca4d 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -481,89 +481,100 @@ public class ProtonServerSenderContext extends ProtonInitializable
implements Pr
          return;
       }
 
-      Message message = ((MessageReference) delivery.getContext()).getMessage();
+      try {
+         Message message = ((MessageReference) delivery.getContext()).getMessage();
 
-      boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
+         boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
 
-      DeliveryState remoteState = delivery.getRemoteState();
-
-      boolean settleImmediate = true;
-      if (remoteState != null) {
-         // If we are transactional then we need ack if the msg has been accepted
-         if (remoteState instanceof TransactionalState) {
-
-            TransactionalState txState = (TransactionalState) remoteState;
-            ProtonTransactionImpl tx = (ProtonTransactionImpl) this.sessionSPI.getTransaction(txState.getTxnId(),
false);
-
-            if (txState.getOutcome() != null) {
-               settleImmediate = false;
-               Outcome outcome = txState.getOutcome();
-               if (outcome instanceof Accepted) {
-                  if (!delivery.remotelySettled()) {
-                     TransactionalState txAccepted = new TransactionalState();
-                     txAccepted.setOutcome(Accepted.getInstance());
-                     txAccepted.setTxnId(txState.getTxnId());
-                     delivery.disposition(txAccepted);
-                  }
-                  // we have to individual ack as we can't guarantee we will get the delivery
-                  // updates (including acks) in order
-                  // from dealer, a perf hit but a must
-                  try {
-                     sessionSPI.ack(tx, brokerConsumer, message);
-                     tx.addDelivery(delivery, this);
-                  } catch (Exception e) {
-                     throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(),
e.getMessage());
+         DeliveryState remoteState;
+
+         synchronized (connection.getLock()) {
+            remoteState = delivery.getRemoteState();
+         }
+
+         boolean settleImmediate = true;
+         if (remoteState != null) {
+            // If we are transactional then we need ack if the msg has been accepted
+            if (remoteState instanceof TransactionalState) {
+
+               TransactionalState txState = (TransactionalState) remoteState;
+               ProtonTransactionImpl tx = (ProtonTransactionImpl) this.sessionSPI.getTransaction(txState.getTxnId(),
false);
+
+               if (txState.getOutcome() != null) {
+                  settleImmediate = false;
+                  Outcome outcome = txState.getOutcome();
+                  if (outcome instanceof Accepted) {
+                     if (!delivery.remotelySettled()) {
+                        TransactionalState txAccepted = new TransactionalState();
+                        txAccepted.setOutcome(Accepted.getInstance());
+                        txAccepted.setTxnId(txState.getTxnId());
+                        synchronized (connection.getLock()) {
+                           delivery.disposition(txAccepted);
+                        }
+                     }
+                     // we have to individual ack as we can't guarantee we will get the delivery
+                     // updates (including acks) in order
+                     // from dealer, a perf hit but a must
+                     try {
+                        sessionSPI.ack(tx, brokerConsumer, message);
+                        tx.addDelivery(delivery, this);
+                     } catch (Exception e) {
+                        throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(),
e.getMessage());
+                     }
                   }
                }
-            }
-         } else if (remoteState instanceof Accepted) {
-            //this can happen in the twice ack mode, that is the receiver accepts and settles
separately
-            //acking again would show an exception but would have no negative effect but
best to handle anyway.
-            if (delivery.isSettled()) {
-               return;
-            }
-            // we have to individual ack as we can't guarantee we will get the delivery updates
-            // (including acks) in order
-            // from dealer, a perf hit but a must
-            try {
-               sessionSPI.ack(null, brokerConsumer, message);
-            } catch (Exception e) {
-               log.warn(e.toString(), e);
-               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(),
e.getMessage());
-            }
-         } else if (remoteState instanceof Released) {
-            try {
-               sessionSPI.cancel(brokerConsumer, message, false);
-            } catch (Exception e) {
-               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(),
e.getMessage());
-            }
-         } else if (remoteState instanceof Rejected) {
-            try {
-               sessionSPI.cancel(brokerConsumer, message, true);
-            } catch (Exception e) {
-               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(),
e.getMessage());
-            }
-         } else if (remoteState instanceof Modified) {
-            try {
-               Modified modification = (Modified) remoteState;
-               if (Boolean.TRUE.equals(modification.getDeliveryFailed())) {
-                  sessionSPI.cancel(brokerConsumer, message, true);
-               } else {
+            } else if (remoteState instanceof Accepted) {
+               //this can happen in the twice ack mode, that is the receiver accepts and
settles separately
+               //acking again would show an exception but would have no negative effect but
best to handle anyway.
+               if (delivery.isSettled()) {
+                  return;
+               }
+               // we have to individual ack as we can't guarantee we will get the delivery
updates
+               // (including acks) in order
+               // from dealer, a perf hit but a must
+               try {
+                  sessionSPI.ack(null, brokerConsumer, message);
+               } catch (Exception e) {
+                  log.warn(e.toString(), e);
+                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(),
e.getMessage());
+               }
+            } else if (remoteState instanceof Released) {
+               try {
                   sessionSPI.cancel(brokerConsumer, message, false);
+               } catch (Exception e) {
+                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(),
e.getMessage());
+               }
+            } else if (remoteState instanceof Rejected) {
+               try {
+                  sessionSPI.cancel(brokerConsumer, message, true);
+               } catch (Exception e) {
+                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(),
e.getMessage());
+               }
+            } else if (remoteState instanceof Modified) {
+               try {
+                  Modified modification = (Modified) remoteState;
+                  if (Boolean.TRUE.equals(modification.getDeliveryFailed())) {
+                     sessionSPI.cancel(brokerConsumer, message, true);
+                  } else {
+                     sessionSPI.cancel(brokerConsumer, message, false);
+                  }
+               } catch (Exception e) {
+                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(),
e.getMessage());
                }
-            } catch (Exception e) {
-               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(),
e.getMessage());
             }
-         }
-         // todo add tag caching
-         if (!preSettle) {
-            protonSession.replaceTag(delivery.getTag());
-         }
+            // todo add tag caching
+            if (!preSettle) {
+               protonSession.replaceTag(delivery.getTag());
+            }
 
-         if (settleImmediate) settle(delivery);
+            if (settleImmediate)
+               settle(delivery);
 
-      } else {
-         // todo not sure if we need to do anything here
+         } else {
+            // todo not sure if we need to do anything here
+         }
+      } finally {
+         connection.flush();
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bfb39bf/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
index 673a688..6b66f62 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
@@ -240,7 +240,7 @@ public class ProtonHandler extends ProtonInitializable {
          }
 
          // For returning PooledBytes
-         ByteBuf buffer = PooledByteBufAllocator.DEFAULT.buffer(size);
+         ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size);
          ByteBuffer head = transport.head();
          head.position(offset);
          head.limit(offset + size);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bfb39bf/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java
b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java
index 2f6ed2f..de7d2ff 100644
--- a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java
+++ b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java
@@ -49,7 +49,7 @@ class HornetQProtocolManager extends CoreProtocolManager {
          buffer.getByte(5) == 'T' &&
          buffer.getByte(6) == 'Q') {
          //todo add some handshaking
-         buffer.readBytes(7);
+         buffer.skipBytes(7);
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bfb39bf/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
index cc81fbe..2cfd451 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
@@ -174,7 +174,7 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor>
{
    public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
       //if we are not an old client then handshake
       if (isArtemis(buffer)) {
-         buffer.readBytes(7);
+         buffer.skipBytes(7);
       }
    }
 


Mime
View raw message