activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmacn...@apache.org
Subject svn commit: r790047 [1/2] - in /activemq/sandbox/activemq-flow: activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/ activemq-openwire/src/test/java/org/apache/activemq/pe...
Date Wed, 01 Jul 2009 06:10:18 GMT
Author: cmacnaug
Date: Wed Jul  1 06:10:17 2009
New Revision: 790047

URL: http://svn.apache.org/viewvc?rev=790047&view=rev
Log:
Updates to Transaction support. 

Still a little ways to go, but basic local transaction commit is beginning to take shape.

Added:
    activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/FutureListener.java   (with props)
    activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/ListenableFuture.java   (with props)
Modified:
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
    activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SaveableQueueElement.java
    activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueueOld.java
    activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java?rev=790047&r1=790046&r2=790047&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java Wed Jul  1 06:10:17 2009
@@ -24,9 +24,12 @@
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.store.Store;
@@ -50,20 +53,22 @@
 import org.apache.activemq.queue.RestoreListener;
 import org.apache.activemq.queue.RestoredElement;
 import org.apache.activemq.queue.SaveableQueueElement;
+import org.apache.activemq.util.FutureListener;
+import org.apache.activemq.util.ListenableFuture;
 import org.apache.activemq.util.buffer.AsciiBuffer;
 import org.apache.activemq.util.buffer.Buffer;
 import org.apache.activemq.util.list.LinkedNode;
 import org.apache.activemq.util.list.LinkedNodeList;
 
