activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1170201 [3/3] - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/filter/ activemq-core/src/main/java/org/apache/activemq/store/kahadb/ activemq-core/src/test/java/org/apache/activemq/broker/ activemq-core/src/test/java/o...
Date Tue, 13 Sep 2011 15:01:38 GMT
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java?rev=1170201&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java Tue Sep 13 15:01:37 2011
@@ -0,0 +1,419 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.store.AbstractMessageStore;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.ProxyMessageStore;
+import org.apache.activemq.store.ProxyTopicMessageStore;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TransactionRecoveryListener;
+import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
+import org.apache.activemq.store.kahadb.data.KahaEntryType;
+import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
+import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
+import org.apache.activemq.util.IOHelper;
+import org.apache.kahadb.journal.Journal;
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.util.DataByteArrayInputStream;
+import org.apache.kahadb.util.DataByteArrayOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MultiKahaDBTransactionStore implements TransactionStore {
+    static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBTransactionStore.class);
+    final MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter;
+    final ConcurrentHashMap<TransactionId, Tx> inflightTransactions = new ConcurrentHashMap<TransactionId, Tx>();
+    final Set<TransactionId> recoveredPendingCommit = new HashSet<TransactionId>();
+    private Journal journal;
+    private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
+    private int journalWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
+
+    public MultiKahaDBTransactionStore(MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter) {
+        this.multiKahaDBPersistenceAdapter = multiKahaDBPersistenceAdapter;
+    }
+
+    public MessageStore proxy(final TransactionStore transactionStore, MessageStore messageStore) {
+        return new ProxyMessageStore(messageStore) {
+            @Override
+            public void addMessage(ConnectionContext context, final Message send) throws IOException {
+                MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
+            }
+
+            @Override
+            public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
+                return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message);
+            }
+
+            @Override
+            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
+                MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack);
+            }
+
+            @Override
+            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
+                MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack);
+            }
+        };
+    }
+
+    public TopicMessageStore proxy(final TransactionStore transactionStore, final TopicMessageStore messageStore) {
+        return new ProxyTopicMessageStore(messageStore) {
+            @Override
+            public void addMessage(ConnectionContext context, final Message send) throws IOException {
+                MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
+            }
+
+            @Override
+            public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
+                return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message);
+            }
+
+            @Override
+            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
+                MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack);
+            }
+
+            @Override
+            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
+                MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack);
+            }
+
+            @Override
+            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
+                                    MessageId messageId, MessageAck ack) throws IOException {
+                MultiKahaDBTransactionStore.this.acknowledge(transactionStore, context, (TopicMessageStore) getDelegate(), clientId,
+                        subscriptionName, messageId, ack);
+            }
+        };
+    }
+
+    public void deleteAllMessages() {
+        IOHelper.deleteChildren(getDirectory());
+    }
+
+    public int getJournalMaxFileLength() {
+        return journalMaxFileLength;
+    }
+
+    public void setJournalMaxFileLength(int journalMaxFileLength) {
+        this.journalMaxFileLength = journalMaxFileLength;
+    }
+
+    public int getJournalMaxWriteBatchSize() {
+        return journalWriteBatchSize;
+    }
+
+    public void setJournalMaxWriteBatchSize(int journalWriteBatchSize) {
+        this.journalWriteBatchSize = journalWriteBatchSize;
+    }
+
+    public class Tx {
+        private final Set<TransactionStore> stores = new HashSet<TransactionStore>();
+        private int prepareLocationId = 0;
+
+        public void trackStore(TransactionStore store) {
+            stores.add(store);
+        }
+
+        public Set<TransactionStore> getStores() {
+            return stores;
+        }
+
+        public void trackPrepareLocation(Location location) {
+            this.prepareLocationId = location.getDataFileId();
+        }
+
+        public int getPreparedLocationId() {
+            return prepareLocationId;
+        }
+    }
+
+    public Tx getTx(TransactionId txid) {
+        Tx tx = inflightTransactions.get(txid);
+        if (tx == null) {
+            tx = new Tx();
+            inflightTransactions.put(txid, tx);
+        }
+        return tx;
+    }
+
+    public Tx removeTx(TransactionId txid) {
+        return inflightTransactions.remove(txid);
+    }
+
+    public void prepare(TransactionId txid) throws IOException {
+        Tx tx = getTx(txid);
+        for (TransactionStore store : tx.getStores()) {
+            store.prepare(txid);
+        }
+    }
+
+    public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit)
+            throws IOException {
+
+        if (preCommit != null) {
+            preCommit.run();
+        }
+
+        Tx tx = getTx(txid);
+        if (wasPrepared) {
+            for (TransactionStore store : tx.getStores()) {
+                store.commit(txid, true, null, null);
+            }
+        } else {
+            // can only do 1pc on a single store
+            if (tx.getStores().size() == 1) {
+                for (TransactionStore store : tx.getStores()) {
+                    store.commit(txid, false, null, null);
+                }
+            } else {
+                // need to do local 2pc
+                for (TransactionStore store : tx.getStores()) {
+                    store.prepare(txid);
+                }
+                persistOutcome(tx, txid);
+                for (TransactionStore store : tx.getStores()) {
+                    store.commit(txid, true, null, null);
+                }
+                persistCompletion(txid);
+            }
+        }
+        removeTx(txid);
+        if (postCommit != null) {
+            postCommit.run();
+        }
+    }
+
+    public void persistOutcome(Tx tx, TransactionId txid) throws IOException {
+        tx.trackPrepareLocation(store(new KahaPrepareCommand().setTransactionInfo(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid))));
+    }
+
+    public void persistCompletion(TransactionId txid) throws IOException {
+        store(new KahaCommitCommand().setTransactionInfo(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid)));
+    }
+
+    private Location store(JournalCommand<?> data) throws IOException {
+        int size = data.serializedSizeFramed();
+        DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
+        os.writeByte(data.type().getNumber());
+        data.writeFramed(os);
+        Location location = journal.write(os.toByteSequence(), true);
+        journal.setLastAppendLocation(location);
+        return location;
+    }
+
+    public void rollback(TransactionId txid) throws IOException {
+        Tx tx = removeTx(txid);
+        if (tx != null) {
+            for (TransactionStore store : tx.getStores()) {
+                store.rollback(txid);
+            }
+        }
+    }
+
+    public void start() throws Exception {
+        journal = new Journal() {
+            @Override
+            protected void cleanup() {
+                super.cleanup();
+                txStoreCleanup();
+            }
+        };
+        journal.setDirectory(getDirectory());
+        journal.setMaxFileLength(journalMaxFileLength);
+        journal.setWriteBatchSize(journalWriteBatchSize);
+        IOHelper.mkdirs(journal.getDirectory());
+        journal.start();
+        recoverPendingLocalTransactions();
+        store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
+    }
+
+    private void txStoreCleanup() {
+        Set<Integer> knownDataFileIds = new TreeSet<Integer>(journal.getFileMap().keySet());
+        for (Tx tx : inflightTransactions.values()) {
+            knownDataFileIds.remove(tx.getPreparedLocationId());
+        }
+        try {
+            journal.removeDataFiles(knownDataFileIds);
+        } catch (Exception e) {
+            LOG.error(this + ", Failed to remove tx journal datafiles " + knownDataFileIds);
+        }
+    }
+
+    private File getDirectory() {
+        return new File(multiKahaDBPersistenceAdapter.getDirectory(), "txStore");
+    }
+
+    public void stop() throws Exception {
+        journal.close();
+        journal = null;
+    }
+
+    private void recoverPendingLocalTransactions() throws IOException {
+        Location location = journal.getNextLocation(null);
+        while (location != null) {
+            process(load(location));
+            location = journal.getNextLocation(location);
+        }
+        recoveredPendingCommit.addAll(inflightTransactions.keySet());
+        LOG.info("pending local transactions: " + recoveredPendingCommit);
+    }
+
+    public JournalCommand<?> load(Location location) throws IOException {
+        DataByteArrayInputStream is = new DataByteArrayInputStream(journal.read(location));
+        byte readByte = is.readByte();
+        KahaEntryType type = KahaEntryType.valueOf(readByte);
+        if (type == null) {
+            throw new IOException("Could not load journal record. Invalid location: " + location);
+        }
+        JournalCommand<?> message = (JournalCommand<?>) type.createMessage();
+        message.mergeFramed(is);
+        return message;
+    }
+
+    public void process(JournalCommand<?> command) throws IOException {
+        switch (command.type()) {
+            case KAHA_PREPARE_COMMAND:
+                KahaPrepareCommand prepareCommand = (KahaPrepareCommand) command;
+                getTx(TransactionIdConversion.convert(prepareCommand.getTransactionInfo()));
+                break;
+            case KAHA_COMMIT_COMMAND:
+                KahaCommitCommand commitCommand = (KahaCommitCommand) command;
+                removeTx(TransactionIdConversion.convert(commitCommand.getTransactionInfo()));
+                break;
+            case KAHA_TRACE_COMMAND:
+                break;
+            default:
+                throw new IOException("Unexpected command in transaction journal: " + command);
+        }
+    }
+
+
+    public synchronized void recover(final TransactionRecoveryListener listener) throws IOException {
+
+        for (final KahaDBPersistenceAdapter adapter : multiKahaDBPersistenceAdapter.adapters) {
+            adapter.createTransactionStore().recover(new TransactionRecoveryListener() {
+                @Override
+                public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] acks) {
+                    try {
+                        getTx(xid).trackStore(adapter.createTransactionStore());
+                    } catch (IOException e) {
+                        LOG.error("Failed to access transaction store: " + adapter + " for prepared xa tid: " + xid, e);
+                    }
+                    listener.recover(xid, addedMessages, acks);
+                }
+            });
+        }
+
+        try {
+            Broker broker = multiKahaDBPersistenceAdapter.getBrokerService().getBroker();
+            // force completion of local xa
+            for (TransactionId txid : broker.getPreparedTransactions(null)) {
+                if (multiKahaDBPersistenceAdapter.isLocalXid(txid)) {
+                    try {
+                        if (recoveredPendingCommit.contains(txid)) {
+                            LOG.info("delivering pending commit outcome for tid: " + txid);
+                            broker.commitTransaction(null, txid, false);
+
+                        } else {
+                            LOG.info("delivering rollback outcome to store for tid: " + txid);
+                            broker.forgetTransaction(null, txid);
+                        }
+                        persistCompletion(txid);
+                    } catch (Exception ex) {
+                        LOG.error("failed to deliver pending outcome for tid: " + txid, ex);
+                    }
+                }
+            }
+        } catch (Exception e) {
+            LOG.error("failed to resolve pending local transactions", e);
+        }
+    }
+
+    void addMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
+            throws IOException {
+        if (message.getTransactionId() != null) {
+            getTx(message.getTransactionId()).trackStore(transactionStore);
+        }
+        destination.addMessage(context, message);
+    }
+
+    Future<Object> asyncAddQueueMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
+            throws IOException {
+        if (message.getTransactionId() != null) {
+            getTx(message.getTransactionId()).trackStore(transactionStore);
+            destination.addMessage(context, message);
+            return AbstractMessageStore.FUTURE;
+        } else {
+            return destination.asyncAddQueueMessage(context, message);
+        }
+    }
+
+    Future<Object> asyncAddTopicMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
+            throws IOException {
+
+        if (message.getTransactionId() != null) {
+            getTx(message.getTransactionId()).trackStore(transactionStore);
+            destination.addMessage(context, message);
+            return AbstractMessageStore.FUTURE;
+        } else {
+            return destination.asyncAddTopicMessage(context, message);
+        }
+    }
+
+    final void removeMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack)
+            throws IOException {
+        if (ack.getTransactionId() != null) {
+            getTx(ack.getTransactionId()).trackStore(transactionStore);
+        }
+        destination.removeMessage(context, ack);
+    }
+
+    final void removeAsyncMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack)
+            throws IOException {
+        if (ack.getTransactionId() != null) {
+            getTx(ack.getTransactionId()).trackStore(transactionStore);
+        }
+        destination.removeAsyncMessage(context, ack);
+    }
+
+    final void acknowledge(final TransactionStore transactionStore, ConnectionContext context, final TopicMessageStore destination,
+                           final String clientId, final String subscriptionName,
+                           final MessageId messageId, final MessageAck ack) throws IOException {
+        if (ack.getTransactionId() != null) {
+            getTx(ack.getTransactionId()).trackStore(transactionStore);
+        }
+        destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TransactionIdConversion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TransactionIdConversion.java?rev=1170201&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TransactionIdConversion.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TransactionIdConversion.java Tue Sep 13 15:01:37 2011
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.store.kahadb;
+
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
+import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
+import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
+
+public class TransactionIdConversion {
+
+    static KahaTransactionInfo convertToLocal(TransactionId tx) {
+        KahaTransactionInfo rc = new KahaTransactionInfo();
+        LocalTransactionId t = (LocalTransactionId) tx;
+        KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId();
+        kahaTxId.setConnectionId(t.getConnectionId().getValue());
+        kahaTxId.setTransacitonId(t.getValue());
+        rc.setLocalTransacitonId(kahaTxId);
+        return rc;
+    }
+
+    static KahaTransactionInfo convert(TransactionId txid) {
+        if (txid == null) {
+            return null;
+        }
+        KahaTransactionInfo rc;
+
+        if (txid.isLocalTransaction()) {
+            rc = convertToLocal(txid);
+        } else {
+            rc = new KahaTransactionInfo();
+            XATransactionId t = (XATransactionId) txid;
+            KahaXATransactionId kahaTxId = new KahaXATransactionId();
+            kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
+            kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
+            kahaTxId.setFormatId(t.getFormatId());
+            rc.setXaTransacitonId(kahaTxId);
+        }
+        return rc;
+    }
+
+    static TransactionId convert(KahaTransactionInfo transactionInfo) {
+        if (transactionInfo.hasLocalTransacitonId()) {
+            KahaLocalTransactionId tx = transactionInfo.getLocalTransacitonId();
+            LocalTransactionId rc = new LocalTransactionId();
+            rc.setConnectionId(new ConnectionId(tx.getConnectionId()));
+            rc.setValue(tx.getTransacitonId());
+            return rc;
+        } else {
+            KahaXATransactionId tx = transactionInfo.getXaTransacitonId();
+            XATransactionId rc = new XATransactionId();
+            rc.setBranchQualifier(tx.getBranchQualifier().toByteArray());
+            rc.setGlobalTransactionId(tx.getGlobalTransactionId().toByteArray());
+            rc.setFormatId(tx.getFormatId());
+            return rc;
+        }
+    }
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TransactionIdConversion.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TransactionIdConversion.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TransactionIdTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TransactionIdTransformer.java?rev=1170201&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TransactionIdTransformer.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TransactionIdTransformer.java Tue Sep 13 15:01:37 2011
@@ -0,0 +1,8 @@
+package org.apache.activemq.store.kahadb;
+
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
+
+public interface TransactionIdTransformer {
+    KahaTransactionInfo transform(TransactionId txid);
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TransactionIdTransformer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TransactionIdTransformer.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java?rev=1170201&r1=1170200&r2=1170201&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java Tue Sep 13 15:01:37 2011
@@ -37,6 +37,8 @@ import org.apache.activemq.command.Trans
 import org.apache.activemq.command.TransactionInfo;
 import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.util.JMXSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Used to simulate the recovery that occurs when a broker shuts down.
@@ -44,7 +46,7 @@ import org.apache.activemq.util.JMXSuppo
  * 
  */
 public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
-
+    protected static final Logger LOG = LoggerFactory.getLogger(XARecoveryBrokerTest.class);
     public void testPreparedJmxView() throws Exception {
 
         ActiveMQDestination destination = createDestination();
@@ -202,7 +204,7 @@ public class XARecoveryBrokerTest extend
         }
 
         // We should get the committed transactions.
-        for (int i = 0; i < 4; i++) {
+        for (int i = 0; i < expectedMessageCount(4, destination); i++) {
             Message m = receiveMessage(connection);
             assertNotNull(m);
         }
@@ -249,7 +251,7 @@ public class XARecoveryBrokerTest extend
         ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
         connection.send(consumerInfo);
 
-        for (int i = 0; i < 4; i++) {
+        for (int i = 0; i < expectedMessageCount(4, destination); i++) {
             Message m = receiveMessage(connection);
             assertNotNull(m);
         }
@@ -276,22 +278,26 @@ public class XARecoveryBrokerTest extend
             connection.send(message);
         }
 
-        // Setup the consumer and receive the message.
-        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
-        connection.send(consumerInfo);
-
         // Begin the transaction.
         XATransactionId txid = createXATransaction(sessionInfo);
         connection.send(createBeginTransaction(connectionInfo, txid));
+
+        ConsumerInfo consumerInfo;
         Message m = null;
-        for (int i = 0; i < 4; i++) {
-            m = receiveMessage(connection);
-            assertNotNull(m);
-        }
+        for (ActiveMQDestination dest : destinationList(destination)) {
+            // Setup the consumer and receive the message.
+            consumerInfo = createConsumerInfo(sessionInfo, dest);
+            connection.send(consumerInfo);
+
+            for (int i = 0; i < 4; i++) {
+                m = receiveMessage(connection);
+                assertNotNull(m);
+            }
 
-        MessageAck ack = createAck(consumerInfo, m, 4, MessageAck.STANDARD_ACK_TYPE);
-        ack.setTransactionId(txid);
-        connection.send(ack);
+            MessageAck ack = createAck(consumerInfo, m, 4, MessageAck.STANDARD_ACK_TYPE);
+            ack.setTransactionId(txid);
+            connection.send(ack);
+        }
 
         // Commit
         connection.request(createCommitTransaction1Phase(connectionInfo, txid));
@@ -334,23 +340,27 @@ public class XARecoveryBrokerTest extend
             connection.send(message);
         }
 
-        // Setup the consumer and receive the message.
-        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
-        connection.send(consumerInfo);
-
         // Begin the transaction.
         XATransactionId txid = createXATransaction(sessionInfo);
         connection.send(createBeginTransaction(connectionInfo, txid));
+
+        ConsumerInfo consumerInfo;
         Message m = null;
-        for (int i = 0; i < 4; i++) {
-            m = receiveMessage(connection);
-            assertNotNull(m);
-        }
+        for (ActiveMQDestination dest : destinationList(destination)) {
+            // Setup the consumer and receive the message.
+            consumerInfo = createConsumerInfo(sessionInfo, dest);
+            connection.send(consumerInfo);
+
+            for (int i = 0; i < 4; i++) {
+                m = receiveMessage(connection);
+                assertNotNull(m);
+            }
 
-        // one ack with last received, mimic a beforeEnd synchronization
-        MessageAck ack = createAck(consumerInfo, m, 4, MessageAck.STANDARD_ACK_TYPE);
-        ack.setTransactionId(txid);
-        connection.send(ack);
+            // one ack with last received, mimic a beforeEnd synchronization
+            MessageAck ack = createAck(consumerInfo, m, 4, MessageAck.STANDARD_ACK_TYPE);
+            ack.setTransactionId(txid);
+            connection.send(ack);
+        }
 
         connection.request(createPrepareTransaction(connectionInfo, txid));
 
@@ -404,23 +414,27 @@ public class XARecoveryBrokerTest extend
             connection.send(message);
         }
 
-        // Setup the consumer and receive the message.
-        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
-        connection.send(consumerInfo);
-
         // Begin the transaction.
         XATransactionId txid = createXATransaction(sessionInfo);
         connection.send(createBeginTransaction(connectionInfo, txid));
+
+        ConsumerInfo consumerInfo;
         Message message = null;
-        for (int i = 0; i < 4; i++) {
-            message = receiveMessage(connection);
-            assertNotNull(message);
-        }
+        for (ActiveMQDestination dest : destinationList(destination)) {
+            // Setup the consumer and receive the message.
+            consumerInfo = createConsumerInfo(sessionInfo, dest);
+            connection.send(consumerInfo);
+
+            for (int i = 0; i < 4; i++) {
+                message = receiveMessage(connection);
+                assertNotNull(message);
+            }
 
-        // one ack with last received, mimic a beforeEnd synchronization
-        MessageAck ack = createAck(consumerInfo, message, 4, MessageAck.STANDARD_ACK_TYPE);
-        ack.setTransactionId(txid);
-        connection.send(ack);
+            // one ack with last received, mimic a beforeEnd synchronization
+            MessageAck ack = createAck(consumerInfo, message, 4, MessageAck.STANDARD_ACK_TYPE);
+            ack.setTransactionId(txid);
+            connection.send(ack);
+        }
 
         connection.request(createPrepareTransaction(connectionInfo, txid));
 
@@ -454,13 +468,20 @@ public class XARecoveryBrokerTest extend
         // Begin new transaction for redelivery
         txid = createXATransaction(sessionInfo);
         connection.send(createBeginTransaction(connectionInfo, txid));
-        for (int i = 0; i < 4; i++) {
-            message = receiveMessage(connection);
-            assertNotNull(message);
+
+        for (ActiveMQDestination dest : destinationList(destination)) {
+            // Setup the consumer and receive the message.
+            consumerInfo = createConsumerInfo(sessionInfo, dest);
+            connection.send(consumerInfo);
+
+            for (int i = 0; i < 4; i++) {
+                message = receiveMessage(connection);
+                assertNotNull(message);
+            }
+            MessageAck ack = createAck(consumerInfo, message, 4, MessageAck.STANDARD_ACK_TYPE);
+            ack.setTransactionId(txid);
+            connection.send(ack);
         }
-        ack = createAck(consumerInfo, message, 4, MessageAck.STANDARD_ACK_TYPE);
-        ack.setTransactionId(txid);
-        connection.send(ack);
 
         // Commit
         connection.request(createCommitTransaction1Phase(connectionInfo, txid));
@@ -470,6 +491,14 @@ public class XARecoveryBrokerTest extend
         assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length);
     }
 
+    private ActiveMQDestination[] destinationList(ActiveMQDestination dest) {
+        return dest.isComposite() ? dest.getCompositeDestinations() : new ActiveMQDestination[]{dest};
+    }
+
+    private int expectedMessageCount(int i, ActiveMQDestination destination) {
+        return i * (destination.isComposite() ? destination.getCompositeDestinations().length : 1);
+    }
+
     public void testQueuePersistentUncommittedAcksLostOnRestart() throws Exception {
 
         ActiveMQDestination destination = createDestination();

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/mKahaDBXARecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/mKahaDBXARecoveryBrokerTest.java?rev=1170201&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/mKahaDBXARecoveryBrokerTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/mKahaDBXARecoveryBrokerTest.java Tue Sep 13 15:01:37 2011
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.util.LinkedList;
+import java.util.List;
+import javax.jms.JMSException;
+import javax.management.InstanceNotFoundException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import junit.framework.Test;
+import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DataArrayResponse;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
+import org.apache.activemq.util.JMXSupport;
+
+public class mKahaDBXARecoveryBrokerTest extends XARecoveryBrokerTest {
+
+    @Override
+    protected void configureBroker(BrokerService broker) throws Exception {
+        super.configureBroker(broker);
+
+        MultiKahaDBPersistenceAdapter mKahaDB = new MultiKahaDBPersistenceAdapter();
+        List adapters = new LinkedList<FilteredKahaDBPersistenceAdapter>();
+        FilteredKahaDBPersistenceAdapter defaultEntry = new FilteredKahaDBPersistenceAdapter();
+        defaultEntry.setPersistenceAdapter(new KahaDBPersistenceAdapter());
+        adapters.add(defaultEntry);
+
+        FilteredKahaDBPersistenceAdapter special = new FilteredKahaDBPersistenceAdapter();
+        special.setDestination(new ActiveMQQueue("special"));
+        special.setPersistenceAdapter(new KahaDBPersistenceAdapter());
+        adapters.add(special);
+
+        mKahaDB.setFilteredPersistenceAdapters(adapters);
+        broker.setPersistenceAdapter(mKahaDB);
+    }
+
+    public static Test suite() {
+        return suite(mKahaDBXARecoveryBrokerTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+    protected ActiveMQDestination createDestination() {
+        return new ActiveMQQueue("test,special");
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/mKahaDBXARecoveryBrokerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/mKahaDBXARecoveryBrokerTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java?rev=1170201&r1=1170200&r2=1170201&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java Tue Sep 13 15:01:37 2011
@@ -27,6 +27,8 @@ import org.apache.activemq.store.kahadb.
 import org.apache.activemq.util.DefaultIOExceptionHandler;
 import org.junit.After;
 import org.junit.Test;
+
+
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
@@ -56,7 +58,7 @@ public class AMQ2736Test {
 
         // test hack, close the journal to ensure no further journal updates when broker stops
         // mimic kill -9 in terms of no normal shutdown sequence
-        store.getJournalManager().close();
+        store.getJournal().close();
         try {
             store.close();
         } catch (Exception expectedLotsAsJournalBorked) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java?rev=1170201&r1=1170200&r2=1170201&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java Tue Sep 13 15:01:37 2011
@@ -16,7 +16,11 @@
  */
 package org.apache.activemq.bugs;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
 import java.io.IOException;
+import java.net.URISyntaxException;
 import java.util.concurrent.CountDownLatch;
 
 import javax.jms.BytesMessage;
@@ -28,6 +32,7 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
+
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.RedeliveryPolicy;
 import org.apache.activemq.broker.BrokerService;
@@ -37,8 +42,6 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 
 public class AMQ2982Test {
 
@@ -62,7 +65,7 @@ public class AMQ2982Test {
             // ensure save memory publishing, use the right lock
             indexLock.readLock().lock();
             try {
-                return getJournalManager().getFileMap().size();
+                return getJournal().getFileMap().size();
             } finally {
                 indexLock.readLock().unlock();
             }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java?rev=1170201&r1=1170200&r2=1170201&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java Tue Sep 13 15:01:37 2011
@@ -16,6 +16,10 @@
  */
 package org.apache.activemq.bugs;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -30,6 +34,7 @@ import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
+
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.store.kahadb.KahaDBStore;
@@ -37,7 +42,6 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import static org.junit.Assert.*;
 
 public class AMQ2983Test {
 
@@ -63,7 +67,7 @@ public class AMQ2983Test {
             // ensure save memory publishing, use the right lock
             indexLock.readLock().lock();
             try {
-                return getJournalManager().getFileMap().size();
+                return getJournal().getFileMap().size();
             } finally {
                 indexLock.readLock().unlock();
             }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java?rev=1170201&r1=1170200&r2=1170201&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java Tue Sep 13 15:01:37 2011
@@ -31,9 +31,9 @@ public class SimpleDurableTopicTest exte
     protected long initialConsumerDelay = 0;
     @Override
     protected void setUp() throws Exception {
-        numberOfDestinations=10;
+        numberOfDestinations=1;
         numberOfConsumers = 1;
-        numberofProducers = Integer.parseInt(System.getProperty("SimpleDurableTopicTest.numberofProducers", "1"));
+        numberofProducers = Integer.parseInt(System.getProperty("SimpleDurableTopicTest.numberofProducers", "20"), 20);
         sampleCount= Integer.parseInt(System.getProperty("SimpleDurableTopicTest.sampleCount", "1000"), 10);
         playloadSize = 1024;
         super.setUp();

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java?rev=1170201&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java Tue Sep 13 15:01:37 2011
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Vector;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class StorePerDestinationTest  {
+    static final Logger LOG = LoggerFactory.getLogger(StorePerDestinationTest.class);
+    final static int maxFileLength = 1024*100;
+    final static int numToSend = 10000;
+    final Vector<Throwable> exceptions = new Vector<Throwable>();
+    BrokerService brokerService;
+
+    protected BrokerService createBroker(PersistenceAdapter kaha) throws Exception {
+
+        BrokerService broker = new BrokerService();
+        broker.setUseJmx(false);
+        broker.setPersistenceAdapter(kaha);
+        return broker;
+    }
+
+    private KahaDBPersistenceAdapter createStore(boolean delete) throws IOException {
+        KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
+        kaha.setJournalMaxFileLength(maxFileLength);
+        kaha.setCleanupInterval(5000);
+        if (delete) {
+            kaha.deleteAllMessages();
+        }
+        return kaha;
+    }
+
+    @Before
+    public void prepareCleanBrokerWithMultiStore() throws Exception {
+           prepareBrokerWithMultiStore(true);
+    }
+
+    public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws Exception {
+
+        MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter();
+        if (deleteAllMessages) {
+            multiKahaDBPersistenceAdapter.deleteAllMessages();
+        }
+        ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new ArrayList<FilteredKahaDBPersistenceAdapter>();
+
+        FilteredKahaDBPersistenceAdapter theRest = new FilteredKahaDBPersistenceAdapter();
+        theRest.setPersistenceAdapter(createStore(deleteAllMessages));
+        // default destination when not set is a match for all
+        adapters.add(theRest);
+
+        // separate store for FastQ
+        FilteredKahaDBPersistenceAdapter fastQStore = new FilteredKahaDBPersistenceAdapter();
+        fastQStore.setPersistenceAdapter(createStore(deleteAllMessages));
+        fastQStore.setDestination(new ActiveMQQueue("FastQ"));
+        adapters.add(fastQStore);
+
+        multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
+        brokerService  = createBroker(multiKahaDBPersistenceAdapter);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        brokerService.stop();
+    }
+
+    @Test
+    public void testTransactedSendReceive() throws Exception {
+        brokerService.start();
+        sendMessages(true, "SlowQ", 1, 0);
+        assertEquals("got one", 1, receiveMessages(true, "SlowQ", 1));
+    }
+
+    @Test
+    public void testTransactedSendReceiveAcrossStores() throws Exception {
+        brokerService.start();
+        sendMessages(true, "SlowQ,FastQ", 1, 0);
+        assertEquals("got one", 2, receiveMessages(true, "SlowQ,FastQ", 2));
+    }
+
+    @Test
+    public void testCommitRecovery() throws Exception {
+        doTestRecovery(true);
+    }
+
+     @Test
+    public void testRollbackRecovery() throws Exception {
+        doTestRecovery(false);
+    }
+
+    public void doTestRecovery(final boolean haveOutcome) throws Exception {
+        final MultiKahaDBPersistenceAdapter persistenceAdapter =
+                (MultiKahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
+        MultiKahaDBTransactionStore transactionStore =
+                new MultiKahaDBTransactionStore(persistenceAdapter) {
+                    @Override
+                    public void persistOutcome(Tx tx, TransactionId txid) throws IOException {
+                        if (haveOutcome) {
+                            super.persistOutcome(tx, txid);
+                        }
+                        try {
+                            // IOExceptions will stop the broker
+                            persistenceAdapter.stop();
+                        } catch (Exception e) {
+                            LOG.error("ex on stop ", e);
+                            exceptions.add(e);
+                        }
+                    }
+                };
+        persistenceAdapter.setTransactionStore(transactionStore);
+        brokerService.start();
+
+        ExecutorService executorService = Executors.newCachedThreadPool();
+        executorService.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    // commit will block
+                    sendMessages(true, "SlowQ,FastQ", 1, 0);
+                } catch(Exception expected) {
+                    LOG.info("expected", expected);
+                }
+            }
+        });
+
+        brokerService.waitUntilStopped();
+        // interrupt the send thread
+        executorService.shutdownNow();
+
+        // verify auto recovery
+        prepareBrokerWithMultiStore(false);
+        brokerService.start();
+
+        assertEquals("expect to get the recovered message", haveOutcome ? 2 : 0, receiveMessages(false, "SlowQ,FastQ", 2));
+        assertEquals("all transactions are complete", 0, brokerService.getBroker().getPreparedTransactions(null).length);
+    }
+
+    @Test
+    public void testSlowFastDestinationsStoreUsage() throws Exception {
+        brokerService.start();
+        ExecutorService executorService = Executors.newCachedThreadPool();
+        executorService.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    sendMessages(false, "SlowQ", 50, 500);
+                } catch (Exception e) {
+                    exceptions.add(e);
+                }
+            }
+        });
+
+        executorService.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    sendMessages(false, "FastQ", numToSend, 0);
+                } catch (Exception e) {
+                    exceptions.add(e);
+                }
+            }
+        });
+
+        executorService.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    assertEquals("Got all sent", numToSend, receiveMessages(false, "FastQ", numToSend));
+                } catch (Exception e) {
+                    exceptions.add(e);
+                }
+            }
+        });
+
+        executorService.shutdown();
+        assertTrue("consumers executor finished on time", executorService.awaitTermination(60, TimeUnit.SECONDS));
+        final SystemUsage usage = brokerService.getSystemUsage();
+        assertTrue("Store is not hogged", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                long storeUsage = usage.getStoreUsage().getUsage();
+                LOG.info("Store Usage: " + storeUsage);
+                return storeUsage < 5 * maxFileLength;
+            }
+        }));
+        assertTrue("no exceptions", exceptions.isEmpty());
+    }
+
+    private void sendMessages(boolean transacted, String destName, int count, long sleep) throws Exception {
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
+        Connection connection = cf.createConnection();
+        try {
+            Session session = transacted ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = session.createProducer(new ActiveMQQueue(destName));
+            for (int i = 0; i < count; i++) {
+                if (sleep > 0) {
+                    TimeUnit.MILLISECONDS.sleep(sleep);
+                }
+                producer.send(session.createTextMessage(createContent(i)));
+            }
+            if (transacted) {
+                session.commit();
+            }
+        } finally {
+            connection.close();
+        }
+    }
+
+    private int receiveMessages(boolean transacted, String destName, int max) throws JMSException {
+        int rc = 0;
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
+        Connection connection = cf.createConnection();
+        try {
+            connection.start();
+            Session session = transacted ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer messageConsumer = session.createConsumer(new ActiveMQQueue(destName));
+            while (rc < max && messageConsumer.receive(4000) != null) {
+                rc++;
+
+                if (transacted && rc % 200 == 0) {
+                    session.commit();
+                }
+            }
+            if (transacted) {
+                session.commit();
+            }
+            return rc;
+        } finally {
+            connection.close();
+        }
+    }
+
+    private String createContent(int i) {
+        StringBuilder sb = new StringBuilder(i + ":");
+        while (sb.length() < 1024) {
+            sb.append("*");
+        }
+        return sb.toString();
+    }
+
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1170201&r1=1170200&r2=1170201&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java Tue Sep 13 15:01:37 2011
@@ -17,7 +17,6 @@
 package org.apache.activemq.usecases;
 
 import java.util.Vector;
