activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [4/9] activemq-artemis git commit: ARTEMIS-636 Implement AMQP AddressFull BLOCK
Date Wed, 20 Jul 2016 09:35:36 GMT
ARTEMIS-636 Implement AMQP AddressFull BLOCK


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/4d60ced5
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/4d60ced5
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/4d60ced5

Branch: refs/heads/master
Commit: 4d60ced581f28d9ffcd8ab4cef9130bf07715209
Parents: 5dfa1c5
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Mon Jul 18 14:08:41 2016 +0100
Committer: Andy Taylor <andy.tayls67@gmail.com>
Committed: Wed Jul 20 10:33:44 2016 +0100

----------------------------------------------------------------------
 .../plug/ProtonSessionIntegrationCallback.java  |  64 +++++--
 .../org/proton/plug/AMQPSessionCallback.java    |   2 +
 .../plug/context/AbstractConnectionContext.java |   2 +-
 .../context/AbstractProtonReceiverContext.java  |   5 +-
 .../client/ProtonClientReceiverContext.java     |   5 +
 .../server/ProtonServerReceiverContext.java     |  21 ++-
 .../server/ProtonServerSenderContext.java       |   4 +-
 .../test/minimalserver/MinimalSessionSPI.java   |   7 +-
 docs/user-manual/en/flow-control.md             |  22 +++
 .../tests/integration/proton/ProtonTest.java    | 185 ++++++++++++++++++-
 10 files changed, 283 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4d60ced5/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
index 00f5e3f..ab57fe1 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
@@ -20,33 +20,37 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import io.netty.buffer.ByteBuf;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.protocol.proton.ProtonProtocolManager;
 import org.apache.activemq.artemis.core.protocol.proton.converter.message.EncodedMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
+import org.apache.activemq.artemis.utils.ByteUtil;
+import org.apache.activemq.artemis.utils.IDGenerator;
 import org.apache.activemq.artemis.utils.SelectorTranslator;
+import org.apache.activemq.artemis.utils.SimpleIDGenerator;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
 import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Link;
 import org.apache.qpid.proton.engine.Receiver;
 import org.apache.qpid.proton.message.ProtonJMessage;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
-import org.apache.activemq.artemis.core.protocol.proton.ProtonProtocolManager;
-import org.apache.activemq.artemis.core.server.QueueQueryResult;
-import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.ServerSession;
-import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
-import org.apache.activemq.artemis.utils.ByteUtil;
-import org.apache.activemq.artemis.utils.IDGenerator;
-import org.apache.activemq.artemis.utils.SimpleIDGenerator;
-import org.apache.activemq.artemis.utils.UUIDGenerator;
 import org.proton.plug.AMQPConnectionContext;
 import org.proton.plug.AMQPSessionCallback;
 import org.proton.plug.AMQPSessionContext;
@@ -66,7 +70,6 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback,
Se
 
    private final Connection transportConnection;
 
-
    private ServerSession serverSession;
 
    private AMQPSessionContext protonSession;
@@ -347,13 +350,28 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback,
Se
 
       recoverContext();
 
