activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-1073 Adding configuration for Producer's credits on AMQP
Date Mon, 27 Mar 2017 21:03:25 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 2ef0d2601 -> 0db33b78b


ARTEMIS-1073 Adding configuration for Producer's credits on AMQP


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/dc25ff0e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/dc25ff0e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/dc25ff0e

Branch: refs/heads/master
Commit: dc25ff0e42976b4fee507105cd64f0e9847a4ea4
Parents: 2ef0d26
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Mon Mar 27 15:36:41 2017 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Mon Mar 27 16:27:24 2017 -0400

----------------------------------------------------------------------
 .../artemis/cli/commands/etc/amqp-acceptor.txt  |  2 +-
 .../artemis/cli/commands/etc/broker.xml         |  5 +++-
 .../amqp/broker/ProtonProtocolManager.java      | 24 +++++++++++++++-
 .../client/AMQPClientConnectionFactory.java     |  2 +-
 .../amqp/proton/AMQPConnectionContext.java      | 30 +++++++++++++++++++-
 .../amqp/proton/AMQPSessionContext.java         |  2 +-
 .../proton/ProtonServerReceiverContext.java     | 12 ++++----
 .../transaction/ProtonTransactionHandler.java   | 10 ++++---
 8 files changed, 72 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc25ff0e/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/amqp-acceptor.txt
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/amqp-acceptor.txt
b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/amqp-acceptor.txt
index 5b20b92..71f44b7 100644
--- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/amqp-acceptor.txt
+++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/amqp-acceptor.txt
@@ -1,3 +1,3 @@
 
          <!-- AMQP Acceptor.  Listens on default AMQP port for AMQP traffic.-->
-         <acceptor name="amqp">tcp://${host}:${amqp.port}?protocols=AMQP;useEpoll=true</acceptor>
+         <acceptor name="amqp">tcp://${host}:${amqp.port}?protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc25ff0e/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
index 5ca8687..497b10d 100644
--- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
+++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
@@ -61,10 +61,13 @@ ${ping-config.settings}${journal-buffer.settings}${connector-config.settings}
       <global-max-size>100Mb</global-max-size>
 
       <acceptors>
+
          <!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that
supports it -->
+         <!-- amqpCredits: The number of credits sent to AMQP producers -->
+         <!-- amqpLowCredits: The server will send the # credits specified at amqpCredits
at this low mark -->
 
          <!-- Acceptor for every supported protocol -->
-         <acceptor name="artemis">tcp://${host}:${default.port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true</acceptor>
+         <acceptor name="artemis">tcp://${host}:${default.port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
 ${amqp-acceptor}${stomp-acceptor}${hornetq-acceptor}${mqtt-acceptor}
       </acceptors>
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc25ff0e/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
index 2828cc1..03314b2 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
@@ -56,6 +56,10 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>,
Noti
 
    private final Map<SimpleString, RoutingType> prefixes = new HashMap<>();
 
+   private int amqpCredits = 100;
+
+   private int amqpLowCredits = 30;
+
    /*
    * used when you want to treat senders as a subscription on an address rather than consuming
from the actual queue for
    * the address. This can be changed on the acceptor.
@@ -105,7 +109,7 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>,
Noti
       }
 
       String id = server.getConfiguration().getName();
-      AMQPConnectionContext amqpConnection = new AMQPConnectionContext(connectionCallback,
id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(),
server.getScheduledPool());
+      AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, connectionCallback,
id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(),
server.getScheduledPool());
 
       Executor executor = server.getExecutorFactory().getExecutor();
 
@@ -137,6 +141,24 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>,
Noti
 
    }
 
+   public int getAmqpCredits() {
+      return amqpCredits;
+   }
+
+   public ProtonProtocolManager setAmqpCredits(int amqpCredits) {
+      this.amqpCredits = amqpCredits;
+      return this;
+   }
+
+   public int getAmqpLowCredits() {
+      return amqpLowCredits;
+   }
+
+   public ProtonProtocolManager setAmqpLowCredits(int amqpLowCredits) {
+      this.amqpLowCredits = amqpLowCredits;
+      return this;
+   }
+
    @Override
    public boolean isProtocol(byte[] array) {
       return array.length >= 4 && array[0] == (byte) 'A' && array[1] ==
(byte) 'M' && array[2] == (byte) 'Q' && array[3] == (byte) 'P';

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc25ff0e/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
index b8851bb..510fdad 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
@@ -52,7 +52,7 @@ public class AMQPClientConnectionFactory {
 
       Executor executor = server.getExecutorFactory().getExecutor();
 
-      AMQPConnectionContext amqpConnection = new AMQPConnectionContext(connectionCallback,
containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX,
executor, server.getScheduledPool());
+      AMQPConnectionContext amqpConnection = new AMQPConnectionContext(null, connectionCallback,
containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX,
executor, server.getScheduledPool());
       eventHandler.ifPresent(amqpConnection::addEventHandler);
 
       ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager,
amqpConnection, connection, executor);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc25ff0e/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index 1c38942..7994be4 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
 import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
 import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExtCapability;
@@ -74,13 +75,18 @@ public class AMQPConnectionContext extends ProtonInitializable {
 
    protected LocalListener listener = new LocalListener();
 
-   public AMQPConnectionContext(AMQPConnectionCallback connectionSP,
+   private final ProtonProtocolManager protocolManager;
+
+   public AMQPConnectionContext(ProtonProtocolManager protocolManager,
+                                AMQPConnectionCallback connectionSP,
                                 String containerId,
                                 int idleTimeout,
                                 int maxFrameSize,
                                 int channelMax,
                                 Executor dispatchExecutor,
                                 ScheduledExecutorService scheduledPool) {
+
+      this.protocolManager = protocolManager;
       this.connectionCallback = connectionSP;
       this.containerId = (containerId != null) ? containerId : UUID.randomUUID().toString();
 
@@ -240,6 +246,28 @@ public class AMQPConnectionContext extends ProtonInitializable {
       handler.addEventHandler(eventHandler);
    }
 
+   public ProtonProtocolManager getProtocolManager() {
+      return protocolManager;
+   }
+
+   public int getAmqpLowCredits() {
+      if (protocolManager != null) {
+         return protocolManager.getAmqpLowCredits();
+      } else {
+         // this is for tests only...
+         return 30;
+      }
+   }
+
+   public int getAmqpCredits() {
+      if (protocolManager != null) {
+         return protocolManager.getAmqpCredits();
+      } else {
+         // this is for tests only...
+         return 100;
+      }
+   }
+
    // This listener will perform a bunch of things here
    class LocalListener implements EventHandler {
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc25ff0e/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
index 64b2531..c2c1f2d 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
@@ -149,7 +149,7 @@ public class AMQPSessionContext extends ProtonInitializable {
       receiver.setContext(transactionHandler);
       synchronized (connection.getLock()) {
          receiver.open();
-         receiver.flow(ProtonTransactionHandler.DEFAULT_COORDINATOR_CREDIT);
+         receiver.flow(connection.getAmqpCredits());
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc25ff0e/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 596e93a..76ad1ac 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -53,10 +53,10 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
     The maximum number of credits we will allocate to clients.
     This number is also used by the broker when refresh client credits.
     */
