activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5153 https://issues.apache.org/jira/browse/AMQ-6254
Date Tue, 19 Apr 2016 13:54:22 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 6541bef52 -> b027e6555


https://issues.apache.org/jira/browse/AMQ-5153
https://issues.apache.org/jira/browse/AMQ-6254

Store the original subscribed destination along with the target
destination in the subscription info object to ensure that wildcard
subscriptions remain linked.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b027e655
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b027e655
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b027e655

Branch: refs/heads/master
Commit: b027e655538226b56f1fe277a426e0df41501fc6
Parents: 6541bef
Author: Timothy Bish <tabish121@gmail.com>
Authored: Tue Apr 19 09:53:58 2016 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Tue Apr 19 09:53:58 2016 -0400

----------------------------------------------------------------------
 .../src/main/proto/records.proto                |   3 +-
 .../org/apache/activemq/leveldb/DBManager.scala |  10 +-
 .../org/apache/activemq/bugs/AMQ6254Test.java   | 311 +++++++++++++++++++
 3 files changed, 321 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/b027e655/activemq-leveldb-store/src/main/proto/records.proto
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/proto/records.proto b/activemq-leveldb-store/src/main/proto/records.proto
index d7ec58d..629bdce 100644
--- a/activemq-leveldb-store/src/main/proto/records.proto
+++ b/activemq-leveldb-store/src/main/proto/records.proto
@@ -12,7 +12,7 @@
 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 // See the License for the specific language governing permissions and
 // limitations under the License.
-// 
+//
 package org.apache.activemq.leveldb.record;
 
 option java_multiple_files = true;
