activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/3] activemq-artemis git commit: ARTEMIS-636 Add AMQP Hard Soft Limit for BLOCK
Date Fri, 05 Aug 2016 14:35:31 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master d871dfe62 -> 410cd91f6


ARTEMIS-636 Add AMQP Hard Soft Limit for BLOCK


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2f721866
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2f721866
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2f721866

Branch: refs/heads/master
Commit: 2f721866ab982d56c488ed124cc191cf5f627e42
Parents: 06fb4a1
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Wed Jul 27 13:36:08 2016 +0100
Committer: Martyn Taylor <mtaylor@redhat.com>
Committed: Fri Aug 5 15:29:01 2016 +0100

----------------------------------------------------------------------
 .../plug/ProtonSessionIntegrationCallback.java  | 30 +++++++---
 .../plug/context/ProtonTransactionHandler.java  | 29 ++++++----
 .../artemis/core/paging/PagingStore.java        |  2 +
 .../core/paging/impl/PagingStoreImpl.java       | 12 ++++
 .../core/settings/impl/AddressSettings.java     | 35 +++++++++++-
 .../resources/schema/artemis-configuration.xsd  | 10 +++-
 .../core/settings/AddressSettingsTest.java      |  5 ++
 docs/user-manual/en/flow-control.md             | 38 ++++++-------
 .../transport/amqp/client/AmqpSession.java      |  2 +-
 .../amqp/client/AmqpTransactionContext.java     |  2 +-
 .../tests/integration/proton/ProtonTest.java    | 60 +++++++++++++++++++-
 .../storage/PersistMultiThreadTest.java         |  5 ++
 12 files changed, 187 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
index b00474d..a00af71 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
@@ -20,6 +20,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import io.netty.buffer.ByteBuf;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.core.io.IOCallback;
@@ -32,7 +33,6 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
-import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
@@ -56,6 +56,7 @@ import org.proton.plug.AMQPSessionCallback;
 import org.proton.plug.AMQPSessionContext;
 import org.proton.plug.SASLResult;
 import org.proton.plug.context.ProtonPlugSender;
+import org.proton.plug.exceptions.ActiveMQAMQPResourceLimitExceededException;
 import org.proton.plug.sasl.PlainSASLResult;
 
 public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, SessionCallback
{
@@ -351,18 +352,33 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback,
Se
       recoverContext();
 
       PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddress());
-      if (store.isFull() && store.getAddressFullMessagePolicy() == AddressFullMessagePolicy.BLOCK)
{
-         ErrorCondition ec = new ErrorCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address
is full: " + message.getAddress());
-         Rejected rejected = new Rejected();
-         rejected.setError(ec);
-         delivery.disposition(rejected);
-         connection.flush();
+      if (store.isRejectingMessages()) {
+         // We drop pre-settled messages (and abort any associated Tx)
+         if (delivery.remotelySettled()) {
+            if (serverSession.getCurrentTransaction() != null) {
+               String amqpAddress = delivery.getLink().getTarget().getAddress();
+               ActiveMQException e = new ActiveMQAMQPResourceLimitExceededException("Address
is full: " + amqpAddress);
+               serverSession.getCurrentTransaction().markAsRollbackOnly(e);
+            }
+         }
+         else {
+            rejectMessage(delivery);
+         }
       }
       else {
          serverSend(message, delivery, receiver);
       }
    }
 
+   private void rejectMessage(Delivery delivery) {
+      String address = delivery.getLink().getTarget().getAddress();
+      ErrorCondition ec = new ErrorCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address
is full: " + address);
+      Rejected rejected = new Rejected();
+      rejected.setError(ec);
+      delivery.disposition(rejected);
+      connection.flush();
+   }
+
    private void serverSend(final ServerMessage message, final Delivery delivery, final Receiver
receiver) throws Exception {
       try {
          serverSession.send(message, false);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
index dbf6f38..c8fb994 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
@@ -91,40 +91,49 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler
{
                try {
                   sessionSPI.commitCurrentTX();
                }
+               catch (ActiveMQAMQPException amqpE) {
+                  throw amqpE;
+               }
                catch (Exception e) {
                   throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCommittingCoordinator(e.getMessage());
                }
             }
-            delivery.settle();
          }
 
       }
+      catch (ActiveMQAMQPException amqpE) {
+         delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage()));
+      }
       catch (Exception e) {
          log.warn(e.getMessage(), e);
-         Rejected rejected = new Rejected();
-         ErrorCondition condition = new ErrorCondition();
-         condition.setCondition(Symbol.valueOf("failed"));
-         condition.setDescription(e.getMessage());
-         rejected.setError(condition);
-         delivery.disposition(rejected);
+         delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage()));
       }
       finally {
+         delivery.settle();
          buffer.release();
       }
    }
 
