activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-979 OpenWire "no-Local" consumer not working
Date Mon, 20 Feb 2017 13:18:32 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master b9f00f73b -> cd47873f2


ARTEMIS-979 OpenWire "no-Local" consumer not working

When creating a 'no-local' openwire consumer, it doesn't work,
meaning it can still receive messages from the same connection.
The fix is similar to what Artemis client does, which is adding
a 'filter' to the consumer/subscription.
The difference is that with OpenWire we have to do it on the
broker side.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e7a4d42a
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e7a4d42a
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e7a4d42a

Branch: refs/heads/master
Commit: e7a4d42a6453973cf97525d355d91dc0c93d7937
Parents: b9f00f7
Author: Howard Gao <howard.gao@gmail.com>
Authored: Sun Feb 19 23:21:20 2017 +0800
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Mon Feb 20 08:17:43 2017 -0500

----------------------------------------------------------------------
 .../protocol/openwire/OpenWireConnection.java   |  10 ++
 .../core/protocol/openwire/amq/AMQConsumer.java |  17 +++
 .../core/protocol/openwire/amq/AMQSession.java  |   6 +
 .../openwire/SimpleOpenWireTest.java            | 137 +++++++++++++++++++
 4 files changed, 170 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e7a4d42a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 13b8a39..da32bda 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -156,6 +156,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
 
    private ConnectionState state;
 