-
 import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -73,7 +72,7 @@ public class DurableSubscriptionOfflineT
     public static Test suite() {
         return suite(DurableSubscriptionOfflineTest.class);
     }
-
+    
     protected void setUp() throws Exception {
         exceptions.clear();
         topic = (ActiveMQTopic) createDestination();
@@ -89,9 +88,9 @@ public class DurableSubscriptionOfflineT
     private void createBroker() throws Exception {
         createBroker(true);
     }
-
+    
     private void createBroker(boolean deleteAllMessages) throws Exception {
-        broker = BrokerFactory.createBroker("broker:(vm://" + getName(true) + ")");
+        broker = BrokerFactory.createBroker("broker:(vm://" + getName(true) +")");
         broker.setBrokerName(getName(true));
         broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
         broker.getManagementContext().setCreateConnector(false);
@@ -105,14 +104,14 @@ public class DurableSubscriptionOfflineT
             policyMap.setDefaultEntry(policy);
             broker.setDestinationPolicy(policyMap);
         }
-
+        
         setDefaultPersistenceAdapter(broker);
         if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) {
             // ensure it kicks in during tests
-            ((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).setCleanupPeriod(2 * 1000);
+            ((JDBCPersistenceAdapter)broker.getPersistenceAdapter()).setCleanupPeriod(2*1000);
         } else if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
             // have lots of journal files
-            ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).setJournalMaxFileLength(journalMaxFileLength);
+            ((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).setJournalMaxFileLength(journalMaxFileLength);
         }
         broker.start();
     }
