activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-585 support send on dynamic sender link
Date Tue, 21 Jun 2016 20:53:37 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 372528d84 -> 5c4fab3fc


ARTEMIS-585 support send on dynamic sender link

The sender abstraction must be able to update its sender address in the
case of dynamic senders whose target address is not set until the code
initializes the link and creates a destination for it.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/75f18d1e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/75f18d1e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/75f18d1e

Branch: refs/heads/master
Commit: 75f18d1e10fc3b6c881bf3a44045e80ca8812288
Parents: 372528d
Author: Timothy Bish <tabish121@gmail.com>
Authored: Mon Jun 20 11:34:49 2016 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Tue Jun 21 09:33:33 2016 -0400

----------------------------------------------------------------------
 .../proton/plug/AMQPClientSenderContext.java    |  3 ++
 .../proton/plug/AMQPClientSessionContext.java   |  2 +
 .../context/AbstractProtonReceiverContext.java  |  8 +--
 .../context/client/ProtonClientContext.java     |  5 +-
 .../client/ProtonClientSessionContext.java      | 51 ++++++++++++++++++++
 .../server/ProtonServerReceiverContext.java     |  8 +--
 .../tests/integration/proton/ProtonTest.java    | 41 +++++++++++++++-
 7 files changed, 105 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/75f18d1e/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSenderContext.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSenderContext.java
index 8c40b9d..44d1056 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSenderContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSenderContext.java
@@ -21,4 +21,7 @@ import org.apache.qpid.proton.message.ProtonJMessage;
 public interface AMQPClientSenderContext {
 
    void send(ProtonJMessage message);
+
+   String getAddress();
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/75f18d1e/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java
index b518474..44cec7c 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java
@@ -22,6 +22,8 @@ public interface AMQPClientSessionContext extends AMQPSessionContext {
 
    AMQPClientSenderContext createSender(String address, boolean preSettled) throws ActiveMQAMQPException;
 
+   AMQPClientSenderContext createDynamicSender(boolean preSettled) throws ActiveMQAMQPException;
+
    AMQPClientReceiverContext createReceiver(String address) throws ActiveMQAMQPException;
 
    AMQPClientReceiverContext createReceiver(String name, String address) throws ActiveMQAMQPException;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/75f18d1e/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
index ffc08d3..4343b01 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
@@ -32,7 +32,7 @@ public abstract class AbstractProtonReceiverContext extends ProtonInitializable
 
    protected final Receiver receiver;
 
-   protected final String address;
+   protected String address;
 
    protected final AMQPSessionCallback sessionSPI;
 
@@ -43,12 +43,6 @@ public abstract class AbstractProtonReceiverContext extends ProtonInitializable
       this.connection = connection;
       this.protonSession = protonSession;
       this.receiver = receiver;
-      if (receiver.getRemoteTarget() != null) {
-         this.address = receiver.getRemoteTarget().getAddress();
-      }
-      else {
-         this.address = null;
-      }
       this.sessionSPI = sessionSPI;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/75f18d1e/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientContext.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientContext.java
index e03c99d..f442b9e 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientContext.java
@@ -67,7 +67,10 @@ public class ProtonClientContext extends AbstractProtonContextSender implements
          Thread.currentThread().interrupt();
          return false;
       }
-
    }
 
+   @Override
+   public String getAddress() {
+      return sender.getRemoteTarget().getAddress();
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/75f18d1e/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java
index b3e96bb..9079dc3 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java
@@ -16,8 +16,18 @@
  */
 package org.proton.plug.context.client;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.DeleteOnClose;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
 import org.apache.qpid.proton.amqp.messaging.Source;
 import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
+import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
 import org.apache.qpid.proton.engine.Receiver;
 import org.apache.qpid.proton.engine.Sender;
@@ -26,6 +36,7 @@ import org.proton.plug.AMQPClientReceiverContext;
 import org.proton.plug.AMQPClientSenderContext;
 import org.proton.plug.AMQPClientSessionContext;
 import org.proton.plug.AMQPSessionCallback;
+import org.proton.plug.AmqpSupport;
 import org.proton.plug.context.AbstractConnectionContext;
 import org.proton.plug.context.AbstractProtonSessionContext;
 import org.proton.plug.exceptions.ActiveMQAMQPException;
@@ -63,6 +74,46 @@ public class ProtonClientSessionContext extends AbstractProtonSessionContext
imp
    }
 
    @Override