-   private static int maxCreditAllocation = 100;
+   private final int amqpCredits;
 
    // Used by the broker to decide when to refresh clients credit.  This is not used when
client requests credit.
-   private static int minCreditRefresh = 30;
+   private final int minCreditRefresh;
    private TerminusExpiryPolicy expiryPolicy;
 
    public ProtonServerReceiverContext(AMQPSessionCallback sessionSPI,
@@ -67,11 +67,13 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
       this.protonSession = protonSession;
       this.receiver = receiver;
       this.sessionSPI = sessionSPI;
+      this.amqpCredits = connection.getAmqpCredits();
+      this.minCreditRefresh = connection.getAmqpLowCredits();
    }
 
    @Override
    public void onFlow(int credits, boolean drain) {
-      flow(Math.min(credits, maxCreditAllocation), maxCreditAllocation);
+      flow(Math.min(credits, amqpCredits), amqpCredits);
    }
 
    @Override
@@ -119,7 +121,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
             }
          }
       }
-      flow(maxCreditAllocation, minCreditRefresh);
+      flow(amqpCredits, minCreditRefresh);
    }
 
    private RoutingType getRoutingType(Symbol[] symbols) {
@@ -173,7 +175,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
          sessionSPI.serverSend(tx, receiver, delivery, address, delivery.getMessageFormat(),
data);
 
          synchronized (connection.getLock()) {
-            flow(maxCreditAllocation, minCreditRefresh);
+            flow(amqpCredits, minCreditRefresh);
          }
       } catch (Exception e) {
          log.warn(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc25ff0e/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
index 12498b0..a3dae25 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
@@ -42,8 +42,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
 
    private static final Logger log = Logger.getLogger(ProtonTransactionHandler.class);
 
-   public static final int DEFAULT_COORDINATOR_CREDIT = 100;
-   public static final int CREDIT_LOW_WATERMARK = 30;
+   private final int amqpCredit;
+   private final int amqpLowMark;
 
    final AMQPSessionCallback sessionSPI;
    final AMQPConnectionContext connection;
@@ -51,6 +51,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
    public ProtonTransactionHandler(AMQPSessionCallback sessionSPI, AMQPConnectionContext
connection) {
       this.sessionSPI = sessionSPI;
       this.connection = connection;
+      this.amqpCredit = connection.getAmqpCredits();
+      this.amqpLowMark = connection.getAmqpLowCredits();
    }
 
    @Override
@@ -68,8 +70,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
          synchronized (connection.getLock()) {
             // Replenish coordinator receiver credit on exhaustion so sender can continue
             // transaction declare and discahrge operations.
-            if (receiver.getCredit() < CREDIT_LOW_WATERMARK) {
-               receiver.flow(DEFAULT_COORDINATOR_CREDIT);
+            if (receiver.getCredit() < amqpLowMark) {
+               receiver.flow(amqpCredit);
             }
 
             buffer = new byte[delivery.available()];


Mime
View raw message