activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r392434 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/kahadaptor/ test/java/org/apache/activemq/broker/store/
Date Fri, 07 Apr 2006 22:02:09 GMT
Author: rajdavies
Date: Fri Apr  7 15:02:07 2006
New Revision: 392434

URL: http://svn.apache.org/viewcvs?rev=392434&view=rev
Log:
fix for http://issues.apache.org/activemq/browse/AMQ-676

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransaction.java
  (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
  (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TransactionMarshaller.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TxCommand.java
  (with props)
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/KahaXARecoveryBrokerTest.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?rev=392434&r1=392433&r2=392434&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
Fri Apr  7 15:02:07 2006
@@ -38,6 +38,10 @@
         this.messageContainer=container;
         this.destination=destination;
     }
+    
+    public Object getId(){
+        return messageContainer.getId();
+    }
 
     public void addMessage(ConnectionContext context,Message message) throws IOException{
         messageContainer.put(message.getMessageId().toString(),message);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java?rev=392434&r1=392433&r2=392434&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java
Fri Apr  7 15:02:07 2006
@@ -18,7 +18,6 @@
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
-
 import org.apache.activeio.command.WireFormat;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -34,8 +33,6 @@
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TransactionStore;
-import org.apache.activemq.store.memory.MemoryTransactionStore;
-
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
 /**
  * @org.apache.xbean.XBean
@@ -43,20 +40,21 @@
  * @version $Revision: 1.4 $
  */
 public class KahaPersistentAdaptor implements PersistenceAdapter{
-    MemoryTransactionStore transactionStore;
+    static final String PREPARED_TRANSACTIONS_NAME="PreparedTransactions";
+    KahaTransactionStore transactionStore;
     ConcurrentHashMap topics=new ConcurrentHashMap();
     ConcurrentHashMap queues=new ConcurrentHashMap();
+    ConcurrentHashMap messageStores=new ConcurrentHashMap();
     private boolean useExternalMessageReferences;
-    private WireFormat wireFormat = new OpenWireFormat();
+    private OpenWireFormat wireFormat=new OpenWireFormat();
     Store store;
 
     public KahaPersistentAdaptor(File dir) throws IOException{
-        if (!dir.exists()){
+        if(!dir.exists()){
             dir.mkdirs();
         }
-        String name = dir.getAbsolutePath() + File.separator + "kaha.db";
+        String name=dir.getAbsolutePath()+File.separator+"kaha.db";
         store=StoreFactory.open(name,"rw");
-        
     }
 
     public Set getDestinations(){
@@ -74,6 +72,7 @@
         MessageStore rc=(MessageStore) queues.get(destination);
         if(rc==null){
             rc=new KahaMessageStore(getMapContainer(destination),destination);
+            messageStores.put(destination, rc);
             if(transactionStore!=null){
                 rc=transactionStore.proxy(rc);
             }
@@ -92,17 +91,28 @@
             ackContainer.setValueMarshaller(new AtomicIntegerMarshaller());
             ackContainer.load();
             rc=new KahaTopicMessageStore(store,messageContainer,ackContainer,subsContainer,destination);
+            messageStores.put(destination, rc);
             if(transactionStore!=null){
                 rc=transactionStore.proxy(rc);
             }
             topics.put(destination,rc);
+            
         }
         return rc;
     }
 
+    protected MessageStore retrieveMessageStore(Object id){
+        MessageStore result =  (MessageStore) messageStores.get(id);
+        return result;
+    }
+
     public TransactionStore createTransactionStore() throws IOException{
         if(transactionStore==null){
-            transactionStore=new MemoryTransactionStore();
+            MapContainer container=store.getMapContainer(PREPARED_TRANSACTIONS_NAME);
+            container.setKeyMarshaller(new CommandMarshaller(wireFormat));
+            container.setValueMarshaller(new TransactionMarshaller(wireFormat));
+            container.load();
+            transactionStore=new KahaTransactionStore(this,container);
         }
         return transactionStore;
     }
@@ -155,8 +165,8 @@
     }
 
     /**
-     * @param usageManager The UsageManager that is controlling the broker's memory usage.
+     * @param usageManager
+     *            The UsageManager that is controlling the broker's memory usage.
      */
-    public void setUsageManager(UsageManager usageManager) {
-    }
+    public void setUsageManager(UsageManager usageManager){}
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?rev=392434&r1=392433&r2=392434&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
Fri Apr  7 15:02:07 2006
@@ -16,7 +16,6 @@
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Map.Entry;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
@@ -55,7 +54,6 @@
     public synchronized void addMessage(ConnectionContext context,Message message) throws
IOException{
         int subscriberCount=subscriberAcks.size();
         if(subscriberCount>0){
-            super.addMessage(context,message);
             String id=message.getMessageId().toString();
             ackContainer.put(id,new AtomicInteger(subscriberCount));
             for(Iterator i=subscriberAcks.keySet().iterator();i.hasNext();){
@@ -63,6 +61,7 @@
                 ListContainer container=store.getListContainer(key);
                 container.add(id);
             }
+            super.addMessage(context,message);
         }
     }
 
@@ -79,7 +78,7 @@
                     ackContainer.put(id,count);
                 }else{
                     // no more references to message messageContainer so remove it
-                    container.remove(id);
+                    super.removeMessage(messageId);
                 }
             }
         }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransaction.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransaction.java?rev=392434&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransaction.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransaction.java
