activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-4729 - add lockableSupport to mKahaDb to isolate it from dynamically created filtered kahadb locks, nested locks are disabled when top level lock is in place. default to a sharedfile locker
Date Thu, 19 Sep 2013 21:08:07 GMT
Updated Branches:
  refs/heads/trunk c7d33ad5e -> 0f90695db


https://issues.apache.org/jira/browse/AMQ-4729 - add lockableSupport to mKahaDb to isolate
it from dynamically created filtered kahadb locks, nested locks are disabled when top level
lock is in place. default to a sharedfile locker


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0f90695d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0f90695d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0f90695d

Branch: refs/heads/trunk
Commit: 0f90695db7b8edd08348f5772500e3605a8825f6
Parents: c7d33ad
Author: gtully <gary.tully@gmail.com>
Authored: Thu Sep 19 21:57:47 2013 +0100
Committer: gtully <gary.tully@gmail.com>
Committed: Thu Sep 19 21:58:43 2013 +0100

----------------------------------------------------------------------
 .../activemq/broker/LockableServiceSupport.java |  4 +
 .../kahadb/MultiKahaDBPersistenceAdapter.java   | 46 +++++++---
 .../broker/ft/mKahaDbQueueMasterSlaveTest.java  | 89 ++++++++++++++++++++
 3 files changed, 129 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/0f90695d/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java
index b1bffd0..78480e6 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java
@@ -54,6 +54,10 @@ public abstract class LockableServiceSupport extends ServiceSupport implements
L
         this.useLock = useLock;
     }
 
+    public boolean isUseLock() {
+        return this.useLock;
+    }
+
     @Override
     public void setLocker(Locker locker) throws IOException {
         this.locker = locker;

http://git-wip-us.apache.org/repos/asf/activemq/blob/0f90695d/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
index 40dcebd..4cda2a1 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
@@ -30,6 +30,8 @@ import java.util.Set;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.LockableServiceSupport;
+import org.apache.activemq.broker.Locker;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -39,9 +41,11 @@ import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.filter.AnyDestination;
 import org.apache.activemq.filter.DestinationMap;
+import org.apache.activemq.filter.DestinationMapEntry;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.SharedFileLocker;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TransactionStore;
 import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
@@ -49,6 +53,7 @@ import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.ServiceStopper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,12 +63,19 @@ import org.slf4j.LoggerFactory;
  *
  * @org.apache.xbean.XBean element="mKahaDB"
  */
-public class MultiKahaDBPersistenceAdapter extends DestinationMap implements PersistenceAdapter,
BrokerServiceAware {
+public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter
{
     static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBPersistenceAdapter.class);
 
     final static ActiveMQDestination matchAll = new AnyDestination(new ActiveMQDestination[]{new
ActiveMQQueue(">"), new ActiveMQTopic(">")});
     final int LOCAL_FORMAT_ID_MAGIC = Integer.valueOf(System.getProperty("org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore.localXaFormatId",
"61616"));
 
+    final class DelegateDestinationMap extends DestinationMap {
+        public void setEntries(List<DestinationMapEntry>  entries) {
+            super.setEntries(entries);
+        }
+    };
+    final DelegateDestinationMap destinationMap = new DelegateDestinationMap();
+
     BrokerService brokerService;
     List<KahaDBPersistenceAdapter> adapters = new LinkedList<KahaDBPersistenceAdapter>();
     private File directory = new File(IOHelper.getDefaultDataDirectory() + File.separator
+ "mKahaDB");
@@ -120,7 +132,7 @@ public class MultiKahaDBPersistenceAdapter extends DestinationMap implements
Per
             configureAdapter(adapter);
             adapters.add(adapter);
         }
-        super.setEntries(entries);
+        destinationMap.setEntries(entries);
     }
 
     private String nameFromDestinationFilter(ActiveMQDestination destination) {
@@ -161,7 +173,7 @@ public class MultiKahaDBPersistenceAdapter extends DestinationMap implements
Per
     }
 
     private PersistenceAdapter getMatchingPersistenceAdapter(ActiveMQDestination destination)
{
-        Object result = this.chooseValue(destination);
+        Object result = destinationMap.chooseValue(destination);
         if (result == null) {
             throw new RuntimeException("No matching persistence adapter configured for destination:
" + destination + ", options:" + adapters);
         }
@@ -249,7 +261,7 @@ public class MultiKahaDBPersistenceAdapter extends DestinationMap implements
Per
         if (adapter instanceof KahaDBPersistenceAdapter) {
             adapter.removeQueueMessageStore(destination);
             removeMessageStore((KahaDBPersistenceAdapter)adapter, destination);
-            removeAll(destination);
+            destinationMap.removeAll(destination);
         }
     }
 
@@ -259,7 +271,7 @@ public class MultiKahaDBPersistenceAdapter extends DestinationMap implements
Per
         if (adapter instanceof KahaDBPersistenceAdapter) {
             adapter.removeTopicMessageStore(destination);
             removeMessageStore((KahaDBPersistenceAdapter)adapter, destination);
-            removeAll(destination);
+            destinationMap.removeAll(destination);
         }
     }
 
@@ -310,8 +322,8 @@ public class MultiKahaDBPersistenceAdapter extends DestinationMap implements
Per
     }
 
     @Override
