activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1231979 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/kahadb/ test/java/org/apache/activemq/store/
Date Mon, 16 Jan 2012 12:47:46 GMT
Author: gtully
Date: Mon Jan 16 12:47:45 2012
New Revision: 1231979

URL: http://svn.apache.org/viewvc?rev=1231979&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3639 - Modify MKahaDB To Support Using One Adapter
Per Destination Without Explicity Listing Every Desintation In The Configuration. Add perDestination
boolean attribute to mKahaDb filtered adapter. When true, every destination will get its own
persistence adapter using the configured adapter as as template. So any config applied to
the destination less (default) adapter will be reused

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/AutoStorePerDestinationTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java?rev=1231979&r1=1231978&r2=1231979&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java
Mon Jan 16 12:47:45 2012
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.store.kahadb;
 
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.filter.DestinationMapEntry;
 
 /**
@@ -24,6 +25,16 @@ import org.apache.activemq.filter.Destin
  */
 public class FilteredKahaDBPersistenceAdapter extends DestinationMapEntry {
     private KahaDBPersistenceAdapter persistenceAdapter;
+    private boolean perDestination;
+
+    public FilteredKahaDBPersistenceAdapter() {
+        super();
+    }
+
+    public FilteredKahaDBPersistenceAdapter(ActiveMQDestination destination, KahaDBPersistenceAdapter
adapter) {
+        setDestination(destination);
+        persistenceAdapter  = adapter;
+    }
 
     public KahaDBPersistenceAdapter getPersistenceAdapter() {
         return persistenceAdapter;
@@ -37,4 +48,12 @@ public class FilteredKahaDBPersistenceAd
     public void afterPropertiesSet() throws Exception {
         // ok to have no destination, we default it
     }
+
+    public boolean isPerDestination() {
+        return perDestination;
+    }
+
+    public void setPerDestination(boolean perDestination) {
+        this.perDestination = perDestination;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=1231979&r1=1231978&r2=1231979&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
Mon Jan 16 12:47:45 2012
@@ -241,7 +241,7 @@ public class KahaDBPersistenceAdapter im
     }
     
     public int getFailoverProducersAuditDepth() {
-        return this.getFailoverProducersAuditDepth();
+        return this.letter.getFailoverProducersAuditDepth();
     }
     
     /**
@@ -558,7 +558,7 @@ public class KahaDBPersistenceAdapter im
     }
 
     public boolean isEnableIndexPageCaching() {
-        return isEnableIndexPageCaching();
+        return letter.isEnableIndexPageCaching();
     }
 
     public KahaDBStore getStore() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java?rev=1231979&r1=1231978&r2=1231979&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
Mon Jan 16 12:47:45 2012
@@ -17,11 +17,14 @@
 package org.apache.activemq.store.kahadb;
 
 import java.io.File;
+import java.io.FileFilter;
 import java.io.IOException;
 import java.nio.charset.Charset;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
@@ -44,6 +47,7 @@ import org.apache.activemq.store.kahadb.
 import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.IntrospectionSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -103,12 +107,16 @@ public class MultiKahaDBPersistenceAdapt
             if (filteredAdapter.getDestination() == null) {
                 filteredAdapter.setDestination(matchAll);
             }
-            if (MessageDatabase.DEFAULT_DIRECTORY.equals(adapter.getDirectory())) {
-                adapter.setDirectory(new File(getDirectory(), nameFromDestinationFilter(filteredAdapter.getDestination())));
+
+            if (filteredAdapter.isPerDestination()) {
+                configureDirectory(adapter, null);
+                // per destination adapters will be created on demand or during recovery
+                continue;
+            } else {
+                configureDirectory(adapter, nameFromDestinationFilter(filteredAdapter.getDestination()));
             }
 
-            // need a per store factory that will put the store in the branch qualifier to
disiambiguate xid mbeans
-            adapter.getStore().setTransactionIdTransformer(transactionIdTransformer);
+            configureAdapter(adapter);
             adapters.add(adapter);
         }
         super.setEntries(entries);
@@ -147,9 +155,27 @@ public class MultiKahaDBPersistenceAdapt
         if (result == null) {
             throw new RuntimeException("No matching persistence adapter configured for destination:
" + destination + ", options:" + adapters);
         }
+        FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter)
result;
+        if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination())
{
+            result = addAdapter(filteredAdapter, destination);
+            startAdapter(((FilteredKahaDBPersistenceAdapter) result).getPersistenceAdapter(),
destination.getQualifiedName());
+            if (LOG.isTraceEnabled()) {
+                LOG.info("created per destination adapter for: " + destination  + ", " +
result);
+            }
+        }
         return ((FilteredKahaDBPersistenceAdapter) result).getPersistenceAdapter();
     }
 
+    private void startAdapter(KahaDBPersistenceAdapter kahaDBPersistenceAdapter, String destination)
{
+        try {
+            kahaDBPersistenceAdapter.start();
+        } catch (Exception e) {
+            RuntimeException detail = new RuntimeException("Failed to start per destination
persistence adapter for destination: " + destination + ", options:" + adapters, e);
+            LOG.error(detail.toString(), e);
+            throw detail;
+        }
+    }
+
     public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException
{
         PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination);
         return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createTopicMessageStore(destination));
@@ -164,6 +190,7 @@ public class MultiKahaDBPersistenceAdapt
             persistenceAdapter.deleteAllMessages();
         }
         transactionStore.deleteAllMessages();
+        IOHelper.deleteChildren(getDirectory());
     }
 
     public Set<ActiveMQDestination> getDestinations() {
@@ -223,11 +250,86 @@ public class MultiKahaDBPersistenceAdapt
     }
 
     public void start() throws Exception {
+        Object result = this.chooseValue(matchAll);
+        if (result != null) {
+            FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter)
result;
+            if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination())
{
+                findAndRegisterExistingAdapters(filteredAdapter);
+            }
+        }
         for (PersistenceAdapter persistenceAdapter : adapters) {
             persistenceAdapter.start();
         }
     }
 
+    private void findAndRegisterExistingAdapters(FilteredKahaDBPersistenceAdapter template)
{
+        FileFilter destinationNames = new FileFilter() {
+            @Override
+            public boolean accept(File file) {
+                return file.getName().startsWith("queue#") || file.getName().startsWith("topic#");
+            }
+        };
+        File[] candidates = template.getPersistenceAdapter().getDirectory().listFiles(destinationNames);
+        if (candidates != null) {
+            for (File candidate : candidates) {
+                registerExistingAdapter(template, candidate);
+            }
+        }
+    }
+
+    private void registerExistingAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter,
File candidate) {
+        KahaDBPersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(),
candidate.getName());
+        startAdapter(adapter, candidate.getName());
+        registerAdapter(adapter, adapter.getDestinations().toArray(new ActiveMQDestination[]{})[0]);
+    }
+
+    private FilteredKahaDBPersistenceAdapter addAdapter(FilteredKahaDBPersistenceAdapter
filteredAdapter, ActiveMQDestination destination) {
+        KahaDBPersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(),
nameFromDestinationFilter(destination));
+        return registerAdapter(adapter, destination);
+    }
+
+    private KahaDBPersistenceAdapter adapterFromTemplate(KahaDBPersistenceAdapter template,
String destinationName) {
+        KahaDBPersistenceAdapter adapter = kahaDBFromTemplate(template);
+        configureAdapter(adapter);
+        configureDirectory(adapter, destinationName);
+        return adapter;
+    }
+
+    private void configureDirectory(KahaDBPersistenceAdapter adapter, String fileName) {
+        File directory = null;
+        if (MessageDatabase.DEFAULT_DIRECTORY.equals(adapter.getDirectory())) {
+            // not set so inherit from mkahadb
+            directory = getDirectory();
+        } else {
+            directory = adapter.getDirectory();
+        }
+        if (fileName != null) {
+            directory = new File(directory, fileName);
+        }
+        adapter.setDirectory(directory);
+    }
+
+    private FilteredKahaDBPersistenceAdapter registerAdapter(KahaDBPersistenceAdapter adapter,
ActiveMQDestination destination) {
+        adapters.add(adapter);
+        FilteredKahaDBPersistenceAdapter result = new FilteredKahaDBPersistenceAdapter(destination,
adapter);
+        put(destination, result);
+        return result;
+    }
+
+    private void configureAdapter(KahaDBPersistenceAdapter adapter) {
+        // need a per store factory that will put the store in the branch qualifier to disiambiguate
xid mbeans
+        adapter.getStore().setTransactionIdTransformer(transactionIdTransformer);
+        adapter.setBrokerService(getBrokerService());
+    }
+
+    private KahaDBPersistenceAdapter kahaDBFromTemplate(KahaDBPersistenceAdapter template)
{
+        Map<String, Object> configuration = new HashMap<String, Object>();
+        IntrospectionSupport.getProperties(template, configuration, null);
+        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+        IntrospectionSupport.setProperties(adapter, configuration);
+        return adapter;
+    }
+
     public void stop() throws Exception {
         for (PersistenceAdapter persistenceAdapter : adapters) {
             persistenceAdapter.stop();
@@ -284,7 +386,7 @@ public class MultiKahaDBPersistenceAdapt
         transactionStore.setJournalMaxWriteBatchSize(journalWriteBatchSize);
     }
 
-    public int getJournalMaxWriteBatchSize() {
+    public int getJournalWriteBatchSize() {
         return transactionStore.getJournalMaxWriteBatchSize();
     }
 

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/AutoStorePerDestinationTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/AutoStorePerDestinationTest.java?rev=1231979&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/AutoStorePerDestinationTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/AutoStorePerDestinationTest.java
Mon Jan 16 12:47:45 2012
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store;
+
+import java.util.ArrayList;
+import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
+
+public class AutoStorePerDestinationTest extends StorePerDestinationTest {
+
+    // use perDestinationFlag to get multiple stores from one match all adapter
+    public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws Exception {
+
+        MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter();
+        if (deleteAllMessages) {
+            multiKahaDBPersistenceAdapter.deleteAllMessages();
+        }
+        ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new ArrayList<FilteredKahaDBPersistenceAdapter>();
+
+        FilteredKahaDBPersistenceAdapter template = new FilteredKahaDBPersistenceAdapter();
+        template.setPersistenceAdapter(createStore(deleteAllMessages));
+        template.setPerDestination(true);
+        adapters.add(template);
+
+        multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
+        brokerService  = createBroker(multiKahaDBPersistenceAdapter);
+    }
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/AutoStorePerDestinationTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/AutoStorePerDestinationTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java?rev=1231979&r1=1231978&r2=1231979&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java
Mon Jan 16 12:47:45 2012
@@ -62,7 +62,7 @@ public class StorePerDestinationTest  {
         return broker;
     }
 
-    private KahaDBPersistenceAdapter createStore(boolean delete) throws IOException {
+    protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException {
         KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
         kaha.setJournalMaxFileLength(maxFileLength);
         kaha.setCleanupInterval(5000);
@@ -199,7 +199,7 @@ public class StorePerDestinationTest  {
         multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
 
         assertEquals(multiKahaDBPersistenceAdapter.getDirectory(), storeDefault.getDirectory().getParentFile());
-        assertEquals(someOtherDisk, otherStore.getDirectory());
+        assertEquals(someOtherDisk, otherStore.getDirectory().getParentFile());
     }
 
     @Test



Mime
View raw message