activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject git commit: Make the mKahaDB store agnostic to the nested persistence adapter type.
Date Fri, 27 Sep 2013 08:26:32 GMT
Updated Branches:
  refs/heads/trunk 74dafd7f2 -> 21fe8cac7


Make the mKahaDB store agnostic to the nested persistence adapter type.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/21fe8cac
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/21fe8cac
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/21fe8cac

Branch: refs/heads/trunk
Commit: 21fe8cac7d05247c381fc351596c3a56b89a03cd
Parents: 74dafd7
Author: Hiram Chirino <hiram@hiramchirino.com>
Authored: Fri Sep 27 04:25:29 2013 -0400
Committer: Hiram Chirino <hiram@hiramchirino.com>
Committed: Fri Sep 27 04:26:17 2013 -0400

----------------------------------------------------------------------
 .../store/TransactionIdTransformer.java         |  23 +++
 .../store/TransactionIdTransformerAware.java    |  23 +++
 .../FilteredKahaDBPersistenceAdapter.java       |   9 +-
 .../store/kahadb/KahaDBPersistenceAdapter.java  |  13 +-
 .../activemq/store/kahadb/KahaDBStore.java      |  18 +--
 .../store/kahadb/KahaDBTransactionStore.java    |   2 +-
 .../kahadb/MultiKahaDBPersistenceAdapter.java   | 160 ++++++++++---------
 .../kahadb/MultiKahaDBTransactionStore.java     |  14 +-
 .../store/kahadb/TransactionIdTransformer.java  |  24 ---
 9 files changed, 151 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/21fe8cac/activemq-broker/src/main/java/org/apache/activemq/store/TransactionIdTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/TransactionIdTransformer.java
