activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r557389 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store: amq/AMQTransactionStore.java amq/AMQTx.java amq/AMQTxOperation.java kahadaptor/AMQTxMarshaller.java
Date Wed, 18 Jul 2007 20:32:46 GMT
Author: rajdavies
Date: Wed Jul 18 13:32:45 2007
New Revision: 557389

URL: http://svn.apache.org/viewvc?view=rev&rev=557389
Log:
Split out Transaction class from AMQTrandactionStore -

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTxOperation.java
  (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTx.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java?view=diff&rev=557389&r1=557388&r2=557389
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java
Wed Jul 18 13:32:45 2007
@@ -19,13 +19,10 @@
 package org.apache.activemq.store.amq;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
-
 import javax.transaction.xa.XAException;
-
 import org.apache.activemq.command.JournalTopicAck;
 import org.apache.activemq.command.JournalTransaction;
 import org.apache.activemq.command.Message;
@@ -39,92 +36,15 @@
 
 /**
  */
-public class AMQTransactionStore implements TransactionStore {
+public class AMQTransactionStore implements TransactionStore{
 
     private final AMQPersistenceAdapter peristenceAdapter;
-    Map<TransactionId, Tx> inflightTransactions = new LinkedHashMap<TransactionId,
Tx>();
-    Map<TransactionId, Tx> preparedTransactions = new LinkedHashMap<TransactionId,
Tx>();
+    Map<TransactionId,AMQTx> inflightTransactions=new LinkedHashMap<TransactionId,AMQTx>();
+    Map<TransactionId,AMQTx> preparedTransactions=new LinkedHashMap<TransactionId,AMQTx>();
     private boolean doingRecover;
 
-    
-    public static class TxOperation {
-        
-        static final byte ADD_OPERATION_TYPE       = 0;
-        static final byte REMOVE_OPERATION_TYPE    = 1;
-        static final byte ACK_OPERATION_TYPE       = 3;
-        
-        public byte operationType;
-        public AMQMessageStore store;
-        public Object data;
-        public Location location;
-        
-        public TxOperation(byte operationType, AMQMessageStore store, Object data, Location
location) {
-            this.operationType=operationType;
-            this.store=store;
-            this.data=data;
-            this.location=location;
-        }
-        
-    }
-    /**
-     * Operations
-     * @version $Revision: 1.6 $
-     */
-    public static class Tx {
-
-        private final Location location;
-        private ArrayList<TxOperation> operations = new ArrayList<TxOperation>();
-
-        public Tx(Location location) {
-            this.location=location;
-        }
-
-        public void add(AMQMessageStore store, Message msg, Location location) {
-            operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg, location));
-        }
-
-        public void add(AMQMessageStore store, MessageAck ack) {
-            operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack,
null));
-        }
-
-        public void add(AMQTopicMessageStore store, JournalTopicAck ack) {
-            operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack, null));
-        }
-        
-        public Message[] getMessages() {
-            ArrayList<Object> list = new ArrayList<Object>();
-            for (Iterator<TxOperation> iter = operations.iterator(); iter.hasNext();)
{
-                TxOperation op = iter.next();
-                if( op.operationType==TxOperation.ADD_OPERATION_TYPE ) {
-                    list.add(op.data);
-                }
-            }
-            Message rc[] = new Message[list.size()];
-            list.toArray(rc);
-            return rc;
-        }
-
-        public MessageAck[] getAcks() {
-            ArrayList<Object> list = new ArrayList<Object>();
-            for (Iterator<TxOperation> iter = operations.iterator(); iter.hasNext();)
{
-                TxOperation op = iter.next();
-                if( op.operationType==TxOperation.REMOVE_OPERATION_TYPE ) {
-                    list.add(op.data);
-                }
-            }
-            MessageAck rc[] = new MessageAck[list.size()];
-            list.toArray(rc);
-            return rc;
-        }
-
-        public ArrayList<TxOperation> getOperations() {
-            return operations;
-        }
-
-    }
-
-    public AMQTransactionStore(AMQPersistenceAdapter adapter) {
-        this.peristenceAdapter = adapter;
+    public AMQTransactionStore(AMQPersistenceAdapter adapter){
+        this.peristenceAdapter=adapter;
     }
 
     /**
@@ -132,7 +52,7 @@
      * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
      */
     public void prepare(TransactionId txid) throws IOException{
-        Tx tx=null;
+        AMQTx tx=null;
         synchronized(inflightTransactions){
             tx=inflightTransactions.remove(txid);
         }
@@ -143,13 +63,13 @@
             preparedTransactions.put(txid,tx);
         }
     }
