activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [3/3] activemq-artemis git commit: ARTEMIS-823 = broker doesn't settle rejected messages
Date Mon, 21 Nov 2016 16:25:13 GMT
ARTEMIS-823 = broker doesn't settle rejected messages

Currently we don't settle rejected messages at the broker, we should always settle when rejected

https://issues.apache.org/jira/browse/ARTEMIS-823


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/329c533d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/329c533d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/329c533d

Branch: refs/heads/master
Commit: 329c533d21ac9eeb858ae89cfa616c6f63138bf5
Parents: f509c08
Author: Andy Taylor <andy.tayls67@gmail.com>
Authored: Wed Oct 26 09:06:10 2016 +0100
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Mon Nov 21 11:24:51 2016 -0500

----------------------------------------------------------------------
 .../amqp/broker/AMQPSessionCallback.java        |  1 +
 .../proton/ProtonServerReceiverContext.java     |  1 +
 .../amqp/client/AmqpAbstractResource.java       |  5 +
 .../transport/amqp/client/AmqpSender.java       | 11 +++
 .../transport/amqp/client/AmqpValidator.java    |  6 ++
 .../integration/amqp/AmqpSecurityTest.java      | 98 ++++++++++++++++++++
 6 files changed, 122 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/329c533d/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index acbb2e9..1fc8511 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -352,6 +352,7 @@ public class AMQPSessionCallback implements SessionCallback {
       Rejected rejected = new Rejected();
       rejected.setError(ec);
       delivery.disposition(rejected);
+      delivery.settle();
       connection.flush();
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/329c533d/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 41caea9..0cc293a 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -158,6 +158,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
          condition.setDescription(e.getMessage());
          rejected.setError(condition);
          delivery.disposition(rejected);
+         delivery.settle();
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/329c533d/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
index 50aa770..0ab4596 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java
@@ -19,6 +19,7 @@ package org.apache.activemq.transport.amqp.client;
 import java.io.IOException;
 
 import org.apache.activemq.transport.amqp.client.util.AsyncResult;
+import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Endpoint;
 import org.apache.qpid.proton.engine.EndpointState;
 import org.slf4j.Logger;
@@ -303,6 +304,10 @@ public abstract class AmqpAbstractResource<E extends Endpoint>
implements AmqpRe
    protected void doDetachedInspection() {
    }
 
+   protected void doDeliveryUpdate(Delivery delivery) {
+
+   }
+
    //----- Private implementation utility methods ---------------------------//
 
    private boolean isAwaitingOpen() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/329c533d/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
index 3b134c9..9b2a70d 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
@@ -440,6 +440,8 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
          tagGenerator.returnTag(delivery.getTag());
          delivery.settle();
          toRemove.add(delivery);
+
+         doDeliveryUpdate(delivery);
       }
 
       pending.removeAll(toRemove);
@@ -449,4 +451,13 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
    public String toString() {
       return getClass().getSimpleName() + "{ address = " + address + "}";
    }
+
+   @Override
+   protected void doDeliveryUpdate(Delivery delivery) {
+      try {
+         getStateInspector().inspectDeliveryUpdate(delivery);
+      } catch (Throwable error) {
+         getStateInspector().markAsInvalid(error.getMessage());
+      }
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/329c533d/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java
index 5f46cb6..eca7676 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpValidator.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.transport.amqp.client;
 
 import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
 import org.apache.qpid.proton.engine.Sender;
 import org.apache.qpid.proton.engine.Session;
@@ -70,6 +71,10 @@ public class AmqpValidator {
 
    }
 
+   public void inspectDeliveryUpdate(Delivery delivery) {
+
+   }
+
    public boolean isValid() {
       return valid;
    }
@@ -98,4 +103,5 @@ public class AmqpValidator {
          throw new AssertionError(errorMessage);
       }
    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/329c533d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java
new file mode 100644
index 0000000..2c15c35
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.security.Role;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.activemq.transport.amqp.client.AmqpValidator;
+import org.apache.qpid.proton.engine.Delivery;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class AmqpSecurityTest extends AmqpClientTestSupport {
+
+   @Override
+   protected ActiveMQServer createServer() throws Exception {
+      ActiveMQServer server = createServer(true, true);
+      ActiveMQJAASSecurityManager securityManager = (ActiveMQJAASSecurityManager) server.getSecurityManager();
+      securityManager.getConfiguration().addUser("foo", "bar");
+      securityManager.getConfiguration().addRole("foo", "none");
+      HierarchicalRepository<Set<Role>> securityRepository = server.getSecurityRepository();
+      HashSet<Role> value = new HashSet<>();
+      value.add(new Role("none", false, true, true, true, true, true, true, true));
+      securityRepository.addMatch(getTestName(), value);
+
+      serverManager = new JMSServerManagerImpl(server);
+      Configuration serverConfig = server.getConfiguration();
+      serverConfig.getAddressesSettings().put("jms.queue.#", new AddressSettings().setAutoCreateJmsQueues(true).setDeadLetterAddress(new
SimpleString("jms.queue.ActiveMQ.DLQ")));
+      serverConfig.setSecurityEnabled(true);
+      serverManager.start();
+      server.start();
+      return server;
+   }
+
+   @Test(timeout = 60000)
+   public void testSendAndRejected() throws Exception {
+      AmqpConnection connection = null;
+      AmqpClient client = createAmqpClient("foo", "bar");
+      CountDownLatch latch = new CountDownLatch(1);
+      client.setValidator(new AmqpValidator() {
+         @Override
+         public void inspectDeliveryUpdate(Delivery delivery) {
+            super.inspectDeliveryUpdate(delivery);
+            if (!delivery.remotelySettled()) {
+               markAsInvalid("delivery is not remotely settled");
+            }
+            latch.countDown();
+         }
+      });
+      connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpSender sender = session.createSender(getTestName());
+      AmqpMessage message = new AmqpMessage();
+
+      message.setMessageId("msg" + 1);
+      message.setMessageAnnotation("serialNo", 1);
+      message.setText("Test-Message");
+
+      try {
+         sender.send(message);
+      } catch (IOException e) {
+         //
+      }
+      assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
+      connection.getStateInspector().assertValid();
+      connection.close();
+   }
+}


Mime
View raw message