Fri Apr  7 15:02:07 2006
@@ -0,0 +1,100 @@
+/**
+ * 
+ * Copyright 2005-2006 The Apache Software Foundation
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.store.kahadaptor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.activemq.command.BaseCommand;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.store.MessageStore;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+/**
+ * Stores a messages/acknowledgements for a transaction
+ * 
+ * @version $Revision: 1.4 $
+ */
+class KahaTransaction{
+    private static final Log log=LogFactory.getLog(KahaTransaction.class);
+    protected List list=new ArrayList();
+
+    
+     void add(KahaMessageStore store,BaseCommand command){
+        TxCommand tx=new TxCommand();
+        tx.setCommand(command);
+        tx.setMessageStoreKey(store.getId());
+        list.add(tx);
+    }
+
+    Message[] getMessages(){
+        List result=new ArrayList();
+        for(int i=0;i<list.size();i++){
+            TxCommand command=(TxCommand) list.get(i);
+            if(command.isAdd()){
+                result.add(command.getCommand());
+            }
+        }
+        Message[] messages=new Message[result.size()];
+        return (Message[]) result.toArray(messages);
+    }
+
+    MessageAck[] getAcks(){
+        List result=new ArrayList();
+        for(int i=0;i<list.size();i++){
+            TxCommand command=(TxCommand) list.get(i);
+            if(command.isRemove()){
+                result.add(command.getCommand());
+            }
+        }
+        MessageAck[] acks=new MessageAck[result.size()];
+        return (MessageAck[]) result.toArray(acks);
+    }
+
+    void prepare(){}
+
+    void rollback(){
+        list.clear();
+    }
+
+    /**
+     * @throws IOException
+     */
+    void commit(KahaTransactionStore transactionStore) throws IOException{
+        for(int i=0;i<list.size();i++){
+            TxCommand command=(TxCommand) list.get(i);
+            MessageStore ms=transactionStore.getStoreById(command.getMessageStoreKey());
+            if(command.isAdd()){
+                ms.addMessage(null,(Message) command.getCommand());
+            }
+        }
+        for(int i=0;i<list.size();i++){
+            TxCommand command=(TxCommand) list.get(i);
+            MessageStore ms=transactionStore.getStoreById(command.getMessageStoreKey());
+            if(command.isRemove()){
+                ms.removeMessage(null,(MessageAck) command.getCommand());
+            }
+        }
+    }
+    
+    List getList(){
+        return new ArrayList(list);
+    }
+    
+    void setList(List list){
+        this.list = list;
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransaction.java
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java?rev=392434&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
Fri Apr  7 15:02:07 2006
@@ -0,0 +1,176 @@
+/**
+ * 
+ * Copyright 2005-2006 The Apache Software Foundation
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.store.kahadaptor;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import javax.transaction.xa.XAException;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.ProxyMessageStore;
+import org.apache.activemq.store.ProxyTopicMessageStore;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TransactionRecoveryListener;
+import org.apache.activemq.store.TransactionStore;
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+/**
+ * Provides a TransactionStore implementation that can create transaction aware MessageStore
objects from non
+ * transaction aware MessageStore objects.
+ * 
+ * @version $Revision: 1.4 $
+ */
+public class KahaTransactionStore implements TransactionStore{
+    private Map transactions=new ConcurrentHashMap();
+    private Map prepared;
+    private KahaPersistentAdaptor adaptor;
+
+    KahaTransactionStore(KahaPersistentAdaptor adaptor,Map preparedMap){
+        this.adaptor=adaptor;
+        this.prepared=preparedMap;
+    }
+
+    public MessageStore proxy(MessageStore messageStore){
+        return new ProxyMessageStore(messageStore){
+            public void addMessage(ConnectionContext context,final Message send) throws IOException{
+                KahaTransactionStore.this.addMessage(getDelegate(),send);
+            }
+
+            public void removeMessage(ConnectionContext context,final MessageAck ack) throws
IOException{
+                KahaTransactionStore.this.removeMessage(getDelegate(),ack);
+            }
+        };
+    }
+
+    public TopicMessageStore proxy(TopicMessageStore messageStore){
+        return new ProxyTopicMessageStore(messageStore){
+            public void addMessage(ConnectionContext context,final Message send) throws IOException{
+                KahaTransactionStore.this.addMessage(getDelegate(),send);
+            }
+
+            public void removeMessage(ConnectionContext context,final MessageAck ack) throws
IOException{
+                KahaTransactionStore.this.removeMessage(getDelegate(),ack);
+            }
+        };
+    }
+
+    /**
+     * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
+     */
+    public void prepare(TransactionId txid){
+        KahaTransaction tx=getTx(txid);
+        if(tx!=null){
+            tx.prepare();
+            prepared.put(txid,tx);
+        }
+    }
+
+    /**
+     * @throws XAException
+     * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
+     */
+    public void commit(TransactionId txid,boolean wasPrepared) throws IOException{
+        KahaTransaction tx=getTx(txid);
+        if(tx!=null){
+            tx.commit(this);
+            removeTx(txid);
+        }
+    }
+
+    /**
+     * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
+     */
+    public void rollback(TransactionId txid){
+        KahaTransaction tx=getTx(txid);
+        if(tx!=null){
+            tx.rollback();
+            removeTx(txid);
+        }
+    }
+
+    public void start() throws Exception{}
+
+    public void stop() throws Exception{}
+
+    synchronized public void recover(TransactionRecoveryListener listener) throws IOException{
+        for(Iterator i=prepared.entrySet().iterator();i.hasNext();){
+            Map.Entry entry=(Entry) i.next();
+            XATransactionId xid=(XATransactionId) entry.getKey();
+            KahaTransaction kt=(KahaTransaction) entry.getValue();
+            listener.recover(xid,kt.getMessages(),kt.getAcks());
+        }
+    }
+
+    /**
+     * @param message
+     * @throws IOException
+     */
+    void addMessage(final MessageStore destination,final Message message) throws IOException{
+        if(message.isInTransaction()){
+            KahaTransaction tx=getOrCreateTx(message.getTransactionId());
+            tx.add((KahaMessageStore) destination,message);
+        }else{
+            destination.addMessage(null,message);
+        }
+    }
+
+    /**
+     * @param ack
+     * @throws IOException
+     */
+    private void removeMessage(final MessageStore destination,final MessageAck ack) throws
IOException{
+        if(ack.isInTransaction()){
+            KahaTransaction tx=getOrCreateTx(ack.getTransactionId());
+            tx.add((KahaMessageStore) destination,ack);
+        }else{
+            destination.removeMessage(null,ack);
+        }
+    }
+
+    protected synchronized KahaTransaction getTx(TransactionId key){
+        KahaTransaction result=(KahaTransaction) transactions.get(key);
+        if(result==null){
+            result=(KahaTransaction) prepared.get(key);
+        }
+        return result;
+    }
+
+    protected synchronized KahaTransaction getOrCreateTx(TransactionId key){
+        KahaTransaction result=(KahaTransaction) transactions.get(key);
+        if(result==null){
+            result=new KahaTransaction();
+            transactions.put(key,result);
+        }
+        return result;
+    }
+
+    protected synchronized void removeTx(TransactionId key){
+        transactions.remove(key);
+        prepared.remove(key);
+    }
+
+    public void delete(){
+        transactions.clear();
+        prepared.clear();
+    }
+
+    protected MessageStore getStoreById(Object id){
+        return adaptor.retrieveMessageStore(id);
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TransactionMarshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TransactionMarshaller.java?rev=392434&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TransactionMarshaller.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TransactionMarshaller.java
Fri Apr  7 15:02:07 2006
@@ -0,0 +1,87 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.store.kahadaptor;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activeio.command.WireFormat;
+import org.apache.activeio.packet.ByteArrayPacket;
+import org.apache.activeio.packet.Packet;
+import org.apache.activemq.command.BaseCommand;
+import org.apache.activemq.kaha.Marshaller;
+
+/**
+ * Marshall a Transaction
+ * @version $Revision: 1.10 $
+ */
+public class TransactionMarshaller implements Marshaller{
+    
+    private WireFormat wireFormat;
+    public TransactionMarshaller(WireFormat wireFormat){
+        this.wireFormat = wireFormat;
+      
+    }
+    
+    public void writePayload(Object object,DataOutputStream dataOut) throws IOException{
+        KahaTransaction kt = (KahaTransaction) object;
+        List list = kt.getList();
+        dataOut.writeInt(list.size());
+        for (int i = 0; i < list.size(); i++){
+            TxCommand tx = (TxCommand) list.get(i);
+            Object key = tx.getMessageStoreKey();
+            Packet packet = wireFormat.marshal(key);
+            byte[] data = packet.sliceAsBytes();
+            dataOut.writeInt(data.length);
+            dataOut.write(data);
+            Object command = tx.getCommand();
+            packet = wireFormat.marshal(command);
+            data = packet.sliceAsBytes();
+            dataOut.writeInt(data.length);
+            dataOut.write(data);
+            
+        }
+       }
+
+   
+    public Object readPayload(DataInputStream dataIn) throws IOException{
+        KahaTransaction result = new KahaTransaction();
+        List list = new ArrayList();
+        result.setList(list);
+        int number=dataIn.readInt();
+        for (int i = 0; i < number; i++){
+            TxCommand command = new TxCommand();
+            int size = dataIn.readInt();
+            byte[] data=new byte[size];
+            dataIn.readFully(data);
+            Object key =  wireFormat.unmarshal(new ByteArrayPacket(data));
+            command.setMessageStoreKey(key);
+            size = dataIn.readInt();
+            data=new byte[size];
+            dataIn.readFully(data);
+            BaseCommand bc =  (BaseCommand) wireFormat.unmarshal(new ByteArrayPacket(data));
+            command.setCommand(bc);
+            list.add(command);
+        }
+        return result;
+       
+    }
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TxCommand.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TxCommand.java?rev=392434&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TxCommand.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TxCommand.java
Fri Apr  7 15:02:07 2006
@@ -0,0 +1,76 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadaptor;
+
+import org.apache.activemq.command.BaseCommand;
+import org.apache.activemq.command.CommandTypes;
+
+
+/**
+ * Base class for  messages/acknowledgements for a transaction
+ * 
+ * @version $Revision: 1.4 $
+ */
+class TxCommand {
+        protected Object messageStoreKey;
+        protected BaseCommand command;
+
+        /**
+         * @return Returns the messageStoreKey.
+         */
+        public Object getMessageStoreKey(){
+            return messageStoreKey;
+        }
+
+        /**
+         * @param messageStoreKey The messageStoreKey to set.
+         */
+        public void setMessageStoreKey(Object messageStoreKey){
+            this.messageStoreKey=messageStoreKey;
+        }
+
+        /**
+         * @return Returns the command.
+         */
+        public BaseCommand getCommand(){
+            return command;
+        }
+
+        /**
+         * @param command The command to set.
+         */
+        public void setCommand(BaseCommand command){
+            this.command=command;
+        }
+        
+        /**
+         * @return true if a Message command
+         */
+        public boolean isAdd(){
+            return command != null && command.getDataStructureType() != CommandTypes.MESSAGE_ACK;
+        }
+        
+        /**
+         * @return true if a MessageAck command
+         */
+        public boolean isRemove(){
+            return command != null && command.getDataStructureType() == CommandTypes.MESSAGE_ACK;
+        }
+
+  
+    
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TxCommand.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/KahaXARecoveryBrokerTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/KahaXARecoveryBrokerTest.java?rev=392434&r1=392433&r2=392434&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/KahaXARecoveryBrokerTest.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/KahaXARecoveryBrokerTest.java
Fri Apr  7 15:02:07 2006
@@ -16,10 +16,12 @@
  */
 package org.apache.activemq.broker.store;
 
+import java.io.File;
 import junit.framework.Test;
 
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.XARecoveryBrokerTest;
+import org.apache.activemq.store.kahadaptor.KahaPersistentAdaptor;
 import org.apache.activemq.xbean.BrokerFactoryBean;
 import org.springframework.core.io.ClassPathResource;
 
@@ -39,17 +41,18 @@
     }
 
     protected BrokerService createBroker() throws Exception {
-        BrokerFactoryBean brokerFactory=new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/store/kahabroker.xml"));
-        brokerFactory.afterPropertiesSet();
-        BrokerService broker =  brokerFactory.getBroker();
+        BrokerService broker = createRestartedBroker();
         broker.setDeleteAllMessagesOnStartup(true);
         return broker;
     }
     
     protected BrokerService createRestartedBroker() throws Exception {
-        BrokerFactoryBean brokerFactory=new BrokerFactoryBean(new ClassPathResource("org/apache/activemq/broker/store/kahabroker.xml"));
-        brokerFactory.afterPropertiesSet();
-        return brokerFactory.getBroker();
+        BrokerService broker = new BrokerService();
+       
+        KahaPersistentAdaptor adaptor = new KahaPersistentAdaptor(new File("activemq-data/storetest"));
+        broker.setPersistenceAdapter(adaptor);
+        broker.addConnector("tcp://localhost:0");
+        return broker;
     }
     
 }



Mime
View raw message