-public class BrokerDatabase extends AbstractLimitedFlowResource<BrokerDatabase.OperationBase> implements Service, DispatcherAware {
+public class BrokerDatabase extends AbstractLimitedFlowResource<BrokerDatabase.OperationBase<?>> implements Service, DispatcherAware {
 
     private static final boolean DEBUG = false;
 
     private final Store store;
     private final Flow databaseFlow = new Flow("database", false);
 
-    private final SizeLimiter<OperationBase> storeLimiter;
-    private final FlowController<OperationBase> storeController;
+    private final SizeLimiter<OperationBase<?>> storeLimiter;
+    private final FlowController<OperationBase<?>> storeController;
     private final int FLUSH_QUEUE_SIZE = 10000 * 1024;
 
     private IDispatcher dispatcher;
@@ -71,7 +76,7 @@
     private AtomicBoolean running = new AtomicBoolean(false);
     private DatabaseListener listener;
 
-    private final LinkedNodeList<OperationBase> opQueue;
+    private final LinkedNodeList<OperationBase<?>> opQueue;
     private AtomicBoolean notify = new AtomicBoolean(false);
     private Semaphore opsReady = new Semaphore(0);
     private long opSequenceNumber;
@@ -115,18 +120,18 @@
 
     public BrokerDatabase(Store store) {
         this.store = store;
-        this.opQueue = new LinkedNodeList<OperationBase>();
-        storeLimiter = new SizeLimiter<OperationBase>(FLUSH_QUEUE_SIZE, 0) {
+        this.opQueue = new LinkedNodeList<OperationBase<?>>();
+        storeLimiter = new SizeLimiter<OperationBase<?>>(FLUSH_QUEUE_SIZE, 0) {
 
             @Override
-            public int getElementSize(OperationBase op) {
+            public int getElementSize(OperationBase<?> op) {
                 return op.getLimiterSize();
             }
         };
 
-        storeController = new FlowController<OperationBase>(new FlowControllable<OperationBase>() {
+        storeController = new FlowController<OperationBase<?>>(new FlowControllable<OperationBase<?>>() {
 
-            public void flowElemAccepted(ISourceController<OperationBase> controller, OperationBase op) {
+            public void flowElemAccepted(ISourceController<OperationBase<?>> controller, OperationBase<?> op) {
                 addToOpQueue(op);
             }
 
@@ -234,7 +239,7 @@
                 } catch (Store.KeyNotFoundException knfe) {
                     //No keys then:
                 }
-                
+
                 return ret;
             }
 
@@ -249,7 +254,7 @@
      * @param value
      *            The value to insert.
      */
-    public OperationContext updateMapEntry(AsciiBuffer map, AsciiBuffer key, Buffer value) {
+    public OperationContext<?> updateMapEntry(AsciiBuffer map, AsciiBuffer key, Buffer value) {
         return add(new MapUpdateOperation(map, key, value), null, false);
     }
 
@@ -277,14 +282,14 @@
      *            the source of the operation.
      * @return the {@link OperationContext} associated with the operation
      */
-    private OperationContext add(OperationBase op, ISourceController<?> controller, boolean flush) {
+    private <T> OperationContext<T> add(OperationBase<T> op, ISourceController<?> controller, boolean flush) {
 
         op.flushRequested = flush;
         storeController.add(op, controller);
         return op;
     }
 
-    private final void addToOpQueue(OperationBase op) {
+    private final void addToOpQueue(OperationBase<?> op) {
         if (!running.get()) {
             throw new IllegalStateException("BrokerDatabase not started");
         }
@@ -305,7 +310,7 @@
     private void updateFlushPointer(long seqNumber) {
         if (seqNumber > flushPointer) {
             flushPointer = seqNumber;
-            OperationBase op = opQueue.getHead();
+            OperationBase<?> op = opQueue.getHead();
             if (op != null && op.opSequenceNumber <= flushPointer && notify.get()) {
                 opsReady.release();
             }
@@ -347,10 +352,10 @@
         }
     }
 
-    private final OperationBase getNextOp(boolean wait) {
+    private final OperationBase<?> getNextOp(boolean wait) {
         if (!wait) {
             synchronized (opQueue) {
-                OperationBase op = opQueue.getHead();
+                OperationBase<?> op = opQueue.getHead();
                 if (op != null && (op.opSequenceNumber <= flushPointer || !op.isDelayable())) {
                     op.unlink();
                     return op;
@@ -358,7 +363,7 @@
             }
             return null;
         } else {
-            OperationBase op = getNextOp(false);
+            OperationBase<?> op = getNextOp(false);
             if (op == null) {
                 notify.set(true);
                 op = getNextOp(false);
@@ -380,7 +385,7 @@
         int count = 0;
         Session session = store.getSession();
         while (running.get()) {
-            final OperationBase firstOp = getNextOp(true);
+            final OperationBase<?> firstOp = getNextOp(true);
             if (firstOp == null) {
                 continue;
             }
@@ -388,13 +393,13 @@
 
             // The first operation we get, triggers a store transaction.
             if (firstOp != null) {
-                final LinkedList<Operation> processedQueue = new LinkedList<Operation>();
+                final LinkedList<Operation<?>> processedQueue = new LinkedList<Operation<?>>();
                 boolean locked = false;
                 try {
 
-                    Operation op = firstOp;
+                    Operation<?> op = firstOp;
                     while (op != null) {
-                        final Operation toExec = op;
+                        final Operation<?> toExec = op;
                         if (toExec.beginExecute()) {
                             if (!locked) {
                                 session.acquireLock();
@@ -442,7 +447,7 @@
 
                         // Post process operations
                         long release = 0;
-                        for (Operation processed : processedQueue) {
+                        for (Operation<?> processed : processedQueue) {
                             processed.onCommit();
                             // System.out.println("Processed" + processed);
                             release += processed.getLimiterSize();
@@ -454,19 +459,19 @@
                     }
 
                 } catch (IOException e) {
-                    for (Operation processed : processedQueue) {
+                    for (Operation<?> processed : processedQueue) {
                         processed.onRollback(e);
                     }
                     onDatabaseException(e);
                 } catch (RuntimeException e) {
-                    for (Operation processed : processedQueue) {
+                    for (Operation<?> processed : processedQueue) {
                         processed.onRollback(e);
                     }
                     IOException ioe = new IOException(e.getMessage());
                     ioe.initCause(e);
                     onDatabaseException(ioe);
                 } catch (Exception e) {
-                    for (Operation processed : processedQueue) {
+                    for (Operation<?> processed : processedQueue) {
                         processed.onRollback(e);
                     }
                     IOException ioe = new IOException(e.getMessage());
@@ -538,7 +543,7 @@
      *             If there is an error marshalling the message.
      * @return The {@link OperationContext} associated with the operation
      */
-    public OperationContext persistReceivedMessage(BrokerMessageDelivery delivery, ISourceController<?> source) {
+    public OperationContext<?> persistReceivedMessage(BrokerMessageDelivery delivery, ISourceController<?> source) {
         return add(new AddMessageOperation(delivery), source, true);
     }
 
@@ -554,7 +559,7 @@
      * 
      * @return The {@link OperationContext} associated with the operation
      */
-    public OperationContext saveMessage(SaveableQueueElement<MessageDelivery> queueElement, ISourceController<?> source, boolean delayable) {
+    public OperationContext<?> saveMessage(SaveableQueueElement<MessageDelivery> queueElement, ISourceController<?> source, boolean delayable) {
         return add(new AddMessageOperation(queueElement), source, !delayable);
     }
 
@@ -567,7 +572,7 @@
      *            The queue.
      * @return The {@link OperationContext} associated with the operation
      */
-    public OperationContext deleteQueueElement(SaveableQueueElement<?> queueElement) {
+    public OperationContext<?> deleteQueueElement(SaveableQueueElement<?> queueElement) {
         return add(new DeleteOperation(queueElement.getSequenceNumber(), queueElement.getQueueDescriptor()), null, false);
     }
 
@@ -593,7 +598,7 @@
      *            The listener to which messags should be passed.
      * @return The {@link OperationContext} associated with the operation
      */
-    public <T> OperationContext restoreQueueElements(QueueDescriptor queue, boolean recordsOnly, long first, long maxSequence, int maxCount, RestoreListener<T> listener,
+    public <T> OperationContext<?> restoreQueueElements(QueueDescriptor queue, boolean recordsOnly, long first, long maxSequence, int maxCount, RestoreListener<T> listener,
             MessageRecordMarshaller<T> marshaller) {
         return add(new RestoreElementsOperation<T>(queue, recordsOnly, first, maxCount, maxSequence, listener, marshaller), null, true);
     }
@@ -606,7 +611,8 @@
         }
     }
 
-    public interface OperationContext {
+    public interface OperationContext<V> extends ListenableFuture<V> {
+
         /**
          * Attempts to cancel the store operation. Returns true if the operation
          * could be canceled or false if the operation was already executed by
@@ -617,21 +623,10 @@
         public boolean cancel();
 
         /**
-         * @return true if the operation has been executed
-         */
-        public boolean cancelled();
-
-        /**
-         * @return true if the operation has been executed
-         */
-        public boolean executed();
-
-        /**
          * Requests flush for this database operation (overriding a previous
          * delay)
          */
         public void requestFlush();
-
     }
 
     /**
@@ -640,7 +635,7 @@
      * It is used by the {@link Store#execute(Callback)} method, often as
      * anonymous class.
      */
-    public interface Operation extends OperationContext {
+    public interface Operation<V> extends OperationContext<V> {
 
         /**
          * Called when the saver is about to execute the operation. If true is
@@ -704,16 +699,23 @@
      * This is a convenience base class that can be used to implement
      * Operations. It handles operation cancellation for you.
      */
-    abstract class OperationBase extends LinkedNode<OperationBase> implements Operation {
+    abstract class OperationBase<V> extends LinkedNode<OperationBase<?>> implements Operation<V> {
         public boolean flushRequested = false;
         public long opSequenceNumber = -1;
 
         final protected AtomicBoolean executePending = new AtomicBoolean(true);
         final protected AtomicBoolean cancelled = new AtomicBoolean(false);
         final protected AtomicBoolean executed = new AtomicBoolean(false);
+        final protected AtomicReference<FutureListener<? super V>> listener = new AtomicReference<FutureListener<? super V>>();
+
+        protected Throwable error;
 
         public static final int BASE_MEM_SIZE = 20;
 
+        public boolean cancel(boolean interrupt) {
+            return cancel();
+        }
+
         public boolean cancel() {
             if (storeBypass) {
                 if (executePending.compareAndSet(true, false)) {
@@ -723,20 +725,25 @@
                         unlink();
                         storeController.elementDispatched(this);
                     }
+                    fireListener();
                     return true;
                 }
             }
             return cancelled.get();
         }
 
-        public final boolean cancelled() {
+        public final boolean isCancelled() {
             return cancelled.get();
         }
 
-        public final boolean executed() {
+        public final boolean isExecuted() {
             return executed.get();
         }
 
+        public final boolean isDone() {
+            return isCancelled() || isExecuted();
+        }
+
         /**
          * Called when the saver is about to execute the operation. If true is
          * returned the operation can no longer be cancelled.
@@ -745,7 +752,6 @@
          */
         public final boolean beginExecute() {
             if (executePending.compareAndSet(true, false)) {
-                executed.set(true);
                 return true;
             } else {
                 return false;
@@ -789,12 +795,14 @@
          * delay)
          */
         public void requestFlush() {
-            synchronized (this) {
+            synchronized (opQueue) {
                 updateFlushPointer(opSequenceNumber);
             }
         }
 
         public void onCommit() {
+            executed.set(true);
+            fireListener();
         }
 
         /**
@@ -802,15 +810,95 @@
          * operation has been rolled back.
          */
         public void onRollback(Throwable error) {
-            error.printStackTrace();
+            executed.set(true);
+            if (!fireListener()) {
+                error.printStackTrace();
+            }
+        }
+
+        private final boolean fireListener() {
+            FutureListener<? super V> l = this.listener.getAndSet(null);
+            if (l != null) {
+                l.onFutureComplete(this);
+                return true;
+            }
+            return false;
+        }
+
+        public void setFutureListener(FutureListener<? super V> listener) {
+            this.listener.set(listener);
+            if (isDone()) {
+                fireListener();
+            }
+        }
+
+        /**
+         * Subclasses the return a result should override this
+         * @return The result.
+         */
+        protected final V getResult() {
+            return null;
+        }
+
+        /**
+         * Waits if necessary for the computation to complete, and then
+         * retrieves its result.
+         * 
+         * @return the computed result
+         * @throws CancellationException
+         *             if the computation was cancelled
+         * @throws ExecutionException
+         *             if the computation threw an exception
+         * @throws InterruptedException
+         *             if the current thread was interrupted while waiting
+         */
+        public final V get() throws ExecutionException, InterruptedException  {
+            
+            try {
+                return get(-1, TimeUnit.MILLISECONDS);
+            } catch (TimeoutException e) {
+                //Can't happen.
+                throw new AssertionError(e);
+            }
+        }
+        
+        /**
+         * Waits if necessary for at most the given time for the computation
+         * to complete, and then retrieves its result, if available.
+         *
+         * @param timeout the maximum time to wait
+         * @param unit the time unit of the timeout argument
+         * @return the computed result
+         * @throws CancellationException if the computation was cancelled
+         * @throws ExecutionException if the computation threw an
+         * exception
+         * @throws InterruptedException if the current thread was interrupted
+         * while waiting
+         * @throws TimeoutException if the wait timed out
+         */
+        public final V get(long timeout, TimeUnit tu) throws ExecutionException, InterruptedException, TimeoutException {
+            if (isCancelled()) {
+                throw new CancellationException();
+            }
+            if (error != null) {
+                throw new ExecutionException(error);
+            }
+            
+            //TODO implement blocking?
+            if(!isDone())
+            {
+                throw new UnsupportedOperationException("Blocking result retrieval not yet implemented");
+            }
+            
+            return getResult();
         }
 
         public String toString() {
-            return "DBOp seq: " + opSequenceNumber + "P/C/E: " + executePending.get() + "/" + cancelled() + "/" + executed();
+            return "DBOp seq: " + opSequenceNumber + "P/C/E: " + executePending.get() + "/" + isCancelled() + "/" + isExecuted();
         }
     }
 
-    private class QueueAddOperation extends OperationBase {
+    private class QueueAddOperation extends OperationBase<Object> {
 
         private QueueDescriptor qd;
 
@@ -827,17 +915,12 @@
             }
         }
 
-        @Override
-        public void onCommit() {
-
-        }
-
         public String toString() {
             return "QueueAdd: " + qd.getQueueName().toString();
         }
     }
 
-    private class QueueDeleteOperation extends OperationBase {
+    private class QueueDeleteOperation extends OperationBase<Object> {
 
         private QueueDescriptor qd;
 
@@ -850,17 +933,12 @@
             session.queueRemove(qd);
         }
 
-        @Override
-        public void onCommit() {
-
-        }
-
         public String toString() {
             return "QueueDelete: " + qd.getQueueName().toString();
         }
     }
 
-    private class DeleteOperation extends OperationBase {
+    private class DeleteOperation extends OperationBase<Object> {
         private final long queueKey;
         private QueueDescriptor queue;
 
@@ -887,17 +965,12 @@
             }
         }
 
-        @Override
-        public void onCommit() {
-
-        }
-
         public String toString() {
             return "MessageDelete: " + queue.getQueueName().toString() + " tracking: " + queueKey + " " + super.toString();
         }
     }
 
-    private class RestoreElementsOperation<V> extends OperationBase {
+    private class RestoreElementsOperation<V> extends OperationBase<V> {
         private QueueDescriptor queue;
         private long firstKey;
         private int maxRecords;
@@ -986,6 +1059,7 @@
         @Override
         public void onCommit() {
             listener.elementsRestored(msgs);
+            super.onCommit();
         }
 
         public String toString() {
@@ -993,7 +1067,7 @@
         }
     }
 
-    private class AddMessageOperation extends OperationBase {
+    private class AddMessageOperation extends OperationBase<Object> {
 
         private final BrokerMessageDelivery brokerDelivery;
         private final SaveableQueueElement<MessageDelivery> singleElement;
@@ -1101,6 +1175,8 @@
                     notify.notifySave();
                 }
             }
+            
+            super.onCommit();
         }
 
         public String toString() {
@@ -1108,7 +1184,7 @@
         }
     }
 
-    private class MapUpdateOperation extends OperationBase {
+    private class MapUpdateOperation extends OperationBase<Object> {
         final AsciiBuffer map;
         final AsciiBuffer key;
         final Buffer value;
@@ -1229,11 +1305,11 @@
      * @param source
      * @param delayable
      */
-    public <T> OperationContext saveQeueuElement(SaveableQueueElement<T> sqe, ISourceController<?> source, boolean delayable, MessageRecordMarshaller<T> marshaller) {
+    public <T> OperationContext<?> saveQeueuElement(SaveableQueueElement<T> sqe, ISourceController<?> source, boolean delayable, MessageRecordMarshaller<T> marshaller) {
         return add(new AddElementOperation<T>(sqe, delayable, marshaller), source, !delayable);
     }
 
-    private class AddElementOperation<T> extends OperationBase {
+    private class AddElementOperation<T> extends OperationBase<Object> {
 
         private final SaveableQueueElement<T> op;
         private MessageRecord record;
@@ -1257,7 +1333,7 @@
 
         @Override
         public int getLimiterSize() {
-            return record.getSize() + BASE_MEM_SIZE + 40;
+            return op.getLimiterSize() + BASE_MEM_SIZE + 32;
         }
 
         @Override
@@ -1280,10 +1356,6 @@
             }
         }
 
-        @Override
-        public void onCommit() {
-        }
-
         public String toString() {
             return "AddTxOpOperation " + record.getKey() + super.toString();
         }

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java?rev=790047&r1=790046&r2=790047&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java Wed Jul  1 06:10:17 2009
@@ -26,6 +26,8 @@
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.queue.QueueDescriptor;
 import org.apache.activemq.queue.SaveableQueueElement;
+import org.apache.activemq.util.buffer.AsciiBuffer;
+import org.apache.activemq.util.buffer.Buffer;
 
 public abstract class BrokerMessageDelivery implements MessageDelivery {
 
@@ -47,6 +49,8 @@
     boolean enableFlushDelay = true;
     private int limiterSize = -1;
 
+    private long tid;
+
     public void setFromDatabase(BrokerDatabase database, MessageRecord mRecord) {
         fromStore = true;
         store = database;
@@ -62,6 +66,28 @@
     }
 
     /**
+     * When an application wishes to include a message in a broker transaction
+     * it must set this the tid returned by {@link Transaction#getTid()}
+     * 
+     * @param tid
+     *            Sets the tid used to identify the transaction at the broker.
+     */
+    public void setTransactionId(long tid) {
+        this.tid = tid;
+    }
+
+    /**
+     * @return The tid used to identify the transaction at the broker.
+     */
+    public final long getTransactionId() {
+        return tid;
+    }
+
+    public final void clearTransactionId() {
+        tid = -1;
+    }
+
+    /**
      * Subclass must implement this to return their current memory size
      * estimate.
      * 
@@ -136,14 +162,18 @@
 
     }
 
-    public void beginDispatch(BrokerDatabase database) {
-        this.store = database;
-        dispatching = true;
+    public final void setStoreTracking(long tracking) {
         if (storeTracking == -1) {
-            storeTracking = database.allocateStoreTracking();
+            storeTracking = tracking;
         }
     }
 
+    public final void beginDispatch(BrokerDatabase database) {
+        this.store = database;
+        dispatching = true;
+        setStoreTracking(database.allocateStoreTracking());
+    }
+
     public long getStoreTracking() {
         return storeTracking;
     }
@@ -199,7 +229,7 @@
         }
     }
 
-    public void finishDispatch(ISourceController<?> controller) throws IOException {
+    public final void finishDispatch(ISourceController<?> controller) throws IOException {
         boolean firePersistListener = false;
         synchronized (this) {
             // If any of the targets requested save then save the message
@@ -223,6 +253,28 @@
         }
     }
 
+    public final MessageRecord createMessageRecord() {
+
+        MessageRecord record = new MessageRecord();
+        record.setEncoding(getStoreEncoding());
+        record.setBuffer(getStoreEncoded());
+        record.setStreamKey((long) 0);
+        record.setMessageId(getMsgId());
+        record.setSize(getFlowLimiterSize());
+        record.setKey(getStoreTracking());
+        return record;
+    }
+
+    /**
+     * @return A buffer representation of the message to be stored in the store.
+     */
+    protected abstract Buffer getStoreEncoded();
+
+    /**
+     * @return The encoding scheme used to store the message.
+     */
+    protected abstract AsciiBuffer getStoreEncoding();
+
     public boolean isFlushDelayable() {
         // TODO Auto-generated method stub
         return enableFlushDelay;

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java?rev=790047&r1=790046&r2=790047&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java Wed Jul  1 06:10:17 2009
@@ -88,10 +88,6 @@
         }
     }
 
-    public IFlowSink<MessageDelivery> getSink() {
-        return queue;
-    }
-
     public boolean hasSelector() {
         return selector != null;
     }

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java?rev=790047&r1=790046&r2=790047&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java Wed Jul  1 06:10:17 2009
@@ -17,10 +17,16 @@
 package org.apache.activemq.apollo.broker;
 
 import java.io.IOException;
+import java.util.HashSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 import javax.transaction.xa.XAException;
 
 import org.apache.activemq.queue.IQueue;
+import org.apache.activemq.util.FutureListener;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * LocalTransaction
@@ -33,6 +39,8 @@
  */
 public class LocalTransaction extends Transaction {
 
+    private static final Log LOG = LogFactory.getLog(LocalTransaction.class);
+    
     LocalTransaction(TransactionManager manager, long tid, IQueue<Long, TxOp> opQueue) {
         super(manager, tid, opQueue);
     }
@@ -43,20 +51,62 @@
      * @see org.apache.activemq.apollo.broker.Transaction#commit(boolean)
      */
     @Override
-    public void commit(boolean onePhase, TransactionListener listener) throws XAException, IOException {
-        // TODO Auto-generated method stub
-
+    public void commit(boolean onePhase, final TransactionListener listener) throws XAException, IOException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("commit: " + this);
+        }
+        
+        synchronized(this)
+        {
+            // Get ready for commit.
+            try {
+                prePrepare();
+            } catch (XAException e) {
+                throw e;
+            } catch (Throwable e) {
+                LOG.warn("COMMIT FAILED: ", e);
+                rollback(null);
+                // Let them know we rolled back.
+                XAException xae = new XAException("COMMIT FAILED: Transaction rolled back.");
+                xae.errorCode = XAException.XA_RBOTHER;
+                xae.initCause(e);
+                throw xae;
+            }
+            
+            //Add the listener for commit
+            if(listeners == null)
+            {
+                listeners = new HashSet<TransactionListener>();
+            }
+            listeners.add(listener);
+            
+            //Update the transaction state to committed,
+            //and on complete process the commit:
+            setState(COMMITED_STATE, new FutureListener<Object>()
+            {
+                public void onFutureComplete(Future<? extends Object> dbCommitResult) {
+                    try {
+                        dbCommitResult.get();
+                        startTransactionProcessor();
+                    } catch (InterruptedException e) {
+                        //Shouldn't happen
+                        LOG.warn(new AssertionError(e));
+                    } catch (ExecutionException e) {
+                        LOG.warn("COMMIT FAILED: ", e);
+                    }
+                    catch (Exception e)
+                    {
+                    }
+                }
+            });
+        }
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.apollo.broker.Transaction#prepare()
-     */
-    @Override
-    public int prepare(TransactionListener listener) throws XAException, IOException {
-        // TODO Auto-generated method stub
-        return 0;
+
+    public int prepare(TransactionListener listener) throws XAException {
+        XAException xae = new XAException("Prepare not implemented on Local Transactions.");
+        xae.errorCode = XAException.XAER_RMERR;
+        throw xae;
     }
 
     /*

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java?rev=790047&r1=790046&r2=790047&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java Wed Jul  1 06:10:17 2009
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.apollo.broker;
 
+import java.io.IOException;
+
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.flow.ISourceController;
@@ -80,7 +82,23 @@
      *         transaction
      */
     public long getTransactionId();
+    
+    /**
+     * Clears the transaction id. Called by the broker when a transacted message
+     * is commited. 
+     */
+    public void clearTransactionId();
 
+    public void beginDispatch(BrokerDatabase database); 
+    
+    public void finishDispatch(ISourceController<?> controller) throws IOException;
+    
+    /**
+     * Sets the unique id used to identify this message in the store.
+     * @param tracking The tracking to use.
+     */
+    public void setStoreTracking(long tracking);
+    
     /**
      * Called by a queue to request that the element be persisted. The save is
      * done asynchronously, and depending on the state of the message delivery

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java?rev=790047&r1=790046&r2=790047&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java Wed Jul  1 06:10:17 2009
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.apollo.broker;
 
+import java.io.IOException;
+
 import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.flow.ISourceController;
@@ -31,7 +33,6 @@
 
     private final MessageDelivery delegate;
 
-   
     public void acknowledge(SaveableQueueElement<MessageDelivery> sqe) {
         delegate.acknowledge(sqe);
     }
@@ -198,4 +199,43 @@
     MessageDeliveryWrapper(MessageDelivery delivery) {
         delegate = delivery;
     }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.activemq.apollo.broker.MessageDelivery#beginDispatch(org.apache
+     * .activemq.apollo.broker.BrokerDatabase)
+     */
+    public void beginDispatch(BrokerDatabase database) {
+        delegate.beginDispatch(database);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.activemq.apollo.broker.MessageDelivery#finishDispatch(org.
+     * apache.activemq.flow.ISourceController)
+     */
+    public void finishDispatch(ISourceController<?> controller) throws IOException {
+        delegate.finishDispatch(controller);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.activemq.apollo.broker.MessageDelivery#setStoreTracking(long)
+     */
+    public void setStoreTracking(long tracking) {
+        delegate.setStoreTracking(tracking);
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.apollo.broker.MessageDelivery#clearTransactionId()
+     */
+    public void clearTransactionId() {
+        delegate.clearTransactionId();
+    }
 }

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java?rev=790047&r1=790046&r2=790047&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java Wed Jul  1 06:10:17 2009
@@ -28,18 +28,17 @@
 import org.apache.commons.logging.LogFactory;
 
 final public class Router {
-	static final private Log LOG = LogFactory.getLog(Router.class); 
-	
+    static final private Log LOG = LogFactory.getLog(Router.class);
+
     public static final AsciiBuffer TOPIC_DOMAIN = new AsciiBuffer("topic");
     public static final AsciiBuffer QUEUE_DOMAIN = new AsciiBuffer("queue");
     public static final AsciiBuffer TEMP_TOPIC_DOMAIN = new AsciiBuffer("temp-topic");
     public static final AsciiBuffer TEMP_QUEUE_DOMAIN = new AsciiBuffer("temp-queue");
 
     private final HashMap<AsciiBuffer, Domain> domains = new HashMap<AsciiBuffer, Domain>();
-    
+
     private VirtualHost virtualHost;
     private BrokerDatabase database;
-    
 
     public Router() {
         domains.put(QUEUE_DOMAIN, new Domain());
@@ -67,7 +66,7 @@
     public synchronized void bind(Destination destination, DeliveryTarget target) {
         Collection<Destination> destinationList = destination.getDestinations();
         if (destinationList == null) {
-        	Domain domain = getDomain(destination);
+            Domain domain = getDomain(destination);
             domain.bind(destination.getName(), target);
         } else {
             for (Destination d : destinationList) {
@@ -79,7 +78,7 @@
     public synchronized void unbind(Destination destination, DeliveryTarget target) {
         Collection<Destination> destinationList = destination.getDestinations();
         if (destinationList == null) {
-        	Domain domain = getDomain(destination);
+            Domain domain = getDomain(destination);
             domain.unbind(destination.getName(), target);
         } else {
             for (Destination d : destinationList) {
@@ -88,10 +87,17 @@
         }
     }
 
-    public void route(final BrokerMessageDelivery msg, ISourceController<?> controller, boolean autoCreate) {
+    public void route(final MessageDelivery msg, ISourceController<?> controller, boolean autoCreate) {
 
+        //If the message is part of transaction send it to the transaction manager
+        if(msg.getTransactionId() >= 0)
+        {
+            virtualHost.getTransactionManager().newMessage(msg, controller);
+            return;
+        }
+        
         Collection<DeliveryTarget> targets = route(msg.getDestination(), msg, autoCreate);
-
+        
         //Set up the delivery for persistence:
         msg.beginDispatch(database);
 
@@ -124,20 +130,20 @@
         // Handles routing to composite/multi destinations.
         Collection<Destination> destinationList = destination.getDestinations();
         if (destinationList == null) {
-        	Domain domain = getDomain(destination);
+            Domain domain = getDomain(destination);
             Collection<DeliveryTarget> rc = domain.route(destination.getName(), msg);
             // We can auto create queues in the queue domain..
-            if(rc.isEmpty() && autoCreate && destination.getDomain().equals(Router.QUEUE_DOMAIN) ) {
-            	try {
-					Queue queue = virtualHost.createQueue(destination);
-					rc = new ArrayList<DeliveryTarget>(1);
-					rc.add(queue);
-				} catch (Exception e) {
-					LOG.error("Failed to auto create queue: "+destination.getName()+": "+e);
-					LOG.debug("Failed to auto create queue: "+destination.getName(),e);
-				}
+            if (rc.isEmpty() && autoCreate && destination.getDomain().equals(Router.QUEUE_DOMAIN)) {
+                try {
+                    Queue queue = virtualHost.createQueue(destination);
+                    rc = new ArrayList<DeliveryTarget>(1);
+                    rc.add(queue);
+                } catch (Exception e) {
+                    LOG.error("Failed to auto create queue: " + destination.getName() + ": " + e);
+                    LOG.debug("Failed to auto create queue: " + destination.getName(), e);
+                }
             }
-			return rc;
+            return rc;
         } else {
             HashSet<DeliveryTarget> rc = new HashSet<DeliveryTarget>();
             for (Destination d : destinationList) {

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java?rev=790047&r1=790046&r2=790047&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java Wed Jul  1 06:10:17 2009
@@ -24,11 +24,16 @@
 import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.queue.IQueue;
+import org.apache.activemq.queue.Subscription;
 import org.apache.activemq.queue.Subscription.SubscriptionDelivery;
+import org.apache.activemq.util.FutureListener;
+import org.apache.activemq.util.ListenableFuture;
 import org.apache.activemq.util.buffer.AsciiBuffer;
 import org.apache.activemq.util.buffer.Buffer;
 import org.apache.activemq.util.buffer.DataByteArrayInputStream;
 import org.apache.activemq.util.buffer.DataByteArrayOutputStream;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * Keeps track of all the actions the need to be done when a transaction does a
@@ -36,10 +41,13 @@
  */
 public abstract class Transaction {
 
+    private static final Log LOG = LogFactory.getLog(Transaction.class);
+
     public static final byte START_STATE = 0; // can go to: 1,2,3
-    public static final byte IN_USE_STATE = 1; // can go to: 2,3
-    public static final byte PREPARED_STATE = 2; // can go to: 3
-    public static final byte FINISHED_STATE = 3;
+    public static final byte IN_USE_STATE = 1; // can go to: 2,3, 4
+    public static final byte PREPARED_STATE = 2; // can go to: 3, 4
+    public static final byte COMMITED_STATE = 3;
+    public static final byte ROLLBACK_STATE = 4;
 
     static final byte TYPE_LOCAL = 0;
     static final byte TYPE_XA = 1;
@@ -48,11 +56,14 @@
     private final TransactionManager manager;
     private final long tid;
     private final IQueue<Long, TxOp> opQueue;
-    private HashSet<TransactionListener> listeners;
+    protected HashSet<TransactionListener> listeners;
+    
+    private TxProcessor processor;
 
     Transaction(TransactionManager manager, long tid, IQueue<Long, TxOp> opQueue) {
         this.manager = manager;
         this.opQueue = opQueue;
+        opQueue.start();
         this.tid = tid;
     }
 
@@ -74,7 +85,7 @@
      */
     public abstract byte getType();
 
-    public void addMessage(BrokerMessageDelivery m, ISourceController<?> source) {
+    public void addMessage(MessageDelivery m, ISourceController<?> source) {
 
         synchronized (this) {
             switch (state) {
@@ -97,7 +108,8 @@
                 IQueue<Long, MessageDelivery> target = manager.getVirtualHost().getQueueStore().getQueue(toAck.getQueueDescriptor().getQueueName());
                 //Queue could be null if it was just deleted:
                 if (target != null) {
-                    opQueue.add(new TxAck(target, toAck.getSourceQueueRemovalKey(), this), null);
+                    long tracking = manager.getVirtualHost().getDatabase().allocateStoreTracking();
+                    opQueue.add(new TxAck(target, toAck.getSourceQueueRemovalKey(), tracking, this), null);
                 }
                 break;
             default: {
@@ -111,8 +123,10 @@
         return state;
     }
 
-    public void setState(byte state) {
+    public void setState(byte state, FutureListener<? super Object> listener) {
         this.state = state;
+        ListenableFuture<?> future = manager.persistTransaction(this);
+        future.setFutureListener(listener);
     }
 
     public void prePrepare() throws Exception {
@@ -128,20 +142,30 @@
             xae.errorCode = XAException.XAER_PROTO;
             throw xae;
         }
-
-        //TODO:
     }
 
     protected void fireAfterCommit() throws Exception {
 
         synchronized (this) {
-
+            for (TransactionListener listener : listeners) {
+                listener.onCommit(this);
+            }
         }
     }
 
     public void fireAfterRollback() throws Exception {
         synchronized (this) {
+            for (TransactionListener listener : listeners) {
+                listener.onRollback(this);
+            }
+        }
+    }
 
+    public void fireAfterPrepare() throws Exception {
+        synchronized (this) {
+            for (TransactionListener listener : listeners) {
+                listener.onPrepared(this);
+            }
         }
     }
 
@@ -185,9 +209,9 @@
 
         public <T> T asType(Class<T> type);
 
-        public void onRollback();
+        public void onRollback(ISourceController<?> controller);
 
-        public void onCommit();
+        public void onCommit(ISourceController<?> controller);
 
         public int getLimiterSize();
 
@@ -237,17 +261,16 @@
         }
 
         public final int getLimiterSize() {
-            // TODO Auto-generated method stub
             return message.getFlowLimiterSize();
         }
 
-        public final void onCommit() {
-
+        public final void onCommit(ISourceController<?> controller) {
+            message.clearTransactionId();
+            tx.manager.getVirtualHost().getRouter().route(message, controller, true);
         }
 
-        public final void onRollback() {
-            // TODO Auto-generated method stub
-
+        public final void onRollback(ISourceController<?> controller) {
+            return;
         }
 
         public final boolean isFromStore() {
@@ -280,14 +303,15 @@
         Transaction tx;
         IQueue<Long, ?> queue; //Desriptor of the queue on which to delete.
         long queueSequence; //Sequence number of the element on the queue from which to delete.
-        long storeTracking; //Store tracking of this delete op.
+        final long storeTracking; //Store tracking of this delete op.
         private boolean fromStore;
         private static final int MEM_SIZE = 8 + 8 + 8 + 8 + 1;
 
-        TxAck(IQueue<Long, ?> queue, long removalKey, Transaction tx) {
+        TxAck(IQueue<Long, ?> queue, long removalKey, long storeTracking, Transaction tx) {
             this.queue = queue;
             this.queueSequence = removalKey;
             this.tx = tx;
+            this.storeTracking = storeTracking;
         }
 
         public final short getType() {
@@ -307,28 +331,27 @@
          * 
          * @see org.apache.activemq.apollo.broker.Transaction.TxOp#onCommit()
          */
-        public final void onCommit() {
+        public final void onCommit(ISourceController<?> controller) {
             queue.remove(queueSequence);
-
         }
 
         /*
          * (non-Javadoc)
          * 
-         * @see
-         * org.apache.activemq.apollo.broker.Transaction.TxOp#getLimiterSize()
+         * @see org.apache.activemq.apollo.broker.Transaction.TxOp#onRollback()
          */
-        public final int getLimiterSize() {
-            return MEM_SIZE;
+        public final void onRollback(ISourceController<?> controller) {
+            // TODO unaquire the element.
         }
 
         /*
          * (non-Javadoc)
          * 
-         * @see org.apache.activemq.apollo.broker.Transaction.TxOp#onRollback()
+         * @see
+         * org.apache.activemq.apollo.broker.Transaction.TxOp#getLimiterSize()
          */
-        public final void onRollback() {
-            // TODO unaquire the element.
+        public final int getLimiterSize() {
+            return MEM_SIZE;
         }
 
         public final boolean isFromStore() {
@@ -380,11 +403,12 @@
             baos.readFully(queueBytes);
             AsciiBuffer queueName = new AsciiBuffer(queueBytes);
             queue = tx.manager.getVirtualHost().getQueueStore().getQueue(queueName);
+            queueSequence = baos.readLong();
 
         }
 
         public final static TxAck createFromMessageRecord(MessageRecord record, Transaction tx) {
-            TxAck ret = new TxAck(null, record.getKey(), tx);
+            TxAck ret = new TxAck(null, -1, record.getKey(), tx);
             ret.fromBytes(record.getBuffer().getData());
             return ret;
         }
@@ -416,4 +440,124 @@
             return new TxMessage(delivery, tx);
         }
     }
+
+    protected void startTransactionProcessor()
+    {
+        synchronized(this)
+        {
+            if(processor == null)
+            {
+                processor = new TxProcessor();
+                opQueue.addSubscription(processor);
+            }
+        }
+    }
+    
+    
+    /**
+     * TxProcessor
+     * <p>
+     * Description: The tx processor processes the transaction queue after
+     * commit or rollback.
+     * </p>
+     * 
+     * @author cmacnaug
+     * @version 1.0
+     */
+    private class TxProcessor implements Subscription<TxOp> {
+        /*
+         * (non-Javadoc)
+         * 
+         * @see org.apache.activemq.queue.Subscription#add(java.lang.Object,
+         * org.apache.activemq.flow.ISourceController,
+         * org.apache.activemq.queue.Subscription.SubscriptionDelivery)
+         */
+        public void add(TxOp element, ISourceController<?> controller, SubscriptionDelivery<TxOp> callback) {
+
+            switch (state) {
+            case COMMITED_STATE: {
+                element.onCommit(controller);
+                if (callback != null) {
+                    callback.acknowledge();
+                }
+                break;
+            }
+            case ROLLBACK_STATE: {
+                element.onRollback(controller);
+                if (callback != null) {
+                    callback.acknowledge();
+                }
+                break;
+            }
+            default: {
+                LOG.error("Illegal state for transaction dispatch: " + this + " state: " + state);
+            }
+            }
+
+            //If we've reached the end of the op queue
+            if (opQueue.getEnqueuedCount() == 0) {
+                opQueue.shutdown(false);
+            }
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see org.apache.activemq.queue.Subscription#hasSelector()
+         */
+        public boolean hasSelector() {
+            return false;
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see org.apache.activemq.queue.Subscription#isBrowser()
+         */
+        public boolean isBrowser() {
+            return false;
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see org.apache.activemq.queue.Subscription#isExclusive()
+         */
+        public boolean isExclusive() {
+            return true;
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.activemq.queue.Subscription#isRemoveOnDispatch(java.lang
+         * .Object)
+         */
+        public boolean isRemoveOnDispatch(TxOp elem) {
+            return false;
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see org.apache.activemq.queue.Subscription#matches(java.lang.Object)
+         */
+        public boolean matches(TxOp elem) {
+            return true;
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see org.apache.activemq.queue.Subscription#offer(java.lang.Object,
+         * org.apache.activemq.flow.ISourceController,
+         * org.apache.activemq.queue.Subscription.SubscriptionDelivery)
+         */
+        public boolean offer(TxOp element, ISourceController<?> controller, SubscriptionDelivery<TxOp> callback) {
+            add(element, controller, callback);
+            return true;
+        }
+
+    }
 }

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java?rev=790047&r1=790046&r2=790047&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java Wed Jul  1 06:10:17 2009
@@ -35,6 +35,7 @@
 import org.apache.activemq.queue.QueueStore;
 import org.apache.activemq.queue.RestoreListener;
 import org.apache.activemq.queue.SaveableQueueElement;
+import org.apache.activemq.util.ListenableFuture;
 import org.apache.activemq.util.Mapper;
 import org.apache.activemq.util.buffer.AsciiBuffer;
 import org.apache.activemq.util.buffer.Buffer;
@@ -69,8 +70,8 @@
     private static final int DEFAULT_TX_QUEUE_PAGING_THRESHOLD = 1024 * 64;
     private static final int DEFAULT_TX_QUEUE_RESUME_THRESHOLD = 1;
     // Be default we don't page out elements to disk.
-    //private static final int DEFAULT_DURABLE_QUEUE_SIZE = DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD;
-    private static final int DEFAULT_TX_QUEUE_SIZE = Integer.MAX_VALUE;
+    private static final int DEFAULT_TX_QUEUE_SIZE = DEFAULT_TX_QUEUE_PAGING_THRESHOLD;
+    //private static final int DEFAULT_TX_QUEUE_SIZE = Integer.MAX_VALUE;
 
     private static final PersistencePolicy<TxOp> DEFAULT_TX_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy<TxOp>() {
 
@@ -151,6 +152,17 @@
     }
 
     /**
+     * @param msg
+     * @param controller
+     */
+    public void newMessage(MessageDelivery msg, ISourceController<?> controller) {
+        if (msg.getStoreTracking() == -1) {
+            msg.setStoreTracking(host.getDatabase().allocateStoreTracking());
+        }
+        transactions.get(msg.getTransactionId()).addMessage(msg, controller);
+    }
+
+    /**
      * Creates a transaction.
      * 
      * @param xid
@@ -209,7 +221,7 @@
             Transaction tx = loadTransaction(b, queue);
 
             //TODO if we recover a tx that isn't committed then, we should discard it.
-            if (tx.getState() < Transaction.FINISHED_STATE) {
+            if (tx.getState() < Transaction.COMMITED_STATE) {
                 LOG.warn("Recovered unfinished transaction: " + tx);
             }
             transactions.put(tx.getTid(), tx);
@@ -250,26 +262,32 @@
             throw new IOException("Invalid transaction type: " + type);
 
         }
-        tx.setState(state);
+        tx.setState(state, null);
         return tx;
 
     }
 
-    public OperationContext persistTransaction(Transaction tx) throws IOException {
+    public ListenableFuture<?> persistTransaction(Transaction tx) {
 
         //TODO move the serialization into the transaction itself:
         DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
-        baos.writeByte(tx.getType());
-        baos.writeByte(tx.getState());
-        baos.writeLong(tx.getTid());
-        if (tx.getType() == Transaction.TYPE_XA) {
-            Buffer xid = ((XATransaction) tx).getXid();
-            // An XID max size is around 140 bytes, byte SHOULD be big enough to frame it.
-            baos.writeByte(xid.length & 0xFF);
-            baos.write(xid.data, xid.offset, xid.length);
+        try {
+            baos.writeByte(tx.getType());
+            baos.writeByte(tx.getState());
+            baos.writeLong(tx.getTid());
+            if (tx.getType() == Transaction.TYPE_XA) {
+                Buffer xid = ((XATransaction) tx).getXid();
+                // An XID max size is around 140 bytes, byte SHOULD be big enough to frame it.
+                baos.writeByte(xid.length & 0xFF);
+                baos.write(xid.data, xid.offset, xid.length);
+            }
+            OperationContext<?> ctx = database.updateMapEntry(TXN_MAP, tx.getBackingQueueName(), new Buffer(baos.getData(), 0, baos.size()));
+            ctx.requestFlush();
+            return ctx;
+        } catch (IOException ioe) {
+            //Shouldn't happen
+            throw new RuntimeException(ioe);
         }
-
-        return database.updateMapEntry(TXN_MAP, tx.getBackingQueueName(), new Buffer(baos.getData(), 0, baos.size()));
     }
 
     private IQueue<Long, TxOp> createRestoredTxQueue(QueueQueryResult loaded) throws IOException {
@@ -280,7 +298,10 @@
     }
 
     private final IQueue<Long, TxOp> createTransactionQueue(long tid) {
-        return createTxQueueInternal(TX_QUEUE_PREFIX + tid, BrokerQueueStore.TRANSACTION_QUEUE_TYPE);
+        IQueue<Long, TxOp> queue = createTxQueueInternal(TX_QUEUE_PREFIX + tid, BrokerQueueStore.TRANSACTION_QUEUE_TYPE);
+        queue.initialize(0, 0, 0, 0);
+        txStore.addQueue(queue.getDescriptor());
+        return queue;
     }
 
     private IQueue<Long, TxOp> createTxQueueInternal(final String name, short type) {
@@ -375,7 +396,7 @@
          * org.apache.activemq.flow.ISourceController, boolean)
          */
         public void persistQueueElement(SaveableQueueElement<TxOp> sqe, ISourceController<?> source, boolean delayable) {
-            database.saveQeueuElement(sqe, source, delayable, TX_OP_MARSHALLER);
+            database.saveQeueuElement(sqe, source, false, TX_OP_MARSHALLER);
         }
 
         /*
@@ -390,4 +411,5 @@
             database.restoreQueueElements(queue, recordOnly, firstSequence, maxSequence, maxCount, listener, TX_OP_MARSHALLER);
         }
     }
+
 }

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=790047&r1=790046&r2=790047&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java Wed Jul  1 06:10:17 2009
@@ -37,8 +37,6 @@
     private PersistListener persistListener = null;
     private final int size;
 
-    private long tid = -1;
-
     public interface PersistListener {
         public void onMessagePersisted(OpenWireMessageDelivery delivery);
     }
@@ -105,38 +103,6 @@
         return message.isResponseRequired();
     }
 
-    public MessageRecord createMessageRecord() {
-        MessageRecord record = new MessageRecord();
-        record.setEncoding(ENCODING);
-
-        Buffer bytes;
-        try {
-            bytes = storeWireFormat.marshal(message);
-        } catch (IOException e) {
-            return null;
-        }
-        record.setBuffer(new Buffer(bytes.getData(), bytes.getOffset(), bytes.getLength()));
-        record.setStreamKey((long) 0);
-        record.setMessageId(getMsgId());
-        record.setSize(getFlowLimiterSize());
-        return record;
-    }
-
-    public long getTransactionId() {
-        return tid;
-    }
-    
-    /**
-     * @param tid
-     */
-    public void setTransactionId(long tid) {
-        this.tid  = tid;
-    }
-
-    public void setStoreWireFormat(OpenWireFormat wireFormat) {
-        this.storeWireFormat = wireFormat;
-    }
-
     /*
      * (non-Javadoc)
      * 
@@ -154,5 +120,23 @@
         return message.getMessageId().toString();
     }
 
+    public AsciiBuffer getStoreEncoding() {
+        return ENCODING;
+    }
+    
+    public Buffer getStoreEncoded() {
+        Buffer bytes;
+        try {
+            bytes = storeWireFormat.marshal(message);
+        } catch (IOException e) {
+            return null;
+        }
+        return bytes;
+    }
+    
+
+    public void setStoreWireFormat(OpenWireFormat wireFormat) {
+        this.storeWireFormat = wireFormat;
+    }
     
 }

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=790047&r1=790046&r2=790047&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Wed Jul  1 06:10:17 2009
@@ -38,6 +38,7 @@
 import org.apache.activemq.apollo.broker.Transaction;
 import org.apache.activemq.apollo.broker.VirtualHost;
 import org.apache.activemq.apollo.broker.XidImpl;
+import org.apache.activemq.apollo.broker.Transaction.TransactionListener;
 import org.apache.activemq.broker.openwire.OpenWireMessageDelivery.PersistListener;
 import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -240,7 +241,7 @@
                 md.setStoreWireFormat(storeWireFormat);
                 TransactionId tid = info.getTransactionId();
                 if (tid != null) {
-                    Transaction t = locateTransaction(tid);
+                    Transaction t = locateTransaction(tid, true);
                     md.setTransactionId(t.getTid());
                 } else {
                     md.setPersistListener(OpenwireProtocolHandler.this);
@@ -367,7 +368,7 @@
             public Response processBeginTransaction(TransactionInfo info) throws Exception {
                 TransactionId tid = info.getTransactionId();
 
-                Transaction t = locateTransaction(tid);
+                Transaction t = locateTransaction(tid, false);
                 if (t == null) {
 
                     Buffer xid = null;
@@ -382,22 +383,62 @@
             }
 
             public Response processCommitTransactionOnePhase(final TransactionInfo info) throws Exception {
-                TransactionId tid = info.getTransactionId();
-                Transaction t = locateTransaction(tid);
-                t.commit(true, new Transaction.TransactionListener()
-                {
-                    
-                });
+                final TransactionId tid = info.getTransactionId();
+                Transaction t = locateTransaction(tid, true);
+                
+                TransactionListener listener = null;
+                if (info.isResponseRequired()) {
+                    listener = new TransactionListener() {
+
+                        @Override
+                        public void onCommit(Transaction t) {
+                            transactions.remove(tid);
+                            ack(info);
+                        }
+
+                        @Override
+                        public void onRollback(Transaction t) {
+                            transactions.remove(tid);
+                            ExceptionResponse r = new ExceptionResponse(new XAException("RolledBack"));
+                            r.setCorrelationId(info.getCommandId());
+                            connection.write(r);
+                        }
+
+                    };
+                }
+                
+                t.commit(true, listener);
                 transactions.remove(tid);
-                throw new UnsupportedOperationException();
+                return null;
             }
 
-            public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
-                TransactionId tid = info.getTransactionId();
-                Transaction t = locateTransaction(tid);
-                t.commit(false, null);
-                transactions.remove(tid);
-                throw new UnsupportedOperationException();
+            public Response processCommitTransactionTwoPhase(final TransactionInfo info) throws Exception {
+                final TransactionId tid = info.getTransactionId();
+                Transaction t = locateTransaction(tid, true);
+
+                TransactionListener listener = null;
+                if (info.isResponseRequired()) {
+                    listener = new TransactionListener() {
+
+                        @Override
+                        public void onCommit(Transaction t) {
+                            transactions.remove(tid);
+                            ack(info);
+                        }
+
+                        @Override
+                        public void onRollback(Transaction t) {
+                            transactions.remove(tid);
+                            ExceptionResponse r = new ExceptionResponse(new XAException("RolledBack"));
+                            r.setCorrelationId(info.getCommandId());
+                            connection.write(r);
+                        }
+
+                    };
+                }
+
+                t.commit(false, listener);
+                return null;
             }
 
             public Response processEndTransaction(TransactionInfo info) throws Exception {
@@ -405,18 +446,30 @@
                 //in sync with broker transaction state. 
                 //TODO need to investigate whether this should wait for prior transaction
                 //state to flush out?
-                throw new UnsupportedOperationException();
+                new UnsupportedOperationException().printStackTrace();
+                return ack(info);
             }
 
             public Response processForgetTransaction(TransactionInfo info) throws Exception {
                 return processRollbackTransaction(info);
             }
 
-            public Response processPrepareTransaction(TransactionInfo info) throws Exception {
-                TransactionId tid = info.getTransactionId();
-                Transaction t = locateTransaction(tid);
-                t.prepare(null);
-                throw new UnsupportedOperationException();
+            public Response processPrepareTransaction(final TransactionInfo info) throws Exception {
+                final TransactionId tid = info.getTransactionId();
+                Transaction t = locateTransaction(tid, true);
+
+                TransactionListener listener = null;
+                if (info.isResponseRequired()) {
+                    listener = new TransactionListener() {
+
+                        @Override
+                        public void onPrepared(Transaction t) {
+                            ack(info);
+                        }
+                    };
+                }
+                t.prepare(listener);
+                return null;
             }
 
             public Response processRecoverTransactions(TransactionInfo info) throws Exception {
@@ -424,13 +477,23 @@
                 throw new UnsupportedOperationException();
             }
 
-            public Response processRollbackTransaction(TransactionInfo info) throws Exception {
-                TransactionId tid = info.getTransactionId();
-                Transaction t = locateTransaction(tid);
-                t.rollback(null);
+            public Response processRollbackTransaction(final TransactionInfo info) throws Exception {
+                final TransactionId tid = info.getTransactionId();
+                Transaction t = locateTransaction(tid, true);
+
+                TransactionListener listener = null;
+                if (info.isResponseRequired()) {
+                    listener = new TransactionListener() {
+
+                        @Override
+                        public void onRollback(Transaction t) {
+                            ack(info);
+                        }
+                    };
+                }
+                t.rollback(listener);
                 transactions.remove(tid);
-                //TODO need to respond to this when the rollback completes
-                throw new UnsupportedOperationException();
+                return null;
             }
 
             // /////////////////////////////////////////////////////////////////
@@ -456,7 +519,7 @@
         };
     }
 
-    private Transaction locateTransaction(TransactionId tid) throws XAException, JMSException {
+    private Transaction locateTransaction(TransactionId tid, boolean expected) throws XAException, JMSException {
         Transaction t;
 
         if (tid.isLocalTransaction()) {
@@ -465,7 +528,7 @@
             t = host.getTransactionManager().getXATransaction(XidImpl.toBuffer((Xid) tid));
         }
 
-        if (t == null) {
+        if (t == null && expected) {
             if (tid.isXATransaction()) {
                 XAException e = new XAException("Transaction '" + tid + "' has not been started.");
                 e.errorCode = XAException.XAER_NOTA;
@@ -692,7 +755,7 @@
                 TransactionId tid = info.getTransactionId();
                 Transaction transaction = null;
                 if (tid != null) {
-                    transaction = locateTransaction(tid);
+                    transaction = locateTransaction(tid, true);
                 }
 
                 LinkedList<SubscriptionDelivery<MessageDelivery>> acked = new LinkedList<SubscriptionDelivery<MessageDelivery>>();
@@ -877,15 +940,6 @@
         /*
          * (non-Javadoc)
          * 
-         * @see org.apache.activemq.queue.Subscription#getSink()
-         */
-        public IFlowSink<MessageDelivery> getSink() {
-            return this;
-        }
-
-        /*
-         * (non-Javadoc)
-         * 
          * @see org.apache.activemq.flow.IFlowSink#add(java.lang.Object,
          * org.apache.activemq.flow.ISourceController)
          */

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java?rev=790047&r1=790046&r2=790047&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java Wed Jul  1 06:10:17 2009
@@ -516,15 +516,6 @@
         /*
          * (non-Javadoc)
          * 
-         * @see org.apache.activemq.queue.Subscription#getSink()
-         */
-        public IFlowSink<MessageDelivery> getSink() {
-            return this;
-        }
-
-        /*
-         * (non-Javadoc)
-         * 
          * @see org.apache.activemq.queue.Subscription#hasSelector()
          */
         public boolean hasSelector() {

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SaveableQueueElement.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SaveableQueueElement.java?rev=790047&r1=790046&r2=790047&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SaveableQueueElement.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SaveableQueueElement.java Wed Jul  1 06:10:17 2009
@@ -41,6 +41,11 @@
      *         called when this element is persisted
      */
     public boolean requestSaveNotify();
+    
+    /**
+     * @return The size of the element in memory
+     */
+    public int getLimiterSize();
 
     /**
      * Called when the element has been saved.

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueueOld.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueueOld.java?rev=790047&r1=790046&r2=790047&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueueOld.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueueOld.java Wed Jul  1 06:10:17 2009
@@ -26,7 +26,6 @@
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
 import org.apache.activemq.flow.IFlowResource;
-import org.apache.activemq.flow.IFlowSink;
 import org.apache.activemq.flow.IFlowSizeLimiter;
 import org.apache.activemq.flow.ISinkController;
 import org.apache.activemq.flow.ISourceController;
@@ -49,7 +48,7 @@
     private final LinkedNodeList<SubscriptionNode> readyPollingSubs = new LinkedNodeList<SubscriptionNode>();
 
     private final HashMap<Subscription<V>, SubscriptionNode> subscriptions = new HashMap<Subscription<V>, SubscriptionNode>();
-    private final HashMap<IFlowResource, SubscriptionNode> sinks = new HashMap<IFlowResource, SubscriptionNode>();
+    //private final HashMap<IFlowResource, SubscriptionNode> sinks = new HashMap<IFlowResource, SubscriptionNode>();
 
     private final FlowController<V> sinkController;
     private final IFlowSizeLimiter<V> limiter;
@@ -58,54 +57,6 @@
     protected Mapper<K, V> keyMapper;
     private long directs;
 
-    private final ISourceController<V> sourceControler = new ISourceController<V>() {
-
-        public Flow getFlow() {
-            return sinkController.getFlow();
-        }
-
-        public void elementDispatched(V elem) {
-        }
-
-        public void onFlowBlock(ISinkController<?> sink) {
-        }
-
-        public void onFlowResume(ISinkController<?> sinkController) {
-            IFlowResource sink = sinkController.getFlowResource();
-            synchronized (mutex) {
-                SubscriptionNode node = sinks.get(sink);
-                if (node != null) {
-                    node.unlink();
-                    boolean notify = false;
-                    if (node.cursor == null) {
-                        readyDirectSubs.addLast(node);
-                        // System.out.println("Subscription state change: un-ready direct -> ready direct: "+node);
-                    } else {
-                        if (readyPollingSubs.isEmpty()) {
-                            notify = !store.isEmpty();
-                        }
-                        readyPollingSubs.addLast(node);
-                        // System.out.println("Subscription state change: un-ready polling -> ready polling: "+node);
-                    }
-
-                    if (notify) {
-                        notifyReady();
-                    }
-                }
-            }
-        }
-
-        @Override
-        public String toString() {
-            return getResourceName();
-        }
-
-        public IFlowResource getFlowResource() {
-            return SharedQueueOld.this;
-        }
-
-    };
-
     private QueueDescriptor queueDescriptor;
 
     public SharedQueueOld(String name, IFlowSizeLimiter<V> limiter) {
@@ -257,7 +208,7 @@
         while (node != null) {
             next = node.getNext();
             if (node.subscription.matches(elem)) {
-                accepted = node.subscription.getSink().offer(elem, sourceControler);
+                accepted = node.subscription.offer(elem, node, null);
                 if (accepted) {
                     if (autoRelease) {
                         sinkController.elementDispatched(elem);
@@ -325,8 +276,7 @@
             }
 
             // The subscription's sink may be full..
-            IFlowSink<V> sink = subNode.subscription.getSink();
-            boolean accepted = sink.offer(storeNode.getValue(), sourceControler);
+            boolean accepted = subNode.subscription.offer(storeNode.getValue(), subNode, null);
 
             synchronized (mutex) {
                 if (accepted) {
@@ -358,7 +308,7 @@
             if (node == null) {
                 node = new SubscriptionNode(subscription);
                 subscriptions.put(subscription, node);
-                sinks.put(subscription.getSink(), node);
+                //sinks.put(subscription.getSink(), node);
                 if (!store.isEmpty()) {
                     readyPollingSubs.addLast(node);
                     notifyReady();
@@ -373,7 +323,7 @@
         synchronized (mutex) {
             SubscriptionNode node = subscriptions.remove(subscription);
             if (node != null) {
-                sinks.remove(subscription.getSink());
+                //sinks.remove(subscription.getSink());
                 node.unlink();
                 return true;
             }
@@ -381,7 +331,7 @@
         }
     }
 
-    private class SubscriptionNode extends LinkedNode<SubscriptionNode> {
+    private class SubscriptionNode extends LinkedNode<SubscriptionNode> implements ISourceController<V> {
         public final Subscription<V> subscription;
         public StoreCursor<K, V> cursor;
 
@@ -434,6 +384,59 @@
         public String toString() {
             return "subscription from " + getResourceName() + " to " + subscription;
         }
+
+        /* (non-Javadoc)
+         * @see org.apache.activemq.flow.ISourceController#elementDispatched(java.lang.Object)
+         */
+        public void elementDispatched(V elem) {
+            
+        }
+
+        /* (non-Javadoc)
+         * @see org.apache.activemq.flow.ISourceController#getFlow()
+         */
+        public Flow getFlow() {
+            return sinkController.getFlow();
+        }
+
+        /* (non-Javadoc)
+         * @see org.apache.activemq.flow.ISourceController#getFlowResource()
+         */
+        public IFlowResource getFlowResource() {
+            return SharedQueueOld.this;
+        }
+
+        /* (non-Javadoc)
+         * @see org.apache.activemq.flow.ISourceController#onFlowBlock(org.apache.activemq.flow.ISinkController)
+         */
+        public void onFlowBlock(ISinkController<?> sinkController) {
+            
+            
+        }
+
+        /* (non-Javadoc)
+         * @see org.apache.activemq.flow.ISourceController#onFlowResume(org.apache.activemq.flow.ISinkController)
+         */
+        public void onFlowResume(ISinkController<?> sinkController) {
+            synchronized (mutex) {
+                    unlink();
+                boolean notify = false;
+                if (cursor == null) {
+                    readyDirectSubs.addLast(this);
+                    // System.out.println("Subscription state change: un-ready direct -> ready direct: "+node);
+                } else {
+                    if (readyPollingSubs.isEmpty()) {
+                        notify = !store.isEmpty();
+                    }
+                    readyPollingSubs.addLast(this);
+                    // System.out.println("Subscription state change: un-ready polling -> ready polling: "+node);
+                }
+
+                if (notify) {
+                    notifyReady();
+                }
+            }
+        }
     }
 
     public Mapper<K, V> getKeyMapper() {

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java?rev=790047&r1=790046&r2=790047&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java Wed Jul  1 06:10:17 2009
@@ -138,8 +138,4 @@
      *         returning false.
      */
     public void add(E element, ISourceController<?> controller, SubscriptionDelivery<E> callback);
-
-    @Deprecated
-    public IFlowSink<E> getSink();
-
 }

Modified: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java?rev=790047&r1=790046&r2=790047&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java Wed Jul  1 06:10:17 2009
@@ -18,11 +18,11 @@
 
 import org.apache.activemq.apollo.broker.BrokerMessageDelivery;
 import org.apache.activemq.apollo.broker.Destination;
-import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.transport.stomp.Stomp;
 import org.apache.activemq.transport.stomp.StompFrame;
 import org.apache.activemq.util.buffer.AsciiBuffer;
+import org.apache.activemq.util.buffer.Buffer;
 
 public class StompMessageDelivery extends BrokerMessageDelivery {
 
@@ -37,8 +37,6 @@
     private PersistListener persistListener = null;
     private long tte = Long.MIN_VALUE;
 
-    private long tid = -1;
-
     public interface PersistListener {
         public void onMessagePersisted(StompMessageDelivery delivery);
     }
@@ -143,23 +141,17 @@
         }
     }
 
-    public MessageRecord createMessageRecord() {
-        MessageRecord record = new MessageRecord();
-        record.setEncoding(ENCODING);
-        // TODO: Serialize it..
-        // record.setBuffer()
-        // record.setStreamKey(stream);
-        record.setMessageId(getMsgId());
-        return record;
-    }
-
-    public long getTransactionId() {
-        return tid;
+    public AsciiBuffer getStoreEncoding() {
+        return ENCODING;
     }
-
-    public void setTransactionId(long tid) {
-        this.tid = tid;
+    
+    public Buffer getStoreEncoded() {
+        Buffer bytes;
+        //TODO encode it:
+        //return bytes;
+        throw new UnsupportedOperationException();
     }
+    
 
     public MessageEvaluationContext createMessageEvaluationContext() {
         return new StompMessageEvaluationContext();

Modified: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=790047&r1=790046&r2=790047&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java Wed Jul  1 06:10:17 2009
@@ -349,10 +349,6 @@
 
         }
 
-        public IFlowSink<MessageDelivery> getSink() {
-            return queue;
-        }
-
         public boolean hasSelector() {
             return false;
         }

Added: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/FutureListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/FutureListener.java?rev=790047&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/FutureListener.java (added)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/FutureListener.java Wed Jul  1 06:10:17 2009
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.util.concurrent.Future;
+
+/** 
+ * FutureListener
+ * <p>
+ * Description:
+ * </p>
+ * @author cmacnaug
+ * @version 1.0
+ */
+public interface FutureListener<V> {
+
+    public void onFutureComplete(Future<? extends V> future);
+}

Propchange: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/FutureListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/ListenableFuture.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/ListenableFuture.java?rev=790047&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/ListenableFuture.java (added)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/ListenableFuture.java Wed Jul  1 06:10:17 2009
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.util.concurrent.Future;
+
+/** 
+ * ListenableFuture
+ * <p>
+ * Description:
+ * </p>
+ * @author cmacnaug
+ * @version 1.0
+ */
+public interface ListenableFuture<V> extends Future<V> {
+
+    /**
+     * Adds a listener for this future. The listener is called when the
+     * Future is cancelled, or executed. 
+     * @param listener The listener.
+     */
+    public void setFutureListener(FutureListener<? super V> listener);
+}



Mime
View raw message