activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r741769 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/kahadb/ main/java/org/apache/activemq/store/kahadb/temp/ test/java/org/apache/activemq/store/kahadb/ test/java/org/apache/activemq/store/kahadb/perf/
Date Fri, 06 Feb 2009 22:49:01 GMT
Author: chirino
Date: Fri Feb  6 22:49:01 2009
New Revision: 741769

URL: http://svn.apache.org/viewvc?rev=741769&view=rev
Log:
Added a new TempKahaDBStore which is an optimized KahaDB based message for temporary messages.


Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempMessageDatabase.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/temp/
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/TempKahaDBStoreBrokerTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/perf/TempKahaStoreQueueTest.java

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java?rev=741769&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java Fri Feb  6 22:49:01 2009
@@ -0,0 +1,564 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.ActiveMQTempTopic;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.store.AbstractMessageStore;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TransactionRecoveryListener;
+import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
+import org.apache.activemq.store.kahadb.data.KahaDestination;
+import org.apache.activemq.store.kahadb.data.KahaLocation;
+import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
+import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
+import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
+import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.page.Transaction;
+
+public class TempKahaDBStore extends TempMessageDatabase implements PersistenceAdapter {
+
+    private WireFormat wireFormat = new OpenWireFormat();
+
+    public void setBrokerName(String brokerName) {
+    }
+    public void setUsageManager(SystemUsage usageManager) {
+    }
+
+    public TransactionStore createTransactionStore() throws IOException {
+        return new TransactionStore(){
+            
+            public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
+                processCommit(txid);
+            }
+            public void prepare(TransactionId txid) throws IOException {
+            	processPrepare(txid);
+            }
+            public void rollback(TransactionId txid) throws IOException {
+            	processRollback(txid);
+            }
+            public void recover(TransactionRecoveryListener listener) throws IOException {
+                for (Map.Entry<TransactionId, ArrayList<Operation>> entry : preparedTransactions.entrySet()) {
+                    XATransactionId xid = (XATransactionId)entry.getKey();
+                    ArrayList<Message> messageList = new ArrayList<Message>();
+                    ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
+                    
+                    for (Operation op : entry.getValue()) {
+                        if( op.getClass() == AddOpperation.class ) {
+                            AddOpperation addOp = (AddOpperation)op;
+                            Message msg = (Message)wireFormat.unmarshal( new DataInputStream(addOp.getCommand().getMessage().newInput()) );
+                            messageList.add(msg);
+                        } else {
+                            RemoveOpperation rmOp = (RemoveOpperation)op;
+                            MessageAck ack = (MessageAck)wireFormat.unmarshal( new DataInputStream(rmOp.getCommand().getAck().newInput()) );
+                            ackList.add(ack);
+                        }
+                    }
+                    
+                    Message[] addedMessages = new Message[messageList.size()];
+                    MessageAck[] acks = new MessageAck[ackList.size()];
+                    messageList.toArray(addedMessages);
+                    ackList.toArray(acks);
+                    listener.recover(xid, addedMessages, acks);
+                }
+            }
+            public void start() throws Exception {
+            }
+            public void stop() throws Exception {
+            }
+        };
+    }
+
+    public class KahaDBMessageStore extends AbstractMessageStore {
+        protected KahaDestination dest;
+
+        public KahaDBMessageStore(ActiveMQDestination destination) {
+            super(destination);
+            this.dest = convert( destination );
+        }
+
+        public ActiveMQDestination getDestination() {
+            return destination;
+        }
+
+        public void addMessage(ConnectionContext context, Message message) throws IOException {
+            KahaAddMessageCommand command = new KahaAddMessageCommand();
+            command.setDestination(dest);
+            command.setMessageId(message.getMessageId().toString());
+            processAdd(command, message.getTransactionId(), wireFormat.marshal(message));
+        }
+        
+        public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
+            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
+            command.setDestination(dest);
+            command.setMessageId(ack.getLastMessageId().toString());
+            processRemove(command, ack.getTransactionId());
+        }
+
+        public void removeAllMessages(ConnectionContext context) throws IOException {
+            KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
+            command.setDestination(dest);
+            process(command);
+        }
+
+        public Message getMessage(MessageId identity) throws IOException {
+            final String key = identity.toString();
+            
+            // Hopefully one day the page file supports concurrent read operations... but for now we must
+            // externally synchronize...
+            ByteSequence data;
+            synchronized(indexMutex) {
+                data = pageFile.tx().execute(new Transaction.CallableClosure<ByteSequence, IOException>(){
+                    public ByteSequence execute(Transaction tx) throws IOException {
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        Long sequence = sd.messageIdIndex.get(tx, key);
+                        if( sequence ==null ) {
+                            return null;
+                        }
+                        return sd.orderIndex.get(tx, sequence).data;
+                    }
+                });
+            }
+            if( data == null ) {
+                return null;
+            }
+            
+            Message msg = (Message)wireFormat.unmarshal( data );
+			return msg;
+        }
+        
+        public int getMessageCount() throws IOException {
+            synchronized(indexMutex) {
+                return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
+                    public Integer execute(Transaction tx) throws IOException {
+                        // Iterate through all index entries to get a count of messages in the destination.
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        int rc=0;
+                        for (Iterator<Entry<String, Long>> iterator = sd.messageIdIndex.iterator(tx); iterator.hasNext();) {
+                            iterator.next();
+                            rc++;
+                        }
+                        return rc;
+                    }
+                });
+            }
+        }
+
+        public void recover(final MessageRecoveryListener listener) throws Exception {
+            synchronized(indexMutex) {
+                pageFile.tx().execute(new Transaction.Closure<Exception>(){
+                    public void execute(Transaction tx) throws Exception {
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
+                            Entry<Long, MessageRecord> entry = iterator.next();
+                            listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data) );
+                        }
+                    }
+                });
+            }
+        }
+
+        long cursorPos=0;
+        
+        public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
+            synchronized(indexMutex) {
+                pageFile.tx().execute(new Transaction.Closure<Exception>(){
+                    public void execute(Transaction tx) throws Exception {
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        Entry<Long, MessageRecord> entry=null;
+                        int counter = 0;
+                        for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
+                            entry = iterator.next();
+                            listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
+                            counter++;
+                            if( counter >= maxReturned ) {
+                                break;
+                            }
+                        }
+                        if( entry!=null ) {
+                            cursorPos = entry.getKey()+1;
+                        }
+                    }
+                });
+            }
+        }
+
+        public void resetBatching() {
+            cursorPos=0;
+        }
+
+        
+        @Override
+        public void setBatch(MessageId identity) throws IOException {
+            final String key = identity.toString();
+            
+            // Hopefully one day the page file supports concurrent read operations... but for now we must
+            // externally synchronize...
+            Long location;
+            synchronized(indexMutex) {
+                location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>(){
+                    public Long execute(Transaction tx) throws IOException {
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        return sd.messageIdIndex.get(tx, key);
+                    }
+                });
+            }
+            if( location!=null ) {
+                cursorPos=location+1;
+            }
+            
+        }
+
+        public void setMemoryUsage(MemoryUsage memoeyUSage) {
+        }
+        public void start() throws Exception {
+        }
+        public void stop() throws Exception {
+        }
+        
+    }
+        
+    class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
+        public KahaDBTopicMessageStore(ActiveMQTopic destination) {
+            super(destination);
+        }
+        
+        public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
+            KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
+            command.setDestination(dest);
+            command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
+            command.setMessageId(messageId.toString());
+            // We are not passed a transaction info.. so we can't participate in a transaction.
+            // Looks like a design issue with the TopicMessageStore interface.  Also we can't recover the original ack
+            // to pass back to the XA recover method.
+            // command.setTransactionInfo();
+            processRemove(command, null);
+        }
+
+        public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
+            String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo.getSubscriptionName());
+            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
+            command.setDestination(dest);
+            command.setSubscriptionKey(subscriptionKey);
+            command.setRetroactive(retroactive);
+            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
+            command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
+            process(command);
+        }
+
+        public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
+            KahaSubscriptionCommand command = new KahaSubscriptionCommand();
+            command.setDestination(dest);
+            command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
+            process(command);
+        }
+
+        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
+            
+            final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
+            synchronized(indexMutex) {
+                pageFile.tx().execute(new Transaction.Closure<IOException>(){
+                    public void execute(Transaction tx) throws IOException {
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator.hasNext();) {
+                            Entry<String, KahaSubscriptionCommand> entry = iterator.next();
+                            SubscriptionInfo info = (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(entry.getValue().getSubscriptionInfo().newInput()) );
+                            subscriptions.add(info);
+
+                        }
+                    }
+                });
+            }
+            
+            SubscriptionInfo[]rc=new SubscriptionInfo[subscriptions.size()];
+            subscriptions.toArray(rc);
+            return rc;
+        }
+
+        public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
+            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
+            synchronized(indexMutex) {
+                return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>(){
+                    public SubscriptionInfo execute(Transaction tx) throws IOException {
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
+                        if( command ==null ) {
+                            return null;
+                        }
+                        return (SubscriptionInfo)wireFormat.unmarshal( new DataInputStream(command.getSubscriptionInfo().newInput()) );
+                    }
+                });
+            }
+        }
+       
+        public int getMessageCount(String clientId, String subscriptionName) throws IOException {
+            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
+            synchronized(indexMutex) {
+                return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
+                    public Integer execute(Transaction tx) throws IOException {
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
+                        if ( cursorPos==null ) {
+                            // The subscription might not exist.
+                            return 0;
+                        }
+                        cursorPos += 1;
+                        
+                        int counter = 0;
+                        for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
+                            iterator.next();
+                            counter++;
+                        }
+                        return counter;
+                    }
+                });
+            }        
+        }
+
+        public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
+            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
+            synchronized(indexMutex) {
+                pageFile.tx().execute(new Transaction.Closure<Exception>(){
+                    public void execute(Transaction tx) throws Exception {
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
+                        cursorPos += 1;
+                        
+                        for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
+                            Entry<Long, MessageRecord> entry = iterator.next();
+                            listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
+                        }
+                    }
+                });
+            }
+        }
+
+        public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception {
+            final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
+            synchronized(indexMutex) {
+                pageFile.tx().execute(new Transaction.Closure<Exception>(){
+                    public void execute(Transaction tx) throws Exception {
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        Long cursorPos = sd.subscriptionCursors.get(subscriptionKey);
+                        if( cursorPos == null ) {
+                            cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
+                            cursorPos += 1;
+                        }
+                        
+                        Entry<Long, MessageRecord> entry=null;
+                        int counter = 0;
+                        for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
+                            entry = iterator.next();
+                            listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
+                            counter++;
+                            if( counter >= maxReturned ) {
+                                break;
+                            }
+                        }
+                        if( entry!=null ) {
+                            sd.subscriptionCursors.put(subscriptionKey, cursorPos+1);
+                        }
+                    }
+                });
+            }
+        }
+
+        public void resetBatching(String clientId, String subscriptionName) {
+            try {
+                final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
+                synchronized(indexMutex) {
+                    pageFile.tx().execute(new Transaction.Closure<IOException>(){
+                        public void execute(Transaction tx) throws IOException {
+                            StoredDestination sd = getStoredDestination(dest, tx);
+                            sd.subscriptionCursors.remove(subscriptionKey);
+                        }
+                    });
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    String subscriptionKey(String clientId, String subscriptionName){
+        return clientId+":"+subscriptionName;
+    }
+    
+    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
+        return new KahaDBMessageStore(destination);
+    }
+
+    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
+        return new KahaDBTopicMessageStore(destination);
+    }
+
+    /**
+     * Cleanup method to remove any state associated with the given destination.
+     * This method does not stop the message store (it might not be cached).
+     *
+     * @param destination Destination to forget
+     */
+    public void removeQueueMessageStore(ActiveMQQueue destination) {
+    }
+
+    /**
+     * Cleanup method to remove any state associated with the given destination
+     * This method does not stop the message store (it might not be cached).
+     *
+     * @param destination Destination to forget
+     */
+    public void removeTopicMessageStore(ActiveMQTopic destination) {
+    }
+
+    public void deleteAllMessages() throws IOException {
+    }
+    
+    
+    public Set<ActiveMQDestination> getDestinations() {
+        try {
+            final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
+            synchronized(indexMutex) {
+                pageFile.tx().execute(new Transaction.Closure<IOException>(){
+                    public void execute(Transaction tx) throws IOException {
+                        for (Iterator<Entry<String, StoredDestination>> iterator = destinations.iterator(tx); iterator.hasNext();) {
+                            Entry<String, StoredDestination> entry = iterator.next();
+                            rc.add(convert(entry.getKey()));
+                        }
+                    }
+                });
+            }
+            return rc;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+    
+    public long getLastMessageBrokerSequenceId() throws IOException {
+        return 0;
+    }
+    
+    public long size() {
+        if ( !started.get() ) {
+            return 0;
+        }
+        try {
+            return pageFile.getDiskSize();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void beginTransaction(ConnectionContext context) throws IOException {
+        throw new IOException("Not yet implemented.");
+    }
+    public void commitTransaction(ConnectionContext context) throws IOException {
+        throw new IOException("Not yet implemented.");
+    }
+    public void rollbackTransaction(ConnectionContext context) throws IOException {
+        throw new IOException("Not yet implemented.");
+    }
+    
+    public void checkpoint(boolean sync) throws IOException {
+    }    
+
+    ///////////////////////////////////////////////////////////////////
+    // Internal conversion methods.
+    ///////////////////////////////////////////////////////////////////
+    
+
+    
+    KahaLocation convert(Location location) {
+        KahaLocation rc = new KahaLocation();
+        rc.setLogId(location.getDataFileId());
+        rc.setOffset(location.getOffset());
+        return rc;
+    }
+    
+    KahaDestination convert(ActiveMQDestination dest) {
+        KahaDestination rc = new KahaDestination();
+        rc.setName(dest.getPhysicalName());
+        switch( dest.getDestinationType() ) {
+        case ActiveMQDestination.QUEUE_TYPE:
+            rc.setType(DestinationType.QUEUE);
+            return rc;
+        case ActiveMQDestination.TOPIC_TYPE:
+            rc.setType(DestinationType.TOPIC);
+            return rc;
+        case ActiveMQDestination.TEMP_QUEUE_TYPE:
+            rc.setType(DestinationType.TEMP_QUEUE);
+            return rc;
+        case ActiveMQDestination.TEMP_TOPIC_TYPE:
+            rc.setType(DestinationType.TEMP_TOPIC);
+            return rc;
+        default:
+            return null;
+        }
+    }
+
+    ActiveMQDestination convert(String dest) {
+        int p = dest.indexOf(":");
+        if( p<0 ) {
+            throw new IllegalArgumentException("Not in the valid destination format");
+        }
+        int type = Integer.parseInt(dest.substring(0, p));
+        String name = dest.substring(p+1);
+        
+        switch( KahaDestination.DestinationType.valueOf(type) ) {
+        case QUEUE:
+            return new ActiveMQQueue(name);
+        case TOPIC:
+            return new ActiveMQTopic(name);
+        case TEMP_QUEUE:
+            return new ActiveMQTempQueue(name);
+        case TEMP_TOPIC:
+            return new ActiveMQTempTopic(name);
+        default:    
+            throw new IllegalArgumentException("Not in the valid destination format");
+        }
+    }
+        
+}

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempMessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempMessageDatabase.java?rev=741769&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempMessageDatabase.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempMessageDatabase.java Fri Feb  6 22:49:01 2009
@@ -0,0 +1,703 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
+import org.apache.activemq.store.kahadb.data.KahaDestination;
+import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
+import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
+import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.index.BTreeIndex;
+import org.apache.kahadb.page.PageFile;
+import org.apache.kahadb.page.Transaction;
+import org.apache.kahadb.util.LongMarshaller;
+import org.apache.kahadb.util.Marshaller;
+import org.apache.kahadb.util.StringMarshaller;
+
+public class TempMessageDatabase {
+
+    private static final Log LOG = LogFactory.getLog(TempMessageDatabase.class);
+
+    public static final int CLOSED_STATE = 1;
+    public static final int OPEN_STATE = 2;
+
+    protected BTreeIndex<String, StoredDestination> destinations;
+    protected PageFile pageFile;
+
+    protected File directory;
+    
+    boolean enableIndexWriteAsync = true;
+    int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 
+    
+    protected AtomicBoolean started = new AtomicBoolean();
+    protected AtomicBoolean opened = new AtomicBoolean();
+
+    public TempMessageDatabase() {
+    }
+
+    public void start() throws Exception {
+        if (started.compareAndSet(false, true)) {
+        	load();
+        }
+    }
+
+    public void stop() throws Exception {
+        if (started.compareAndSet(true, false)) {
+            unload();
+        }
+    }
+
+	private void loadPageFile() throws IOException {
+		synchronized (indexMutex) {
+		    final PageFile pageFile = getPageFile();
+            pageFile.load();
+            pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId());
+                    destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
+                    destinations.setValueMarshaller(new StoredDestinationMarshaller());
+                    destinations.load(tx);
+                }
+            });
+            pageFile.flush();
+            storedDestinations.clear();
+        }
+	}
+	
+	/**
+	 * @throws IOException
+	 */
+	public void open() throws IOException {
+		if( opened.compareAndSet(false, true) ) {
+	        loadPageFile();
+		}
+	}
+	
+    public void load() throws IOException {
+        synchronized (indexMutex) {
+	    	open();
+            pageFile.unload();
+            pageFile.delete();
+            loadPageFile();
+        }
+    }
+
+    
+	public void close() throws IOException, InterruptedException {
+		if( opened.compareAndSet(true, false)) {
+	        synchronized (indexMutex) {
+	            pageFile.unload();
+	        }
+		}
+	}
+	
+    public void unload() throws IOException, InterruptedException {
+        synchronized (indexMutex) {
+            if( pageFile.isLoaded() ) {
+                close();
+            }
+        }
+    }
+
+    public void processAdd(final KahaAddMessageCommand command, TransactionId txid, final ByteSequence data) throws IOException {
+        if (txid!=null) {
+            synchronized (indexMutex) {
+                ArrayList<Operation> inflightTx = getInflightTx(txid);
+                inflightTx.add(new AddOpperation(command, data));
+            }
+        } else {
+            synchronized (indexMutex) {
+                pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                    public void execute(Transaction tx) throws IOException {
+                        upadateIndex(tx, command, data);
+                    }
+                });
+            }
+        }
+    }
+
+    public void processRemove(final KahaRemoveMessageCommand command, TransactionId txid) throws IOException {
+        if (txid!=null) {
+            synchronized (indexMutex) {
+                ArrayList<Operation> inflightTx = getInflightTx(txid);
+                inflightTx.add(new RemoveOpperation(command));
+            }
+        } else {
+            synchronized (indexMutex) {
+                pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                    public void execute(Transaction tx) throws IOException {
+                        updateIndex(tx, command);
+                    }
+                });
+            }
+        }
+
+    }
+
+    public void process(final KahaRemoveDestinationCommand command) throws IOException {
+        synchronized (indexMutex) {
+            pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    updateIndex(tx, command);
+                }
+            });
+        }
+    }
+
+    public void process(final KahaSubscriptionCommand command) throws IOException {
+        synchronized (indexMutex) {
+            pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    updateIndex(tx, command);
+                }
+            });
+        }
+    }
+
+    public void processCommit(TransactionId key) throws IOException {
+        synchronized (indexMutex) {
+            ArrayList<Operation> inflightTx = inflightTransactions.remove(key);
+            if (inflightTx == null) {
+                inflightTx = preparedTransactions.remove(key);
+            }
+            if (inflightTx == null) {
+                return;
+            }
+
+            final ArrayList<Operation> messagingTx = inflightTx;
+            pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    for (Operation op : messagingTx) {
+                        op.execute(tx);
+                    }
+                }
+            });
+        }
+    }
+
+    public void processPrepare(TransactionId key) {
+        synchronized (indexMutex) {
+            ArrayList<Operation> tx = inflightTransactions.remove(key);
+            if (tx != null) {
+                preparedTransactions.put(key, tx);
+            }
+        }
+    }
+
+    public void processRollback(TransactionId key) {
+        synchronized (indexMutex) {
+            ArrayList<Operation> tx = inflightTransactions.remove(key);
+            if (tx == null) {
+                preparedTransactions.remove(key);
+            }
+        }
+    }
+
+    // /////////////////////////////////////////////////////////////////
+    // These methods do the actual index updates.
+    // /////////////////////////////////////////////////////////////////
+
+    protected final Object indexMutex = new Object();
+	private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
+
+    private void upadateIndex(Transaction tx, KahaAddMessageCommand command, ByteSequence data) throws IOException {
+        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
+
+        // Skip adding the message to the index if this is a topic and there are
+        // no subscriptions.
+        if (sd.subscriptions != null && sd.ackPositions.isEmpty()) {
+            return;
+        }
+
+        // Add the message.
+        long id = sd.nextMessageId++;
+        Long previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
+        if( previous == null ) {
+            sd.orderIndex.put(tx, id, new MessageRecord(command.getMessageId(), data));
+        } else {
+            // restore the previous value.. Looks like this was a redo of a previously
+            // added message.  We don't want to assing it a new id as the other indexes would 
+            // be wrong..
+            sd.messageIdIndex.put(tx, command.getMessageId(), previous);
+        }
+    }
+
+    private void updateIndex(Transaction tx, KahaRemoveMessageCommand command) throws IOException {
+        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
+        if (!command.hasSubscriptionKey()) {
+            
+            // In the queue case we just remove the message from the index..
+            Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
+            if (sequenceId != null) {
+                sd.orderIndex.remove(tx, sequenceId);
+            }
+        } else {
+            // In the topic case we need remove the message once it's been acked
+            // by all the subs
+            Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
+
+            // Make sure it's a valid message id...
+            if (sequence != null) {
+                String subscriptionKey = command.getSubscriptionKey();
+                Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, sequence);
+
+                // The following method handles deleting un-referenced messages.
+                removeAckByteSequence(tx, sd, subscriptionKey, prev);
+
+                // Add it to the new location set.
+                addAckByteSequence(sd, sequence, subscriptionKey);
+            }
+
+        }
+    }
+
+    private void updateIndex(Transaction tx, KahaRemoveDestinationCommand command) throws IOException {
+        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
+        sd.orderIndex.clear(tx);
+        sd.orderIndex.unload(tx);
+        tx.free(sd.orderIndex.getPageId());
+        
+        sd.messageIdIndex.clear(tx);
+        sd.messageIdIndex.unload(tx);
+        tx.free(sd.messageIdIndex.getPageId());
+
+        if (sd.subscriptions != null) {
+            sd.subscriptions.clear(tx);
+            sd.subscriptions.unload(tx);
+            tx.free(sd.subscriptions.getPageId());
+
+            sd.subscriptionAcks.clear(tx);
+            sd.subscriptionAcks.unload(tx);
+            tx.free(sd.subscriptionAcks.getPageId());
+        }
+
+        String key = key(command.getDestination());
+        storedDestinations.remove(key);
+        destinations.remove(tx, key);
+    }
+
+    private void updateIndex(Transaction tx, KahaSubscriptionCommand command) throws IOException {
+        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
+
+        // If set then we are creating it.. otherwise we are destroying the sub
+        if (command.hasSubscriptionInfo()) {
+            String subscriptionKey = command.getSubscriptionKey();
+            sd.subscriptions.put(tx, subscriptionKey, command);
+            long ackByteSequence=-1;
+            if (!command.getRetroactive()) {
+                ackByteSequence = sd.nextMessageId-1;
+            }
+
+            sd.subscriptionAcks.put(tx, subscriptionKey, ackByteSequence);
+            addAckByteSequence(sd, ackByteSequence, subscriptionKey);
+        } else {
+            // delete the sub...
+            String subscriptionKey = command.getSubscriptionKey();
+            sd.subscriptions.remove(tx, subscriptionKey);
+            Long prev = sd.subscriptionAcks.remove(tx, subscriptionKey);
+            if( prev!=null ) {
+                removeAckByteSequence(tx, sd, subscriptionKey, prev);
+            }
+        }
+
+    }
+    
+    public HashSet<Integer> getJournalFilesBeingReplicated() {
+		return journalFilesBeingReplicated;
+	}
+
+    // /////////////////////////////////////////////////////////////////
+    // StoredDestination related implementation methods.
+    // /////////////////////////////////////////////////////////////////
+
+
+	private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
+
+    class StoredSubscription {
+        SubscriptionInfo subscriptionInfo;
+        String lastAckId;
+        ByteSequence lastAckByteSequence;
+        ByteSequence cursor;
+    }
+    
+    static class MessageRecord {
+        final String messageId;
+        final ByteSequence data;
+        
+        public MessageRecord(String messageId, ByteSequence location) {
+            this.messageId=messageId;
+            this.data=location;
+        }
+        
+        @Override
+        public String toString() {
+            return "["+messageId+","+data+"]";
+        }
+    }
+    
+    static protected class MessageKeysMarshaller implements Marshaller<MessageRecord> {
+        static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller();
+        
+        public Class<MessageRecord> getType() {
+            return MessageRecord.class;
+        }
+
+        public MessageRecord readPayload(DataInput dataIn) throws IOException {
+            return new MessageRecord(dataIn.readUTF(), ByteSequenceMarshaller.INSTANCE.readPayload(dataIn));
+        }
+
+        public void writePayload(MessageRecord object, DataOutput dataOut) throws IOException {
+            dataOut.writeUTF(object.messageId);
+            ByteSequenceMarshaller.INSTANCE.writePayload(object.data, dataOut);
+        }
+    }
+    
+    static class StoredDestination {
+        long nextMessageId;
+        BTreeIndex<Long, MessageRecord> orderIndex;
+        BTreeIndex<String, Long> messageIdIndex;
+
+        // These bits are only set for Topics
+        BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
+        BTreeIndex<String, Long> subscriptionAcks;
+        HashMap<String, Long> subscriptionCursors;
+        TreeMap<Long, HashSet<String>> ackPositions;
+    }
+
+    protected class StoredDestinationMarshaller implements Marshaller<StoredDestination> {
+        public Class<StoredDestination> getType() {
+            return StoredDestination.class;
+        }
+
+        public StoredDestination readPayload(DataInput dataIn) throws IOException {
+            StoredDestination value = new StoredDestination();
+            value.orderIndex = new BTreeIndex<Long, MessageRecord>(pageFile, dataIn.readLong());
+            value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
+
+            if (dataIn.readBoolean()) {
+                value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
+                value.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
+            }
+            return value;
+        }
+
+        public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
+            dataOut.writeLong(value.orderIndex.getPageId());
+            dataOut.writeLong(value.messageIdIndex.getPageId());
+            if (value.subscriptions != null) {
+                dataOut.writeBoolean(true);
+                dataOut.writeLong(value.subscriptions.getPageId());
+                dataOut.writeLong(value.subscriptionAcks.getPageId());
+            } else {
+                dataOut.writeBoolean(false);
+            }
+        }
+    }
+
+    static class ByteSequenceMarshaller implements Marshaller<ByteSequence> {
+        final static ByteSequenceMarshaller INSTANCE = new ByteSequenceMarshaller();
+
+        public Class<ByteSequence> getType() {
+            return ByteSequence.class;
+        }
+
+        public ByteSequence readPayload(DataInput dataIn) throws IOException {
+        	byte data[] = new byte[dataIn.readInt()];
+        	dataIn.readFully(data);
+            return new ByteSequence(data);
+        }
+
+        public void writePayload(ByteSequence object, DataOutput dataOut) throws IOException {
+            dataOut.writeInt(object.getLength());
+            dataOut.write(object.getData(), object.getOffset(), object.getLength());
+        }
+    }
+
+    static class KahaSubscriptionCommandMarshaller implements Marshaller<KahaSubscriptionCommand> {
+        final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
+
+        public Class<KahaSubscriptionCommand> getType() {
+            return KahaSubscriptionCommand.class;
+        }
+
+        public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
+            KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
+            rc.mergeFramed((InputStream)dataIn);
+            return rc;
+        }
+
+        public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
+            object.writeFramed((OutputStream)dataOut);
+        }
+    }
+
+    protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
+        String key = key(destination);
+        StoredDestination rc = storedDestinations.get(key);
+        if (rc == null) {
+            boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
+            rc = loadStoredDestination(tx, key, topic);
+            // Cache it. We may want to remove/unload destinations from the
+            // cache that are not used for a while
+            // to reduce memory usage.
+            storedDestinations.put(key, rc);
+        }
+        return rc;
+    }
+
+    /**
+     * @param tx
+     * @param key
+     * @param topic
+     * @return
+     * @throws IOException
+     */
+    private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
+        // Try to load the existing indexes..
+        StoredDestination rc = destinations.get(tx, key);
+        if (rc == null) {
+            // Brand new destination.. allocate indexes for it.
+            rc = new StoredDestination();
+            rc.orderIndex = new BTreeIndex<Long, MessageRecord>(pageFile, tx.allocate());
+            rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
+
+            if (topic) {
+                rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
+                rc.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, tx.allocate());
+            }
+            destinations.put(tx, key, rc);
+        }
+
+        // Configure the marshalers and load.
+        rc.orderIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
+        rc.orderIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
+        rc.orderIndex.load(tx);
+
+        // Figure out the next key using the last entry in the destination.
+        Entry<Long, MessageRecord> lastEntry = rc.orderIndex.getLast(tx);
+        if( lastEntry!=null ) {
+            rc.nextMessageId = lastEntry.getKey()+1;
+        }
+
+        rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
+        rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
+        rc.messageIdIndex.load(tx);
+        
+        // If it was a topic...
+        if (topic) {
+
+            rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
+            rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
+            rc.subscriptions.load(tx);
+
+            rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
+            rc.subscriptionAcks.setValueMarshaller(LongMarshaller.INSTANCE);
+            rc.subscriptionAcks.load(tx);
+
+            rc.ackPositions = new TreeMap<Long, HashSet<String>>();
+            rc.subscriptionCursors = new HashMap<String, Long>();
+
+            for (Iterator<Entry<String, Long>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
+                Entry<String, Long> entry = iterator.next();
+                addAckByteSequence(rc, entry.getValue(), entry.getKey());
+            }
+
+        }
+        return rc;
+    }
+
+    /**
+     * @param sd
+     * @param messageSequence
+     * @param subscriptionKey
+     */
+    private void addAckByteSequence(StoredDestination sd, Long messageSequence, String subscriptionKey) {
+        HashSet<String> hs = sd.ackPositions.get(messageSequence);
+        if (hs == null) {
+            hs = new HashSet<String>();
+            sd.ackPositions.put(messageSequence, hs);
+        }
+        hs.add(subscriptionKey);
+    }
+
+    /**
+     * @param tx
+     * @param sd
+     * @param subscriptionKey
+     * @param sequenceId
+     * @throws IOException
+     */
+    private void removeAckByteSequence(Transaction tx, StoredDestination sd, String subscriptionKey, Long sequenceId) throws IOException {
+        // Remove the sub from the previous location set..
+        if (sequenceId != null) {
+            HashSet<String> hs = sd.ackPositions.get(sequenceId);
+            if (hs != null) {
+                hs.remove(subscriptionKey);
+                if (hs.isEmpty()) {
+                    HashSet<String> firstSet = sd.ackPositions.values().iterator().next();
+                    sd.ackPositions.remove(sequenceId);
+
+                    // Did we just empty out the first set in the
+                    // ordered list of ack locations? Then it's time to
+                    // delete some messages.
+                    if (hs == firstSet) {
+
+                        // Find all the entries that need to get deleted.
+                        ArrayList<Entry<Long, MessageRecord>> deletes = new ArrayList<Entry<Long, MessageRecord>>();
+                        for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
+                            Entry<Long, MessageRecord> entry = iterator.next();
+                            if (entry.getKey().compareTo(sequenceId) <= 0) {
+                                // We don't do the actually delete while we are
+                                // iterating the BTree since
+                                // iterating would fail.
+                                deletes.add(entry);
+                            }
+                        }
+
+                        // Do the actual deletes.
+                        for (Entry<Long, MessageRecord> entry : deletes) {
+                            sd.messageIdIndex.remove(tx,entry.getValue().messageId);
+                            sd.orderIndex.remove(tx,entry.getKey());
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private String key(KahaDestination destination) {
+        return destination.getType().getNumber() + ":" + destination.getName();
+    }
+
+    // /////////////////////////////////////////////////////////////////
+    // Transaction related implementation methods.
+    // /////////////////////////////////////////////////////////////////
+    protected final LinkedHashMap<TransactionId, ArrayList<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
+    protected final LinkedHashMap<TransactionId, ArrayList<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
+ 
+    private ArrayList<Operation> getInflightTx(TransactionId key) {
+        ArrayList<Operation> tx = inflightTransactions.get(key);
+        if (tx == null) {
+            tx = new ArrayList<Operation>();
+            inflightTransactions.put(key, tx);
+        }
+        return tx;
+    }
+
+    abstract class Operation {
+        abstract public void execute(Transaction tx) throws IOException;
+    }
+
+    class AddOpperation extends Operation {
+        final KahaAddMessageCommand command;
+		private final ByteSequence data;
+
+        public AddOpperation(KahaAddMessageCommand command, ByteSequence location) {
+            this.command = command;
+			this.data = location;
+        }
+
+        public void execute(Transaction tx) throws IOException {
+            upadateIndex(tx, command, data);
+        }
+
+        public KahaAddMessageCommand getCommand() {
+            return command;
+        }
+    }
+
+    class RemoveOpperation extends Operation {
+        final KahaRemoveMessageCommand command;
+
+        public RemoveOpperation(KahaRemoveMessageCommand command) {
+            this.command = command;
+        }
+
+        public void execute(Transaction tx) throws IOException {
+            updateIndex(tx, command);
+        }
+
+        public KahaRemoveMessageCommand getCommand() {
+            return command;
+        }
+    }
+
+    // /////////////////////////////////////////////////////////////////
+    // Initialization related implementation methods.
+    // /////////////////////////////////////////////////////////////////
+
+    private PageFile createPageFile() {
+        PageFile index = new PageFile(directory, "temp-db");
+        index.setEnableWriteThread(isEnableIndexWriteAsync());
+        index.setWriteBatchSize(getIndexWriteBatchSize());
+        index.setEnableDiskSyncs(false);
+        index.setEnableRecoveryFile(false);
+        return index;
+    }
+
+    public File getDirectory() {
+        return directory;
+    }
+
+    public void setDirectory(File directory) {
+        this.directory = directory;
+    }
+    
+    public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
+        this.setIndexWriteBatchSize = setIndexWriteBatchSize;
+    }
+
+    public int getIndexWriteBatchSize() {
+        return setIndexWriteBatchSize;
+    }
+    
+    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
+        this.enableIndexWriteAsync = enableIndexWriteAsync;
+    }
+    
+    boolean isEnableIndexWriteAsync() {
+        return enableIndexWriteAsync;
+    }
+        
+    public PageFile getPageFile() {
+        if (pageFile == null) {
+            pageFile = createPageFile();
+        }
+		return pageFile;
+	}
+
+}

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/TempKahaDBStoreBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/TempKahaDBStoreBrokerTest.java?rev=741769&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/TempKahaDBStoreBrokerTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/TempKahaDBStoreBrokerTest.java Fri Feb  6 22:49:01 2009
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import java.io.File;
+
+import junit.framework.Test;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerTest;
+
+/**
+ * Once the wire format is completed we can test against real persistence storage.
+ * 
+ * @version $Revision: 712224 $
+ */
+public class TempKahaDBStoreBrokerTest extends BrokerTest {
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        KahaDBStore kaha = new KahaDBStore();
+        kaha.setDirectory(new File("target/activemq-data/kahadb"));
+        kaha.deleteAllMessages();
+        broker.setPersistenceAdapter(kaha);
+        return broker;
+    }
+    
+    protected BrokerService createRestartedBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        TempKahaDBStore kaha = new TempKahaDBStore();
+        kaha.setDirectory(new File("target/activemq-data/kahadb"));
+        broker.setPersistenceAdapter(kaha);
+        return broker;
+    }
+    
+    
+    public static Test suite() {
+        return suite(TempKahaDBStoreBrokerTest.class);
+    }
+    
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+}

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/perf/TempKahaStoreQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/perf/TempKahaStoreQueueTest.java?rev=741769&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/perf/TempKahaStoreQueueTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/perf/TempKahaStoreQueueTest.java Fri Feb  6 22:49:01 2009
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.perf;
+
+import java.io.File;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.perf.SimpleQueueTest;
+import org.apache.activemq.store.kahadb.TempKahaDBStore;
+
+/**
+ * @version $Revision: 712224 $
+ */
+public class TempKahaStoreQueueTest extends SimpleQueueTest {
+
+    protected void configureBroker(BrokerService answer,String uri) throws Exception {
+        File dataFileDir = new File("target/test-amq-data/perfTest/temp-amqdb");
+        dataFileDir.mkdirs();
+        answer.setDeleteAllMessagesOnStartup(true);
+               
+         TempKahaDBStore adaptor = new TempKahaDBStore();
+         adaptor.setDirectory(dataFileDir);
+         
+        
+        answer.setDataDirectoryFile(dataFileDir);
+        answer.setPersistenceAdapter(adaptor);
+        answer.addConnector(uri);
+    }
+
+}
+



Mime
View raw message