@@ -124,9 +123,9 @@ public class DurableSubscriptionOfflineT
 
     public void initCombosForTestConsumeOnlyMatchedMessages() throws Exception {
         this.addCombinationValues("defaultPersistenceAdapter",
-                new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
+                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
         this.addCombinationValues("usePrioritySupport",
-                new Object[]{Boolean.TRUE, Boolean.FALSE});
+                new Object[]{ Boolean.TRUE, Boolean.FALSE});
     }
 
     public void testConsumeOnlyMatchedMessages() throws Exception {
@@ -171,110 +170,110 @@ public class DurableSubscriptionOfflineT
         assertEquals(sent, listener.count);
     }
 
-    public void testConsumeAllMatchedMessages() throws Exception {
-        // create durable subscription
-        Connection con = createConnection();
-        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        session.close();
-        con.close();
-
-        // send messages
-        con = createConnection();
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = session.createProducer(null);
-
-        int sent = 0;
-        for (int i = 0; i < 10; i++) {
-            sent++;
-            Message message = session.createMessage();
-            message.setStringProperty("filter", "true");
-            producer.send(topic, message);
-        }
+     public void testConsumeAllMatchedMessages() throws Exception {
+         // create durable subscription
+         Connection con = createConnection();
+         Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+         session.close();
+         con.close();
+
+         // send messages
+         con = createConnection();
+         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = session.createProducer(null);
+
+         int sent = 0;
+         for (int i = 0; i < 10; i++) {
+             sent++;
+             Message message = session.createMessage();
+             message.setStringProperty("filter", "true");
+             producer.send(topic, message);
+         }
+
+         Thread.sleep(1 * 1000);
+
+         session.close();
+         con.close();
+
+         // consume messages
+         con = createConnection();
+         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+         Listener listener = new Listener();
+         consumer.setMessageListener(listener);
+
+         Thread.sleep(3 * 1000);
 
-        Thread.sleep(1 * 1000);
-
-        session.close();
-        con.close();
-
-        // consume messages
-        con = createConnection();
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        Listener listener = new Listener();
-        consumer.setMessageListener(listener);
-
-        Thread.sleep(3 * 1000);
-
-        session.close();
-        con.close();
-
-        assertEquals(sent, listener.count);
-    }
+         session.close();
+         con.close();
 
+         assertEquals(sent, listener.count);
+     }
 