b/activemq-broker/src/main/java/org/apache/activemq/store/TransactionIdTransformer.java
new file mode 100644
index 0000000..b66d1e3
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/TransactionIdTransformer.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store;
+
+import org.apache.activemq.command.TransactionId;
+
+public interface TransactionIdTransformer {
+    TransactionId transform(TransactionId txid);
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/21fe8cac/activemq-broker/src/main/java/org/apache/activemq/store/TransactionIdTransformerAware.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/TransactionIdTransformerAware.java
b/activemq-broker/src/main/java/org/apache/activemq/store/TransactionIdTransformerAware.java
new file mode 100644
index 0000000..3953196
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/TransactionIdTransformerAware.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store;
+
+/**
+ */
+public interface TransactionIdTransformerAware {
+    void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer);
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/21fe8cac/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java
index 0f833d0..10d0023 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java
@@ -18,29 +18,30 @@ package org.apache.activemq.store.kahadb;
 
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.filter.DestinationMapEntry;
+import org.apache.activemq.store.PersistenceAdapter;
 
 /**
  * @org.apache.xbean.XBean element="filteredKahaDB"
  *
  */
 public class FilteredKahaDBPersistenceAdapter extends DestinationMapEntry {
-    private KahaDBPersistenceAdapter persistenceAdapter;
+    private PersistenceAdapter persistenceAdapter;
     private boolean perDestination;
 
     public FilteredKahaDBPersistenceAdapter() {
         super();
     }
 
-    public FilteredKahaDBPersistenceAdapter(ActiveMQDestination destination, KahaDBPersistenceAdapter
adapter) {
+    public FilteredKahaDBPersistenceAdapter(ActiveMQDestination destination, PersistenceAdapter
adapter) {
         setDestination(destination);
         persistenceAdapter  = adapter;
     }
 
-    public KahaDBPersistenceAdapter getPersistenceAdapter() {
+    public PersistenceAdapter getPersistenceAdapter() {
         return persistenceAdapter;
     }
 
-    public void setPersistenceAdapter(KahaDBPersistenceAdapter persistenceAdapter) {
+    public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) {
         this.persistenceAdapter = persistenceAdapter;
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/21fe8cac/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
index 7f77d3e..9bfbd83 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
@@ -39,12 +39,7 @@ import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.store.JournaledStore;
-import org.apache.activemq.store.MessageStore;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.SharedFileLocker;
-import org.apache.activemq.store.TopicMessageStore;
-import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.*;
 import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
 import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
 import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
@@ -58,7 +53,7 @@ import org.apache.activemq.util.ServiceStopper;
  * @org.apache.xbean.XBean element="kahaDB"
  *
  */
-public class KahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter,
JournaledStore {
+public class KahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter,
JournaledStore, TransactionIdTransformerAware {
     private final KahaDBStore letter = new KahaDBStore();
 
     /**
@@ -655,4 +650,8 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
         return "KahaDBPersistenceAdapter[" + path + "]";
     }
 
+    @Override
+    public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer)
{
+        getStore().setTransactionIdTransformer(transactionIdTransformer);
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/21fe8cac/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index f8e2c5f..0c269b4 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -56,12 +56,7 @@ import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.store.AbstractMessageStore;
-import org.apache.activemq.store.MessageRecoveryListener;
-import org.apache.activemq.store.MessageStore;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.TopicMessageStore;
-import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.*;
 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
 import org.apache.activemq.store.kahadb.data.KahaDestination;
 import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
@@ -69,7 +64,6 @@ import org.apache.activemq.store.kahadb.data.KahaLocation;
 import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
 import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
 import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
-import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
 import org.apache.activemq.store.kahadb.disk.journal.Location;
 import org.apache.activemq.store.kahadb.disk.page.Transaction;
 import org.apache.activemq.usage.MemoryUsage;
@@ -114,8 +108,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter
{
         this.transactionStore = new KahaDBTransactionStore(this);
         this.transactionIdTransformer = new TransactionIdTransformer() {
             @Override
-            public KahaTransactionInfo transform(TransactionId txid) {
-                return TransactionIdConversion.convert(txid);
+            public TransactionId transform(TransactionId txid) {
+                return txid;
             }
         };
     }
@@ -462,7 +456,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter
{
             KahaAddMessageCommand command = new KahaAddMessageCommand();
             command.setDestination(dest);
             command.setMessageId(message.getMessageId().toProducerKey());
-            command.setTransactionInfo(transactionIdTransformer.transform(message.getTransactionId()));
+            command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(message.getTransactionId())));
             command.setPriority(message.getPriority());
             command.setPrioritySupported(isPrioritizedMessages());
             org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
@@ -476,7 +470,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter
{
             KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
             command.setDestination(dest);
             command.setMessageId(ack.getLastMessageId().toProducerKey());
-            command.setTransactionInfo(transactionIdTransformer.transform(ack.getTransactionId()));
+            command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())));
 
             org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
             command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
@@ -760,7 +754,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter
{
             command.setDestination(dest);
             command.setSubscriptionKey(subscriptionKey);
             command.setMessageId(messageId.toProducerKey());
-            command.setTransactionInfo(ack != null ? transactionIdTransformer.transform(ack.getTransactionId())
: null);
+            command.setTransactionInfo(ack != null ? TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId()))
: null);
             if (ack != null && ack.isUnmatchedAck()) {
                 command.setAck(UNMATCHED);
             } else {

http://git-wip-us.apache.org/repos/asf/activemq/blob/21fe8cac/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
index 97a4bb5..7bcb99a 100755
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
@@ -525,6 +525,6 @@ public class KahaDBTransactionStore implements TransactionStore {
 
 
     private KahaTransactionInfo getTransactionInfo(TransactionId txid) {
-        return theStore.getTransactionIdTransformer().transform(txid);
+        return TransactionIdConversion.convert(theStore.getTransactionIdTransformer().transform(txid));
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/21fe8cac/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
index 4cda2a1..4d00c5b 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
@@ -16,47 +16,27 @@
  */
 package org.apache.activemq.store.kahadb;
 
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.BrokerServiceAware;
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.LockableServiceSupport;
-import org.apache.activemq.broker.Locker;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.LocalTransactionId;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.broker.*;
+import org.apache.activemq.command.*;
 import org.apache.activemq.filter.AnyDestination;
 import org.apache.activemq.filter.DestinationMap;
 import org.apache.activemq.filter.DestinationMapEntry;
-import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.store.MessageStore;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.SharedFileLocker;
-import org.apache.activemq.store.TopicMessageStore;
-import org.apache.activemq.store.TransactionStore;
-import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
-import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
+import org.apache.activemq.store.*;
 import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.ServiceStopper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.transaction.xa.Xid;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.*;
+
 /**
  * An implementation of {@link org.apache.activemq.store.PersistenceAdapter}  that supports
  * distribution of destinations across multiple kahaDB persistence adapters
@@ -77,7 +57,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport
implem
     final DelegateDestinationMap destinationMap = new DelegateDestinationMap();
 
     BrokerService brokerService;
-    List<KahaDBPersistenceAdapter> adapters = new LinkedList<KahaDBPersistenceAdapter>();
+    List<PersistenceAdapter> adapters = new LinkedList<PersistenceAdapter>();
     private File directory = new File(IOHelper.getDefaultDataDirectory() + File.separator
+ "mKahaDB");
 
     MultiKahaDBTransactionStore transactionStore = new MultiKahaDBTransactionStore(this);
@@ -85,25 +65,31 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport
implem
     // all local store transactions are XA, 2pc if more than one adapter involved
     TransactionIdTransformer transactionIdTransformer = new TransactionIdTransformer() {
         @Override
-        public KahaTransactionInfo transform(TransactionId txid) {
+        public TransactionId transform(TransactionId txid) {
             if (txid == null) {
                 return null;
             }
-            KahaTransactionInfo rc = new KahaTransactionInfo();
-            KahaXATransactionId kahaTxId = new KahaXATransactionId();
             if (txid.isLocalTransaction()) {
-                LocalTransactionId t = (LocalTransactionId) txid;
-                kahaTxId.setBranchQualifier(new Buffer(Long.toString(t.getValue()).getBytes(Charset.forName("utf-8"))));
-                kahaTxId.setGlobalTransactionId(new Buffer(t.getConnectionId().getValue().getBytes(Charset.forName("utf-8"))));
-                kahaTxId.setFormatId(LOCAL_FORMAT_ID_MAGIC);
+                final LocalTransactionId t = (LocalTransactionId) txid;
+                return new XATransactionId(new Xid() {
+                    @Override
+                    public int getFormatId() {
+                        return LOCAL_FORMAT_ID_MAGIC;
+                    }
+
+                    @Override
+                    public byte[] getGlobalTransactionId() {
+                        return t.getConnectionId().getValue().getBytes(Charset.forName("utf-8"));
+                    }
+
+                    @Override
+                    public byte[] getBranchQualifier() {
+                        return Long.toString(t.getValue()).getBytes(Charset.forName("utf-8"));
+                    }
+                });
             } else {
-                XATransactionId t = (XATransactionId) txid;
-                kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
-                kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
-                kahaTxId.setFormatId(t.getFormatId());
+                return txid;
             }
-            rc.setXaTransactionId(kahaTxId);
-            return rc;
         }
     };
 
@@ -116,7 +102,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport
implem
     public void setFilteredPersistenceAdapters(List entries) {
         for (Object entry : entries) {
             FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter)
entry;
-            KahaDBPersistenceAdapter adapter = filteredAdapter.getPersistenceAdapter();
+            PersistenceAdapter adapter = filteredAdapter.getPersistenceAdapter();
             if (filteredAdapter.getDestination() == null) {
                 filteredAdapter.setDestination(matchAll);
             }
@@ -172,7 +158,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport
implem
         return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createQueueMessageStore(destination));
     }
 
-    private PersistenceAdapter getMatchingPersistenceAdapter(ActiveMQDestination destination)
{
+    private PersistenceAdapter getMatchingPersistenceAdapter(ActiveMQDestination destination)
throws IOException {
         Object result = destinationMap.chooseValue(destination);
         if (result == null) {
             throw new RuntimeException("No matching persistence adapter configured for destination:
" + destination + ", options:" + adapters);
@@ -188,7 +174,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport
implem
         return filteredAdapter.getPersistenceAdapter();
     }
 
-    private void startAdapter(KahaDBPersistenceAdapter kahaDBPersistenceAdapter, String destination)
{
+    private void startAdapter(PersistenceAdapter kahaDBPersistenceAdapter, String destination)
{
         try {
             kahaDBPersistenceAdapter.start();
         } catch (Exception e) {
@@ -198,7 +184,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport
implem
         }
     }
 
-    private void stopAdapter(KahaDBPersistenceAdapter kahaDBPersistenceAdapter, String destination)
{
+    private void stopAdapter(PersistenceAdapter kahaDBPersistenceAdapter, String destination)
{
         try {
             kahaDBPersistenceAdapter.stop();
         } catch (Exception e) {
@@ -257,25 +243,35 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport
implem
 
     @Override
     public void removeQueueMessageStore(ActiveMQQueue destination) {
-        PersistenceAdapter adapter = getMatchingPersistenceAdapter(destination);
-        if (adapter instanceof KahaDBPersistenceAdapter) {
+        PersistenceAdapter adapter = null;
+        try {
+            adapter = getMatchingPersistenceAdapter(destination);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        if (adapter instanceof PersistenceAdapter) {
             adapter.removeQueueMessageStore(destination);
-            removeMessageStore((KahaDBPersistenceAdapter)adapter, destination);
+            removeMessageStore((PersistenceAdapter)adapter, destination);
             destinationMap.removeAll(destination);
         }
     }
 
     @Override
     public void removeTopicMessageStore(ActiveMQTopic destination) {
-        PersistenceAdapter adapter = getMatchingPersistenceAdapter(destination);
-        if (adapter instanceof KahaDBPersistenceAdapter) {
+        PersistenceAdapter adapter = null;
+        try {
+            adapter = getMatchingPersistenceAdapter(destination);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        if (adapter instanceof PersistenceAdapter) {
             adapter.removeTopicMessageStore(destination);
-            removeMessageStore((KahaDBPersistenceAdapter)adapter, destination);
+            removeMessageStore((PersistenceAdapter)adapter, destination);
             destinationMap.removeAll(destination);
         }
     }
 
-    private void removeMessageStore(KahaDBPersistenceAdapter adapter, ActiveMQDestination
destination) {
+    private void removeMessageStore(PersistenceAdapter adapter, ActiveMQDestination destination)
{
         if (adapter.getDestinations().isEmpty()) {
             stopAdapter(adapter, destination.toString());
             File adapterDir = adapter.getDirectory();
@@ -335,7 +331,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport
implem
         }
     }
 
-    private void findAndRegisterExistingAdapters(FilteredKahaDBPersistenceAdapter template)
{
+    private void findAndRegisterExistingAdapters(FilteredKahaDBPersistenceAdapter template)
throws IOException {
         FileFilter destinationNames = new FileFilter() {
             @Override
             public boolean accept(File file) {
@@ -350,8 +346,8 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport
implem
         }
     }
 
-    private void registerExistingAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter,
File candidate) {
-        KahaDBPersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(),
candidate.getName());
+    private void registerExistingAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter,
File candidate) throws IOException {
+        PersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(),
candidate.getName());
         startAdapter(adapter, candidate.getName());
         Set<ActiveMQDestination> destinations = adapter.getDestinations();
         if (destinations.size() != 0) {
@@ -361,19 +357,19 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport
implem
         }
     }
 
-    private FilteredKahaDBPersistenceAdapter addAdapter(FilteredKahaDBPersistenceAdapter
filteredAdapter, ActiveMQDestination destination) {
-        KahaDBPersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(),
nameFromDestinationFilter(destination));
+    private FilteredKahaDBPersistenceAdapter addAdapter(FilteredKahaDBPersistenceAdapter
filteredAdapter, ActiveMQDestination destination) throws IOException {
+        PersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(),
nameFromDestinationFilter(destination));
         return registerAdapter(adapter, destination);
     }
 
-    private KahaDBPersistenceAdapter adapterFromTemplate(KahaDBPersistenceAdapter template,
String destinationName) {
-        KahaDBPersistenceAdapter adapter = kahaDBFromTemplate(template);
+    private PersistenceAdapter adapterFromTemplate(PersistenceAdapter template, String destinationName)
throws IOException {
+        PersistenceAdapter adapter = kahaDBFromTemplate(template);
         configureAdapter(adapter);
         configureDirectory(adapter, destinationName);
         return adapter;
     }
 
-    private void configureDirectory(KahaDBPersistenceAdapter adapter, String fileName) {
+    private void configureDirectory(PersistenceAdapter adapter, String fileName) {
         File directory = null;
         if (MessageDatabase.DEFAULT_DIRECTORY.equals(adapter.getDirectory())) {
             // not set so inherit from mkahadb
@@ -387,28 +383,36 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport
implem
         adapter.setDirectory(directory);
     }
 
-    private FilteredKahaDBPersistenceAdapter registerAdapter(KahaDBPersistenceAdapter adapter,
ActiveMQDestination destination) {
+    private FilteredKahaDBPersistenceAdapter registerAdapter(PersistenceAdapter adapter,
ActiveMQDestination destination) {
         adapters.add(adapter);
         FilteredKahaDBPersistenceAdapter result = new FilteredKahaDBPersistenceAdapter(destination,
adapter);
         destinationMap.put(destination, result);
         return result;
     }
 
-    private void configureAdapter(KahaDBPersistenceAdapter adapter) {
+    private void configureAdapter(PersistenceAdapter adapter) {
         // need a per store factory that will put the store in the branch qualifier to disiambiguate
xid mbeans
-        adapter.getStore().setTransactionIdTransformer(transactionIdTransformer);
+        ((TransactionIdTransformerAware)adapter).setTransactionIdTransformer(transactionIdTransformer);
         if (isUseLock()) {
-            adapter.setUseLock(false);
+            if( adapter instanceof Lockable ) {
+                ((Lockable)adapter).setUseLock(false);
+            }
+        }
+        if( adapter instanceof BrokerServiceAware ) {
+            ((BrokerServiceAware)adapter).setBrokerService(getBrokerService());
         }
-        adapter.setBrokerService(getBrokerService());
     }
 
-    private KahaDBPersistenceAdapter kahaDBFromTemplate(KahaDBPersistenceAdapter template)
{
-        Map<String, Object> configuration = new HashMap<String, Object>();
-        IntrospectionSupport.getProperties(template, configuration, null);
-        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
-        IntrospectionSupport.setProperties(adapter, configuration);
-        return adapter;
+    private PersistenceAdapter kahaDBFromTemplate(PersistenceAdapter template) throws IOException
{
+        try {
+            Map<String, Object> configuration = new HashMap<String, Object>();
+            IntrospectionSupport.getProperties(template, configuration, null);
+            PersistenceAdapter adapter = template.getClass().newInstance();
+            IntrospectionSupport.setProperties(adapter, configuration);
+            return adapter;
+        } catch (Exception e) {
+            throw IOExceptionSupport.create(e);
+        }
     }
 
     @Override
@@ -434,8 +438,10 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport
implem
 
     @Override
     public void setBrokerService(BrokerService brokerService) {
-        for (KahaDBPersistenceAdapter persistenceAdapter : adapters) {
-            persistenceAdapter.setBrokerService(brokerService);
+        for (PersistenceAdapter persistenceAdapter : adapters) {
+            if( persistenceAdapter instanceof BrokerServiceAware ) {
+                ((BrokerServiceAware)persistenceAdapter).setBrokerService(getBrokerService());
+            }
         }
         this.brokerService = brokerService;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/21fe8cac/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
index 0e9d10d..a7d09f1 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
@@ -32,13 +32,7 @@ import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
-import org.apache.activemq.store.AbstractMessageStore;
-import org.apache.activemq.store.MessageStore;
-import org.apache.activemq.store.ProxyMessageStore;
-import org.apache.activemq.store.ProxyTopicMessageStore;
-import org.apache.activemq.store.TopicMessageStore;
-import org.apache.activemq.store.TransactionRecoveryListener;
-import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.*;
 import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
 import org.apache.activemq.store.kahadb.data.KahaEntryType;
 import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
@@ -237,11 +231,11 @@ public class MultiKahaDBTransactionStore implements TransactionStore
{
     }
 
     public void persistOutcome(Tx tx, TransactionId txid) throws IOException {
-        tx.trackPrepareLocation(store(new KahaPrepareCommand().setTransactionInfo(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid))));
+        tx.trackPrepareLocation(store(new KahaPrepareCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid)))));
     }
 
     public void persistCompletion(TransactionId txid) throws IOException {
-        store(new KahaCommitCommand().setTransactionInfo(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid)));
+        store(new KahaCommitCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid))));
     }
 
     private Location store(JournalCommand<?> data) throws IOException {
@@ -343,7 +337,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
 
     public synchronized void recover(final TransactionRecoveryListener listener) throws IOException
{
 
-        for (final KahaDBPersistenceAdapter adapter : multiKahaDBPersistenceAdapter.adapters)
{
+        for (final PersistenceAdapter adapter : multiKahaDBPersistenceAdapter.adapters) {
             adapter.createTransactionStore().recover(new TransactionRecoveryListener() {
                 @Override
                 public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[]
acks) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/21fe8cac/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TransactionIdTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TransactionIdTransformer.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TransactionIdTransformer.java
deleted file mode 100644
index d9ca828..0000000
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TransactionIdTransformer.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb;
-
-import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
-
-public interface TransactionIdTransformer {
-    KahaTransactionInfo transform(TransactionId txid);
-}


Mime
View raw message