activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmacn...@apache.org
Subject svn commit: r787345 - in /activemq/sandbox/activemq-flow: activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-flow/src/main/java/org/apache/activemq/flow/ activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/ active...
Date Mon, 22 Jun 2009 18:28:39 GMT
Author: cmacnaug
Date: Mon Jun 22 18:28:39 2009
New Revision: 787345

URL: http://svn.apache.org/viewvc?rev=787345&view=rev
Log: (empty)

Modified:
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ProtocolHandler.java
    activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
    activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java
    activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
    activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java
    activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ProtocolHandler.java?rev=787345&r1=787344&r2=787345&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ProtocolHandler.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ProtocolHandler.java
Mon Jun 22 18:28:39 2009
@@ -17,10 +17,13 @@
 package org.apache.activemq.apollo.broker;
 
 import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
 
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.flow.AbstractLimitedFlowResource;
 import org.apache.activemq.flow.IFlowSink;
 import org.apache.activemq.queue.Subscription;
 import org.apache.activemq.wireformat.WireFormat;
@@ -39,26 +42,100 @@
 
     public BrokerMessageDelivery createMessageDelivery(MessageRecord record) throws IOException;
 
-    public interface ConsumerContext extends Subscription<MessageDelivery>, IFlowSink<MessageDelivery>
{
-    	
+    /**
+     * ClientContext
+     * <p>
+     * Description: Base interface describing a channel on a physical
+     * connection.
+     * </p>
+     * 
+     * @author cmacnaug
+     * @version 1.0
+     */
+    public interface ClientContext {
+        public ClientContext getParent();
+
+        public Collection<ClientContext> getChildren();
+
+        public void addChild(ClientContext context);
+
+        public void removeChild(ClientContext context);
+
+        public void close();
+
+    }
+
+    public abstract class AbstractClientContext<E extends MessageDelivery> extends
AbstractLimitedFlowResource<E> implements ClientContext {
+        protected final HashSet<ClientContext> children = new HashSet<ClientContext>();
+        protected final ClientContext parent;
+        protected boolean closed = false;
+
+        public AbstractClientContext(String name, ClientContext parent) {
+            super(name);
+            this.parent = parent;
+            if (parent != null) {
+                parent.addChild(this);
+            }
+        }
+
+        public ClientContext getParent() {
+            return parent;
+        }
+
+        public void addChild(ClientContext child) {
+            if (!closed) {
+                children.add(child);
+            }
+        }
+
+        public void removeChild(ClientContext child) {
+            if (!closed) {
+                children.remove(child);
+            }
+        }
+
+        public Collection<ClientContext> getChildren() {
+            return children;
+        }
+
+        public void close() {
+
+            closed = true;
+            
+            for (ClientContext c : children) {
+                c.close();
+            }
+
+            if (parent != null) {
+                parent.removeChild(this);
+            }
+
+            super.close();
+        }
+    }
+
+    public interface ConsumerContext extends ClientContext, Subscription<MessageDelivery>,
IFlowSink<MessageDelivery> {
+
         public String getConsumerId();
-        
+
         public Destination getDestination();
 
         public String getSelector();
-        
+
         public BooleanExpression getSelectorExpression();
-        
+
         public boolean isDurable();
-        
+
         public String getSubscriptionName();
-        
+
         /**
-         * If the destination does not exist, should it automatically be created? 
+         * If the destination does not exist, should it automatically be
+         * created?
+         * 
          * @return
          */
         public boolean autoCreateDestination();
-        
+
     }
 
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java?rev=787345&r1=787344&r2=787345&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java
Mon Jun 22 18:28:39 2009
@@ -120,4 +120,9 @@
     public synchronized IFlowController<E> getFlowController(Flow flow) {
         return openControllers.get(flow);
     }
+    
+    public synchronized void close()
+    {
+        
+    }
 }

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=787345&r1=787344&r2=787345&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
Mon Jun 22 18:28:39 2009
@@ -91,6 +91,8 @@
 
 public class OpenwireProtocolHandler implements ProtocolHandler, PersistListener {
 
+    protected final HashMap<ConnectionId, ClientContext> connections = new HashMap<ConnectionId,
ClientContext>();
+    protected final HashMap<SessionId, ClientContext> sessions = new HashMap<SessionId,
ClientContext>();
     protected final HashMap<ProducerId, ProducerContext> producers = new HashMap<ProducerId,
ProducerContext>();
     protected final HashMap<ConsumerId, ConsumerContext> consumers = new HashMap<ConsumerId,
ConsumerContext>();
 
@@ -111,48 +113,101 @@
             // /////////////////////////////////////////////////////////////////
             // Methods that keep track of the client state
             // /////////////////////////////////////////////////////////////////
-            public Response processAddConnection(ConnectionInfo info) throws Exception {
-                connection.setName(info.getClientId());
+            public Response processAddConnection(final ConnectionInfo info) throws Exception
{
+                if (!connections.containsKey(info.getConnectionId())) {
+
+                    ClientContext connection = new AbstractClientContext<MessageDelivery>(info.getConnectionId().toString(),
null) {
+                        ConnectionInfo connectionInfo = info;
+
+                        public void close() {
+                            super.close();
+                            connections.remove(connectionInfo.getConnectionId());
+                        }
+                    };
+                    connections.put(info.getConnectionId(), connection);
+                }
                 return ack(info);
             }
 
-            public Response processAddSession(SessionInfo info) throws Exception {
+            public Response processAddSession(final SessionInfo info) throws Exception {
+                ClientContext connection = connections.get(info.getSessionId().getParentId());
+                if (connection == null) {
+                    throw new IllegalStateException(host.getHostName() + " Cannot add a session
to a connection that had not been registered: " + info.getSessionId().getParentId());
+                }
+
+                if (!sessions.containsKey(info.getSessionId())) {
+
+                    ClientContext session = new AbstractClientContext<MessageDelivery>(info.getSessionId().toString(),
connection) {
+                        SessionInfo sessioninfo = info;
+
+                        public void close() {
+                            super.close();
+                            sessions.remove(sessioninfo.getSessionId());
+                        }
+                    };
+
+                    sessions.put(info.getSessionId(), session);
+                }
+
                 return ack(info);
             }
 
             public Response processAddProducer(ProducerInfo info) throws Exception {
-                producers.put(info.getProducerId(), new ProducerContext(info));
+                ClientContext session = sessions.get(info.getProducerId().getParentId());
+                if (session == null) {
+                    throw new IllegalStateException(host.getHostName() + " Cannot add a producer
to a session that had not been registered: " + info.getProducerId().getParentId());
+                }
+                if (!producers.containsKey(info.getProducerId())) {
+                    ProducerContext producer = new ProducerContext(info, session);
+                }
                 return ack(info);
             }
 
             public Response processAddConsumer(ConsumerInfo info) throws Exception {
-                ConsumerContext ctx = new ConsumerContext(info);
-                consumers.put(info.getConsumerId(), ctx);
-                ctx.start();
+                ClientContext session = sessions.get(info.getConsumerId().getParentId());
+                if (session == null) {
+                    throw new IllegalStateException(host.getHostName() + " Cannot add a consumer
to a session that had not been registered: " + info.getConsumerId().getParentId());
+                }
+
+                if (!consumers.containsKey(info.getConsumerId())) {
+                    ConsumerContext ctx = new ConsumerContext(info, session);
+                    ctx.start();
+                }
+
                 return ack(info);
             }
 
             public Response processRemoveConnection(RemoveInfo remove, ConnectionId info,
long arg1) throws Exception {
+                ClientContext cc = connections.get(info);
+                if (cc != null) {
+                    cc.close();
+                }
                 ack(remove);
                 return null;
             }
 
             public Response processRemoveSession(RemoveInfo remove, SessionId info, long
arg1) throws Exception {
+                ClientContext cc = sessions.get(info);
+                if (cc != null) {
+                    cc.close();
+                }
                 ack(remove);
                 return null;
             }
 
             public Response processRemoveProducer(RemoveInfo remove, ProducerId info) throws
Exception {
-                producers.remove(info);
-                //TODO add close logic?
+                ClientContext cc = producers.get(info);
+                if (cc != null) {
+                    cc.close();
+                }
                 ack(remove);
                 return null;
             }
 
             public Response processRemoveConsumer(RemoveInfo remove, ConsumerId info, long
arg1) throws Exception {
-                ConsumerContext ctx = consumers.remove(info);
-                if (ctx != null) {
-                    ctx.stop();
+                ClientContext cc = consumers.get(info);
+                if (cc != null) {
+                    cc.close();
                 }
                 ack(remove);
                 return null;
@@ -344,8 +399,8 @@
     }
 
     public void onCommand(Object o) {
-    	boolean responseRequired=false;
-    	int commandId=0;
+        boolean responseRequired = false;
+        int commandId = 0;
         try {
             Command command = (Command) o;
             commandId = command.getCommandId();
@@ -360,8 +415,7 @@
             } else {
                 connection.onException(e);
             }
-        }
-        catch (Throwable t) {
+        } catch (Throwable t) {
             if (responseRequired) {
                 ExceptionResponse response = new ExceptionResponse(t);
                 response.setCorrelationId(commandId);
@@ -406,15 +460,17 @@
     // Internal Support Methods
     // /////////////////////////////////////////////////////////////////
 
-    class ProducerContext extends AbstractLimitedFlowResource<OpenWireMessageDelivery>
{
+    class ProducerContext extends AbstractClientContext<OpenWireMessageDelivery> {
 
         protected final Object inboundMutex = new Object();
         private IFlowController<OpenWireMessageDelivery> controller;
-        private String name;
+        private final ProducerInfo info;
 
-        public ProducerContext(final ProducerInfo info) {
-            super(info.getProducerId().toString());
-            final Flow flow = new Flow("broker-" + name + "-inbound", false);
+        public ProducerContext(final ProducerInfo info, ClientContext parent) {
+            super(info.getProducerId().toString(), parent);
+            this.info = info;
+            producers.put(info.getProducerId(), this);
+            final Flow flow = new Flow("broker-" + super.getResourceName() + "-inbound",
false);
 
             // Openwire only uses credit windows at the producer level for
             // producers that request the feature.
@@ -445,9 +501,14 @@
 
             super.onFlowOpened(controller);
         }
+
+        public void close() {
+            super.close();
+            producers.remove(info);
+        }
     }
 
-    class ConsumerContext extends AbstractLimitedFlowResource<MessageDelivery> implements
ProtocolHandler.ConsumerContext, Service {
+    class ConsumerContext extends AbstractClientContext<MessageDelivery> implements
ProtocolHandler.ConsumerContext {
 
         private final ConsumerInfo info;
         private String name;
@@ -462,9 +523,11 @@
         private LinkedList<MessageId> pendingMessageIds = new LinkedList<MessageId>();
         private BrokerSubscription brokerSubscription;
 
-        public ConsumerContext(final ConsumerInfo info) throws Exception {
+        public ConsumerContext(final ConsumerInfo info, ClientContext parent) throws Exception
{
+            super(info.getConsumerId().toString(), parent);
             this.info = info;
             this.name = info.getConsumerId().toString();
+            consumers.put(info.getConsumerId(), this);
 
             Flow flow = new Flow("broker-" + name + "-outbound", false);
             selector = parseSelector(info);
@@ -476,7 +539,9 @@
             };
 
             isQueueReceiver = info.getDestination().isQueue();
-
+            if (info.getSubscriptionName() != null) {
+                isDurable = true;
+            }
             controller = new FlowController<MessageDelivery>(null, flow, limiter, this);
             controller.useOverFlowQueue(false);
             controller.setExecutor(connection.getDispatcher().createPriorityExecutor(connection.getDispatcher().getDispatchPriorities()
- 1));
@@ -488,10 +553,6 @@
             brokerSubscription.connect(this);
         }
 
-        public void stop() throws Exception {
-            brokerSubscription.disconnect(this);
-        }
-
         public boolean offer(final MessageDelivery message, ISourceController<?> source,
SubscriptionDeliveryCallback callback) {
             if (!controller.offer(message, source)) {
                 return false;
@@ -514,6 +575,9 @@
             md.setDestination(msg.getDestination());
             // Add to the pending list if persistent and we are durable:
             if (callback != null) {
+                if (callback.isRedelivery()) {
+                    md.setRedeliveryCounter(1);
+                }
                 synchronized (this) {
                     Object old = pendingMessages.put(msg.getMessageId(), callback);
                     if (old != null) {
@@ -530,26 +594,29 @@
         public void ack(MessageAck info) {
             // TODO: The pending message queue could probably be optimized to
             // avoid having to create a new list here.
-            LinkedList<SubscriptionDeliveryCallback> acked = new LinkedList<SubscriptionDeliveryCallback>();
-            synchronized (this) {
-                MessageId id = info.getLastMessageId();
-                if (isDurable() || isQueueReceiver())
-                    while (!pendingMessageIds.isEmpty()) {
-                        MessageId pendingId = pendingMessageIds.getFirst();
-                        SubscriptionDeliveryCallback callback = pendingMessages.remove(pendingId);
-                        acked.add(callback);
-                        pendingMessageIds.removeFirst();
-                        if (pendingId.equals(id)) {
-                            break;
+            //if(info.isStandardAck())
+            {
+                LinkedList<SubscriptionDeliveryCallback> acked = new LinkedList<SubscriptionDeliveryCallback>();
+                synchronized (this) {
+                    MessageId id = info.getLastMessageId();
+                    if (isDurable() || isQueueReceiver())
+                        while (!pendingMessageIds.isEmpty()) {
+                            MessageId pendingId = pendingMessageIds.getFirst();
+                            SubscriptionDeliveryCallback callback = pendingMessages.remove(pendingId);
+                            acked.add(callback);
+                            pendingMessageIds.removeFirst();
+                            if (pendingId.equals(id)) {
+                                break;
+                            }
                         }
-                    }
-                limiter.onProtocolCredit(info.getMessageCount());
-            }
+                    limiter.onProtocolCredit(info.getMessageCount());
+                }
 
-            // Delete outside of synchronization on queue to avoid contention
-            // with enqueueing threads.
-            for (SubscriptionDeliveryCallback callback : acked) {
-                callback.acknowledge();
+                // Delete outside of synchronization on queue to avoid contention
+                // with enqueueing threads.
+                for (SubscriptionDeliveryCallback callback : acked) {
+                    callback.acknowledge();
+                }
             }
         }
 
@@ -584,7 +651,7 @@
         public boolean isExclusive() {
             return info.isExclusive();
         }
-        
+
         /*
          * (non-Javadoc)
          * 
@@ -722,6 +789,36 @@
             return info.getConsumerId().toString();
         }
 
+        public void close() {
+            brokerSubscription.disconnect(this);
+
+            if (isDurable() || isQueueReceiver()) {
+                LinkedList<SubscriptionDeliveryCallback> unacquired = null;
+
+                synchronized (this) {
+
+                    unacquired = new LinkedList<SubscriptionDeliveryCallback>();
+                    while (!pendingMessageIds.isEmpty()) {
+                        MessageId pendingId = pendingMessageIds.getLast();
+                        SubscriptionDeliveryCallback callback = pendingMessages.remove(pendingId);
+                        unacquired.add(callback);
+                        pendingMessageIds.removeLast();
+                    }
+                    limiter.onProtocolCredit(unacquired.size());
+                }
+
+                if (unacquired != null) {
+                    // Delete outside of synchronization on queue to avoid contention
+                    // with enqueueing threads.
+                    for (SubscriptionDeliveryCallback callback : unacquired) {
+                        callback.unacquire(controller);
+                    }
+                }
+            }
+
+            super.close();
+            consumers.remove(info.getConsumerId());
+        }
     }
 
     private static BooleanExpression parseSelector(ConsumerInfo info) throws FilterException
{

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java?rev=787345&r1=787344&r2=787345&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
Mon Jun 22 18:28:39 2009
@@ -1318,11 +1318,11 @@
         assertNotNull(m);
         assertEquals(m.getMessageId(), message1.getMessageId());
 
-        assertTrue(countMessagesInQueue(connection, connectionInfo, destination) == 2);
+        assertEquals(countMessagesInQueue(connection, connectionInfo, destination), 2);
         connection.send(createAck(consumerInfo, m, 1, MessageAck.DELIVERED_ACK_TYPE));
-        assertTrue(countMessagesInQueue(connection, connectionInfo, destination) == 2);
+        assertEquals(countMessagesInQueue(connection, connectionInfo, destination), 2);
         connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE));
-        assertTrue(countMessagesInQueue(connection, connectionInfo, destination) == 1);
+        assertEquals(countMessagesInQueue(connection, connectionInfo, destination), 1);
 
     }
 

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java?rev=787345&r1=787344&r2=787345&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java
Mon Jun 22 18:28:39 2009
@@ -96,7 +96,11 @@
 
     protected abstract void requestDispatch();
 
-    protected abstract void acknowledge(QueueElement<V> elem);
+    protected abstract Object getMutex();
+
+    protected abstract void onElementRemoved(QueueElement<V> qe);
+
+    protected abstract void onElementReenqueued(QueueElement<V> qe, ISourceController<?>
controller);
 
     /**
      * Adds an element to the queue.
@@ -250,6 +254,11 @@
         // sequence number beyond the queue's limit.
         long sequence = -1;
 
+        //When an element on the queue is reenqueued, this
+        //is set to indicate that the cursor should go back 
+        //and consider the element:
+        long reenqueueSequence = -1;
+
         // The cursor is holding references for all
         // elements between first and last inclusive:
         QueueElement<V> firstRef = null;
@@ -543,6 +552,12 @@
         public final QueueElement<V> getNext() {
 
             try {
+
+                if (reenqueueSequence != -1) {
+                    reset(reenqueueSequence);
+                    reenqueueSequence = -1;
+                }
+
                 if (atEnd()) {
                     updateCurrent(null);
                     return null;
@@ -640,6 +655,16 @@
             return true;
         }
 
+        public void onElementUnacquired(QueueElement<V> qe) {
+            if (qe.sequence < sequence) {
+                if (reenqueueSequence >= 0) {
+                    reenqueueSequence = Math.min(reenqueueSequence, qe.sequence);
+                } else {
+                    reenqueueSequence = qe.sequence;
+                }
+            }
+        }
+
         /**
          * 
          */
@@ -667,6 +692,10 @@
                 return true;
             }
 
+            if (reenqueueSequence != -1) {
+                return false;
+            }
+
             if (sequence > limit) {
                 return true;
             }
@@ -806,7 +835,26 @@
         }
 
         public final void acknowledge() {
-            queue.acknowledge(this);
+            synchronized (queue.getMutex()) {
+                delete();
+            }
+        }
+
+        public void unacquire(ISourceController<?> source) {
+            synchronized (queue.getMutex()) {
+                if (owner != null) {
+                    owner = null;
+                    if (!deleted) {
+                        redelivered = true;
+                        //TODO need to account for this memory space, and check 
+                        //load/unload:
+                        for (Cursor<V> c : queue.openCursors) {
+                            c.onElementUnacquired(this);
+                        }
+                    }
+                }
+                queue.requestDispatch();
+            }
         }
 
         public final boolean delete() {
@@ -819,24 +867,16 @@
                 if (saved) {
                     queue.getQueueStore().deleteQueueElement(queue.getDescriptor(), elem);
                 }
+
                 elem = null;
                 unload(null);
+
+                queue.onElementRemoved(this);
                 return true;
             }
             return false;
         }
 
-        public final void unacquire(ISourceController<?> source) {
-            owner = null;
-            if (isExpired()) {
-                acknowledge();
-            } else {
-                // TODO reset all cursors beyond this sequence number
-                // back to this element
-                throw new UnsupportedOperationException("Not yet implemented");
-            }
-        }
-
         /**
          * Attempts to unlink this element from the queue
          */
@@ -916,6 +956,10 @@
 
         }
 
+        public final boolean isRedelivery() {
+            return redelivered;
+        }
+
         private boolean unlinkable() {
             return softRefs == 0 && !loaded;
         }
@@ -954,6 +998,8 @@
                     }
                     this.size = re.getElementSize();
                     this.expiration = re.getExpiration();
+                    //TODO Need to add redelivery to the store:
+                    //this.redelivered = re.getRedelivered();
                     // If the loader asked to add a soft ref do it:
                     if (softRef) {
                         addSoftRef();
@@ -1005,7 +1051,7 @@
         }
 
         public final boolean isPagedOut() {
-            return elem == null;
+            return elem == null && !deleted;
         }
 
         public final boolean isLoaded() {

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java?rev=787345&r1=787344&r2=787345&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
Mon Jun 22 18:28:39 2009
@@ -117,13 +117,22 @@
         queue = new CursoredQueue<E>(persistencePolicy, expirationMapper, controller.getFlow(),
queueDescriptor, queueStore, this) {
 
             @Override
-            protected void acknowledge(QueueElement<E> qe) {
+            protected Object getMutex() {
+                return ExclusivePersistentQueue.this;
+            }
+
+            @Override
+            protected void onElementRemoved(QueueElement<E> qe) {
+                synchronized (ExclusivePersistentQueue.this) {
+                    limiter.remove(1, qe.getLimiterSize());
+                }
+            }
+
+            @Override
+            protected void onElementReenqueued(QueueElement<E> qe, ISourceController<?>
source) {
                 synchronized (ExclusivePersistentQueue.this) {
-                    E elem = qe.getElement();
-                    if (qe.delete()) {
-                        if (!qe.isAcquired()) {
-                            controller.elementDispatched(elem);
-                        }
+                    if (isDispatchReady()) {
+                        notifyReady();
                     }
                 }
             }
@@ -185,7 +194,8 @@
 
     public synchronized boolean removeSubscription(Subscription<E> sub) {
         if (sub == subscription) {
-            sub = null;
+            subscription = null;
+            cursor.reset(queue.getFirstSequence());
             return true;
         } else {
             return false;
@@ -280,7 +290,6 @@
                 // See if the sink has room:
                 qe.setAcquired(subscription);
                 if (subscription.offer(qe.elem, sourceController, callback)) {
-                    controller.elementDispatched(qe.getElement());
                     // If remove on dispatch acknowledge now:
                     if (callback == null) {
                         qe.acknowledge();

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java?rev=787345&r1=787344&r2=787345&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java
Mon Jun 22 18:28:39 2009
@@ -142,9 +142,16 @@
                 queue = new CursoredQueue<V>(persistencePolicy, expirationMapper, flow,
queueDescriptor, store, mutex) {
 
                     @Override
-                    protected void acknowledge(QueueElement<V> elem) {
-                        SharedQueue.this.acknowledge(elem);
+                    protected void onElementRemoved(QueueElement<V> elem) {
+                        synchronized (mutex) {
+                            //If the element wasn't acqired release space:
+                            sizeLimiter.remove(1, elem.getLimiterSize());
+                        }
+                    }
 
+                    @Override
+                    protected Object getMutex() {
+                        return mutex;
                     }
 
                     @Override
@@ -156,6 +163,15 @@
                     protected void requestDispatch() {
                         notifyReady();
                     }
+
+                    @Override
+                    protected void onElementReenqueued(QueueElement<V> qe, ISourceController<?>
controller) {
+                        synchronized (SharedQueue.this) {
+                            if (isDispatchReady()) {
+                                notifyReady();
+                            }
+                        }
+                    }
                 };
 
                 queue.initialize(sequenceMin, sequenceMax, count, size);
@@ -287,16 +303,6 @@
         return inputController;
     }
 
-    final void acknowledge(QueueElement<V> qe) {
-        synchronized (mutex) {
-            qe.delete();
-            //If the element wasn't acqired release space:
-            if (!qe.isAcquired()) {
-                sizeLimiter.remove(1, qe.getLimiterSize());
-            }
-        }
-    }
-
     /**
      * Starts this queue.
      */
@@ -430,9 +436,6 @@
                         SubscriptionContext nextConsumer = consumer.getNext();
                         switch (consumer.offer(next)) {
                         case ACCEPTED:
-                            if (DEBUG)
-                                System.out.println("Dispatched " + next.getElement() + "
to " + consumer);
-
                             // Rotate list so this one is last next time:
                             sharedConsumers.rotate();
                             interested = true;
@@ -539,6 +542,10 @@
                 } else {
                     cursor.reset(queue.getFirstSequence());
                 }
+                
+                if (DEBUG)
+                    System.out.println("Starting " + this + " at " + cursor);
+
 
                 updateDispatchList();
             }
@@ -686,7 +693,7 @@
 
             // Check for expiration:
             if (qe.isExpired()) {
-                acknowledge(qe);
+                qe.acknowledge();
                 return ACCEPTED;
             }
 
@@ -696,9 +703,11 @@
             // See if the sink has room:
             qe.setAcquired(sub);
             if (sub.offer(qe.elem, this, callback)) {
-                if (!sub.isBrowser()) {
+                if (DEBUG)
+                    System.out.println("Dispatched " + qe.getElement() + " to " + this);
 
-                    sizeLimiter.remove(1, qe.getLimiterSize());
+                
+                if (!sub.isBrowser()) {
 
                     // If remove on dispatch acknowledge now:
                     if (callback == null) {

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java?rev=787345&r1=787344&r2=787345&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java
Mon Jun 22 18:28:39 2009
@@ -22,28 +22,33 @@
 public interface Subscription<E> {
 
     public interface SubscriptionDeliveryCallback {
-        
+
         /**
          * If {@link Subscription#isBrowser()} returns false this method
-         * indicates that the Subscription is finished with the element
-         * and that it can be removed from the queue. 
+         * indicates that the Subscription is finished with the element and that
+         * it can be removed from the queue.
          */
         public void acknowledge();
 
         /**
-         * Indicates that the subscription no longer has interest in
-         * the element and that it should be placed back on the queue. 
+         * Indicates that the subscription no longer has interest in the element
+         * and that it should be placed back on the queue.
          * 
-         * The provided source controller will be blocked if there 
-         * is not enough space available on the queue to
-         * reenqueue the element.
+         * The provided source controller will be blocked if there is not enough
+         * space available on the queue to reenqueue the element.
          * 
-         * It is illegal to call this method after a prior call to 
-         * {@link #acknowledge()}. 
+         * It is illegal to call this method after a prior call to
+         * {@link #acknowledge()}.
          * 
-         * @param source The source controller.
+         * @param source
+         *            The source controller.
          */
         public void unacquire(ISourceController<?> sourceController);
+
+        /**
+         * @return Returns true if the delivery is a redelivery
+         */
+        public boolean isRedelivery();
     }
 
     /**
@@ -52,25 +57,25 @@
      * @return true if the element should be removed on dispatch
      */
     public boolean isRemoveOnDispatch(E elem);
-    
+
     /**
-     * @return True if this is a subscription browser. 
+     * @return True if this is a subscription browser.
      */
     public boolean isBrowser();
-    
+
     /**
-     * Indicates that the subscription is exclusive. When there at least one 
-     * exclusive subscription on a shared queue, the queue will dispatch to
-     * only one such consumer while there is at least one connected.
+     * Indicates that the subscription is exclusive. When there at least one
+     * exclusive subscription on a shared queue, the queue will dispatch to only
+     * one such consumer while there is at least one connected.
      * 
      * @return True if the Subscription is exclusive.
      */
     public boolean isExclusive();
 
     /**
-     * Returns true if the Subscription has a selector. If true
-     * is returned the {@link #matches(Object)} will be called
-     * prior to an attempt to offer the message to {@link Subscription}
+     * Returns true if the Subscription has a selector. If true is returned the
+     * {@link #matches(Object)} will be called prior to an attempt to offer the
+     * message to {@link Subscription}
      * 
      * @return true if this {@link Subscription} has a selector.
      */
@@ -96,7 +101,8 @@
      *            The queue's controller, which must be used if the offered
      *            element exceeds the subscription's buffer limits.
      * @param callback
-     *            The {@link SubscriptionDeliveryCallback} associated with the element
+     *            The {@link SubscriptionDeliveryCallback} associated with the
+     *            element
      * 
      * @return true if the element was accepted false otherwise, if false is
      *         returned the caller must have called
@@ -104,7 +110,7 @@
      *         returning false.
      */
     public boolean offer(E element, ISourceController<?> controller, SubscriptionDeliveryCallback
callback);
-    
+
     /**
      * Pushes an item to the subscription. If the subscription is not remove on
      * dispatch, then it must call acknowledge method on the callback when it
@@ -116,7 +122,8 @@
      *            The queue's controller, which must be used if the added
      *            element exceeds the subscription's buffer limits.
      * @param callback
-     *            The {@link SubscriptionDeliveryCallback} associated with the element
+     *            The {@link SubscriptionDeliveryCallback} associated with the
+     *            element
      * @return true if the element was accepted false otherwise, if false is
      *         returned the caller must have called
      *         {@link ISourceController#onFlowBlock(ISinkController)} prior to

Modified: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=787345&r1=787344&r2=787345&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
Mon Jun 22 18:28:39 2009
@@ -33,6 +33,7 @@
 import org.apache.activemq.apollo.broker.MessageDelivery;
 import org.apache.activemq.apollo.broker.ProtocolHandler;
 import org.apache.activemq.apollo.broker.Router;
+import org.apache.activemq.apollo.broker.ProtocolHandler.AbstractClientContext;
 import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.filter.BooleanExpression;
@@ -259,7 +260,7 @@
         }
     }
 
-    class ConsumerContext extends AbstractLimitedFlowResource<MessageDelivery> implements
ProtocolHandler.ConsumerContext {
+    class ConsumerContext extends AbstractClientContext<MessageDelivery> implements
ProtocolHandler.ConsumerContext {
 
         private BooleanExpression selector;
         private String selectorString;
@@ -277,6 +278,7 @@
         private boolean durable;
 
         public ConsumerContext(final StompFrame subscribe) throws Exception {
+            super(subscribe.getHeaders().get(Stomp.Headers.Subscribe.ID), null);
             translator = translator(subscribe);
 
             Map<String, String> headers = subscribe.getHeaders();



Mime
View raw message