+    
     public void initCombosForTestVerifyAllConsumedAreAcked() throws Exception {
         this.addCombinationValues("defaultPersistenceAdapter",
-                new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
+               new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
         this.addCombinationValues("usePrioritySupport",
-                new Object[]{Boolean.TRUE, Boolean.FALSE});
+                new Object[]{ Boolean.TRUE, Boolean.FALSE});
     }
 
-    public void testVerifyAllConsumedAreAcked() throws Exception {
-        // create durable subscription
-        Connection con = createConnection();
-        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        session.close();
-        con.close();
+     public void testVerifyAllConsumedAreAcked() throws Exception {
+         // create durable subscription
+         Connection con = createConnection();
+         Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+         session.close();
+         con.close();
+
+         // send messages
+         con = createConnection();
+         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = session.createProducer(null);
+
+         int sent = 0;
+         for (int i = 0; i < 10; i++) {
+             sent++;
+             Message message = session.createMessage();
+             message.setStringProperty("filter", "true");
+             producer.send(topic, message);
+         }
+
+         Thread.sleep(1 * 1000);
+
+         session.close();
+         con.close();
+
+         // consume messages
+         con = createConnection();
+         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+         Listener listener = new Listener();
+         consumer.setMessageListener(listener);
+
+         Thread.sleep(3 * 1000);
+
+         session.close();
+         con.close();
+
+         LOG.info("Consumed: " + listener.count);
+         assertEquals(sent, listener.count);
+
+         // consume messages again, should not get any
+         con = createConnection();
+         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+         listener = new Listener();
+         consumer.setMessageListener(listener);
 
-        // send messages
-        con = createConnection();
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = session.createProducer(null);
-
-        int sent = 0;
-        for (int i = 0; i < 10; i++) {
-            sent++;
-            Message message = session.createMessage();
-            message.setStringProperty("filter", "true");
-            producer.send(topic, message);
-        }
-
-        Thread.sleep(1 * 1000);
-
-        session.close();
-        con.close();
-
-        // consume messages
-        con = createConnection();
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        Listener listener = new Listener();
-        consumer.setMessageListener(listener);
-
-        Thread.sleep(3 * 1000);
-
-        session.close();
-        con.close();
-
-        LOG.info("Consumed: " + listener.count);
-        assertEquals(sent, listener.count);
-
-        // consume messages again, should not get any
-        con = createConnection();
-        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
-        listener = new Listener();
-        consumer.setMessageListener(listener);
-
-        Thread.sleep(3 * 1000);
+         Thread.sleep(3 * 1000);
 
-        session.close();
-        con.close();
+         session.close();
+         con.close();
 
-        assertEquals(0, listener.count);
-    }
+         assertEquals(0, listener.count);
+     }
 
     public void testTwoOfflineSubscriptionCanConsume() throws Exception {
         // create durable subscription 1
@@ -445,9 +444,9 @@ public class DurableSubscriptionOfflineT
 
     public void initCombosForTestOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
         this.addCombinationValues("defaultPersistenceAdapter",
-                new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
+                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
         this.addCombinationValues("usePrioritySupport",
-                new Object[]{Boolean.TRUE, Boolean.FALSE});
+                new Object[]{ Boolean.TRUE, Boolean.FALSE});
     }
 
     public void testOfflineSubscriptionCanConsumeAfterOnlineSubs() throws Exception {
@@ -596,15 +595,14 @@ public class DurableSubscriptionOfflineT
         con.close();
 
         assertEquals("offline consumer got all", sent, listener.count);
-    }
+    }    
 
     public void initCombosForTestMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
         this.addCombinationValues("defaultPersistenceAdapter",
-                new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
+                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
     }
 
     private static String filter = "$a='A1' AND (($b=true AND $c=true) OR ($d='D1' OR $d='D2'))";