+   private Rejected createRejected(Symbol amqpError, String message) {
+      Rejected rejected = new Rejected();
+      ErrorCondition condition = new ErrorCondition();
+      condition.setCondition(amqpError);
+      condition.setDescription(message);
+      rejected.setError(condition);
+      return rejected;
+   }
+
    @Override
    public void onFlow(int credits, boolean drain) {
-
    }
 
    @Override
    public void close(boolean linkRemoteClose) throws ActiveMQAMQPException {
-      //noop
+      // no op
    }
 
    @Override
    public void close(ErrorCondition condition) throws ActiveMQAMQPException {
-      //noop
+      // no op
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
index 566b91a..79fb115 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
@@ -128,6 +128,8 @@ public interface PagingStore extends ActiveMQComponent {
 
    boolean isFull();
 
+   boolean isRejectingMessages();
+
    /**
     * Write lock the PagingStore.
     *

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index f57f1b8..7e6cda8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -123,6 +123,8 @@ public class PagingStoreImpl implements PagingStore {
 
    private volatile AtomicBoolean blocking = new AtomicBoolean(false);
 
+   private long rejectThreshold;
+
    public PagingStoreImpl(final SimpleString address,
                           final ScheduledExecutorService scheduledExecutor,
                           final long syncTimeout,
@@ -187,6 +189,8 @@ public class PagingStoreImpl implements PagingStore {
 
       addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
 
+      rejectThreshold = addressSettings.getMaxSizeBytesRejectThreshold();
+
       if (cursorProvider != null) {
          cursorProvider.setCacheMaxSize(addressSettings.getPageCacheMaxSize());
       }
@@ -1073,6 +1077,14 @@ public class PagingStoreImpl implements PagingStore {
    }
 
    @Override
+   public boolean isRejectingMessages() {
+      if (addressFullMessagePolicy != AddressFullMessagePolicy.BLOCK) {
+         return false;
+      }
+      return rejectThreshold != AddressSettings.DEFAULT_ADDRESS_REJECT_THRESHOLD &&
getAddressSize() > rejectThreshold;
+   }
+
+   @Override
    public Collection<Integer> getCurrentIds() throws Exception {
       List<Integer> ids = new ArrayList<>();
       if (fileFactory != null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
index 642574b..f5f00f7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
@@ -76,6 +76,9 @@ public class AddressSettings implements Mergeable<AddressSettings>,
Serializable
 
    public static final int DEFAULT_QUEUE_PREFETCH = 1000;
 
+   // Default address drop threshold, applied to address settings with BLOCK policy.  -1
means no threshold enabled.
+   public static final long DEFAULT_ADDRESS_REJECT_THRESHOLD = -1;
+
    private AddressFullMessagePolicy addressFullMessagePolicy = null;
 
    private Long maxSizeBytes = null;
@@ -124,6 +127,8 @@ public class AddressSettings implements Mergeable<AddressSettings>,
Serializable
 
    private Integer managementBrowsePageSize = AddressSettings.MANAGEMENT_BROWSE_PAGE_SIZE;
 
+   private Long maxSizeBytesRejectThreshold = null;
+
    //from amq5
    //make it transient
    private transient Integer queuePrefetch = null;
@@ -154,6 +159,7 @@ public class AddressSettings implements Mergeable<AddressSettings>,
Serializable
       this.autoDeleteJmsTopics = other.autoDeleteJmsTopics;
       this.managementBrowsePageSize = other.managementBrowsePageSize;
       this.queuePrefetch = other.queuePrefetch;
+      this.maxSizeBytesRejectThreshold = other.maxSizeBytesRejectThreshold;
    }
 
    public AddressSettings() {
@@ -377,6 +383,15 @@ public class AddressSettings implements Mergeable<AddressSettings>,
Serializable
       return this;
    }
 
+   public long getMaxSizeBytesRejectThreshold() {
+      return (maxSizeBytesRejectThreshold == null) ? AddressSettings.DEFAULT_ADDRESS_REJECT_THRESHOLD
: maxSizeBytesRejectThreshold;
+   }
+
+   public AddressSettings setMaxSizeBytesRejectThreshold(long maxSizeBytesRejectThreshold)
{
+      this.maxSizeBytesRejectThreshold = maxSizeBytesRejectThreshold;
+      return this;
+   }
+
    /**
     * merge 2 objects in to 1
     *
@@ -456,6 +471,9 @@ public class AddressSettings implements Mergeable<AddressSettings>,
Serializable
       if (queuePrefetch == null) {
          queuePrefetch = merged.queuePrefetch;
       }
+      if (maxSizeBytesRejectThreshold == null) {
+         maxSizeBytesRejectThreshold = merged.maxSizeBytesRejectThreshold;
+      }
    }
 
    @Override
@@ -521,6 +539,8 @@ public class AddressSettings implements Mergeable<AddressSettings>,
Serializable
       autoDeleteJmsTopics = BufferHelper.readNullableBoolean(buffer);
 
       managementBrowsePageSize = BufferHelper.readNullableInteger(buffer);
+
+      maxSizeBytesRejectThreshold = BufferHelper.readNullableLong(buffer);
    }
 
    @Override
@@ -549,7 +569,8 @@ public class AddressSettings implements Mergeable<AddressSettings>,
Serializable
          BufferHelper.sizeOfNullableBoolean(autoDeleteJmsQueues) +
          BufferHelper.sizeOfNullableBoolean(autoCreateJmsTopics) +
          BufferHelper.sizeOfNullableBoolean(autoDeleteJmsTopics) +
-         BufferHelper.sizeOfNullableInteger(managementBrowsePageSize);
+         BufferHelper.sizeOfNullableInteger(managementBrowsePageSize) +
+         BufferHelper.sizeOfNullableLong(maxSizeBytesRejectThreshold);
    }
 
    @Override
@@ -601,6 +622,8 @@ public class AddressSettings implements Mergeable<AddressSettings>,
Serializable
       BufferHelper.writeNullableBoolean(buffer, autoDeleteJmsTopics);
 
       BufferHelper.writeNullableInteger(buffer, managementBrowsePageSize);
+
+      BufferHelper.writeNullableLong(buffer, maxSizeBytesRejectThreshold);
    }
 
    /* (non-Javadoc)
@@ -635,6 +658,7 @@ public class AddressSettings implements Mergeable<AddressSettings>,
Serializable
       result = prime * result + ((autoDeleteJmsTopics == null) ? 0 : autoDeleteJmsTopics.hashCode());
       result = prime * result + ((managementBrowsePageSize == null) ? 0 : managementBrowsePageSize.hashCode());
       result = prime * result + ((queuePrefetch == null) ? 0 : queuePrefetch.hashCode());
+      result = prime * result + ((maxSizeBytesRejectThreshold == null) ? 0 : queuePrefetch.hashCode());
       return result;
    }
 
@@ -802,6 +826,13 @@ public class AddressSettings implements Mergeable<AddressSettings>,
Serializable
       }
       else if (!queuePrefetch.equals(other.queuePrefetch))
          return false;
+
+      if (maxSizeBytesRejectThreshold == null) {
+         if (other.maxSizeBytesRejectThreshold != null)
+            return false;
+      }
+      else if (!maxSizeBytesRejectThreshold.equals(other.maxSizeBytesRejectThreshold))
+         return false;
       return true;
    }
 
@@ -825,6 +856,8 @@ public class AddressSettings implements Mergeable<AddressSettings>,
Serializable
          maxDeliveryAttempts +
          ", maxSizeBytes=" +
          maxSizeBytes +
+         ", maxSizeBytesRejectThreshold=" +
+         maxSizeBytesRejectThreshold +
          ", messageCounterHistoryDayLimit=" +
          messageCounterHistoryDayLimit +
          ", pageSizeBytes=" +

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/artemis-server/src/main/resources/schema/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 5ac86a0..815ef7c 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -2220,7 +2220,15 @@
             <xsd:element name="max-size-bytes" type="xsd:long" default="-1" maxOccurs="1"
minOccurs="0">
                <xsd:annotation>
                   <xsd:documentation>
-                     the maximum size (in bytes) to use in paging for an address (-1 means
no limits)
+                     the maximum size (in bytes) for an address (-1 means no limits).  This
is used in PAGING, BLOCK and FAIL policies.
+                  </xsd:documentation>
+               </xsd:annotation>
+            </xsd:element>
+
+            <xsd:element name="max-size-bytes-reject-threshold" type="xsd:long" default="-1"
maxOccurs="1" minOccurs="0">
+               <xsd:annotation>
+                  <xsd:documentation>
+                     used with the address full BLOCK policy, the maximum size (in bytes)
an address can reach before messages start getting rejected.  Works in combination with max-size-bytes
for AMQP protocol only.  Default = -1 (no limit).
                   </xsd:documentation>
                </xsd:annotation>
             </xsd:element>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java
index 58f7c99..202f2ba 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java
@@ -59,6 +59,8 @@ public class AddressSettingsTest extends ActiveMQTestBase {
       addressSettingsToMerge.setMessageCounterHistoryDayLimit(1002);
       addressSettingsToMerge.setRedeliveryDelay(1003);
       addressSettingsToMerge.setPageSizeBytes(1004);
+      addressSettingsToMerge.setMaxSizeBytesRejectThreshold(10 * 1024);
+
       addressSettings.merge(addressSettingsToMerge);
       Assert.assertEquals(addressSettings.getDeadLetterAddress(), DLQ);
       Assert.assertEquals(addressSettings.getExpiryAddress(), exp);
@@ -68,6 +70,7 @@ public class AddressSettingsTest extends ActiveMQTestBase {
       Assert.assertEquals(addressSettings.getRedeliveryDelay(), 1003);
       Assert.assertEquals(addressSettings.getPageSizeBytes(), 1004);
       Assert.assertEquals(AddressFullMessagePolicy.DROP, addressSettings.getAddressFullMessagePolicy());
+      Assert.assertEquals(addressSettings.getMaxSizeBytesRejectThreshold(), 10 * 1024);
    }
 
    @Test
@@ -82,6 +85,7 @@ public class AddressSettingsTest extends ActiveMQTestBase {
       addressSettingsToMerge.setMaxSizeBytes(1001);
       addressSettingsToMerge.setMessageCounterHistoryDayLimit(1002);
       addressSettingsToMerge.setAddressFullMessagePolicy(AddressFullMessagePolicy.DROP);
+      addressSettingsToMerge.setMaxSizeBytesRejectThreshold(10 * 1024);
       addressSettings.merge(addressSettingsToMerge);
 
       AddressSettings addressSettingsToMerge2 = new AddressSettings();
@@ -100,6 +104,7 @@ public class AddressSettingsTest extends ActiveMQTestBase {
       Assert.assertEquals(addressSettings.getRedeliveryDelay(), 2003);
       Assert.assertEquals(addressSettings.getRedeliveryMultiplier(), 2.5, 0.000001);
       Assert.assertEquals(AddressFullMessagePolicy.DROP, addressSettings.getAddressFullMessagePolicy());
+      Assert.assertEquals(addressSettings.getMaxSizeBytesRejectThreshold(), 10 * 1024);
    }
 
    @Test

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/docs/user-manual/en/flow-control.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/flow-control.md b/docs/user-manual/en/flow-control.md
index c1b4035..8a11966 100644
--- a/docs/user-manual/en/flow-control.md
+++ b/docs/user-manual/en/flow-control.md
@@ -275,25 +275,25 @@ control.
 
 #### Blocking producer window based flow control using AMQP
 
-Apache ActiveMQ Artemis ships with out of the box with 2 protocols that support
-flow control.  Artemis CORE protocol and AMQP.  Both protocols implement flow
-control slightly differently and therefore address full BLOCK policy behaves
-slightly different for clients uses each protocol respectively.
-
-As explained earlier in this chapter the CORE protocol uses a producer window size
-flow control system.  Where credits (representing bytes) are allocated to producers,
-if a producer wants to send a message it should wait until it has enough bytes available
-to send it.  AMQP flow control credits are not representative of bytes but instead represent
-the number of messages a producer is permitted to send (regardless of size).
-
-BLOCK for AMQP works mostly in the same way as the producer window size mechanism above.
 Artemis
-will issue 100 credits to a client at a time and refresh them when the clients credits reaches
30.
-The broker will stop issuing credits once an address is full.  However, since AMQP credits
represent
-whole messages and not bytes, it would be possible for an AMQP client to significantly exceed
an
-address upper bound should the broker continue accepting messages until the clients credits
are exhausted.
-For this reason once an address has reached it's upper bound and is blocked (when using AMQP)
Artemis
-will start rejecting messages until the address becomes unblocked.  This should be taken
into consideration when writing
-application code.
+Apache ActiveMQ Artemis ships with out of the box with 2 protocols that support flow control.
Artemis CORE protocol and
+AMQP. Both protocols implement flow control slightly differently and therefore address full
BLOCK policy behaves slightly
+different for clients that use each protocol respectively.
+
+As explained earlier in this chapter the CORE protocol uses a producer window size flow control
system. Where credits
+(representing bytes) are allocated to producers, if a producer wants to send a message it
should wait until it has
+enough byte credits available for it to send. AMQP flow control credits are not representative
of bytes but instead
+represent the number of messages a producer is permitted to send (regardless of the message
size).
+
+BLOCK for AMQP works mostly in the same way as the producer window size mechanism above.
Artemis will issue 100 credits
+to a client at a time and refresh them when the clients credits reaches 30. The broker will
stop issuing credits once an
+address is full. However, since AMQP credits represent whole messages and not bytes, it would
be possible in some
+scenarios for an AMQP client to significantly exceed an address upper bound should the broker
continue accepting
+messages until the clients credits are exhausted. For this reason there is an additional
parameter available on address
+settings that specifies an upper bound on an address size in bytes. Once this upper bound
is reach Artemis will start
+rejecting AMQP messages. This limit is the max-size-bytes-reject-threshold and is by default
set to -1 (or no limit).
+This is additional parameter allows a kind of soft and hard limit, in normal circumstances
the broker will utilize the
+max-size-bytes parameter using using flow control to put back pressure on the client, but
will protect the broker by
+rejecting messages once the address size is reached.
 
 ### Rate limited flow control
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index 28e38f2..82b6aec 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -412,7 +412,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
       return txContext.getTransactionId();
    }
 
-   AmqpTransactionContext getTransactionContext() {
+   public AmqpTransactionContext getTransactionContext() {
       return txContext;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java
index dcf23d2..2f3e22a 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java
@@ -213,7 +213,7 @@ public class AmqpTransactionContext {
 
    //----- Internal access to context properties ----------------------------//
 
-   AmqpTransactionCoordinator getCoordinator() {
+   public AmqpTransactionCoordinator getCoordinator() {
       return coordinator;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
index b170f82..785543d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
@@ -95,8 +95,13 @@ public class ProtonTest extends ActiveMQTestBase {
 
    private static final String password = "guest";
 
+
    private static final String brokerName = "my-broker";
 
+   private static final long maxSizeBytes = 1 * 1024 * 1024;
+
+   private static final long maxSizeBytesRejectThreshold = 2 * 1024 * 1024;
+
    // this will ensure that all tests in this class are run twice,
    // once with "true" passed to the class' constructor and once with "false"
    @Parameterized.Parameters(name = "{0}")
@@ -310,6 +315,7 @@ public class ProtonTest extends ActiveMQTestBase {
       Assert.assertEquals(q.getMessageCount(), 0);
    }
 
+
    @Test
    public void testRollbackConsumer() throws Throwable {
 
@@ -342,8 +348,11 @@ public class ProtonTest extends ActiveMQTestBase {
    public void testResourceLimitExceptionOnAddressFull() throws Exception {
       if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP
protocol
       setAddressFullBlockPolicy();
+      String destinationAddress = address + 1;
+      fillAddress(destinationAddress);
 
-      fillAddress(address + 1);
+      long addressSize = server.getPagingManager().getPageStore(new SimpleString(destinationAddress)).getAddressSize();
+      assertTrue(addressSize >= maxSizeBytesRejectThreshold);
    }
 
    @Test
@@ -367,6 +376,9 @@ public class ProtonTest extends ActiveMQTestBase {
       }
       assertTrue(e instanceof ResourceAllocationException);
       assertTrue(e.getMessage().contains("resource-limit-exceeded"));
+
+      long addressSize = server.getPagingManager().getPageStore(new SimpleString(destinationAddress)).getAddressSize();
+      assertTrue(addressSize >= maxSizeBytesRejectThreshold);
    }
 
    @Test
@@ -393,6 +405,9 @@ public class ProtonTest extends ActiveMQTestBase {
 
          // This should be -1. A single message is buffered in the client, and 0 credit has
been allocated.
          assertTrue(sender.getSender().getCredit() == -1);
+
+         long addressSize = server.getPagingManager().getPageStore(new SimpleString(destinationAddress)).getAddressSize();
+         assertTrue(addressSize >= maxSizeBytes && addressSize <= maxSizeBytesRejectThreshold);
       }
       finally {
          amqpConnection.close();
@@ -446,7 +461,7 @@ public class ProtonTest extends ActiveMQTestBase {
 
       fillAddress(address + 1);
       AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
-      AmqpConnection amqpConnection = amqpConnection = client.connect();
+      AmqpConnection amqpConnection = client.connect();
       try {
          AmqpSession session = amqpConnection.createSession();
          AmqpSender sender = session.createSender(address + 1);
@@ -459,6 +474,43 @@ public class ProtonTest extends ActiveMQTestBase {
       }
    }
 
+   @Test
+   public void testTxIsRolledBackOnRejectedPreSettledMessage() throws Throwable {
+      if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP
protocol
+      setAddressFullBlockPolicy();
+
+      // Create the link attach before filling the address to ensure the link is allocated
credit.
+      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+      AmqpConnection amqpConnection = client.connect();
+
+      AmqpSession session = amqpConnection.createSession();
+      AmqpSender sender = session.createSender(address);
+      sender.setPresettle(true);
+
+      fillAddress(address);
+
+      final AmqpMessage message = new AmqpMessage();
+      byte[] payload = new byte[50 * 1024];
+      message.setBytes(payload);
+
+      Exception expectedException = null;
+      try {
+         session.begin();
+         sender.send(message);
+         session.commit();
+      }
+      catch (Exception e) {
+         expectedException = e;
+      }
+      finally {
+         amqpConnection.close();
+      }
+
+      assertNotNull(expectedException);
+      assertTrue(expectedException.getMessage().contains("resource-limit-exceeded"));
+      assertTrue(expectedException.getMessage().contains("Address is full: " + address));
+   }
+
    /**
     * Fills an address.  Careful when using this method.  Only use when rejected messages
are switched on.
     * @param address
@@ -520,6 +572,7 @@ public class ProtonTest extends ActiveMQTestBase {
 
       timeout.await(5, TimeUnit.SECONDS);
 
+      System.out.println("Messages Sent: " + sentMessages);
       if (errors[0] != null) {
          throw errors[0];
       }
@@ -1313,7 +1366,8 @@ public class ProtonTest extends ActiveMQTestBase {
       // For BLOCK tests
       AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch("#");
       addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
-      addressSettings.setMaxSizeBytes(1 * 1024 * 1024);
+      addressSettings.setMaxSizeBytes(maxSizeBytes);
+      addressSettings.setMaxSizeBytesRejectThreshold(maxSizeBytesRejectThreshold);
       server.getAddressSettingsRepository().addMatch("#", addressSettings);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
----------------------------------------------------------------------
diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
index 33ee0c7..6c42413 100644
--- a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
+++ b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
@@ -307,6 +307,11 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
       }
 
       @Override
+      public boolean isRejectingMessages() {
+         return false;
+      }
+
+      @Override
       public void applySetting(AddressSettings addressSettings) {
 
       }


Mime
View raw message