-    
+
     /**
      * @throws IOException
      * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
      */
     public void replayPrepare(TransactionId txid) throws IOException{
-        Tx tx=null;
+        AMQTx tx=null;
         synchronized(inflightTransactions){
             tx=inflightTransactions.remove(txid);
         }
@@ -160,13 +80,13 @@
         }
     }
 
-    public Tx getTx(TransactionId txid,Location location){
-        Tx tx=null;
+    public AMQTx getTx(TransactionId txid,Location location){
+        AMQTx tx=null;
         synchronized(inflightTransactions){
             tx=inflightTransactions.get(txid);
         }
         if(tx==null){
-            tx=new Tx(location);
+            tx=new AMQTx(location);
             inflightTransactions.put(txid,tx);
         }
         return tx;
@@ -177,7 +97,7 @@
      * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
      */
     public void commit(TransactionId txid,boolean wasPrepared) throws IOException{
-        Tx tx;
+        AMQTx tx;
         if(wasPrepared){
             synchronized(preparedTransactions){
                 tx=preparedTransactions.remove(txid);
@@ -201,7 +121,7 @@
      * @throws XAException
      * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
      */
-    public Tx replayCommit(TransactionId txid,boolean wasPrepared) throws IOException{
+    public AMQTx replayCommit(TransactionId txid,boolean wasPrepared) throws IOException{
         if(wasPrepared){
             synchronized(preparedTransactions){
                 return preparedTransactions.remove(txid);
@@ -218,7 +138,7 @@
      * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
      */
     public void rollback(TransactionId txid) throws IOException{
-        Tx tx=null;
+        AMQTx tx=null;
         synchronized(inflightTransactions){
             tx=inflightTransactions.remove(txid);
         }
@@ -251,13 +171,13 @@
             }
         }
     }
-    
-    public void start() throws Exception {
+
+    public void start() throws Exception{
     }
 
-    public void stop() throws Exception {
+    public void stop() throws Exception{
     }
-    
+
     synchronized public void recover(TransactionRecoveryListener listener) throws IOException{
         // All the in-flight transactions get rolled back..
         synchronized(inflightTransactions){
@@ -265,13 +185,13 @@
         }
         this.doingRecover=true;
         try{
-            Map<TransactionId, Tx> txs=null;
+            Map<TransactionId,AMQTx> txs=null;
             synchronized(preparedTransactions){
-                txs=new LinkedHashMap<TransactionId, Tx>(preparedTransactions);
+                txs=new LinkedHashMap<TransactionId,AMQTx>(preparedTransactions);
             }
             for(Iterator<TransactionId> iter=txs.keySet().iterator();iter.hasNext();){
                 Object txid=iter.next();
-                Tx tx=txs.get(txid);
+                AMQTx tx=txs.get(txid);
                 listener.recover((XATransactionId)txid,tx.getMessages(),tx.getAcks());
             }
         }finally{
@@ -283,26 +203,24 @@
      * @param message
      * @throws IOException
      */
-    void addMessage(AMQMessageStore store, Message message, Location location) throws IOException
{
-        Tx tx = getTx(message.getTransactionId(), location);
-        tx.add(store, message, location);
+    void addMessage(AMQMessageStore store,Message message,Location location) throws IOException{
+        AMQTx tx=getTx(message.getTransactionId(),location);
+        tx.add(store,message,location);
     }
 
     /**
      * @param ack
      * @throws IOException
      */
-    public void removeMessage(AMQMessageStore store, MessageAck ack, Location location) throws
IOException {
-        Tx tx = getTx(ack.getTransactionId(), location);
-        tx.add(store, ack);
-    }
-    
-    
-    public void acknowledge(AMQTopicMessageStore store, JournalTopicAck ack, Location location)
{
-        Tx tx = getTx(ack.getTransactionId(), location);
-        tx.add(store, ack);
+    public void removeMessage(AMQMessageStore store,MessageAck ack,Location location) throws
IOException{
+        AMQTx tx=getTx(ack.getTransactionId(),location);
+        tx.add(store,ack);
     }
 
+    public void acknowledge(AMQTopicMessageStore store,JournalTopicAck ack,Location location){
+        AMQTx tx=getTx(ack.getTransactionId(),location);
+        tx.add(store,ack);
+    }
 
     public Location checkpoint() throws IOException{
         // Nothing really to checkpoint.. since, we don't
@@ -312,18 +230,18 @@
         // roll over active tx records.
         Location rc=null;
         synchronized(inflightTransactions){
-            for(Iterator<Tx> iter=inflightTransactions.values().iterator();iter.hasNext();){
-                Tx tx=iter.next();
-                Location location=tx.location;
+            for(Iterator<AMQTx> iter=inflightTransactions.values().iterator();iter.hasNext();){
+                AMQTx tx=iter.next();
+                Location location=tx.getLocation();
                 if(rc==null||rc.compareTo(location)<0){
                     rc=location;
                 }
             }
         }
         synchronized(preparedTransactions){
-            for(Iterator<Tx> iter=preparedTransactions.values().iterator();iter.hasNext();){
-                Tx tx=iter.next();
-                Location location=tx.location;
+            for(Iterator<AMQTx> iter=preparedTransactions.values().iterator();iter.hasNext();){
+                AMQTx tx=iter.next();
+                Location location=tx.getLocation();
                 if(rc==null||rc.compareTo(location)<0){
                     rc=location;
                 }
@@ -332,9 +250,24 @@
         }
     }
 
-    public boolean isDoingRecover() {
+    public boolean isDoingRecover(){
         return doingRecover;
     }
 
+    /**
+     * @return the preparedTransactions
+     */
+    public Map<TransactionId,AMQTx> getPreparedTransactions(){
+        return this.preparedTransactions;
+    }
 
+    /**
+     * @param preparedTransactions the preparedTransactions to set
+     */
+    public void setPreparedTransactions(Map<TransactionId,AMQTx> preparedTransactions){
+        if(preparedTransactions!=null){
+            this.preparedTransactions.clear();
+            this.preparedTransactions.putAll(preparedTransactions);
+        }
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTx.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTx.java?view=diff&rev=557389&r1=557388&r2=557389
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTx.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTx.java Wed
Jul 18 13:32:45 2007
@@ -15,10 +15,84 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.activemq.store.amq;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import org.apache.activemq.command.JournalTopicAck;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.kaha.impl.async.Location;
+
+
+/**
+ */
 /**
- * @version $Revision: 1.1 $
+ * Operations
+ * @version $Revision: 1.6 $
  */
-public interface AMQTx {
+public class AMQTx{
+
+    private final Location location;
+    private ArrayList<AMQTxOperation> operations=new ArrayList<AMQTxOperation>();
+
+    public AMQTx(Location location){
+        this.location=location;
+    }
+
+    public void add(AMQMessageStore store,Message msg,Location location){
+        operations.add(new AMQTxOperation(AMQTxOperation.ADD_OPERATION_TYPE,store.getDestination(),msg,location));
+    }
+
+    public void add(AMQMessageStore store,MessageAck ack){
+        operations.add(new AMQTxOperation(AMQTxOperation.REMOVE_OPERATION_TYPE,store.getDestination(),ack,null));
+    }
+
+    public void add(AMQTopicMessageStore store,JournalTopicAck ack){
+        operations.add(new AMQTxOperation(AMQTxOperation.ACK_OPERATION_TYPE,store.getDestination(),ack,null));
+    }
+
+    public Message[] getMessages(){
+        ArrayList<Object> list=new ArrayList<Object>();
+        for(Iterator<AMQTxOperation> iter=operations.iterator();iter.hasNext();){
+            AMQTxOperation op=iter.next();
+            if(op.getOperationType()==AMQTxOperation.ADD_OPERATION_TYPE){
+                list.add(op.getData());
+            }
+        }
+        Message rc[]=new Message[list.size()];
+        list.toArray(rc);
+        return rc;
+    }
+
+    public MessageAck[] getAcks(){
+        ArrayList<Object> list=new ArrayList<Object>();
+        for(Iterator<AMQTxOperation> iter=operations.iterator();iter.hasNext();){
+            AMQTxOperation op=iter.next();
+            if(op.getOperationType()==AMQTxOperation.REMOVE_OPERATION_TYPE){
+                list.add(op.getData());
+            }
+        }
+        MessageAck rc[]=new MessageAck[list.size()];
+        list.toArray(rc);
+        return rc;
+    }
+
+    /**
+     * @return the location
+     */
+    public Location getLocation(){
+        return this.location;
+    }
+
+    public ArrayList<AMQTxOperation> getOperations(){
+        return operations;
+    }
+
+    public void setOperations(ArrayList<AMQTxOperation> operations){
+        this.operations=operations;
+    }
 }
+
+   
\ No newline at end of file

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTxOperation.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTxOperation.java?view=auto&rev=557389
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTxOperation.java
(added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTxOperation.java
Wed Jul 18 13:32:45 2007
@@ -0,0 +1,137 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.store.amq;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.JournalTopicAck;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.kaha.impl.async.Location;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
+
+
+/**
+ */
+public class AMQTxOperation {
+
+    public static final byte ADD_OPERATION_TYPE=0;
+    public static final byte REMOVE_OPERATION_TYPE=1;
+    public static final byte ACK_OPERATION_TYPE=3;
+    private byte operationType;
+    private ActiveMQDestination destination;
+    private Object data;
+    private Location location;
+
+    public AMQTxOperation() {
+    }
+    
+    public AMQTxOperation(byte operationType,ActiveMQDestination destination,Object data,Location
location){
+        this.operationType=operationType;
+        this.destination=destination;
+        this.data=data;
+        this.location=location;
+        
+    }
+
+    /**
+     * @return the data
+     */
+    public Object getData(){
+        return this.data;
+    }
+
+    /**
+     * @param data the data to set
+     */
+    public void setData(Object data){
+        this.data=data;
+    }
+
+    /**
+     * @return the location
+     */
+    public Location getLocation(){
+        return this.location;
+    }
+
+    /**
+     * @param location the location to set
+     */
+    public void setLocation(Location location){
+        this.location=location;
+    }
+
+    /**
+     * @return the operationType
+     */
+    public byte getOperationType(){
+        return this.operationType;
+    }
+
+    /**
+     * @param operationType the operationType to set
+     */
+    public void setOperationType(byte operationType){
+        this.operationType=operationType;
+    }
+
+   
+    public boolean replay(AMQPersistenceAdapter adapter,ConnectionContext context) throws
IOException{
+        boolean result=false;
+        AMQMessageStore store=(AMQMessageStore)adapter.createMessageStore(destination);
+        if(operationType==ADD_OPERATION_TYPE){
+            result=store.replayAddMessage(context,(Message)data,location);
+        }else if(operationType==REMOVE_OPERATION_TYPE){
+            result=store.replayRemoveMessage(context,(MessageAck)data);
+        }else{
+            JournalTopicAck ack=(JournalTopicAck)data;
+            result=((AMQTopicMessageStore)store).replayAcknowledge(context,ack.getClientId(),ack.getSubscritionName(),
+                    ack.getMessageId());
+        }
+        return result;
+    }
+    
+    public void writeExternal(WireFormat wireFormat,DataOutput dos) throws IOException {
+        location.writeExternal(dos);
+        ByteSequence packet = wireFormat.marshal(getData());
+        dos.writeInt(packet.length);
+        dos.write(packet.data, packet.offset, packet.length);
+        packet = wireFormat.marshal(destination);
+        dos.writeInt(packet.length);
+        dos.write(packet.data, packet.offset, packet.length);
+    }
+
+    public void readExternal(WireFormat wireFormat,DataInput dis) throws IOException {
+        this.location=new Location();
+        this.location.readExternal(dis);
+        int size=dis.readInt();
+        byte[] data=new byte[size];
+        dis.readFully(data);
+        setData(wireFormat.unmarshal(new ByteSequence(data)));
+        size=dis.readInt();
+        data=new byte[size];
+        dis.readFully(data);
+        this.destination=(ActiveMQDestination)wireFormat.unmarshal(new ByteSequence(data));
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTxOperation.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java?view=auto&rev=557389
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java
(added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java
Wed Jul 18 13:32:45 2007
@@ -0,0 +1,63 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadaptor;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.impl.async.Location;
+import org.apache.activemq.store.amq.AMQTx;
+import org.apache.activemq.store.amq.AMQTxOperation;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ * Marshall an AMQTx
+ * @version $Revision: 1.10 $
+ */
+public class AMQTxMarshaller implements Marshaller<AMQTx>{
+
+    private WireFormat wireFormat;
+
+    public AMQTxMarshaller(WireFormat wireFormat){
+        this.wireFormat=wireFormat;
+    }
+
+    public AMQTx readPayload(DataInput dataIn) throws IOException{
+        Location location=new Location();
+        location.readExternal(dataIn);
+        AMQTx result=new AMQTx(location);
+        int size=dataIn.readInt();
+        for(int i=0;i<size;i++){
+            AMQTxOperation op=new AMQTxOperation();
+            op.readExternal(wireFormat,dataIn);
+            result.getOperations().add(op);
+        }
+        return result;
+    }
+
+    public void writePayload(AMQTx amqtx,DataOutput dataOut) throws IOException{
+        amqtx.getLocation().writeExternal(dataOut);
+        List<AMQTxOperation> list=amqtx.getOperations();
+        dataOut.writeInt(list.size());
+        for(AMQTxOperation op:list){
+            op.writeExternal(wireFormat,dataOut);
+        }
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AMQTxMarshaller.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message