-
     public void testMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
         // create offline subs 1
         Connection con = createConnection("offCli1");
@@ -752,9 +750,9 @@ public class DurableSubscriptionOfflineT
 
     public void initCombosForTestOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
         this.addCombinationValues("defaultPersistenceAdapter",
-                new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
+                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
     }
-
+    
     public void testOfflineSubscriptionWithSelectorAfterRestart() throws Exception {
         // create offline subs 1
         Connection con = createConnection("offCli1");
@@ -795,7 +793,7 @@ public class DurableSubscriptionOfflineT
         Thread.sleep(3 * 1000);
         broker.stop();
         createBroker(false /*deleteAllMessages*/);
-
+ 
         // send more messages
         con = createConnection();
         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -842,7 +840,7 @@ public class DurableSubscriptionOfflineT
 
     public void initCombosForTestOfflineAfterRestart() throws Exception {
         this.addCombinationValues("defaultPersistenceAdapter",
-                new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
+                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
     }
 
     public void testOfflineSubscriptionAfterRestart() throws Exception {
@@ -978,7 +976,7 @@ public class DurableSubscriptionOfflineT
 
         int filtered = 0;
         for (int i = 0; i < 10; i++) {
-            boolean filter = (i % 2 == 0); //(int) (Math.random() * 2) >= 1;
+            boolean filter = (i %2 == 0); //(int) (Math.random() * 2) >= 1;
             if (filter)
                 filtered++;
 
@@ -1076,7 +1074,7 @@ public class DurableSubscriptionOfflineT
         sent = 0;
         for (int i = 0; i < 2; i++) {
             Message message = session.createMessage();
-            message.setStringProperty("filter", i == 1 ? "true" : "false");
+            message.setStringProperty("filter", i==1 ? "true" : "false");
             producer.send(topic, message);
             sent++;
         }
@@ -1084,7 +1082,7 @@ public class DurableSubscriptionOfflineT
         Thread.sleep(1 * 1000);
         session.close();
         con.close();
-
+ 
         LOG.info("cli1 again, should get 1 new ones");
         con = createConnection("cli1");
         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -1206,7 +1204,7 @@ public class DurableSubscriptionOfflineT
         MessageProducer producer = session.createProducer(null);
 
         final int toSend = 500;
-        final String payload = new byte[40 * 1024].toString();
+        final String payload = new byte[40*1024].toString();
         int sent = 0;
         for (int i = sent; i < toSend; i++) {
             Message message = session.createTextMessage(payload);
@@ -1233,7 +1231,7 @@ public class DurableSubscriptionOfflineT
         consumer.setMessageListener(listener);
         assertTrue("got all sent", Wait.waitFor(new Wait.Condition() {
             public boolean isSatisified() throws Exception {
-                LOG.info("Want: " + toSend + ", current: " + listener.count);
+                LOG.info("Want: " + toSend  + ", current: " + listener.count);
                 return listener.count == toSend;
             }
         }));
@@ -1243,7 +1241,7 @@ public class DurableSubscriptionOfflineT
         destroyBroker();
         createBroker(false);
         KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
-        assertEquals("only one journal file left after restart", 1, pa.getStore().getJournalManager().getFileMap().size());
+        assertEquals("only one journal file left after restart", 1, pa.getStore().getJournal().getFileMap().size());
     }
 
     public static class Listener implements MessageListener {
@@ -1252,23 +1250,20 @@ public class DurableSubscriptionOfflineT
 
         Listener() {
         }
-
         Listener(String id) {
             this.id = id;
         }
-
         public void onMessage(Message message) {
             count++;
             if (id != null) {
                 try {
                     LOG.info(id + ", " + message.getJMSMessageID());
-                } catch (Exception ignored) {
-                }
+                } catch (Exception ignored) {}
             }
         }
     }
 
-    public class FilterCheckListener extends Listener {
+    public class FilterCheckListener extends Listener  {
 
         public void onMessage(Message message) {
             count++;
@@ -1278,11 +1273,13 @@ public class DurableSubscriptionOfflineT
                 if (b != null) {
                     boolean c = message.getBooleanProperty("$c");
                     assertTrue("", c);
-                } else {
+                }
+                else {
                     String d = message.getStringProperty("$d");
                     assertTrue("", "D1".equals(d) || "D2".equals(d));
                 }
-            } catch (JMSException e) {
+            }
+            catch (JMSException e) {
                 exceptions.add(e);
             }
         }

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=1170201&r1=1170200&r2=1170201&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java Tue Sep 13 15:01:37 2011
@@ -382,7 +382,7 @@ public class Journal {
         started = false;
     }
 
-    synchronized void cleanup() {
+    protected synchronized void cleanup() {
         if (accessorPool != null) {
             accessorPool.disposeUnused();
         }



Mime
View raw message