+      PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddress());
+      if (store.isFull() && store.getAddressFullMessagePolicy() == AddressFullMessagePolicy.BLOCK)
{
+         ErrorCondition ec = new ErrorCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address
is full: " + message.getAddress());
+         Rejected rejected = new Rejected();
+         rejected.setError(ec);
+         delivery.disposition(rejected);
+         connection.flush();
+      }
+      else {
+         serverSend(message, delivery, receiver);
+      }
+   }
+
+   private void serverSend(final ServerMessage message, final Delivery delivery, final Receiver
receiver) throws Exception {
       try {
          serverSession.send(message, false);
-
+         // FIXME Potential race here...
          manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback()
{
             @Override
             public void done() {
                synchronized (connection.getLock()) {
+                  delivery.disposition(Accepted.getInstance());
                   delivery.settle();
                   connection.flush();
                }
@@ -379,6 +397,24 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback,
Se
    }
 
    @Override
+   public void offerProducerCredit(final String address, final int credits, final int threshold,
final Receiver receiver) {
+      try {
+         final PagingStore store = manager.getServer().getPagingManager().getPageStore(new
SimpleString(address));
+         store.checkMemory(new Runnable() {
+            @Override
+            public void run() {
+               if (receiver.getRemoteCredit() < threshold) {
+                  receiver.flow(credits);
+               }
+            }
+         });
+      }
+      catch (Exception e) {
+         throw new RuntimeException(e);
+      }
+   }
+
+   @Override
    public void deleteQueue(String address) throws Exception {
       manager.getServer().destroyQueue(new SimpleString(address));
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4d60ced5/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
index bb53791..637b538 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
@@ -44,6 +44,8 @@ public interface AMQPSessionCallback {
 
    void createDurableQueue(String address, String queueName) throws Exception;
 
+   void offerProducerCredit(String address, int credits, int threshold, Receiver receiver);
+
    void deleteQueue(String address) throws Exception;
 
    boolean queueQuery(String queueName) throws Exception;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4d60ced5/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
index d6269e8..fa949d3 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
@@ -39,8 +39,8 @@ import org.proton.plug.handler.ProtonHandler;
 import org.proton.plug.handler.impl.DefaultEventHandler;
 import org.proton.plug.util.ByteUtil;
 
-import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_IDLE_TIMEOUT;
 import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_CHANNEL_MAX;
+import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_IDLE_TIMEOUT;
 import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_MAX_FRAME_SIZE;
 
 public abstract class AbstractConnectionContext extends ProtonInitializable implements AMQPConnectionContext
{

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4d60ced5/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
index 4343b01..5a43029 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
@@ -57,14 +57,13 @@ public abstract class AbstractProtonReceiverContext extends ProtonInitializable
       close(false);
    }
 
-   public void flow(int credits) {
+   public void flow(int credits, int threshold) {
       synchronized (connection.getLock()) {
-         receiver.flow(credits);
+         sessionSPI.offerProducerCredit(address, credits, threshold, receiver);
       }
       connection.flush();
    }
 
-
    public void drain(int credits) {
       synchronized (connection.getLock()) {
          receiver.drain(credits);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4d60ced5/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java
index 884af60..c06ae58 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java
@@ -84,4 +84,9 @@ public class ProtonClientReceiverContext extends AbstractProtonReceiverContext
i
       return queues.poll(time, unit);
    }
 
+   @Override
+   public void flow(int credits) {
+      flow(credits, Integer.MAX_VALUE);
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4d60ced5/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
index aa04cef..7d39bb7 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
@@ -19,7 +19,6 @@ package org.proton.plug.context.server;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.Rejected;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.engine.Delivery;
@@ -39,7 +38,14 @@ public class ProtonServerReceiverContext extends AbstractProtonReceiverContext
{
 
    private static final Logger log = Logger.getLogger(ProtonServerReceiverContext.class);
 
-   private final int numberOfCredits = 100;
+   /*
+    The maximum number of credits we will allocate to clients.
+    This number is also used by the broker when refresh client credits.
+     */
+   private static int maxCreditAllocation = 100;
+
+   // Used by the broker to decide when to refresh clients credit.  This is not used when
client requests credit.
+   private static int minCreditRefresh = 30;
 
    public ProtonServerReceiverContext(AMQPSessionCallback sessionSPI,
                                       AbstractConnectionContext connection,
@@ -50,6 +56,7 @@ public class ProtonServerReceiverContext extends AbstractProtonReceiverContext
{
 
    @Override
    public void onFlow(int credits, boolean drain) {
+      flow(Math.min(credits, maxCreditAllocation), maxCreditAllocation);
    }
 
    @Override
@@ -86,10 +93,10 @@ public class ProtonServerReceiverContext extends AbstractProtonReceiverContext
{
             catch (Exception e) {
                throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorFindingTemporaryQueue(e.getMessage());
             }
+
          }
       }
-
-      flow(numberOfCredits);
+      flow(maxCreditAllocation, minCreditRefresh);
    }
 
    /*
@@ -117,12 +124,8 @@ public class ProtonServerReceiverContext extends AbstractProtonReceiverContext
{
                receiver.advance();
 
                sessionSPI.serverSend(receiver, delivery, address, delivery.getMessageFormat(),
buffer);
-               delivery.disposition(Accepted.getInstance());
-               delivery.settle();
 
-               if (receiver.getRemoteCredit() < numberOfCredits / 2) {
-                  flow(numberOfCredits);
-               }
+               flow(maxCreditAllocation, minCreditRefresh);
             }
          }
          finally {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4d60ced5/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
index 0804084..5fd24d9 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
@@ -26,6 +26,7 @@ import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.Modified;
 import org.apache.qpid.proton.amqp.messaging.Rejected;
 import org.apache.qpid.proton.amqp.messaging.Released;
+import org.apache.qpid.proton.amqp.messaging.Source;
 import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
 import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
@@ -40,11 +41,10 @@ import org.proton.plug.AMQPSessionCallback;
 import org.proton.plug.context.AbstractConnectionContext;
 import org.proton.plug.context.AbstractProtonContextSender;
 import org.proton.plug.context.AbstractProtonSessionContext;
+import org.proton.plug.context.ProtonPlugSender;
 import org.proton.plug.exceptions.ActiveMQAMQPException;
 import org.proton.plug.exceptions.ActiveMQAMQPInternalErrorException;
 import org.proton.plug.logger.ActiveMQAMQPProtocolMessageBundle;
-import org.proton.plug.context.ProtonPlugSender;
-import org.apache.qpid.proton.amqp.messaging.Source;
 
 import static org.proton.plug.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
 import static org.proton.plug.AmqpSupport.findFilter;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4d60ced5/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
index ebc85f1..b917aa6 100644
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
+++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalSessionSPI.java
@@ -27,9 +27,9 @@ import org.apache.qpid.proton.engine.Receiver;
 import org.apache.qpid.proton.message.ProtonJMessage;
 import org.proton.plug.AMQPSessionCallback;
 import org.proton.plug.AMQPSessionContext;
+import org.proton.plug.SASLResult;
 import org.proton.plug.context.ProtonPlugSender;
 import org.proton.plug.context.server.ProtonServerSessionContext;
-import org.proton.plug.SASLResult;
 import org.proton.plug.util.ProtonServerMessage;
 
 public class MinimalSessionSPI implements AMQPSessionCallback {
@@ -76,6 +76,11 @@ public class MinimalSessionSPI implements AMQPSessionCallback {
    }
 
    @Override
+   public void offerProducerCredit(String address, int credits, int threshold, Receiver receiver)
{
+
+   }
+
+   @Override
    public void createTemporaryQueue(String address, String queueName) throws Exception {
 
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4d60ced5/docs/user-manual/en/flow-control.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/flow-control.md b/docs/user-manual/en/flow-control.md
index 054bcce..c1b4035 100644
--- a/docs/user-manual/en/flow-control.md
+++ b/docs/user-manual/en/flow-control.md
@@ -273,6 +273,28 @@ control.
 > a misbehaving client to ignore the flow control credits issued by the broker
 > and continue sending with out sufficient credit.
 
+#### Blocking producer window based flow control using AMQP
+
+Apache ActiveMQ Artemis ships with out of the box with 2 protocols that support
+flow control.  Artemis CORE protocol and AMQP.  Both protocols implement flow
+control slightly differently and therefore address full BLOCK policy behaves
+slightly different for clients uses each protocol respectively.
+
+As explained earlier in this chapter the CORE protocol uses a producer window size
+flow control system.  Where credits (representing bytes) are allocated to producers,
+if a producer wants to send a message it should wait until it has enough bytes available
+to send it.  AMQP flow control credits are not representative of bytes but instead represent
+the number of messages a producer is permitted to send (regardless of size).
+
+BLOCK for AMQP works mostly in the same way as the producer window size mechanism above.
 Artemis
+will issue 100 credits to a client at a time and refresh them when the clients credits reaches
30.
+The broker will stop issuing credits once an address is full.  However, since AMQP credits
represent
+whole messages and not bytes, it would be possible for an AMQP client to significantly exceed
an
+address upper bound should the broker continue accepting messages until the clients credits
are exhausted.
+For this reason once an address has reached it's upper bound and is blocked (when using AMQP)
Artemis
+will start rejecting messages until the address becomes unblocked.  This should be taken
into consideration when writing
+application code.
+
 ### Rate limited flow control
 
 Apache ActiveMQ Artemis also allows the rate a producer can emit message to be limited,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4d60ced5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
index 4d41ff5..8874271 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
@@ -29,11 +29,15 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.ObjectMessage;
 import javax.jms.QueueBrowser;
+import javax.jms.ResourceAllocationException;
 import javax.jms.Session;
 import javax.jms.StreamMessage;
 import javax.jms.TemporaryQueue;
 import javax.jms.TextMessage;
+import java.io.IOException;
 import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -48,9 +52,17 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.ByteUtil;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
 import org.apache.qpid.jms.JmsConnectionFactory;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.Properties;
@@ -66,12 +78,21 @@ import org.proton.plug.AMQPClientConnectionContext;
 import org.proton.plug.AMQPClientReceiverContext;
 import org.proton.plug.AMQPClientSenderContext;
 import org.proton.plug.AMQPClientSessionContext;
+import org.proton.plug.context.server.ProtonServerReceiverContext;
 import org.proton.plug.test.Constants;
 import org.proton.plug.test.minimalclient.SimpleAMQPConnector;
 
 @RunWith(Parameterized.class)
 public class ProtonTest extends ActiveMQTestBase {
 
+   private static final String amqpConnectionUri = "amqp://localhost:5672";
+
+   private static final String tcpAmqpConnectionUri = "tcp://localhost:5672";
+
+   private static final String userName = "guest";
+
+   private static final String password = "guest";
+
    // this will ensure that all tests in this class are run twice,
    // once with "true" passed to the class' constructor and once with "false"
    @Parameterized.Parameters(name = "{0}")
@@ -106,6 +127,7 @@ public class ProtonTest extends ActiveMQTestBase {
    public void setUp() throws Exception {
       super.setUp();
       disableCheckThread();
+
       server = this.createServer(true, true);
       HashMap<String, Object> params = new HashMap<>();
       params.put(TransportConstants.PORT_PROP_NAME, "5672");
@@ -113,6 +135,12 @@ public class ProtonTest extends ActiveMQTestBase {
       TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY,
params);
 
       server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
+
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+      addressSettings.setMaxSizeBytes(1 * 1024 * 1024);
+      server.getConfiguration().getAddressesSettings().put("#", addressSettings);
+
       server.start();
       server.createQueue(new SimpleString(coreAddress), new SimpleString(coreAddress), null,
true, false);
       server.createQueue(new SimpleString(coreAddress + "1"), new SimpleString(coreAddress
+ "1"), null, true, false);
@@ -167,7 +195,7 @@ public class ProtonTest extends ActiveMQTestBase {
       maxCreditAllocation.setInt(null, 1);
 
       String destinationAddress = address + 1;
-      AmqpClient client = new AmqpClient(new URI("tcp://localhost:5672"), userName, password);
+      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
       AmqpConnection amqpConnection = client.connect();
       try {
          AmqpSession session = amqpConnection.createSession();
@@ -197,9 +225,158 @@ public class ProtonTest extends ActiveMQTestBase {
 
       message = (TextMessage) cons.receive(5000);
       Assert.assertNotNull(message);
+   }
+
+   @Test
+   public void testResourceLimitExceptionOnAddressFull() throws Exception {
+      if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP
protocol
+      fillAddress(address + 1);
+   }
+
+   @Test
+   public void testAddressIsBlockedForOtherProdudersWhenFull() throws Exception {
+      if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP
protocol
+      String destinationAddress = address + 1;
+      fillAddress(destinationAddress);
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Exception e = null;
+      try {
+         Destination d = session.createQueue(destinationAddress);
+         MessageProducer p = session.createProducer(d);
+         p.send(session.createBytesMessage());
+      }
+      catch (ResourceAllocationException rae) {
+         e = rae;
+      }
+      assertTrue(e instanceof ResourceAllocationException);
+      assertTrue(e.getMessage().contains("resource-limit-exceeded"));
+   }
+
+   @Test
+   public void testCreditsAreNotAllocatedWhenAddressIsFull() throws Exception {
+      if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP
protocol
+
+      // Only allow 1 credit to be submitted at a time.
+      Field maxCreditAllocation = ProtonServerReceiverContext.class.getDeclaredField("maxCreditAllocation");
+      maxCreditAllocation.setAccessible(true);
+      int originalMaxCreditAllocation = maxCreditAllocation.getInt(null);
+      maxCreditAllocation.setInt(null, 1);
+
+      String destinationAddress = address + 1;
+      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+      AmqpConnection amqpConnection = client.connect();
+      try {
+         AmqpSession session = amqpConnection.createSession();
+         AmqpSender sender = session.createSender(destinationAddress);
+         sender.setSendTimeout(1000);
+         sendUntilFull(sender);
+         assertTrue(sender.getSender().getCredit() <= 0);
+      }
+      finally {
+         amqpConnection.close();
+         maxCreditAllocation.setInt(null, originalMaxCreditAllocation);
+      }
+   }
+
+   @Test
+   public void testCreditsAreRefreshedWhenAddressIsUnblocked() throws Exception {
+      if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP
protocol
 
+      String destinationAddress = address + 1;
+      int messagesSent = fillAddress(destinationAddress);
+
+      AmqpConnection amqpConnection = null;
+      try {
+         amqpConnection = AmqpClient.connect(new URI(tcpAmqpConnectionUri));
+         AmqpSession session = amqpConnection.createSession();
+         AmqpSender sender = session.createSender(destinationAddress);
+
+         // Wait for a potential flow frame.
+         Thread.sleep(500);
+         assertEquals(0, sender.getSender().getCredit());
+
+         // Empty Address except for 1 message used later.
+         AmqpReceiver receiver = session.createReceiver(destinationAddress);
+         receiver.flow(100);
+
+         AmqpMessage m;
+         for (int i = 0; i < messagesSent - 1; i++) {
+            m = receiver.receive();
+            m.accept();
+         }
+
+         // Wait for address to unblock and flow frame to arrive
+         Thread.sleep(500);
+         assertTrue(sender.getSender().getCredit() > 0);
+         assertNotNull(receiver.receive());
+      }
+      finally {
+         amqpConnection.close();
+      }
+   }
+
+   @Test
+   public void testNewLinkAttachAreNotAllocatedCreditsWhenAddressIsBlocked() throws Exception
{
+      if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP
protocol
+
+      fillAddress(address + 1);
+      AmqpConnection amqpConnection = null;
+      try {
+         amqpConnection = AmqpClient.connect(new URI(tcpAmqpConnectionUri));
+         AmqpSession session = amqpConnection.createSession();
+         AmqpSender sender = session.createSender(address + 1);
+         // Wait for a potential flow frame.
+         Thread.sleep(1000);
+         assertEquals(0, sender.getSender().getCredit());
+      }
+      finally {
+         amqpConnection.close();
+      }
    }
 
+   /**
+    * Fills an address.  Careful when using this method.  Only use when rejected messages
are switched on.
+    * @param address
+    * @return
+    * @throws Exception
+    */
+   private int fillAddress(String address) throws Exception {
+      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+      AmqpConnection amqpConnection = client.connect();
+      try {
+         AmqpSession session = amqpConnection.createSession();
+         AmqpSender sender = session.createSender(address);
+         return sendUntilFull(sender);
+      }
+      finally {
+         amqpConnection.close();
+      }
+   }
+
+   private int sendUntilFull(AmqpSender sender) throws IOException {
+      AmqpMessage message = new AmqpMessage();
+      byte[] payload = new byte[50 * 1024];
+
+      int sentMessages = 0;
+      int maxMessages = 50;
+
+      Exception e = null;
+      try {
+         for (int i = 0; i < maxMessages; i++) {
+            message.setBytes(payload);
+            sender.send(message);
+            sentMessages++;
+         }
+      }
+      catch (IOException ioe) {
+         e = ioe;
+      }
+
+      assertNotNull(e);
+      assertTrue(e.getMessage().contains("amqp:resource-limit-exceeded"));
+      return sentMessages;
+   }
 
    @Test
    public void testReplyTo() throws Throwable {
@@ -918,7 +1095,7 @@ public class ProtonTest extends ActiveMQTestBase {
    private javax.jms.Connection createConnection() throws JMSException {
       Connection connection;
       if (protocol == 3) {
-         factory = new JmsConnectionFactory("amqp://localhost:5672");
+         factory = new JmsConnectionFactory(amqpConnectionUri);
          connection = factory.createConnection();
          connection.setExceptionListener(new ExceptionListener() {
             @Override
@@ -929,7 +1106,7 @@ public class ProtonTest extends ActiveMQTestBase {
          connection.start();
       }
       else if (protocol == 0) {
-         factory = new JmsConnectionFactory("guest", "guest", "amqp://localhost:5672");
+         factory = new JmsConnectionFactory(userName, password, amqpConnectionUri);
          connection = factory.createConnection();
          connection.setExceptionListener(new ExceptionListener() {
             @Override
@@ -950,7 +1127,7 @@ public class ProtonTest extends ActiveMQTestBase {
             factory = new ActiveMQConnectionFactory();
          }
 
-         connection = factory.createConnection("guest", "guest");
+         connection = factory.createConnection(userName, password);
          connection.setExceptionListener(new ExceptionListener() {
             @Override
             public void onException(JMSException exception) {


Mime
View raw message