@@ -53,4 +53,5 @@ message SubscriptionRecord {
   optional string subscription_name = 3;
   optional string selector = 4;
   optional string destination_name = 5;
+  optional string subscribed_destination_name = 6;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/b027e655/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
index b0051cc..bc53710 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
@@ -785,6 +785,9 @@ class DBManager(val parent:LevelDBStore) {
     if( info.getDestination!=null ) {
       record.setDestinationName(info.getDestination.getQualifiedName)
     }
+    if ( info.getSubscribedDestination!=null) {
+      record.setSubscribedDestinationName(info.getSubscribedDestination.getQualifiedName)
+    }
     val collection = new CollectionRecord.Bean()
     collection.setType(SUBSCRIPTION_COLLECTION_TYPE)
     collection.setKey(lastCollectionKey.incrementAndGet())
@@ -854,8 +857,11 @@ class DBManager(val parent:LevelDBStore) {
           if( sr.hasSelector ) {
             info.setSelector(sr.getSelector)
           }
-          if(sr.hasDestinationName) {
-            info.setSubscribedDestination(ActiveMQDestination.createDestination(sr.getDestinationName,
ActiveMQDestination.TOPIC_TYPE))
+          if( sr.hasDestinationName ) {
+            info.setDestination(ActiveMQDestination.createDestination(sr.getDestinationName,
ActiveMQDestination.TOPIC_TYPE))
+          }
+          if( sr.hasSubscribedDestinationName ) {
+            info.setSubscribedDestination(ActiveMQDestination.createDestination(sr.getSubscribedDestinationName,
ActiveMQDestination.TOPIC_TYPE))
           }
 
           var sub = DurableSubscription(key, sr.getTopicKey, info)

http://git-wip-us.apache.org/repos/asf/activemq/blob/b027e655/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6254Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6254Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6254Test.java
new file mode 100644
index 0000000..9c9a35b
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6254Test.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.TopicSubscriber;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.TopicRegion;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.filter.DestinationMapEntry;
+import org.apache.activemq.leveldb.LevelDBStoreFactory;
+import org.apache.activemq.security.AuthenticationUser;
+import org.apache.activemq.security.AuthorizationEntry;
+import org.apache.activemq.security.AuthorizationPlugin;
+import org.apache.activemq.security.DefaultAuthorizationMap;
+import org.apache.activemq.security.SimpleAuthenticationPlugin;
+import org.apache.activemq.security.TempDestinationAuthorizationEntry;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(Parameterized.class)
+public class AMQ6254Test {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ6254Test.class);
+
+    private static final String KAHADB = "KahaDB";
+    private static final String LEVELDB = "LevelDB";
+
+    private BrokerService brokerService;
+
+    private String topicA = "alphabet.a";
+    private String topicB = "alphabet.b";
+
+    private String persistenceAdapterName;
+    private boolean pluginsEnabled;
+
+    @Parameters(name="{0} -> plugins = {1}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                {KAHADB, true },
+                {KAHADB, false },
+                {LEVELDB, true },
+                {LEVELDB, false },
+            });
+    }
+
+    public AMQ6254Test(String persistenceAdapterName, boolean pluginsEnabled) {
+        this.persistenceAdapterName = persistenceAdapterName;
+        this.pluginsEnabled = pluginsEnabled;
+    }
+
+    @Test(timeout = 60000)
+    public void testReactivateKeepaliveSubscription() throws Exception {
+        // Create wild card durable subscription
+        Connection connection = createConnection();
+        connection.setClientID("cliID");
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        TopicSubscriber subscriber = session.createDurableSubscriber(session.createTopic("alphabet.>"),
"alphabet.>");
+
+        // Send message on Topic A
+        connection = createConnection();
+        connection.start();
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(session.createTopic(topicA));
+        producer.send(session.createTextMessage("Hello A"));
+
+        // Verify that message is received
+        TextMessage message = (TextMessage) subscriber.receive(2000);
+        assertNotNull("Message not received.", message);
+        assertEquals("Hello A", message.getText());
+
+        subscriber.close();
+
+        assertTrue("Should have only one destination", Wait.waitFor(new Wait.Condition()
{
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                Destination destA = getDestination(topicA);
+                return destA.getDestinationStatistics().getConsumers().getCount() == 1;
+            }
+        }));
+
+        // Restart broker
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+        LOG.info("Broker stopped");
+
+        brokerService = createBroker(false);
+        brokerService.start();
+        brokerService.waitUntilStarted();
+        LOG.info("Broker restarted");
+
+        // Recreate wild card durable subscription
+        connection = createConnection();
+        connection.setClientID("cliID");
+        connection.start();
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        subscriber = session.createDurableSubscriber(session.createTopic("alphabet.>"),
"alphabet.>");
+
+        // Send message on Topic B
+        connection = createConnection();
+        connection.start();
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        producer = session.createProducer(session.createTopic(topicA));
+        producer.send(session.createTextMessage("Hello Again A"));
+
+        // Verify both messages are received
+        message = (TextMessage) subscriber.receive(2000);
+        assertNotNull("Message not received.", message);
+        assertEquals("Hello Again A", message.getText());
+
+        // Verify that we still have a single subscription
+        assertTrue("Should have only one destination", Wait.waitFor(new Wait.Condition()
{
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                Destination destA = getDestination(topicA);
+                return destA.getDestinationStatistics().getConsumers().getCount() == 1;
+            }
+        }));
+
+        subscriber.close();
+        connection.close();
+    }
+
+    private Destination getDestination(String topicName) {
+        RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
+        TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
+
+        Set<Destination> destinations = topicRegion.getDestinations(new ActiveMQTopic(topicName));
+        assertEquals(1, destinations.size());
+
+        return destinations.iterator().next();
+    }
+
+    private Connection createConnection() throws Exception {
+        String connectionURI = brokerService.getTransportConnectors().get(0).getPublishableConnectString();
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionURI);
+        return cf.createConnection("system", "manager");
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        brokerService = createBroker(true);
+        brokerService.start();
+        brokerService.waitUntilStarted();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (brokerService != null) {
+            brokerService.stop();
+            brokerService.waitUntilStopped();
+            brokerService = null;
+        }
+    }
+
+    protected BrokerService createBroker(boolean deleteAllMessages) throws Exception {
+        BrokerService answer = new BrokerService();
+
+        answer.setKeepDurableSubsActive(true);
+        answer.setUseJmx(false);
+        answer.setPersistent(true);
+        answer.setDeleteAllMessagesOnStartup(deleteAllMessages);
+        answer.setAdvisorySupport(false);
+
+        switch (persistenceAdapterName) {
+            case KAHADB:
+                answer.setPersistenceAdapter(new KahaDBPersistenceAdapter());
+                break;
+            case LEVELDB:
+                answer.setPersistenceFactory(new LevelDBStoreFactory());
+                break;
+        }
+
+        answer.addConnector("tcp://localhost:0");
+
+        if (pluginsEnabled) {
+            ArrayList<BrokerPlugin> plugins = new ArrayList<BrokerPlugin>();
+
+            BrokerPlugin authenticationPlugin = configureAuthentication();
+            if (authenticationPlugin != null) {
+                plugins.add(configureAuthorization());
+            }
+
+            BrokerPlugin authorizationPlugin = configureAuthorization();
+            if (authorizationPlugin != null) {
+                plugins.add(configureAuthentication());
+            }
+
+            if (!plugins.isEmpty()) {
+                BrokerPlugin[] array = new BrokerPlugin[plugins.size()];
+                answer.setPlugins(plugins.toArray(array));
+            }
+        }
+
+        ActiveMQDestination[] destinations = { new ActiveMQTopic(topicA), new ActiveMQTopic(topicB)
};
+        answer.setDestinations(destinations);
+        return answer;
+    }
+
+    protected BrokerPlugin configureAuthentication() throws Exception {
+        List<AuthenticationUser> users = new ArrayList<AuthenticationUser>();
+        users.add(new AuthenticationUser("system", "manager", "users,admins"));
+        SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin(users);
+
+        return authenticationPlugin;
+    }
+
+    protected BrokerPlugin configureAuthorization() throws Exception {
+
+        @SuppressWarnings("rawtypes")
+        List<DestinationMapEntry> authorizationEntries = new ArrayList<DestinationMapEntry>();
+
+        AuthorizationEntry entry = new AuthorizationEntry();
+        entry.setQueue(">");
+        entry.setRead("admins");
+        entry.setWrite("admins");
+        entry.setAdmin("admins");
+        authorizationEntries.add(entry);
+        entry = new AuthorizationEntry();
+        entry.setQueue("USERS.>");
+        entry.setRead("users");
+        entry.setWrite("users");
+        entry.setAdmin("users");
+        authorizationEntries.add(entry);
+        entry = new AuthorizationEntry();
+        entry.setQueue("GUEST.>");
+        entry.setRead("guests");
+        entry.setWrite("guests,users");
+        entry.setAdmin("guests,users");
+        authorizationEntries.add(entry);
+        entry = new AuthorizationEntry();
+        entry.setTopic(">");
+        entry.setRead("admins");
+        entry.setWrite("admins");
+        entry.setAdmin("admins");
+        authorizationEntries.add(entry);
+        entry = new AuthorizationEntry();
+        entry.setTopic("USERS.>");
+        entry.setRead("users");
+        entry.setWrite("users");
+        entry.setAdmin("users");
+        authorizationEntries.add(entry);
+        entry = new AuthorizationEntry();
+        entry.setTopic("GUEST.>");
+        entry.setRead("guests");
+        entry.setWrite("guests,users");
+        entry.setAdmin("guests,users");
+        authorizationEntries.add(entry);
+        entry = new AuthorizationEntry();
+        entry.setTopic("ActiveMQ.Advisory.>");
+        entry.setRead("guests,users");
+        entry.setWrite("guests,users");
+        entry.setAdmin("guests,users");
+        authorizationEntries.add(entry);
+
+        TempDestinationAuthorizationEntry tempEntry = new TempDestinationAuthorizationEntry();
+        tempEntry.setRead("admins");
+        tempEntry.setWrite("admins");
+        tempEntry.setAdmin("admins");
+
+        DefaultAuthorizationMap authorizationMap = new DefaultAuthorizationMap(authorizationEntries);
+        authorizationMap.setTempDestinationAuthorizationEntry(tempEntry);
+        AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin(authorizationMap);
+
+        return authorizationPlugin;
+    }
+}


Mime
View raw message