+   public AMQPClientSenderContext createDynamicSender(boolean preSettled) throws ActiveMQAMQPException
{
+      FutureRunnable futureRunnable = new FutureRunnable(1);
+
+      ProtonClientContext amqpSender;
+      synchronized (connection.getLock()) {
+         final String senderName = "Dynamic-" + UUID.randomUUID().toString();
+
+         Sender sender = session.sender(senderName);
+         sender.setSenderSettleMode(SenderSettleMode.SETTLED);
+
+         Symbol[] outcomes = new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL};
+         Source source = new Source();
+         source.setAddress(senderName);
+         source.setOutcomes(outcomes);
+
+         Target target = new Target();
+         target.setDynamic(true);
+         target.setDurable(TerminusDurability.NONE);
+         target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+
+         // Set the dynamic node lifetime-policy
+         Map<Symbol, Object> dynamicNodeProperties = new HashMap<>();
+         dynamicNodeProperties.put(AmqpSupport.LIFETIME_POLICY, DeleteOnClose.getInstance());
+         target.setDynamicNodeProperties(dynamicNodeProperties);
+
+         amqpSender = new ProtonClientContext(connection, sender, this, sessionSPI);
+         amqpSender.afterInit(futureRunnable);
+         sender.setSource(source);
+         sender.setTarget(target);
+         sender.setContext(amqpSender);
+         sender.open();
+      }
+
+      connection.flush();
+
+      waitWithTimeout(futureRunnable);
+      return amqpSender;
+   }
+
+   @Override
    public AMQPClientReceiverContext createReceiver(String address) throws ActiveMQAMQPException
{
       return createReceiver(address, address);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/75f18d1e/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
index e3cbb3b..aa04cef 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
@@ -61,20 +61,20 @@ public class ProtonServerReceiverContext extends AbstractProtonReceiverContext
{
          if (target.getDynamic()) {
             //if dynamic we have to create the node (queue) and set the address on the target,
the node is temporary and
             // will be deleted on closing of the session
-            String queue = sessionSPI.tempQueueName();
+            address = sessionSPI.tempQueueName();
 
             try {
-               sessionSPI.createTemporaryQueue(queue);
+               sessionSPI.createTemporaryQueue(address);
             }
             catch (Exception e) {
                throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
             }
-            target.setAddress(queue);
+            target.setAddress(address);
          }
          else {
             //if not dynamic then we use the targets address as the address to forward the
messages to, however there has to
             //be a queue bound to it so we nee to check this.
-            String address = target.getAddress();
+            address = target.getAddress();
             if (address == null) {
                throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.targetAddressNotSet();
             }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/75f18d1e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
index 9534681..d803e9e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
@@ -39,6 +39,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Enumeration;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
@@ -51,7 +52,10 @@ import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.ByteUtil;
 import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Properties;
 import org.apache.qpid.proton.message.ProtonJMessage;
+import org.apache.qpid.proton.message.impl.MessageImpl;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -60,6 +64,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.proton.plug.AMQPClientConnectionContext;
 import org.proton.plug.AMQPClientReceiverContext;
+import org.proton.plug.AMQPClientSenderContext;
 import org.proton.plug.AMQPClientSessionContext;
 import org.proton.plug.test.Constants;
 import org.proton.plug.test.minimalclient.SimpleAMQPConnector;
@@ -196,7 +201,7 @@ public class ProtonTest extends ActiveMQTestBase {
    }
 
    @Test
-      public void testReplyToNonJMS() throws Throwable {
+   public void testReplyToNonJMS() throws Throwable {
 
       Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
       TemporaryQueue queue = session.createTemporaryQueue();
@@ -351,6 +356,40 @@ public class ProtonTest extends ActiveMQTestBase {
       }
    }
 
+   @Test
+   public void testDynamicSenderLink() throws Exception {
+
+      if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP
protocol
+
+      SimpleAMQPConnector connector = new SimpleAMQPConnector();
+      connector.start();
+      AMQPClientConnectionContext clientConnection = connector.connect("127.0.0.1", Constants.PORT);
+
+      clientConnection.clientOpen(null);
+
+      AMQPClientSessionContext csession = clientConnection.createClientSession();
+      AMQPClientSenderContext sender = csession.createDynamicSender(false);
+
+      String address = sender.getAddress();
+
+      AMQPClientReceiverContext receiver = csession.createReceiver(address);
+      receiver.flow(1);
+
+      // Send one on the dynamic address
+      MessageImpl message = (MessageImpl) org.apache.qpid.proton.message.Message.Factory.create();
+
+      Properties props = new Properties();
+      Map<Object, Object> map = new HashMap<>();
+
+      map.put("some-property", 1);
+      AmqpValue value = new AmqpValue(map);
+      message.setBody(value);
+      message.setProperties(props);
+      sender.send(message);
+
+      ProtonJMessage protonJMessage = receiver.receiveMessage(500, TimeUnit.MILLISECONDS);
+      Assert.assertNotNull(protonJMessage);
+   }
 
    @Test
    public void testConnection() throws Exception {


Mime
View raw message