activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r961067 [4/5] - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/java/org/apache/activemq/apollo/ activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-broker/src/test/java/org/apache/activemq/broker/...
Date Wed, 07 Jul 2010 03:39:04 GMT
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java?rev=961067&r1=961066&r2=961067&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java Wed Jul  7 03:39:03 2010
@@ -17,120 +17,137 @@
 package org.apache.activemq.apollo.broker;
 
 import org.apache.activemq.apollo.broker.ProtocolHandler.ConsumerContext;
-import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.queue.IQueue;
-import org.apache.activemq.queue.Subscription;
+import org.fusesource.hawtdispatch.internal.util.RunnableCountDownLatch;
 
 public class Queue implements DeliveryTarget {
-
     private Destination destination;
-    private final IQueue<Long, MessageDelivery> queue;
-    private VirtualHost virtualHost;
-
-    Queue(IQueue<Long, MessageDelivery> queue) {
-        this.queue = queue;
-    }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see
-     * org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq
-     * .broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
-     */
-    public void deliver(MessageDelivery message, ISourceController<?> source) {
-        queue.add(message, source);
-    }
-
-    public final void addSubscription(final Subscription<MessageDelivery> sub) {
-        queue.addSubscription(sub);
-    }
+// TODO:
+//    private Destination destination;
+//    private VirtualHost virtualHost;
+//
+//    Queue() {
+//        this.queue = queue;
+//    }
+//
+//    /*
+//     * (non-Javadoc)
+//     *
+//     * @see
+//     * org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq
+//     * .broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
+//     */
+//    public void deliver(MessageDelivery message, ISourceController<?> source) {
+//        queue.add(message, source);
+//    }
+//
+//    public final void addSubscription(final Subscription<MessageDelivery> sub) {
+//        queue.addSubscription(sub);
+//    }
+//
+//    public boolean removeSubscription(final Subscription<MessageDelivery> sub) {
+//        return queue.removeSubscription(sub);
+//    }
+//
+//    public void start() throws Exception {
+//        queue.start();
+//    }
+//
+//    public void stop() throws Exception {
+//        if (queue != null) {
+//            queue.stop();
+//        }
+//    }
+//
+//    public void shutdown(Runnable onShutdown) throws Exception {
+//        if (queue != null) {
+//            queue.shutdown(onShutdown);
+//        }
+//    }
+//
+//    public boolean hasSelector() {
+//        return false;
+//    }
+//
+//    public boolean matches(MessageDelivery message) {
+//        return true;
+//    }
+//
+//    public VirtualHost getBroker() {
+//        return virtualHost;
+//    }
+//
+//    public void setVirtualHost(VirtualHost virtualHost) {
+//        this.virtualHost = virtualHost;
+//    }
+//
+//    public void setDestination(Destination destination) {
+//        this.destination = destination;
+//    }
+//
+//    public final Destination getDestination() {
+//        return destination;
+//    }
+//
+//    public boolean isDurable() {
+//        return true;
+//    }
+//
+//    public static class QueueSubscription implements BrokerSubscription {
+//        Subscription<MessageDelivery> subscription;
+//        final Queue queue;
+//
+//        public QueueSubscription(Queue queue) {
+//            this.queue = queue;
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.broker.BrokerSubscription#connect(org.apache.
+//         * activemq.broker.protocol.ProtocolHandler.ConsumerContext)
+//         */
+//        public void connect(ConsumerContext subscription) throws UserAlreadyConnectedException {
+//            this.subscription = subscription;
+//            queue.addSubscription(subscription);
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache
+//         * .activemq.broker.protocol.ProtocolHandler.ConsumerContext)
+//         */
+//        public void disconnect(ConsumerContext context) {
+//            queue.removeSubscription(subscription);
+//        }
+//
+//        /* (non-Javadoc)
+//         * @see org.apache.activemq.broker.BrokerSubscription#getDestination()
+//         */
+//        public Destination getDestination() {
+//            return queue.getDestination();
+//        }
+//    }
 
-    public boolean removeSubscription(final Subscription<MessageDelivery> sub) {
-        return queue.removeSubscription(sub);
-    }
-
-    public void start() throws Exception {
-        queue.start();
-    }
-
-    public void stop() throws Exception {
-        if (queue != null) {
-            queue.stop();
-        }
-    }
-
-    public void shutdown(Runnable onShutdown) throws Exception {
-        if (queue != null) {
-            queue.shutdown(onShutdown);
-        }
+    public void deliver(MessageDelivery message) {
+        //To change body of implemented methods use File | Settings | File Templates.
     }
 
     public boolean hasSelector() {
-        return false;
+        return false;  //To change body of implemented methods use File | Settings | File Templates.
     }
 
     public boolean matches(MessageDelivery message) {
-        return true;
-    }
-
-    public VirtualHost getBroker() {
-        return virtualHost;
-    }
-
-    public void setVirtualHost(VirtualHost virtualHost) {
-        this.virtualHost = virtualHost;
+        return false;  //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    public void setDestination(Destination destination) {
-        this.destination = destination;
+    public void shutdown(RunnableCountDownLatch done) {
     }
 
-    public final Destination getDestination() {
+    public Destination getDestination() {
         return destination;
     }
-
-    public boolean isDurable() {
-        return true;
-    }
-
-    public static class QueueSubscription implements BrokerSubscription {
-        Subscription<MessageDelivery> subscription;
-        final Queue queue;
-
-        public QueueSubscription(Queue queue) {
-            this.queue = queue;
-        }
-
-        /*
-         * (non-Javadoc)
-         * 
-         * @see
-         * org.apache.activemq.broker.BrokerSubscription#connect(org.apache.
-         * activemq.broker.protocol.ProtocolHandler.ConsumerContext)
-         */
-        public void connect(ConsumerContext subscription) throws UserAlreadyConnectedException {
-            this.subscription = subscription;
-            queue.addSubscription(subscription);
-        }
-
-        /*
-         * (non-Javadoc)
-         * 
-         * @see
-         * org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache
-         * .activemq.broker.protocol.ProtocolHandler.ConsumerContext)
-         */
-        public void disconnect(ConsumerContext context) {
-            queue.removeSubscription(subscription);
-        }
-        
-        /* (non-Javadoc)
-         * @see org.apache.activemq.broker.BrokerSubscription#getDestination()
-         */
-        public Destination getDestination() {
-            return queue.getDestination();
-        }
-    }
-
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java?rev=961067&r1=961066&r2=961067&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java Wed Jul  7 03:39:03 2010
@@ -22,7 +22,6 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 
-import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.util.buffer.AsciiBuffer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -87,43 +86,43 @@ final public class Router {
         }
     }
 
-    public void route(final MessageDelivery msg, ISourceController<?> controller, boolean autoCreate) {
-
-        //If the message is part of transaction send it to the transaction manager
-        if(msg.getTransactionId() >= 0)
-        {
-            virtualHost.getTransactionManager().newMessage(msg, controller);
-            return;
-        }
-        
-        Collection<DeliveryTarget> targets = route(msg.getDestination(), msg, autoCreate);
-        
-        //Set up the delivery for persistence:
-        msg.beginDispatch(database);
-
-        try {
-            // TODO:
-            // Consider doing some caching of this sub list. Most producers
-            // always send to the same destination.
-            if (targets != null) {
-                // The sinks will request persistence via MessageDelivery.persist()
-                // if they require persistence:
-                for (DeliveryTarget target : targets) {
-                    target.deliver(msg, controller);
-                }
-            }
-        } finally {
-            try {
-                msg.finishDispatch(controller);
-            } catch (IOException ioe) {
-                //TODO: Error serializing the message, this should trigger an error
-                //This is a pretty severe error as we've already delivered
-                //the message to the recipients. If we send an error response
-                //back it could result in a duplicate. Does this mean that we
-                //should persist the message prior to sending to the recips?
-                ioe.printStackTrace();
-            }
-        }
+    public void route(final MessageDelivery msg, boolean autoCreate) {
+// TODO:
+//        //If the message is part of transaction send it to the transaction manager
+//        if(msg.getTransactionId() >= 0)
+//        {
+//            virtualHost.getTransactionManager().newMessage(msg, controller);
+//            return;
+//        }
+//
+//        Collection<DeliveryTarget> targets = route(msg.getDestination(), msg, autoCreate);
+//
+//        //Set up the delivery for persistence:
+//        msg.beginDispatch(database);
+//
+//        try {
+//            // TODO:
+//            // Consider doing some caching of this sub list. Most producers
+//            // always send to the same destination.
+//            if (targets != null) {
+//                // The sinks will request persistence via MessageDelivery.persist()
+//                // if they require persistence:
+//                for (DeliveryTarget target : targets) {
+//                    target.deliver(msg, controller);
+//                }
+//            }
+//        } finally {
+//            try {
+//                msg.finishDispatch(controller);
+//            } catch (IOException ioe) {
+//                //TODO: Error serializing the message, this should trigger an error
+//                //This is a pretty severe error as we've already delivered
+//                //the message to the recipients. If we send an error response
+//                //back it could result in a duplicate. Does this mean that we
+//                //should persist the message prior to sending to the recips?
+//                ioe.printStackTrace();
+//            }
+//        }
     }
 
     private Collection<DeliveryTarget> route(Destination destination, MessageDelivery msg, boolean autoCreate) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java?rev=961067&r1=961066&r2=961067&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java Wed Jul  7 03:39:03 2010
@@ -20,140 +20,155 @@ import org.apache.activemq.apollo.broker
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.FilterException;
 import org.apache.activemq.filter.MessageEvaluationContext;
-import org.apache.activemq.flow.Flow;
-import org.apache.activemq.flow.IFlowLimiter;
-import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.flow.SizeLimiter;
-import org.apache.activemq.queue.ExclusivePersistentQueue;
-import org.apache.activemq.queue.ExclusiveQueue;
-import org.apache.activemq.queue.IFlowQueue;
-import org.apache.activemq.queue.QueueDispatchTarget;
-import org.apache.activemq.queue.Subscription;
 import org.apache.activemq.util.IntrospectionSupport;
 
 class TopicSubscription implements BrokerSubscription, DeliveryTarget {
 
-	static final boolean USE_PERSISTENT_QUEUES = true; 
-	
-    protected final BooleanExpression selector;
-    protected final Destination destination;
-    protected Subscription<MessageDelivery> connectedSub;
-    private final VirtualHost host;
-    
-    //TODO: replace this with a base interface for queue which also support non persistent use case.
-	private IFlowQueue<MessageDelivery> queue;
-
-    TopicSubscription(VirtualHost host, Destination destination, BooleanExpression selector) {
-        this.host = host;
-        this.selector = selector;
-        this.destination = destination;
-    }
-
-    @Override
-    public String toString() {
-        return IntrospectionSupport.toString(this);
-    }
-    
-    /*
-     * (non-Javadoc)
-     * 
-     * @see
-     * org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq
-     * .broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
-     */
-    public final void deliver(MessageDelivery message, ISourceController<?> source) {
-        if (matches(message)) {
-            queue.add(message, source);
-        }
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.broker.DeliveryTarget#hasSelector()
-     */
-    public boolean hasSelector() {
-        return selector != null;
+//	static final boolean USE_PERSISTENT_QUEUES = true;
+//
+//    protected final BooleanExpression selector;
+//    protected final Destination destination;
+//    protected Subscription<MessageDelivery> connectedSub;
+//    private final VirtualHost host;
+//
+//    //TODO: replace this with a base interface for queue which also support non persistent use case.
+//	private IFlowQueue<MessageDelivery> queue;
+//
+//    TopicSubscription(VirtualHost host, Destination destination, BooleanExpression selector) {
+//        this.host = host;
+//        this.selector = selector;
+//        this.destination = destination;
+//    }
+//
+//    @Override
+//    public String toString() {
+//        return IntrospectionSupport.toString(this);
+//    }
+//
+//    /*
+//     * (non-Javadoc)
+//     *
+//     * @see
+//     * org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq
+//     * .broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
+//     */
+//    public final void deliver(MessageDelivery message, ISourceController<?> source) {
+//        if (matches(message)) {
+//            queue.add(message, source);
+//        }
+//    }
+//
+//    /*
+//     * (non-Javadoc)
+//     *
+//     * @see org.apache.activemq.broker.DeliveryTarget#hasSelector()
+//     */
+//    public boolean hasSelector() {
+//        return selector != null;
+//    }
+//
+//    public synchronized void connect(final ConsumerContext subscription) throws UserAlreadyConnectedException {
+//        if (this.connectedSub == null) {
+//        	if( subscription.isPersistent() ) {
+//        		queue = createPersistentQueue(subscription);
+//        	} else {
+//        		queue = createNonPersistentQueue(subscription);
+//        	}
+//    		queue.start();
+//
+//        	this.connectedSub = subscription;
+//        	this.queue.addSubscription(connectedSub);
+//    		this.host.getRouter().bind(destination, this);
+//        } else if (connectedSub != subscription) {
+//            throw new UserAlreadyConnectedException();
+//        }
+//    }
+//
+//    private IFlowQueue<MessageDelivery> createNonPersistentQueue(final ConsumerContext subscription) {
+//		Flow flow = new Flow(subscription.getResourceName(), false);
+//		String name = subscription.getResourceName();
+//		IFlowLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(100, 50);
+//		ExclusiveQueue<MessageDelivery> queue = new ExclusiveQueue<MessageDelivery>(flow, name, limiter);
+//		queue.setDrain( new QueueDispatchTarget<MessageDelivery>() {
+//            public void drain(MessageDelivery elem, ISourceController<MessageDelivery> controller) {
+//                subscription.add(elem, controller);
+//            }
+//        });
+//		return queue;
+//	}
+//
+//	private IFlowQueue<MessageDelivery> createPersistentQueue(ConsumerContext subscription) {
+//        ExclusivePersistentQueue<Long, MessageDelivery> queue = host.getQueueStore().createExclusivePersistentQueue();
+//        return queue;
+//	}
+//
+//    @SuppressWarnings("unchecked")
+//	private void destroyPersistentQueue(IFlowQueue<MessageDelivery> queue) {
+//    	ExclusivePersistentQueue<Long, MessageDelivery> pq = (ExclusivePersistentQueue<Long, MessageDelivery>) queue;
+//		host.getQueueStore().deleteQueue(pq.getDescriptor());
+//	}
+//
+//	public synchronized void disconnect(final ConsumerContext subscription) {
+//        if (connectedSub != null && connectedSub == subscription) {
+//    		this.host.getRouter().unbind(destination, this);
+//    		this.queue.removeSubscription(connectedSub);
+//    		this.connectedSub = null;
+//
+//    		queue.stop();
+//        	if( USE_PERSISTENT_QUEUES ) {
+//        		destroyPersistentQueue(queue);
+//        	}
+//    		queue=null;
+//        }
+//    }
+//
+//
+//
+//	public boolean matches(MessageDelivery message) {
+//        if (selector == null) {
+//            return true;
+//        }
+//
+//        MessageEvaluationContext selectorContext = message.createMessageEvaluationContext();
+//        selectorContext.setDestination(destination);
+//        try {
+//            return (selector.matches(selectorContext));
+//        } catch (FilterException e) {
+//            e.printStackTrace();
+//            return false;
+//        }
+//    }
+//
+//    /*
+//     * (non-Javadoc)
+//     *
+//     * @see org.apache.activemq.broker.BrokerSubscription#getDestination()
+//     */
+//    public Destination getDestination() {
+//        return destination;
+//    }
+
+    public void connect(ConsumerContext subscription) throws Exception {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void disconnect(ConsumerContext subscription) {
+        //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    public synchronized void connect(final ConsumerContext subscription) throws UserAlreadyConnectedException {
-        if (this.connectedSub == null) {
-        	if( subscription.isPersistent() ) {
-        		queue = createPersistentQueue(subscription);
-        	} else {
-        		queue = createNonPersistentQueue(subscription);
-        	}
-    		queue.start();
-        	
-        	this.connectedSub = subscription;
-        	this.queue.addSubscription(connectedSub);
-    		this.host.getRouter().bind(destination, this);
-        } else if (connectedSub != subscription) {
-            throw new UserAlreadyConnectedException();
-        }
-    }
-
-    private IFlowQueue<MessageDelivery> createNonPersistentQueue(final ConsumerContext subscription) {
-		Flow flow = new Flow(subscription.getResourceName(), false);
-		String name = subscription.getResourceName();
-		IFlowLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(100, 50);
-		ExclusiveQueue<MessageDelivery> queue = new ExclusiveQueue<MessageDelivery>(flow, name, limiter);
-		queue.setDrain( new QueueDispatchTarget<MessageDelivery>() {
-            public void drain(MessageDelivery elem, ISourceController<MessageDelivery> controller) {
-                subscription.add(elem, controller);
-            }
-        });
-		return queue;
-	}
-
-	private IFlowQueue<MessageDelivery> createPersistentQueue(ConsumerContext subscription) {
-        ExclusivePersistentQueue<Long, MessageDelivery> queue = host.getQueueStore().createExclusivePersistentQueue();
-        return queue;
-	}
-
-    @SuppressWarnings("unchecked")
-	private void destroyPersistentQueue(IFlowQueue<MessageDelivery> queue) {
-    	ExclusivePersistentQueue<Long, MessageDelivery> pq = (ExclusivePersistentQueue<Long, MessageDelivery>) queue;
-		host.getQueueStore().deleteQueue(pq.getDescriptor());
-	}
-
-	public synchronized void disconnect(final ConsumerContext subscription) {
-        if (connectedSub != null && connectedSub == subscription) {
-    		this.host.getRouter().unbind(destination, this);
-    		this.queue.removeSubscription(connectedSub);
-    		this.connectedSub = null;
-    		
-    		queue.stop();
-        	if( USE_PERSISTENT_QUEUES ) {
-        		destroyPersistentQueue(queue);
-        	}
-    		queue=null;
-        }
-    }
-
-
-
-	public boolean matches(MessageDelivery message) {
-        if (selector == null) {
-            return true;
-        }
-
-        MessageEvaluationContext selectorContext = message.createMessageEvaluationContext();
-        selectorContext.setDestination(destination);
-        try {
-            return (selector.matches(selectorContext));
-        } catch (FilterException e) {
-            e.printStackTrace();
-            return false;
-        }
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.broker.BrokerSubscription#getDestination()
-     */
     public Destination getDestination() {
-        return destination;
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void deliver(MessageDelivery message) {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public boolean hasSelector() {
+        return false;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public boolean matches(MessageDelivery message) {
+        return false;  //To change body of implemented methods use File | Settings | File Templates.
     }
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java?rev=961067&r1=961066&r2=961067&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java Wed Jul  7 03:39:03 2010
@@ -22,10 +22,6 @@ import java.util.HashSet;
 import javax.transaction.xa.XAException;
 
 import org.apache.activemq.broker.store.Store.MessageRecord;
-import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.queue.IQueue;
-import org.apache.activemq.queue.Subscription;
-import org.apache.activemq.queue.Subscription.SubscriptionDelivery;
 import org.apache.activemq.util.FutureListener;
 import org.apache.activemq.util.ListenableFuture;
 import org.apache.activemq.util.buffer.AsciiBuffer;
@@ -41,526 +37,527 @@ import org.apache.commons.logging.LogFac
  */
 public abstract class Transaction {
 
-    private static final Log LOG = LogFactory.getLog(Transaction.class);
-
-    public static final byte START_STATE = 0; // can go to: 1,2,3
-    public static final byte IN_USE_STATE = 1; // can go to: 2,3, 4
-    public static final byte PREPARED_STATE = 2; // can go to: 3, 4
-    public static final byte COMMITED_STATE = 3;
-    public static final byte ROLLBACK_STATE = 4;
-
-    static final byte TYPE_LOCAL = 0;
-    static final byte TYPE_XA = 1;
-
-    private byte state = START_STATE;
-    private final TransactionManager manager;
-    private final long tid;
-    private final IQueue<Long, TxOp> opQueue;
-    protected HashSet<TransactionListener> listeners;
-    
-    private TxProcessor processor;
-
-    Transaction(TransactionManager manager, long tid, IQueue<Long, TxOp> opQueue) {
-        this.manager = manager;
-        this.opQueue = opQueue;
-        opQueue.start();
-        this.tid = tid;
-    }
-
-    /**
-     * @return the unique identifier used by the {@link TransactionManager} to
-     *         identify this {@link Transaction}
-     * 
-     */
-    public long getTid() {
-        return tid;
-    }
-
-    public AsciiBuffer getBackingQueueName() {
-        return opQueue.getDescriptor().getQueueName();
-    }
-
-    /**
-     * @return The transaction type e.g. {@link Transaction#TYPE_LOCAL}
-     */
-    public abstract byte getType();
-
-    public void addMessage(MessageDelivery m, ISourceController<?> source) {
-
-        synchronized (this) {
-            switch (state) {
-            case START_STATE:
-            case IN_USE_STATE:
-                opQueue.add(new TxMessage(m, this), source);
-                break;
-            default: {
-                throw new IllegalStateException("Can't add message to finished or prepared transaction");
-            }
-            }
-        }
-    }
-
-    public void addAck(SubscriptionDelivery<MessageDelivery> toAck) {
-        synchronized (this) {
-            switch (state) {
-            case START_STATE:
-            case IN_USE_STATE:
-                IQueue<Long, MessageDelivery> target = manager.getVirtualHost().getQueueStore().getQueue(toAck.getQueueDescriptor().getQueueName());
-                //Queue could be null if it was just deleted:
-                if (target != null) {
-                    long tracking = manager.getVirtualHost().getDatabase().allocateStoreTracking();
-                    opQueue.add(new TxAck(target, toAck.getSourceQueueRemovalKey(), tracking, this), null);
-                }
-                break;
-            default: {
-                throw new IllegalStateException("Can't add message to finished or prepared transaction");
-            }
-            }
-        }
-    }
-
-    public byte getState() {
-        return state;
-    }
-
-    public void setState(byte state, FutureListener<? super Object> listener) {
-        this.state = state;
-        ListenableFuture<?> future = manager.persistTransaction(this);
-        future.setFutureListener(listener);
-    }
-
-    public void prePrepare() throws Exception {
-
-        // Is it ok to call prepare now given the state of the
-        // transaction?
-        switch (state) {
-        case START_STATE:
-        case IN_USE_STATE:
-            break;
-        default:
-            XAException xae = new XAException("Prepare cannot be called now.");
-            xae.errorCode = XAException.XAER_PROTO;
-            throw xae;
-        }
-    }
-
-    protected void fireAfterCommit() throws Exception {
-
-        synchronized (this) {
-            for (TransactionListener listener : listeners) {
-                listener.onCommit(this);
-            }
-        }
-    }
-
-    public void fireAfterRollback() throws Exception {
-        synchronized (this) {
-            for (TransactionListener listener : listeners) {
-                listener.onRollback(this);
-            }
-        }
-    }
-
-    public void fireAfterPrepare() throws Exception {
-        synchronized (this) {
-            for (TransactionListener listener : listeners) {
-                listener.onPrepared(this);
-            }
-        }
-    }
-
-    public String toString() {
-        return super.toString() + "[queue=" + opQueue + "]";
-    }
-
-    public abstract void commit(boolean onePhase, TransactionListener listener) throws XAException, IOException;
-
-    public abstract void rollback(TransactionListener listener) throws XAException, IOException;
-
-    public abstract int prepare(TransactionListener listener) throws XAException, IOException;
-
-    public boolean isPrepared() {
-        return getState() == PREPARED_STATE;
-    }
-
-    public long size() {
-        return opQueue.getEnqueuedCount();
-    }
-
-    public static abstract class TransactionListener {
-        public void onRollback(Transaction t) {
-
-        }
-
-        public void onCommit(Transaction t) {
-
-        }
-
-        public void onPrepared(Transaction t) {
-
-        }
-    }
-
-    interface TxOp {
-        public static final short TYPE_MESSAGE = 0;
-        public static final short TYPE_ACK = 1;
-
-        public short getType();
-
-        public <T> T asType(Class<T> type);
-
-        public void onRollback(ISourceController<?> controller);
-
-        public void onCommit(ISourceController<?> controller);
-
-        public int getLimiterSize();
-
-        public boolean isFromStore();
-
-        public long getStoreTracking();
-
-        public MessageRecord createMessageRecord();
-
-        /**
-         * @return
-         */
-        public boolean isPersistent();
-
-        /**
-         * @return
-         */
-        public Long getExpiration();
-
-        public int getPriority();
-    }
-
-    static class TxMessage implements TxOp {
-        MessageDelivery message;
-        Transaction tx;
-        private boolean fromStore;
-
-        /**
-         * @param m
-         * @param transaction
-         */
-        public TxMessage(MessageDelivery m, Transaction tx) {
-            message = m;
-            this.tx = tx;
-        }
-
-        public <T> T asType(Class<T> type) {
-            if (type == TxMessage.class) {
-                return type.cast(this);
-            } else {
-                return null;
-            }
-        }
-
-        public final short getType() {
-            return TYPE_MESSAGE;
-        }
-
-        public final int getLimiterSize() {
-            return message.getFlowLimiterSize();
-        }
-
-        public final void onCommit(ISourceController<?> controller) {
-            message.clearTransactionId();
-            tx.manager.getVirtualHost().getRouter().route(message, controller, true);
-        }
-
-        public final void onRollback(ISourceController<?> controller) {
-            //Nothing to do here, message just gets dropped:
-            return;
-        }
-
-        public final boolean isFromStore() {
-            return fromStore;
-        }
-
-        public final MessageRecord createMessageRecord() {
-            return message.createMessageRecord();
-        }
-
-        public final long getStoreTracking() {
-            return message.getStoreTracking();
-        }
-
-        public final boolean isPersistent() {
-            return message.isPersistent();
-        }
-
-        public final Long getExpiration() {
-            return message.getExpiration();
-        }
-
-        public final int getPriority() {
-            return message.getPriority();
-        }
-    }
-
-    static class TxAck implements TxOp {
-        public static AsciiBuffer ENCODING = new AsciiBuffer("txack");
-        Transaction tx;
-        IQueue<Long, ?> queue; //Desriptor of the queue on which to delete.
-        long queueSequence; //Sequence number of the element on the queue from which to delete.
-        final long storeTracking; //Store tracking of this delete op.
-        private boolean fromStore;
-        private static final int MEM_SIZE = 8 + 8 + 8 + 8 + 1;
-
-        TxAck(IQueue<Long, ?> queue, long removalKey, long storeTracking, Transaction tx) {
-            this.queue = queue;
-            this.queueSequence = removalKey;
-            this.tx = tx;
-            this.storeTracking = storeTracking;
-        }
-
-        public final short getType() {
-            return TYPE_ACK;
-        }
-
-        public <T> T asType(Class<T> type) {
-            if (type == TxAck.class) {
-                return type.cast(this);
-            } else {
-                return null;
-            }
-        }
-
-        /*
-         * (non-Javadoc)
-         * 
-         * @see org.apache.activemq.apollo.broker.Transaction.TxOp#onCommit()
-         */
-        public final void onCommit(ISourceController<?> controller) {
-            queue.remove(queueSequence);
-        }
-
-        /*
-         * (non-Javadoc)
-         * 
-         * @see org.apache.activemq.apollo.broker.Transaction.TxOp#onRollback()
-         */
-        public final void onRollback(ISourceController<?> controller) {
-            //No-Op for now, it is possible that we'd want to unaquire these
-            //in the queue if the client weren't to keep these
-            //around
-        }
-
-        /*
-         * (non-Javadoc)
-         * 
-         * @see
-         * org.apache.activemq.apollo.broker.Transaction.TxOp#getLimiterSize()
-         */
-        public final int getLimiterSize() {
-            return MEM_SIZE;
-        }
-
-        public final boolean isFromStore() {
-            return fromStore;
-        }
-
-        /*
-         * (non-Javadoc)
-         * 
-         * @see
-         * org.apache.activemq.apollo.broker.Transaction.TxOp#getStoreTracking()
-         */
-        public final long getStoreTracking() {
-            return storeTracking;
-        }
-
-        /*
-         * (non-Javadoc)
-         * 
-         * @see
-         * org.apache.activemq.apollo.broker.Transaction.TxOp#createMessageRecord
-         * ()
-         */
-        public final MessageRecord createMessageRecord() {
-            MessageRecord ret = new MessageRecord();
-            ret.setEncoding(TxAck.ENCODING);
-            ret.setKey(storeTracking);
-            ret.setSize(MEM_SIZE);
-            ret.setBuffer(new Buffer(toBytes().getData()));
-            return null;
-        }
-
-        private final Buffer toBytes() {
-            AsciiBuffer queueName = queue.getDescriptor().getQueueName();
-            DataByteArrayOutputStream baos = new DataByteArrayOutputStream(2 + queueName.length + 8);
-            try {
-                baos.writeShort(queueName.length);
-                baos.write(queueName.data, queueName.offset, queueName.length);
-                baos.writeLong(queueSequence);
-            } catch (IOException shouldNotHappen) {
-                throw new RuntimeException(shouldNotHappen);
-            }
-            return baos.toBuffer();
-        }
-
-        private final void fromBytes(byte[] bytes) {
-            DataByteArrayInputStream baos = new DataByteArrayInputStream(bytes);
-            byte[] queueBytes = new byte[baos.readShort()];
-            baos.readFully(queueBytes);
-            AsciiBuffer queueName = new AsciiBuffer(queueBytes);
-            queue = tx.manager.getVirtualHost().getQueueStore().getQueue(queueName);
-            queueSequence = baos.readLong();
-
-        }
-
-        public final static TxAck createFromMessageRecord(MessageRecord record, Transaction tx) {
-            TxAck ret = new TxAck(null, -1, record.getKey(), tx);
-            ret.fromBytes(record.getBuffer().getData());
-            return ret;
-        }
-
-        public final boolean isPersistent() {
-            //TODO This could probably be relaxed when the ack is for non persistent
-            //elements
-            return true;
-        }
-
-        public final Long getExpiration() {
-            return -1L;
-        }
-
-        public final int getPriority() {
-            return 0;
-        }
-    }
-
-    /**
-     * @param record
-     * @return
-     */
-    public static TxOp createTxOp(MessageRecord record, Transaction tx) {
-        if (record.getEncoding().equals(TxAck.ENCODING)) {
-            return TxAck.createFromMessageRecord(record, tx);
-        } else {
-            MessageDelivery delivery = tx.manager.getVirtualHost().getQueueStore().getMessageMarshaller().unMarshall(record, tx.opQueue.getDescriptor());
-            return new TxMessage(delivery, tx);
-        }
-    }
-
-    protected void startTransactionProcessor()
-    {
-        synchronized(this)
-        {
-            if(processor == null)
-            {
-                processor = new TxProcessor();
-                opQueue.addSubscription(processor);
-            }
-        }
-    }
-    
-    
-    /**
-     * TxProcessor
-     * <p>
-     * Description: The tx processor processes the transaction queue after
-     * commit or rollback.
-     * </p>
-     * 
-     * @author cmacnaug
-     * @version 1.0
-     */
-    private class TxProcessor implements Subscription<TxOp> {
-        /*
-         * (non-Javadoc)
-         * 
-         * @see org.apache.activemq.queue.Subscription#add(java.lang.Object,
-         * org.apache.activemq.flow.ISourceController,
-         * org.apache.activemq.queue.Subscription.SubscriptionDelivery)
-         */
-        public void add(TxOp element, ISourceController<?> controller, SubscriptionDelivery<TxOp> callback) {
-
-            switch (state) {
-            case COMMITED_STATE: {
-                element.onCommit(controller);
-                if (callback != null) {
-                    callback.acknowledge();
-                }
-                break;
-            }
-            case ROLLBACK_STATE: {
-                element.onRollback(controller);
-                if (callback != null) {
-                    callback.acknowledge();
-                }
-                break;
-            }
-            default: {
-                LOG.error("Illegal state for transaction dispatch: " + this + " state: " + state);
-            }
-            }
-
-            //If we've reached the end of the op queue
-            if (opQueue.getEnqueuedCount() == 0) {
-                opQueue.shutdown(null);
-            }
-        }
-
-        /*
-         * (non-Javadoc)
-         * 
-         * @see org.apache.activemq.queue.Subscription#hasSelector()
-         */
-        public boolean hasSelector() {
-            return false;
-        }
-
-        /*
-         * (non-Javadoc)
-         * 
-         * @see org.apache.activemq.queue.Subscription#isBrowser()
-         */
-        public boolean isBrowser() {
-            return false;
-        }
-
-        /*
-         * (non-Javadoc)
-         * 
-         * @see org.apache.activemq.queue.Subscription#isExclusive()
-         */
-        public boolean isExclusive() {
-            return true;
-        }
-
-        /*
-         * (non-Javadoc)
-         * 
-         * @see
-         * org.apache.activemq.queue.Subscription#isRemoveOnDispatch(java.lang
-         * .Object)
-         */
-        public boolean isRemoveOnDispatch(TxOp elem) {
-            return false;
-        }
-
-        /*
-         * (non-Javadoc)
-         * 
-         * @see org.apache.activemq.queue.Subscription#matches(java.lang.Object)
-         */
-        public boolean matches(TxOp elem) {
-            return true;
-        }
-
-        /*
-         * (non-Javadoc)
-         * 
-         * @see org.apache.activemq.queue.Subscription#offer(java.lang.Object,
-         * org.apache.activemq.flow.ISourceController,
-         * org.apache.activemq.queue.Subscription.SubscriptionDelivery)
-         */
-        public boolean offer(TxOp element, ISourceController<?> controller, SubscriptionDelivery<TxOp> callback) {
-            add(element, controller, callback);
-            return true;
-        }
-
-    }
+// TODO:    
+//    private static final Log LOG = LogFactory.getLog(Transaction.class);
+//
+//    public static final byte START_STATE = 0; // can go to: 1,2,3
+//    public static final byte IN_USE_STATE = 1; // can go to: 2,3, 4
+//    public static final byte PREPARED_STATE = 2; // can go to: 3, 4
+//    public static final byte COMMITED_STATE = 3;
+//    public static final byte ROLLBACK_STATE = 4;
+//
+//    static final byte TYPE_LOCAL = 0;
+//    static final byte TYPE_XA = 1;
+//
+//    private byte state = START_STATE;
+//    private final TransactionManager manager;
+//    private final long tid;
+//    private final IQueue<Long, TxOp> opQueue;
+//    protected HashSet<TransactionListener> listeners;
+//
+//    private TxProcessor processor;
+//
+//    Transaction(TransactionManager manager, long tid, IQueue<Long, TxOp> opQueue) {
+//        this.manager = manager;
+//        this.opQueue = opQueue;
+//        opQueue.start();
+//        this.tid = tid;
+//    }
+//
+//    /**
+//     * @return the unique identifier used by the {@link TransactionManager} to
+//     *         identify this {@link Transaction}
+//     *
+//     */
+//    public long getTid() {
+//        return tid;
+//    }
+//
+//    public AsciiBuffer getBackingQueueName() {
+//        return opQueue.getDescriptor().getQueueName();
+//    }
+//
+//    /**
+//     * @return The transaction type e.g. {@link Transaction#TYPE_LOCAL}
+//     */
+//    public abstract byte getType();
+//
+//    public void addMessage(MessageDelivery m, ISourceController<?> source) {
+//
+//        synchronized (this) {
+//            switch (state) {
+//            case START_STATE:
+//            case IN_USE_STATE:
+//                opQueue.add(new TxMessage(m, this), source);
+//                break;
+//            default: {
+//                throw new IllegalStateException("Can't add message to finished or prepared transaction");
+//            }
+//            }
+//        }
+//    }
+//
+//    public void addAck(SubscriptionDelivery<MessageDelivery> toAck) {
+//        synchronized (this) {
+//            switch (state) {
+//            case START_STATE:
+//            case IN_USE_STATE:
+//                IQueue<Long, MessageDelivery> target = manager.getVirtualHost().getQueueStore().getQueue(toAck.getQueueDescriptor().getQueueName());
+//                //Queue could be null if it was just deleted:
+//                if (target != null) {
+//                    long tracking = manager.getVirtualHost().getDatabase().allocateStoreTracking();
+//                    opQueue.add(new TxAck(target, toAck.getSourceQueueRemovalKey(), tracking, this), null);
+//                }
+//                break;
+//            default: {
+//                throw new IllegalStateException("Can't add message to finished or prepared transaction");
+//            }
+//            }
+//        }
+//    }
+//
+//    public byte getState() {
+//        return state;
+//    }
+//
+//    public void setState(byte state, FutureListener<? super Object> listener) {
+//        this.state = state;
+//        ListenableFuture<?> future = manager.persistTransaction(this);
+//        future.setFutureListener(listener);
+//    }
+//
+//    public void prePrepare() throws Exception {
+//
+//        // Is it ok to call prepare now given the state of the
+//        // transaction?
+//        switch (state) {
+//        case START_STATE:
+//        case IN_USE_STATE:
+//            break;
+//        default:
+//            XAException xae = new XAException("Prepare cannot be called now.");
+//            xae.errorCode = XAException.XAER_PROTO;
+//            throw xae;
+//        }
+//    }
+//
+//    protected void fireAfterCommit() throws Exception {
+//
+//        synchronized (this) {
+//            for (TransactionListener listener : listeners) {
+//                listener.onCommit(this);
+//            }
+//        }
+//    }
+//
+//    public void fireAfterRollback() throws Exception {
+//        synchronized (this) {
+//            for (TransactionListener listener : listeners) {
+//                listener.onRollback(this);
+//            }
+//        }
+//    }
+//
+//    public void fireAfterPrepare() throws Exception {
+//        synchronized (this) {
+//            for (TransactionListener listener : listeners) {
+//                listener.onPrepared(this);
+//            }
+//        }
+//    }
+//
+//    public String toString() {
+//        return super.toString() + "[queue=" + opQueue + "]";
+//    }
+//
+//    public abstract void commit(boolean onePhase, TransactionListener listener) throws XAException, IOException;
+//
+//    public abstract void rollback(TransactionListener listener) throws XAException, IOException;
+//
+//    public abstract int prepare(TransactionListener listener) throws XAException, IOException;
+//
+//    public boolean isPrepared() {
+//        return getState() == PREPARED_STATE;
+//    }
+//
+//    public long size() {
+//        return opQueue.getEnqueuedCount();
+//    }
+//
+//    public static abstract class TransactionListener {
+//        public void onRollback(Transaction t) {
+//
+//        }
+//
+//        public void onCommit(Transaction t) {
+//
+//        }
+//
+//        public void onPrepared(Transaction t) {
+//
+//        }
+//    }
+//
+//    interface TxOp {
+//        public static final short TYPE_MESSAGE = 0;
+//        public static final short TYPE_ACK = 1;
+//
+//        public short getType();
+//
+//        public <T> T asType(Class<T> type);
+//
+//        public void onRollback(ISourceController<?> controller);
+//
+//        public void onCommit(ISourceController<?> controller);
+//
+//        public int getLimiterSize();
+//
+//        public boolean isFromStore();
+//
+//        public long getStoreTracking();
+//
+//        public MessageRecord createMessageRecord();
+//
+//        /**
+//         * @return
+//         */
+//        public boolean isPersistent();
+//
+//        /**
+//         * @return
+//         */
+//        public Long getExpiration();
+//
+//        public int getPriority();
+//    }
+//
+//    static class TxMessage implements TxOp {
+//        MessageDelivery message;
+//        Transaction tx;
+//        private boolean fromStore;
+//
+//        /**
+//         * @param m
+//         * @param transaction
+//         */
+//        public TxMessage(MessageDelivery m, Transaction tx) {
+//            message = m;
+//            this.tx = tx;
+//        }
+//
+//        public <T> T asType(Class<T> type) {
+//            if (type == TxMessage.class) {
+//                return type.cast(this);
+//            } else {
+//                return null;
+//            }
+//        }
+//
+//        public final short getType() {
+//            return TYPE_MESSAGE;
+//        }
+//
+//        public final int getLimiterSize() {
+//            return message.getFlowLimiterSize();
+//        }
+//
+//        public final void onCommit(ISourceController<?> controller) {
+//            message.clearTransactionId();
+//            tx.manager.getVirtualHost().getRouter().route(message, controller, true);
+//        }
+//
+//        public final void onRollback(ISourceController<?> controller) {
+//            //Nothing to do here, message just gets dropped:
+//            return;
+//        }
+//
+//        public final boolean isFromStore() {
+//            return fromStore;
+//        }
+//
+//        public final MessageRecord createMessageRecord() {
+//            return message.createMessageRecord();
+//        }
+//
+//        public final long getStoreTracking() {
+//            return message.getStoreTracking();
+//        }
+//
+//        public final boolean isPersistent() {
+//            return message.isPersistent();
+//        }
+//
+//        public final Long getExpiration() {
+//            return message.getExpiration();
+//        }
+//
+//        public final int getPriority() {
+//            return message.getPriority();
+//        }
+//    }
+//
+//    static class TxAck implements TxOp {
+//        public static AsciiBuffer ENCODING = new AsciiBuffer("txack");
+//        Transaction tx;
+//        IQueue<Long, ?> queue; //Desriptor of the queue on which to delete.
+//        long queueSequence; //Sequence number of the element on the queue from which to delete.
+//        final long storeTracking; //Store tracking of this delete op.
+//        private boolean fromStore;
+//        private static final int MEM_SIZE = 8 + 8 + 8 + 8 + 1;
+//
+//        TxAck(IQueue<Long, ?> queue, long removalKey, long storeTracking, Transaction tx) {
+//            this.queue = queue;
+//            this.queueSequence = removalKey;
+//            this.tx = tx;
+//            this.storeTracking = storeTracking;
+//        }
+//
+//        public final short getType() {
+//            return TYPE_ACK;
+//        }
+//
+//        public <T> T asType(Class<T> type) {
+//            if (type == TxAck.class) {
+//                return type.cast(this);
+//            } else {
+//                return null;
+//            }
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see org.apache.activemq.apollo.broker.Transaction.TxOp#onCommit()
+//         */
+//        public final void onCommit(ISourceController<?> controller) {
+//            queue.remove(queueSequence);
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see org.apache.activemq.apollo.broker.Transaction.TxOp#onRollback()
+//         */
+//        public final void onRollback(ISourceController<?> controller) {
+//            //No-Op for now, it is possible that we'd want to unaquire these
+//            //in the queue if the client weren't to keep these
+//            //around
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.apollo.broker.Transaction.TxOp#getLimiterSize()
+//         */
+//        public final int getLimiterSize() {
+//            return MEM_SIZE;
+//        }
+//
+//        public final boolean isFromStore() {
+//            return fromStore;
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.apollo.broker.Transaction.TxOp#getStoreTracking()
+//         */
+//        public final long getStoreTracking() {
+//            return storeTracking;
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.apollo.broker.Transaction.TxOp#createMessageRecord
+//         * ()
+//         */
+//        public final MessageRecord createMessageRecord() {
+//            MessageRecord ret = new MessageRecord();
+//            ret.setEncoding(TxAck.ENCODING);
+//            ret.setKey(storeTracking);
+//            ret.setSize(MEM_SIZE);
+//            ret.setBuffer(new Buffer(toBytes().getData()));
+//            return null;
+//        }
+//
+//        private final Buffer toBytes() {
+//            AsciiBuffer queueName = queue.getDescriptor().getQueueName();
+//            DataByteArrayOutputStream baos = new DataByteArrayOutputStream(2 + queueName.length + 8);
+//            try {
+//                baos.writeShort(queueName.length);
+//                baos.write(queueName.data, queueName.offset, queueName.length);
+//                baos.writeLong(queueSequence);
+//            } catch (IOException shouldNotHappen) {
+//                throw new RuntimeException(shouldNotHappen);
+//            }
+//            return baos.toBuffer();
+//        }
+//
+//        private final void fromBytes(byte[] bytes) {
+//            DataByteArrayInputStream baos = new DataByteArrayInputStream(bytes);
+//            byte[] queueBytes = new byte[baos.readShort()];
+//            baos.readFully(queueBytes);
+//            AsciiBuffer queueName = new AsciiBuffer(queueBytes);
+//            queue = tx.manager.getVirtualHost().getQueueStore().getQueue(queueName);
+//            queueSequence = baos.readLong();
+//
+//        }
+//
+//        public final static TxAck createFromMessageRecord(MessageRecord record, Transaction tx) {
+//            TxAck ret = new TxAck(null, -1, record.getKey(), tx);
+//            ret.fromBytes(record.getBuffer().getData());
+//            return ret;
+//        }
+//
+//        public final boolean isPersistent() {
+//            //TODO This could probably be relaxed when the ack is for non persistent
+//            //elements
+//            return true;
+//        }
+//
+//        public final Long getExpiration() {
+//            return -1L;
+//        }
+//
+//        public final int getPriority() {
+//            return 0;
+//        }
+//    }
+//
+//    /**
+//     * @param record
+//     * @return
+//     */
+//    public static TxOp createTxOp(MessageRecord record, Transaction tx) {
+//        if (record.getEncoding().equals(TxAck.ENCODING)) {
+//            return TxAck.createFromMessageRecord(record, tx);
+//        } else {
+//            MessageDelivery delivery = tx.manager.getVirtualHost().getQueueStore().getMessageMarshaller().unMarshall(record, tx.opQueue.getDescriptor());
+//            return new TxMessage(delivery, tx);
+//        }
+//    }
+//
+//    protected void startTransactionProcessor()
+//    {
+//        synchronized(this)
+//        {
+//            if(processor == null)
+//            {
+//                processor = new TxProcessor();
+//                opQueue.addSubscription(processor);
+//            }
+//        }
+//    }
+//
+//
+//    /**
+//     * TxProcessor
+//     * <p>
+//     * Description: The tx processor processes the transaction queue after
+//     * commit or rollback.
+//     * </p>
+//     *
+//     * @author cmacnaug
+//     * @version 1.0
+//     */
+//    private class TxProcessor implements Subscription<TxOp> {
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see org.apache.activemq.queue.Subscription#add(java.lang.Object,
+//         * org.apache.activemq.flow.ISourceController,
+//         * org.apache.activemq.queue.Subscription.SubscriptionDelivery)
+//         */
+//        public void add(TxOp element, ISourceController<?> controller, SubscriptionDelivery<TxOp> callback) {
+//
+//            switch (state) {
+//            case COMMITED_STATE: {
+//                element.onCommit(controller);
+//                if (callback != null) {
+//                    callback.acknowledge();
+//                }
+//                break;
+//            }
+//            case ROLLBACK_STATE: {
+//                element.onRollback(controller);
+//                if (callback != null) {
+//                    callback.acknowledge();
+//                }
+//                break;
+//            }
+//            default: {
+//                LOG.error("Illegal state for transaction dispatch: " + this + " state: " + state);
+//            }
+//            }
+//
+//            //If we've reached the end of the op queue
+//            if (opQueue.getEnqueuedCount() == 0) {
+//                opQueue.shutdown(null);
+//            }
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see org.apache.activemq.queue.Subscription#hasSelector()
+//         */
+//        public boolean hasSelector() {
+//            return false;
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see org.apache.activemq.queue.Subscription#isBrowser()
+//         */
+//        public boolean isBrowser() {
+//            return false;
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see org.apache.activemq.queue.Subscription#isExclusive()
+//         */
+//        public boolean isExclusive() {
+//            return true;
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.queue.Subscription#isRemoveOnDispatch(java.lang
+//         * .Object)
+//         */
+//        public boolean isRemoveOnDispatch(TxOp elem) {
+//            return false;
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see org.apache.activemq.queue.Subscription#matches(java.lang.Object)
+//         */
+//        public boolean matches(TxOp elem) {
+//            return true;
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see org.apache.activemq.queue.Subscription#offer(java.lang.Object,
+//         * org.apache.activemq.flow.ISourceController,
+//         * org.apache.activemq.queue.Subscription.SubscriptionDelivery)
+//         */
+//        public boolean offer(TxOp element, ISourceController<?> controller, SubscriptionDelivery<TxOp> callback) {
+//            add(element, controller, callback);
+//            return true;
+//        }
+//
+//    }
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java?rev=961067&r1=961066&r2=961067&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java Wed Jul  7 03:39:03 2010
@@ -22,19 +22,9 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.activemq.apollo.broker.BrokerDatabase.OperationContext;
-import org.apache.activemq.apollo.broker.Transaction.TxOp;
 import org.apache.activemq.broker.store.QueueDescriptor;
 import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.broker.store.Store.QueueQueryResult;
-import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.flow.SizeLimiter;
-import org.apache.activemq.queue.ExclusivePersistentQueue;
-import org.apache.activemq.queue.IQueue;
-import org.apache.activemq.queue.PersistencePolicy;
-import org.apache.activemq.queue.QueueStore;
-import org.apache.activemq.queue.RestoreListener;
-import org.apache.activemq.queue.SaveableQueueElement;
 import org.apache.activemq.util.ListenableFuture;
 import org.apache.activemq.util.Mapper;
 import org.apache.activemq.util.buffer.AsciiBuffer;
@@ -53,362 +43,368 @@ import org.apache.commons.logging.LogFac
  * @version 1.0
  */
 public class TransactionManager {
-    private static final Log LOG = LogFactory.getLog(TransactionManager.class);
-    private static final String TX_QUEUE_PREFIX = "TX-";
-    private static final AsciiBuffer TXN_MAP = new AsciiBuffer("TXMAP");
+// TODO:    
+//    private static final Log LOG = LogFactory.getLog(TransactionManager.class);
+//    private static final String TX_QUEUE_PREFIX = "TX-";
+//    private static final AsciiBuffer TXN_MAP = new AsciiBuffer("TXMAP");
+//
+//    private final HashMap<Long, Transaction> transactions = new HashMap<Long, Transaction>();
+//    private final HashMap<AsciiBuffer, Transaction> transactionsByQueue = new HashMap<AsciiBuffer, Transaction>();
+//    private final HashMap<Buffer, XATransaction> xaTransactions = new HashMap<Buffer, XATransaction>();
+//
+//    private final VirtualHost host;
+//    private BrokerDatabase database;
+//
+//    private final AtomicLong tidGen = new AtomicLong(0);
+//    private final TransactionStore txStore;
+//
+//    private static final int DEFAULT_TX_QUEUE_PAGING_THRESHOLD = 1024 * 64;
+//    private static final int DEFAULT_TX_QUEUE_RESUME_THRESHOLD = 1;
+//    // Be default we don't page out elements to disk.
+//    private static final int DEFAULT_TX_QUEUE_SIZE = DEFAULT_TX_QUEUE_PAGING_THRESHOLD;
+//    //private static final int DEFAULT_TX_QUEUE_SIZE = Integer.MAX_VALUE;
+//
+//    private static final PersistencePolicy<TxOp> DEFAULT_TX_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy<TxOp>() {
+//
+//        private static final boolean PAGING_ENABLED = DEFAULT_TX_QUEUE_SIZE > DEFAULT_TX_QUEUE_PAGING_THRESHOLD;
+//
+//        public boolean isPersistent(TxOp elem) {
+//            return elem.isPersistent();
+//        }
+//
+//        public boolean isPageOutPlaceHolders() {
+//            return false;
+//        }
+//
+//        public boolean isPagingEnabled() {
+//            return PAGING_ENABLED;
+//        }
+//
+//        public int getPagingInMemorySize() {
+//            return DEFAULT_TX_QUEUE_PAGING_THRESHOLD;
+//        }
+//
+//        public boolean isThrottleSourcesToMemoryLimit() {
+//            // Keep the queue in memory.
+//            return true;
+//        }
+//
+//        public int getDisconnectedThrottleRate() {
+//            // By default don't throttle consumers when disconnected.
+//            return 0;
+//        }
+//
+//        public int getRecoveryBias() {
+//            return 8;
+//        }
+//    };
+//
+//    private static final Mapper<Long, TxOp> EXPIRATION_MAPPER = new Mapper<Long, TxOp>() {
+//        public Long map(TxOp element) {
+//            return element.getExpiration();
+//        }
+//    };
+//
+//    private static final Mapper<Integer, TxOp> SIZE_MAPPER = new Mapper<Integer, TxOp>() {
+//        public Integer map(TxOp element) {
+//            return element.getLimiterSize();
+//        }
+//    };
+//
+//    private static final Mapper<Integer, TxOp> PRIORITY_MAPPER = new Mapper<Integer, TxOp>() {
+//        public Integer map(TxOp element) {
+//            return element.getPriority();
+//        }
+//    };
+//
+//    private static final Mapper<Long, TxOp> KEY_MAPPER = new Mapper<Long, TxOp>() {
+//        public Long map(TxOp element) {
+//            return element.getStoreTracking();
+//        }
+//    };
+//
+//    private static final Mapper<Integer, TxOp> PARTITION_MAPPER = new Mapper<Integer, TxOp>() {
+//        public Integer map(TxOp element) {
+//            return 1;
+//        }
+//    };
+//
+//    TransactionManager(VirtualHost host) {
+//        this.host = host;
+//        txStore = new TransactionStore(host.getDatabase());
+//        database = host.getDatabase();
+//    }
+//
+//    /**
+//     * @return The TM's virtual host
+//     */
+//    public final VirtualHost getVirtualHost() {
+//        return host;
+//    }
+//
+//    /**
+//     * @param msg
+//     * @param controller
+//     */
+//    public void newMessage(MessageDelivery msg, ISourceController<?> controller) {
+//        if (msg.getStoreTracking() == -1) {
+//            msg.setStoreTracking(host.getDatabase().allocateStoreTracking());
+//        }
+//        transactions.get(msg.getTransactionId()).addMessage(msg, controller);
+//    }
+//
+//    /**
+//     * Creates a transaction.
+//     *
+//     * @param xid
+//     * @return
+//     */
+//    public synchronized final Transaction createTransaction(Buffer xid) {
+//        Transaction ret;
+//
+//        long tid = tidGen.incrementAndGet();
+//        IQueue<Long, TxOp> opQueue = createTransactionQueue(tid);
+//
+//        if (xid == null) {
+//            ret = new LocalTransaction(this, tid, opQueue);
+//        } else {
+//            XATransaction xat = new XATransaction(this, tid, xid, opQueue);
+//            ret = xat;
+//            xaTransactions.put(xid, xat);
+//        }
+//
+//        transactionsByQueue.put(opQueue.getDescriptor().getQueueName(), ret);
+//        transactions.put(ret.getTid(), ret);
+//
+//        return ret;
+//    }
+//
+//    /**
+//     * @param buffer
+//     * @return
+//     */
+//    public synchronized Transaction getXATransaction(Buffer buffer) {
+//        return xaTransactions.get(buffer);
+//    }
+//
+//    /**
+//     *
+//     * @throws Exception
+//     */
+//    public synchronized void loadTransactions() throws Exception {
+//
+//        tidGen.set(database.allocateStoreTracking());
+//
+//        Map<AsciiBuffer, Buffer> txns = database.listMapEntries(TXN_MAP);
+//
+//        // Load shared queues
+//        Iterator<QueueQueryResult> results = database.listQueues(BrokerQueueStore.TRANSACTION_QUEUE_TYPE);
+//        while (results.hasNext()) {
+//            QueueQueryResult loaded = results.next();
+//
+//            Buffer b = txns.remove(loaded.getDescriptor().getQueueName());
+//            if (b == null) {
+//                LOG.warn("Recovered orphaned transaction queue: " + loaded.getDescriptor() + " elements: " + loaded.getCount());
+//                database.deleteQueue(loaded.getDescriptor());
+//            }
+//
+//            IQueue<Long, TxOp> queue = createRestoredTxQueue(loaded);
+//            Transaction tx = loadTransaction(b, queue);
+//
+//            //TODO if we recover a tx that isn't committed then, we should discard it.
+//            if (tx.getState() < Transaction.COMMITED_STATE) {
+//                LOG.warn("Recovered unfinished transaction: " + tx);
+//            }
+//            transactions.put(tx.getTid(), tx);
+//            if (tx instanceof XATransaction) {
+//                XATransaction xat = XATransaction.class.cast(tx);
+//                xaTransactions.put(xat.getXid(), xat);
+//            }
+//
+//            LOG.info("Loaded Queue " + queue.getResourceName() + " Messages: " + queue.getEnqueuedCount() + " Size: " + queue.getEnqueuedSize());
+//        }
+//
+//        if (!txns.isEmpty()) {
+//            //TODO Based on transaction state this is generally ok, anyway the orphaned entries should be
+//            //deleted:
+//            LOG.warn("Recovered transactions without backing queues: " + txns.keySet());
+//        }
+//    }
+//
+//    private Transaction loadTransaction(Buffer b, IQueue<Long, TxOp> queue) throws IOException {
+//        //TODO move the serialization into the transaction itself:
+//        DataByteArrayInputStream bais = new DataByteArrayInputStream(b.getData());
+//        byte type = bais.readByte();
+//        byte state = bais.readByte();
+//        long tid = bais.readLong();
+//
+//        Transaction tx = null;
+//        switch (type) {
+//        case Transaction.TYPE_LOCAL:
+//            tx = new LocalTransaction(this, tid, queue);
+//            break;
+//        case Transaction.TYPE_XA:
+//            int length = bais.readByte() & 0xFF;
+//            Buffer xid = new Buffer(new byte[length]);
+//            bais.readFully(xid.data);
+//            tx = new XATransaction(this, tid, xid, queue);
+//            break;
+//        default:
+//            throw new IOException("Invalid transaction type: " + type);
+//
+//        }
+//        tx.setState(state, null);
+//        return tx;
+//
+//    }
+//
+//    public ListenableFuture<?> persistTransaction(Transaction tx) {
+//
+//        //TODO move the serialization into the transaction itself:
+//        DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
+//        try {
+//            baos.writeByte(tx.getType());
+//            baos.writeByte(tx.getState());
+//            baos.writeLong(tx.getTid());
+//            if (tx.getType() == Transaction.TYPE_XA) {
+//                Buffer xid = ((XATransaction) tx).getXid();
+//                // An XID max size is around 140 bytes, byte SHOULD be big enough to frame it.
+//                baos.writeByte(xid.length & 0xFF);
+//                baos.write(xid.data, xid.offset, xid.length);
+//            }
+//            OperationContext<?> ctx = database.updateMapEntry(TXN_MAP, tx.getBackingQueueName(), new Buffer(baos.getData(), 0, baos.size()));
+//            ctx.requestFlush();
+//            return ctx;
+//        } catch (IOException ioe) {
+//            //Shouldn't happen
+//            throw new RuntimeException(ioe);
+//        }
+//    }
+//
+//    private IQueue<Long, TxOp> createRestoredTxQueue(QueueQueryResult loaded) throws IOException {
+//
+//        IQueue<Long, TxOp> queue = createTxQueueInternal(loaded.getDescriptor().getQueueName().toString(), loaded.getDescriptor().getQueueType());
+//        queue.initialize(loaded.getFirstSequence(), loaded.getLastSequence(), loaded.getCount(), loaded.getSize());
+//        return queue;
+//    }
+//
+//    private final IQueue<Long, TxOp> createTransactionQueue(long tid) {
+//        IQueue<Long, TxOp> queue = createTxQueueInternal(TX_QUEUE_PREFIX + tid, BrokerQueueStore.TRANSACTION_QUEUE_TYPE);
+//        queue.initialize(0, 0, 0, 0);
+//        txStore.addQueue(queue.getDescriptor());
+//        return queue;
+//    }
+//
+//    private IQueue<Long, TxOp> createTxQueueInternal(final String name, short type) {
+//        ExclusivePersistentQueue<Long, TxOp> queue;
+//
+//        SizeLimiter<TxOp> limiter = new SizeLimiter<TxOp>(DEFAULT_TX_QUEUE_SIZE, DEFAULT_TX_QUEUE_RESUME_THRESHOLD) {
+//            @Override
+//            public int getElementSize(TxOp elem) {
+//                return elem.getLimiterSize();
+//            }
+//        };
+//        queue = new ExclusivePersistentQueue<Long, TxOp>(name, limiter);
+//        queue.setStore(txStore);
+//        queue.setPersistencePolicy(DEFAULT_TX_QUEUE_PERSISTENCE_POLICY);
+//        queue.setExpirationMapper(EXPIRATION_MAPPER);
+//        queue.getDescriptor().setApplicationType(type);
+//        return queue;
+//    }
+//
+//    final QueueStore<Long, Transaction.TxOp> getTxnStore() {
+//        return txStore;
+//    }
+//
+//    private class TransactionStore implements QueueStore<Long, Transaction.TxOp> {
+//        private final BrokerDatabase database;
+//
+//        private final BrokerDatabase.MessageRecordMarshaller<TxOp> TX_OP_MARSHALLER = new BrokerDatabase.MessageRecordMarshaller<TxOp>() {
+//            public MessageRecord marshal(TxOp element) {
+//                return element.createMessageRecord();
+//            }
+//
+//            public TxOp unMarshall(MessageRecord record, QueueDescriptor queue) {
+//                Transaction t = transactionsByQueue.get(queue.getQueueName());
+//                return Transaction.createTxOp(record, t);
+//            }
+//        };
+//
+//        TransactionStore(BrokerDatabase database) {
+//            this.database = database;
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.queue.QueueStore#addQueue(org.apache.activemq
+//         * .queue.QueueDescriptor)
+//         */
+//        public void addQueue(QueueDescriptor queue) {
+//            database.addQueue(queue);
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.queue.QueueStore#deleteQueue(org.apache.activemq
+//         * .queue.QueueDescriptor)
+//         */
+//        public void deleteQueue(QueueDescriptor queue) {
+//            database.deleteQueue(queue);
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.queue.QueueStore#deleteQueueElement(org.apache
+//         * .activemq.queue.QueueDescriptor, java.lang.Object)
+//         */
+//        public void deleteQueueElement(SaveableQueueElement<TxOp> sqe) {
+//            database.deleteQueueElement(sqe);
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.queue.QueueStore#isFromStore(java.lang.Object)
+//         */
+//        public boolean isFromStore(TxOp elem) {
+//            return elem.isFromStore();
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.queue.QueueStore#persistQueueElement(org.apache
+//         * .activemq.queue.SaveableQueueElement,
+//         * org.apache.activemq.flow.ISourceController, boolean)
+//         */
+//        public void persistQueueElement(SaveableQueueElement<TxOp> sqe, ISourceController<?> source, boolean delayable) {
+//            database.saveQeueuElement(sqe, source, false, TX_OP_MARSHALLER);
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.queue.QueueStore#restoreQueueElements(org.apache
+//         * .activemq.queue.QueueDescriptor, boolean, long, long, int,
+//         * org.apache.activemq.queue.RestoreListener)
+//         */
+//        public void restoreQueueElements(QueueDescriptor queue, boolean recordOnly, long firstSequence, long maxSequence, int maxCount, RestoreListener<TxOp> listener) {
+//            database.restoreQueueElements(queue, recordOnly, firstSequence, maxSequence, maxCount, listener, TX_OP_MARSHALLER);
+//        }
+//    }
 
-    private final HashMap<Long, Transaction> transactions = new HashMap<Long, Transaction>();
-    private final HashMap<AsciiBuffer, Transaction> transactionsByQueue = new HashMap<AsciiBuffer, Transaction>();
-    private final HashMap<Buffer, XATransaction> xaTransactions = new HashMap<Buffer, XATransaction>();
-
-    private final VirtualHost host;
-    private BrokerDatabase database;
-
-    private final AtomicLong tidGen = new AtomicLong(0);
-    private final TransactionStore txStore;
-
-    private static final int DEFAULT_TX_QUEUE_PAGING_THRESHOLD = 1024 * 64;
-    private static final int DEFAULT_TX_QUEUE_RESUME_THRESHOLD = 1;
-    // Be default we don't page out elements to disk.
-    private static final int DEFAULT_TX_QUEUE_SIZE = DEFAULT_TX_QUEUE_PAGING_THRESHOLD;
-    //private static final int DEFAULT_TX_QUEUE_SIZE = Integer.MAX_VALUE;
-
-    private static final PersistencePolicy<TxOp> DEFAULT_TX_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy<TxOp>() {
-
-        private static final boolean PAGING_ENABLED = DEFAULT_TX_QUEUE_SIZE > DEFAULT_TX_QUEUE_PAGING_THRESHOLD;
-
-        public boolean isPersistent(TxOp elem) {
-            return elem.isPersistent();
-        }
-
-        public boolean isPageOutPlaceHolders() {
-            return false;
-        }
-
-        public boolean isPagingEnabled() {
-            return PAGING_ENABLED;
-        }
-
-        public int getPagingInMemorySize() {
-            return DEFAULT_TX_QUEUE_PAGING_THRESHOLD;
-        }
-
-        public boolean isThrottleSourcesToMemoryLimit() {
-            // Keep the queue in memory.
-            return true;
-        }
-
-        public int getDisconnectedThrottleRate() {
-            // By default don't throttle consumers when disconnected.
-            return 0;
-        }
-
-        public int getRecoveryBias() {
-            return 8;
-        }
-    };
-
-    private static final Mapper<Long, TxOp> EXPIRATION_MAPPER = new Mapper<Long, TxOp>() {
-        public Long map(TxOp element) {
-            return element.getExpiration();
-        }
-    };
-
-    private static final Mapper<Integer, TxOp> SIZE_MAPPER = new Mapper<Integer, TxOp>() {
-        public Integer map(TxOp element) {
-            return element.getLimiterSize();
-        }
-    };
-
-    private static final Mapper<Integer, TxOp> PRIORITY_MAPPER = new Mapper<Integer, TxOp>() {
-        public Integer map(TxOp element) {
-            return element.getPriority();
-        }
-    };
-
-    private static final Mapper<Long, TxOp> KEY_MAPPER = new Mapper<Long, TxOp>() {
-        public Long map(TxOp element) {
-            return element.getStoreTracking();
-        }
-    };
-
-    private static final Mapper<Integer, TxOp> PARTITION_MAPPER = new Mapper<Integer, TxOp>() {
-        public Integer map(TxOp element) {
-            return 1;
-        }
-    };
-
-    TransactionManager(VirtualHost host) {
-        this.host = host;
-        txStore = new TransactionStore(host.getDatabase());
-        database = host.getDatabase();
-    }
-
-    /**
-     * @return The TM's virtual host
-     */
-    public final VirtualHost getVirtualHost() {
-        return host;
-    }
-
-    /**
-     * @param msg
-     * @param controller
-     */
-    public void newMessage(MessageDelivery msg, ISourceController<?> controller) {
-        if (msg.getStoreTracking() == -1) {
-            msg.setStoreTracking(host.getDatabase().allocateStoreTracking());
-        }
-        transactions.get(msg.getTransactionId()).addMessage(msg, controller);
-    }
-
-    /**
-     * Creates a transaction.
-     * 
-     * @param xid
-     * @return
-     */
-    public synchronized final Transaction createTransaction(Buffer xid) {
-        Transaction ret;
-
-        long tid = tidGen.incrementAndGet();
-        IQueue<Long, TxOp> opQueue = createTransactionQueue(tid);
-
-        if (xid == null) {
-            ret = new LocalTransaction(this, tid, opQueue);
-        } else {
-            XATransaction xat = new XATransaction(this, tid, xid, opQueue);
-            ret = xat;
-            xaTransactions.put(xid, xat);
-        }
-
-        transactionsByQueue.put(opQueue.getDescriptor().getQueueName(), ret);
-        transactions.put(ret.getTid(), ret);
-
-        return ret;
-    }
-
-    /**
-     * @param buffer
-     * @return
-     */
-    public synchronized Transaction getXATransaction(Buffer buffer) {
-        return xaTransactions.get(buffer);
-    }
-
-    /**
-     * 
-     * @throws Exception
-     */
-    public synchronized void loadTransactions() throws Exception {
-
-        tidGen.set(database.allocateStoreTracking());
-
-        Map<AsciiBuffer, Buffer> txns = database.listMapEntries(TXN_MAP);
-
-        // Load shared queues
-        Iterator<QueueQueryResult> results = database.listQueues(BrokerQueueStore.TRANSACTION_QUEUE_TYPE);
-        while (results.hasNext()) {
-            QueueQueryResult loaded = results.next();
-
-            Buffer b = txns.remove(loaded.getDescriptor().getQueueName());
-            if (b == null) {
-                LOG.warn("Recovered orphaned transaction queue: " + loaded.getDescriptor() + " elements: " + loaded.getCount());
-                database.deleteQueue(loaded.getDescriptor());
-            }
-
-            IQueue<Long, TxOp> queue = createRestoredTxQueue(loaded);
-            Transaction tx = loadTransaction(b, queue);
-
-            //TODO if we recover a tx that isn't committed then, we should discard it.
-            if (tx.getState() < Transaction.COMMITED_STATE) {
-                LOG.warn("Recovered unfinished transaction: " + tx);
-            }
-            transactions.put(tx.getTid(), tx);
-            if (tx instanceof XATransaction) {
-                XATransaction xat = XATransaction.class.cast(tx);
-                xaTransactions.put(xat.getXid(), xat);
-            }
-
-            LOG.info("Loaded Queue " + queue.getResourceName() + " Messages: " + queue.getEnqueuedCount() + " Size: " + queue.getEnqueuedSize());
-        }
-
-        if (!txns.isEmpty()) {
-            //TODO Based on transaction state this is generally ok, anyway the orphaned entries should be 
-            //deleted:
-            LOG.warn("Recovered transactions without backing queues: " + txns.keySet());
-        }
-    }
-
-    private Transaction loadTransaction(Buffer b, IQueue<Long, TxOp> queue) throws IOException {
-        //TODO move the serialization into the transaction itself:
-        DataByteArrayInputStream bais = new DataByteArrayInputStream(b.getData());
-        byte type = bais.readByte();
-        byte state = bais.readByte();
-        long tid = bais.readLong();
-
-        Transaction tx = null;
-        switch (type) {
-        case Transaction.TYPE_LOCAL:
-            tx = new LocalTransaction(this, tid, queue);
-            break;
-        case Transaction.TYPE_XA:
-            int length = bais.readByte() & 0xFF;
-            Buffer xid = new Buffer(new byte[length]);
-            bais.readFully(xid.data);
-            tx = new XATransaction(this, tid, xid, queue);
-            break;
-        default:
-            throw new IOException("Invalid transaction type: " + type);
-
-        }
-        tx.setState(state, null);
-        return tx;
-
-    }
-
-    public ListenableFuture<?> persistTransaction(Transaction tx) {
-
-        //TODO move the serialization into the transaction itself:
-        DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
-        try {
-            baos.writeByte(tx.getType());
-            baos.writeByte(tx.getState());
-            baos.writeLong(tx.getTid());
-            if (tx.getType() == Transaction.TYPE_XA) {
-                Buffer xid = ((XATransaction) tx).getXid();
-                // An XID max size is around 140 bytes, byte SHOULD be big enough to frame it.
-                baos.writeByte(xid.length & 0xFF);
-                baos.write(xid.data, xid.offset, xid.length);
-            }
-            OperationContext<?> ctx = database.updateMapEntry(TXN_MAP, tx.getBackingQueueName(), new Buffer(baos.getData(), 0, baos.size()));
-            ctx.requestFlush();
-            return ctx;
-        } catch (IOException ioe) {
-            //Shouldn't happen
-            throw new RuntimeException(ioe);
-        }
-    }
-
-    private IQueue<Long, TxOp> createRestoredTxQueue(QueueQueryResult loaded) throws IOException {
-
-        IQueue<Long, TxOp> queue = createTxQueueInternal(loaded.getDescriptor().getQueueName().toString(), loaded.getDescriptor().getQueueType());
-        queue.initialize(loaded.getFirstSequence(), loaded.getLastSequence(), loaded.getCount(), loaded.getSize());
-        return queue;
+    public TransactionManager(VirtualHost virtualHost) {
     }
 
-    private final IQueue<Long, TxOp> createTransactionQueue(long tid) {
-        IQueue<Long, TxOp> queue = createTxQueueInternal(TX_QUEUE_PREFIX + tid, BrokerQueueStore.TRANSACTION_QUEUE_TYPE);
-        queue.initialize(0, 0, 0, 0);
-        txStore.addQueue(queue.getDescriptor());
-        return queue;
+    public void loadTransactions() {
     }
-
-    private IQueue<Long, TxOp> createTxQueueInternal(final String name, short type) {
-        ExclusivePersistentQueue<Long, TxOp> queue;
-
-        SizeLimiter<TxOp> limiter = new SizeLimiter<TxOp>(DEFAULT_TX_QUEUE_SIZE, DEFAULT_TX_QUEUE_RESUME_THRESHOLD) {
-            @Override
-            public int getElementSize(TxOp elem) {
-                return elem.getLimiterSize();
-            }
-        };
-        queue = new ExclusivePersistentQueue<Long, TxOp>(name, limiter);
-        queue.setStore(txStore);
-        queue.setPersistencePolicy(DEFAULT_TX_QUEUE_PERSISTENCE_POLICY);
-        queue.setExpirationMapper(EXPIRATION_MAPPER);
-        queue.getDescriptor().setApplicationType(type);
-        return queue;
-    }
-
-    final QueueStore<Long, Transaction.TxOp> getTxnStore() {
-        return txStore;
-    }
-
-    private class TransactionStore implements QueueStore<Long, Transaction.TxOp> {
-        private final BrokerDatabase database;
-
-        private final BrokerDatabase.MessageRecordMarshaller<TxOp> TX_OP_MARSHALLER = new BrokerDatabase.MessageRecordMarshaller<TxOp>() {
-            public MessageRecord marshal(TxOp element) {
-                return element.createMessageRecord();
-            }
-
-            public TxOp unMarshall(MessageRecord record, QueueDescriptor queue) {
-                Transaction t = transactionsByQueue.get(queue.getQueueName());
-                return Transaction.createTxOp(record, t);
-            }
-        };
-
-        TransactionStore(BrokerDatabase database) {
-            this.database = database;
-        }
-
-        /*
-         * (non-Javadoc)
-         * 
-         * @see
-         * org.apache.activemq.queue.QueueStore#addQueue(org.apache.activemq
-         * .queue.QueueDescriptor)
-         */
-        public void addQueue(QueueDescriptor queue) {
-            database.addQueue(queue);
-        }
-
-        /*
-         * (non-Javadoc)
-         * 
-         * @see
-         * org.apache.activemq.queue.QueueStore#deleteQueue(org.apache.activemq
-         * .queue.QueueDescriptor)
-         */
-        public void deleteQueue(QueueDescriptor queue) {
-            database.deleteQueue(queue);
-        }
-
-        /*
-         * (non-Javadoc)
-         * 
-         * @see
-         * org.apache.activemq.queue.QueueStore#deleteQueueElement(org.apache
-         * .activemq.queue.QueueDescriptor, java.lang.Object)
-         */
-        public void deleteQueueElement(SaveableQueueElement<TxOp> sqe) {
-            database.deleteQueueElement(sqe);
-        }
-
-        /*
-         * (non-Javadoc)
-         * 
-         * @see
-         * org.apache.activemq.queue.QueueStore#isFromStore(java.lang.Object)
-         */
-        public boolean isFromStore(TxOp elem) {
-            return elem.isFromStore();
-        }
-
-        /*
-         * (non-Javadoc)
-         * 
-         * @see
-         * org.apache.activemq.queue.QueueStore#persistQueueElement(org.apache
-         * .activemq.queue.SaveableQueueElement,
-         * org.apache.activemq.flow.ISourceController, boolean)
-         */
-        public void persistQueueElement(SaveableQueueElement<TxOp> sqe, ISourceController<?> source, boolean delayable) {
-            database.saveQeueuElement(sqe, source, false, TX_OP_MARSHALLER);
-        }
-
-        /*
-         * (non-Javadoc)
-         * 
-         * @see
-         * org.apache.activemq.queue.QueueStore#restoreQueueElements(org.apache
-         * .activemq.queue.QueueDescriptor, boolean, long, long, int,
-         * org.apache.activemq.queue.RestoreListener)
-         */
-        public void restoreQueueElements(QueueDescriptor queue, boolean recordOnly, long firstSequence, long maxSequence, int maxCount, RestoreListener<TxOp> listener) {
-            database.restoreQueueElements(queue, recordOnly, firstSequence, maxSequence, maxCount, listener, TX_OP_MARSHALLER);
-        }
-    }
-
 }



Mime
View raw message