-    public void start() throws Exception {
-        Object result = this.chooseValue(matchAll);
+    public void doStart() throws Exception {
+        Object result = destinationMap.chooseValue(matchAll);
         if (result != null) {
             FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter)
result;
             if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination())
{
@@ -378,13 +390,16 @@ public class MultiKahaDBPersistenceAdapter extends DestinationMap implements
Per
     private FilteredKahaDBPersistenceAdapter registerAdapter(KahaDBPersistenceAdapter adapter,
ActiveMQDestination destination) {
         adapters.add(adapter);
         FilteredKahaDBPersistenceAdapter result = new FilteredKahaDBPersistenceAdapter(destination,
adapter);
-        put(destination, result);
+        destinationMap.put(destination, result);
         return result;
     }
 
     private void configureAdapter(KahaDBPersistenceAdapter adapter) {
         // need a per store factory that will put the store in the branch qualifier to disiambiguate
xid mbeans
         adapter.getStore().setTransactionIdTransformer(transactionIdTransformer);
+        if (isUseLock()) {
+            adapter.setUseLock(false);
+        }
         adapter.setBrokerService(getBrokerService());
     }
 
@@ -397,9 +412,9 @@ public class MultiKahaDBPersistenceAdapter extends DestinationMap implements
Per
     }
 
     @Override
-    public void stop() throws Exception {
+    protected void doStop(ServiceStopper stopper) throws Exception {
         for (PersistenceAdapter persistenceAdapter : adapters) {
-            persistenceAdapter.stop();
+            stopper.stop(persistenceAdapter);
         }
     }
 
@@ -414,6 +429,10 @@ public class MultiKahaDBPersistenceAdapter extends DestinationMap implements
Per
     }
 
     @Override
+    public void init() throws Exception {
+    }
+
+    @Override
     public void setBrokerService(BrokerService brokerService) {
         for (KahaDBPersistenceAdapter persistenceAdapter : adapters) {
             persistenceAdapter.setBrokerService(brokerService);
@@ -464,4 +483,11 @@ public class MultiKahaDBPersistenceAdapter extends DestinationMap implements
Per
         String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
         return "MultiKahaDBPersistenceAdapter[" + path + "]" + adapters;
     }
+
+    @Override
+    public Locker createDefaultLocker() throws IOException {
+        SharedFileLocker locker = new SharedFileLocker();
+        locker.configure(this);
+        return locker;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/0f90695d/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/mKahaDbQueueMasterSlaveTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/mKahaDbQueueMasterSlaveTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/mKahaDbQueueMasterSlaveTest.java
new file mode 100644
index 0000000..ad9cca1
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/mKahaDbQueueMasterSlaveTest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.ft;
+
+import java.net.URI;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
+
+public class mKahaDbQueueMasterSlaveTest extends QueueMasterSlaveTestSupport {
+    protected String MASTER_URL = "tcp://localhost:62001";
+    protected String SLAVE_URL  = "tcp://localhost:62002";
+
+    protected void createMaster() throws Exception {
+        master = new BrokerService();
+        master.setBrokerName("master");
+        master.addConnector(MASTER_URL);
+        master.setUseJmx(false);
+        master.setPersistent(true);
+        master.setDeleteAllMessagesOnStartup(true);
+
+        MultiKahaDBPersistenceAdapter mKahaDB = new MultiKahaDBPersistenceAdapter();
+        List adapters = new LinkedList<FilteredKahaDBPersistenceAdapter>();
+        FilteredKahaDBPersistenceAdapter defaultEntry = new FilteredKahaDBPersistenceAdapter();
+        defaultEntry.setPersistenceAdapter(new KahaDBPersistenceAdapter());
+        defaultEntry.setPerDestination(true);
+        adapters.add(defaultEntry);
+
+        mKahaDB.setFilteredPersistenceAdapters(adapters);
+        master.setPersistenceAdapter(mKahaDB);
+
+        master.start();
+    }
+
+    protected void createSlave() throws Exception {
+        // use a separate thread as the slave will block waiting for
+        // the exclusive db lock
+        Thread t = new Thread() {
+            public void run() {
+                try {
+                    BrokerService broker = new BrokerService();
+                    broker.setBrokerName("slave");
+                    TransportConnector connector = new TransportConnector();
+                    connector.setUri(new URI(SLAVE_URL));
+                    broker.addConnector(connector);
+                    // no need for broker.setMasterConnectorURI(masterConnectorURI)
+                    // as the db lock provides the slave/master initialisation
+                    broker.setUseJmx(false);
+                    broker.setPersistent(true);
+
+                    MultiKahaDBPersistenceAdapter mKahaDB = new MultiKahaDBPersistenceAdapter();
+                    List adapters = new LinkedList<FilteredKahaDBPersistenceAdapter>();
+                    FilteredKahaDBPersistenceAdapter defaultEntry = new FilteredKahaDBPersistenceAdapter();
+                    defaultEntry.setPersistenceAdapter(new KahaDBPersistenceAdapter());
+                    defaultEntry.setPerDestination(true);
+                    adapters.add(defaultEntry);
+
+                    mKahaDB.setFilteredPersistenceAdapters(adapters);
+                    broker.setPersistenceAdapter(mKahaDB);
+                    broker.start();
+                    slave.set(broker);
+                    slaveStarted.countDown();
+                } catch (IllegalStateException expectedOnShutdown) {
+                } catch (Exception e) {
+                    fail("failed to start slave broker, reason:" + e);
+                }
+            }
+        };
+        t.start();
+    }
+}


Mime
View raw message