+   private volatile boolean noLocal;
+
    /**
     * Openwire doesn't sen transactions associated with any sessions.
     * It will however send beingTX / endTX as it would be doing it with XA Transactions.
@@ -836,6 +838,14 @@ public class OpenWireConnection extends AbstractRemotingConnection implements
Se
       disableTtl.set(false);
    }
 
+   public boolean isNoLocal() {
+      return noLocal;
+   }
+
+   public void setNoLocal(boolean noLocal) {
+      this.noLocal = noLocal;
+   }
+
    class SlowConsumerDetection implements SlowConsumerDetectionListener {
 
       @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e7a4d42a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 77a1a4a..917d808 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
@@ -39,6 +40,7 @@ import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.reader.MessageUtil;
 import org.apache.activemq.command.ConsumerControl;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
@@ -79,6 +81,18 @@ public class AMQConsumer {
    public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId)
throws Exception {
 
       SimpleString selector = info.getSelector() == null ? null : new SimpleString(info.getSelector());
+      if (info.isNoLocal()) {
+         if (!AdvisorySupport.isAdvisoryTopic(openwireDestination)) {
+            //tell the connection to add the property
+            this.session.getConnection().setNoLocal(true);
+         }
+         String noLocalSelector = MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'"
+ this.getId().getConnectionId() + "'";
+         if (selector == null) {
+            selector = new SimpleString(noLocalSelector);
+         } else {
+            selector = new SimpleString(info.getSelector() + " AND " + noLocalSelector);
+         }
+      }
 
       String physicalName = session.convertWildcard(openwireDestination.getPhysicalName());
 
@@ -201,6 +215,9 @@ public class AMQConsumer {
             return 0;
          }
 
+         if (session.getConnection().isNoLocal()) {
+            message.removeProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME);
+         }
          dispatch = OpenWireMessageConverter.createMessageDispatch(reference, message, this);
          int size = dispatch.getMessage().getSize();
          reference.setProtocolData(dispatch.getMessage().getMessageId());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e7a4d42a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 7cdd070..e002fd0 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
+import org.apache.activemq.artemis.reader.MessageUtil;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.IDGenerator;
@@ -297,6 +298,11 @@ public class AMQSession implements SessionCallback {
 
       ServerMessage originalCoreMsg = getConverter().inbound(messageSend);
 
+      if (connection.isNoLocal() || sessInfo.getSessionId().getValue() == -1) {
+         //Internal session is used to send advisory messages, which are always noLocal
+         originalCoreMsg.putStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString(),
this.connection.getState().getInfo().getConnectionId().getValue());
+      }
+
       /* ActiveMQ failover transport will attempt to reconnect after connection failure.
 Any sent messages that did
       * not receive acks will be resent.  (ActiveMQ broker handles this by returning a last
sequence id received to
       * the client).  To handle this in Artemis we use a duplicate ID cache.  To do this
we check to see if the

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e7a4d42a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
index b406fdd..96d8ac4 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
@@ -31,6 +31,8 @@ import javax.jms.Session;
 import javax.jms.TemporaryQueue;
 import javax.jms.TemporaryTopic;
 import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
 import javax.jms.XAConnection;
 import javax.jms.XASession;
 import javax.transaction.xa.XAResource;
@@ -356,6 +358,141 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
    }
 
    @Test
+   public void testTopicNoLocal() throws Exception {
+      connection.start();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      System.out.println("creating queue: " + topicName);
+      Destination dest = new ActiveMQTopic(topicName);
+
+      MessageConsumer nolocalConsumer = session.createConsumer(dest, null, true);
+      MessageConsumer consumer = session.createConsumer(dest, null, false);
+      MessageConsumer selectorConsumer  = session.createConsumer(dest,"TESTKEY = 'test'",
false);
+
+      MessageProducer producer = session.createProducer(dest);
+
+      final String body1 = "MfromAMQ-1";
+      final String body2 = "MfromAMQ-2";
+      TextMessage msg = session.createTextMessage(body1);
+      producer.send(msg);
+
+      msg = session.createTextMessage(body2);
+      msg.setStringProperty("TESTKEY", "test");
+      producer.send(msg);
+
+      //receive nolocal
+      TextMessage receivedMsg = (TextMessage) nolocalConsumer.receive(1000);
+      assertNull("nolocal consumer got: " + receivedMsg, receivedMsg);
+
+      //receive normal consumer
+      receivedMsg = (TextMessage) consumer.receive(1000);
+      assertNotNull(receivedMsg);
+      assertEquals(body1, receivedMsg.getText());
+
+      receivedMsg = (TextMessage) consumer.receive(1000);
+      assertNotNull(receivedMsg);
+      assertEquals(body2, receivedMsg.getText());
+
+      assertNull(consumer.receiveNoWait());
+
+      //selector should only receive one
+      receivedMsg = (TextMessage) selectorConsumer.receive(1000);
+      assertNotNull(receivedMsg);
+      assertEquals(body2, receivedMsg.getText());
+      assertEquals("test", receivedMsg.getStringProperty("TESTKEY"));
+
+      assertNull(selectorConsumer.receiveNoWait());
+
+      //send from another connection
+      Connection anotherConn = this.factory.createConnection();
+      try {
+         anotherConn.start();
+
+         Session anotherSession = anotherConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer anotherProducer = anotherSession.createProducer(dest);
+         TextMessage anotherMsg = anotherSession.createTextMessage(body1);
+         anotherProducer.send(anotherMsg);
+
+         assertNotNull(consumer.receive(1000));
+         assertNull(selectorConsumer.receive(1000));
+         assertNotNull(nolocalConsumer.receive(1000));
+      } finally {
+         anotherConn.close();
+      }
+
+      session.close();
+   }
+
+   @Test
+   public void testTopicNoLocalDurable() throws Exception {
+      connection.setClientID("forNoLocal-1");
+      connection.start();
+      TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      System.out.println("creating queue: " + topicName);
+      Topic dest = new ActiveMQTopic(topicName);
+
+      MessageConsumer nolocalConsumer = session.createDurableSubscriber(dest, "nolocal-subscriber1",
"", true);
+      MessageConsumer consumer = session.createDurableSubscriber(dest, "normal-subscriber",
null, false);
+      MessageConsumer selectorConsumer = session.createDurableSubscriber(dest, "selector-subscriber",
"TESTKEY = 'test'", false);
+
+      MessageProducer producer = session.createProducer(dest);
+
+      final String body1 = "MfromAMQ-1";
+      final String body2 = "MfromAMQ-2";
+      TextMessage msg = session.createTextMessage(body1);
+      producer.send(msg);
+
+      msg = session.createTextMessage(body2);
+      msg.setStringProperty("TESTKEY", "test");
+      producer.send(msg);
+
+      //receive nolocal
+      TextMessage receivedMsg = (TextMessage) nolocalConsumer.receive(1000);
+      assertNull("nolocal consumer got: " + receivedMsg, receivedMsg);
+
+      //receive normal consumer
+      receivedMsg = (TextMessage) consumer.receive(1000);
+      assertNotNull(receivedMsg);
+      assertEquals(body1, receivedMsg.getText());
+
+      receivedMsg = (TextMessage) consumer.receive(1000);
+      assertNotNull(receivedMsg);
+      assertEquals(body2, receivedMsg.getText());
+
+      assertNull(consumer.receiveNoWait());
+
+      //selector should only receive one
+      receivedMsg = (TextMessage) selectorConsumer.receive(1000);
+      assertNotNull(receivedMsg);
+      assertEquals(body2, receivedMsg.getText());
+      assertEquals("test", receivedMsg.getStringProperty("TESTKEY"));
+
+      assertNull(selectorConsumer.receiveNoWait());
+
+      //send from another connection
+      Connection anotherConn = this.factory.createConnection();
+      try {
+         anotherConn.start();
+
+         Session anotherSession = anotherConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer anotherProducer = anotherSession.createProducer(dest);
+         TextMessage anotherMsg = anotherSession.createTextMessage(body1);
+         anotherProducer.send(anotherMsg);
+
+         assertNotNull(consumer.receive(1000));
+         assertNull(selectorConsumer.receive(1000));
+         assertNotNull(nolocalConsumer.receive(1000));
+      } finally {
+         anotherConn.close();
+      }
+
+      session.close();
+   }
+
+   @Test
    public void testSimpleTempTopic() throws Exception {
       connection.start();
       Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);


Mime
View raw message