activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r756773 - in /activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq: ./ broker/ broker/openwire/ broker/stomp/ broker/store/
Date Fri, 20 Mar 2009 21:48:40 GMT
Author: chirino
Date: Fri Mar 20 21:48:39 2009
New Revision: 756773

URL: http://svn.apache.org/viewvc?rev=756773&view=rev
Log:
Applying patch https://issues.apache.org/activemq/browse/AMQ-2173 Thx Colin\!

Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java?rev=756773&r1=756772&r2=756773&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/WindowLimiter.java Fri
Mar 20 21:48:39 2009
@@ -7,7 +7,7 @@
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.SizeLimiter;
 
-public class WindowLimiter<E> extends SizeLimiter<E>  {
+public class WindowLimiter<E extends MessageDelivery> extends SizeLimiter<E>
 {
         final Flow flow;
         final boolean clientMode;
         private int available;
@@ -54,7 +54,7 @@
             remove(credit);
         }
 
-        public int getElementSize(MessageDelivery m) {
+        public int getElementSize(E m) {
             return m.getFlowLimiterSize();
         }
     }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java?rev=756773&r1=756772&r2=756773&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
Fri Mar 20 21:48:39 2009
@@ -32,9 +32,6 @@
 
     public AsciiBuffer getProducerId();
 
-    public void setCompletionCallback(Runnable runnable);
-    public Runnable getCompletionCallback();
-
     public <T> T asType(Class<T> type);
 
     public boolean isPersistent();
@@ -46,6 +43,17 @@
     public void setTrackingNumber(long tracking);
     
     public long getTrackingNumber();
+    
+    /**
+     * Returns true if this message requires acknowledgement.
+     */
+    public boolean isResponseRequired();
+    
+    /**
+     * Called when the message's persistence requirements have
+     * been met. This method must not block. 
+     */
+    public void onMessagePersisted();
 
     /**
      * Returns the message's buffer representation.

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java?rev=756773&r1=756772&r2=756773&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java Fri
Mar 20 21:48:39 2009
@@ -26,20 +26,21 @@
 import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.broker.QueueDomain;
 import org.apache.activemq.broker.TopicDomain;
+import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.protobuf.AsciiBuffer;
 
 final public class Router {
-    
+
     public static final AsciiBuffer TOPIC_DOMAIN = new AsciiBuffer("topic");
     public static final AsciiBuffer QUEUE_DOMAIN = new AsciiBuffer("queue");
-    
+
     private final HashMap<AsciiBuffer, Domain> domains = new HashMap<AsciiBuffer,
Domain>();
-    
+
     public Router() {
         domains.put(QUEUE_DOMAIN, new QueueDomain());
         domains.put(TOPIC_DOMAIN, new TopicDomain());
     }
-    
+
     public Domain getDomain(AsciiBuffer name) {
         return domains.get(name);
     }
@@ -52,30 +53,61 @@
         return domains.remove(name);
     }
 
-    
     public synchronized void bind(Destination destination, DeliveryTarget dt) {
         Domain domain = domains.get(destination.getDomain());
         domain.bind(destination.getName(), dt);
     }
 
-    public Collection<DeliveryTarget> route(MessageDelivery msg) {
-        return route(msg.getDestination(), msg);
+    public void route(final MessageDelivery msg, ISourceController<?> controller) {
+
+        Collection<DeliveryTarget> targets = route(msg.getDestination(), msg);
+
+        // TODO:
+        // Consider doing some caching of this target list. Most producers
+        // always send to the same destination.
+        if (targets != null) {
+
+            if (msg.isResponseRequired()) {
+                // We need to ack the message once we ensure we won't loose it.
+                // We know we won't loose it once it's persisted or delivered to
+                // a consumer
+                // Setup a callback to get notifed once one of those happens.
+                if (!msg.isPersistent()) {
+                    // Let the client know the broker got the message.
+                    msg.onMessagePersisted();
+                }
+            }
+
+            // Deliver the message to all the targets..
+            for (DeliveryTarget dt : targets) {
+                if (dt.match(msg)) {
+                    dt.getSink().add(msg, controller);
+                }
+            }
+
+        } else {
+            // Let the client know we got the message even though there
+            // were no valid targets to deliver the message to.
+            if (msg.isResponseRequired()) {
+                msg.onMessagePersisted();
+            }
+        }
     }
 
     private Collection<DeliveryTarget> route(Destination destination, MessageDelivery
msg) {
         // Handles routing to composite/multi destinations.
         Collection<Destination> destinationList = destination.getDestinations();
-        if( destinationList == null ) {
+        if (destinationList == null) {
             Domain domain = domains.get(destination.getDomain());
             return domain.route(destination.getName(), msg);
         } else {
             HashSet<DeliveryTarget> rc = new HashSet<DeliveryTarget>();
             for (Destination d : destinationList) {
                 Collection<DeliveryTarget> t = route(d, msg);
-                if( t!=null ) {
+                if (t != null) {
                     rc.addAll(t);
                 }
-            }            
+            }
             return rc;
         }
     }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=756773&r1=756772&r2=756773&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
Fri Mar 20 21:48:39 2009
@@ -27,22 +27,30 @@
     private final Message message;
     private Destination destination;
     private AsciiBuffer producerId;
-    private Runnable completionCallback;
     private long tracking;
+    private PersistListener persistListener = null;
+
+    public interface PersistListener {
+        public void onMessagePersisted(OpenWireMessageDelivery delivery);
+    }
 
     public OpenWireMessageDelivery(Message message) {
         this.message = message;
     }
 
+    public void setPersistListener(PersistListener listener) {
+        persistListener = listener;
+    }
+
     public Destination getDestination() {
-        if( destination == null ) {
+        if (destination == null) {
             destination = OpenwireProtocolHandler.convert(message.getDestination());
         }
         return destination;
     }
 
     public int getFlowLimiterSize() {
-        return message.getSize();
+        return 1;
     }
 
     public int getPriority() {
@@ -54,7 +62,7 @@
     }
 
     public AsciiBuffer getProducerId() {
-        if( producerId == null ) {
+        if (producerId == null) {
             producerId = new AsciiBuffer(message.getProducerId().toString());
         }
         return producerId;
@@ -64,20 +72,12 @@
         return message;
     }
 
-    public Runnable getCompletionCallback() {
-        return completionCallback;
-    }
-
-    public void setCompletionCallback(Runnable completionCallback) {
-        this.completionCallback = completionCallback;
-    }
-
     public <T> T asType(Class<T> type) {
-        if( type == Message.class ) {
+        if (type == Message.class) {
             return type.cast(message);
         }
         // TODO: is this right?
-        if( message.getClass().isAssignableFrom(type) ) {
+        if (message.getClass().isAssignableFrom(type)) {
             return type.cast(message);
         }
         return null;
@@ -94,7 +94,7 @@
     public long getTrackingNumber() {
         return tracking;
     }
-    
+
     /**
      * Returns the message's buffer representation.
      * 
@@ -104,4 +104,15 @@
         throw new UnsupportedOperationException("Not yet implemented");
     }
 
+    public final void onMessagePersisted() {
+        if (persistListener != null) {
+            persistListener.onMessagePersisted(this);
+            persistListener = null;
+        }
+    }
+
+    public final boolean isResponseRequired() {
+        return message.isResponseRequired();
+    }
+
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=756773&r1=756772&r2=756773&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
Fri Mar 20 21:48:39 2009
@@ -29,6 +29,7 @@
 import org.apache.activemq.broker.Destination;
 import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.broker.Router;
+import org.apache.activemq.broker.openwire.OpenWireMessageDelivery.PersistListener;
 import org.apache.activemq.broker.protocol.ProtocolHandler;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BrokerId;
@@ -82,13 +83,13 @@
 import org.apache.activemq.transport.WireFormatNegotiator;
 import org.apache.activemq.wireformat.WireFormat;
 
-public class OpenwireProtocolHandler implements ProtocolHandler {
+public class OpenwireProtocolHandler implements ProtocolHandler, PersistListener {
 
     protected final HashMap<ProducerId, ProducerContext> producers = new HashMap<ProducerId,
ProducerContext>();
     protected final HashMap<ConsumerId, ConsumerContext> consumers = new HashMap<ConsumerId,
ConsumerContext>();
 
     protected final Object inboundMutex = new Object();
-    protected IFlowController<MessageDelivery> inboundController;
+    protected IFlowController<OpenWireMessageDelivery> inboundController;
 
     protected BrokerConnection connection;
     private OpenWireFormat wireFormat;
@@ -97,10 +98,14 @@
     public void start() throws Exception {
         // Setup the inbound processing..
         final Flow flow = new Flow("broker-" + connection.getName() + "-inbound", false);
-        SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(connection.getInputWindowSize(),
connection.getInputResumeThreshold());
-        inboundController = new FlowController<MessageDelivery>(new FlowControllableAdapter()
{
-            public void flowElemAccepted(ISourceController<MessageDelivery> controller,
MessageDelivery elem) {
-                route(controller, elem);
+        SizeLimiter<OpenWireMessageDelivery> limiter = new SizeLimiter<OpenWireMessageDelivery>(connection.getInputWindowSize(),
connection.getInputResumeThreshold());
+        inboundController = new FlowController<OpenWireMessageDelivery>(new FlowControllableAdapter()
{
+            public void flowElemAccepted(ISourceController<OpenWireMessageDelivery>
controller, OpenWireMessageDelivery elem) {
+                if (elem.isResponseRequired()) {
+                    elem.setPersistListener(OpenwireProtocolHandler.this);
+                }
+                router.route(elem, controller);
+                controller.elementDispatched(elem);
             }
 
             public String toString() {
@@ -341,12 +346,15 @@
             }.start();
         }
     }
+    
 
-    // /////////////////////////////////////////////////////////////////
-    // Internal Support Methods
-    // /////////////////////////////////////////////////////////////////
+    public void onMessagePersisted(OpenWireMessageDelivery delivery) {
+        // TODO This method should not block:
+        // Either add to output queue, or spin off in a separate thread. 
+        ack(delivery.getMessage());
+    }
 
-    private Response ack(Command command) {
+    Response ack(Command command) {
         if (command.isResponseRequired()) {
             Response rc = new Response();
             rc.setCorrelationId(command.getCommandId());
@@ -355,22 +363,26 @@
         return null;
     }
 
-    static class FlowControllableAdapter implements FlowControllable<MessageDelivery>
{
-        public void flowElemAccepted(ISourceController<MessageDelivery> controller,
MessageDelivery elem) {
+    // /////////////////////////////////////////////////////////////////
+    // Internal Support Methods
+    // /////////////////////////////////////////////////////////////////
+
+    static class FlowControllableAdapter implements FlowControllable<OpenWireMessageDelivery>
{
+        public void flowElemAccepted(ISourceController<OpenWireMessageDelivery> controller,
OpenWireMessageDelivery elem) {
         }
 
-        public IFlowSink<MessageDelivery> getFlowSink() {
+        public IFlowSink<OpenWireMessageDelivery> getFlowSink() {
             return null;
         }
 
-        public IFlowSource<MessageDelivery> getFlowSource() {
+        public IFlowSource<OpenWireMessageDelivery> getFlowSource() {
             return null;
         }
     }
 
     class ProducerContext {
 
-        private IFlowController<MessageDelivery> controller;
+        private IFlowController<OpenWireMessageDelivery> controller;
         private String name;
 
         public ProducerContext(final ProducerInfo info) {
@@ -380,7 +392,7 @@
             // producers that request the feature.
             if (info.getWindowSize() > 0) {
                 final Flow flow = new Flow("broker-" + name + "-inbound", false);
-                WindowLimiter<MessageDelivery> limiter = new WindowLimiter<MessageDelivery>(false,
flow, info.getWindowSize(), info.getWindowSize() / 2) {
+                WindowLimiter<OpenWireMessageDelivery> limiter = new WindowLimiter<OpenWireMessageDelivery>(false,
flow, info.getWindowSize(), info.getWindowSize() / 2) {
                     @Override
                     protected void sendCredit(int credit) {
                         ProducerAck ack = new ProducerAck(info.getProducerId(), credit);
@@ -388,9 +400,10 @@
                     }
                 };
 
-                controller = new FlowController<MessageDelivery>(new FlowControllableAdapter()
{
-                    public void flowElemAccepted(ISourceController<MessageDelivery>
controller, MessageDelivery elem) {
-                        route(controller, elem);
+                controller = new FlowController<OpenWireMessageDelivery>(new FlowControllableAdapter()
{
+                    public void flowElemAccepted(ISourceController<OpenWireMessageDelivery>
controller, OpenWireMessageDelivery msg) {
+                        router.route(msg, controller);
+                        controller.elementDispatched(msg);
                     }
 
                     public String toString() {
@@ -409,7 +422,7 @@
         private String name;
         private BooleanExpression selector;
         private boolean durable;
-        
+
         private SingleFlowRelay<MessageDelivery> queue;
         public WindowLimiter<MessageDelivery> limiter;
 
@@ -474,50 +487,6 @@
 
     }
 
-    protected void route(ISourceController<MessageDelivery> controller, MessageDelivery
elem) {
-        // TODO:
-        // Consider doing some caching of this target list. Most producers
-        // always send to
-        // the same destination.
-        Collection<DeliveryTarget> targets = router.route(elem);
-
-        final Message message = ((OpenWireMessageDelivery) elem).getMessage();
-        if (targets != null) {
-
-            if (message.isResponseRequired()) {
-                // We need to ack the message once we ensure we won't loose it.
-                // We know we won't loose it once it's persisted or delivered to
-                // a consumer
-                // Setup a callback to get notifed once one of those happens.
-                if (message.isPersistent()) {
-                    elem.setCompletionCallback(new Runnable() {
-                        public void run() {
-                            ack(message);
-                        }
-                    });
-                } else {
-                    // Let the client know the broker got the message.
-                    ack(message);
-                }
-            }
-
-            // Deliver the message to all the targets..
-            for (DeliveryTarget dt : targets) {
-                if (dt.match(elem)) {
-                    dt.getSink().add(elem, controller);
-                }
-            }
-
-        } else {
-            // Let the client know we got the message even though there
-            // were no valid targets to deliver the message to.
-            if (message.isResponseRequired()) {
-                ack(message);
-            }
-        }
-        controller.elementDispatched(elem);
-    }
-
     static public Destination convert(ActiveMQDestination dest) {
         if (dest.isComposite()) {
             ActiveMQDestination[] compositeDestinations = dest.getCompositeDestinations();
@@ -573,5 +542,4 @@
     public void setWireFormat(WireFormat wireFormat) {
         this.wireFormat = (OpenWireFormat) wireFormat;
     }
-
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java?rev=756773&r1=756772&r2=756773&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
Fri Mar 20 21:48:39 2009
@@ -32,6 +32,11 @@
     private int priority = Integer.MIN_VALUE;
     private AsciiBuffer msgId;
     private long tracking = -1;
+    private PersistListener persistListener = null;
+
+    public interface PersistListener {
+        public void onMessagePersisted(StompMessageDelivery delivery);
+    }
 
     public StompMessageDelivery(StompFrame frame, Destination destiantion) {
         this.frame = frame;
@@ -39,6 +44,10 @@
         this.receiptId = frame.getHeaders().remove(Stomp.Headers.RECEIPT_REQUESTED);
     }
 
+    public void setPersistListener(PersistListener listener) {
+        persistListener = listener;
+    }
+
     public Destination getDestination() {
         return destination;
     }
@@ -48,21 +57,21 @@
     }
 
     public int getPriority() {
-        if( priority == Integer.MIN_VALUE ) {
+        if (priority == Integer.MIN_VALUE) {
             String p = frame.getHeaders().get(Stomp.Headers.Message.PRORITY);
             try {
                 priority = (p == null) ? 4 : Integer.parseInt(p);
             } catch (NumberFormatException e) {
                 priority = 4;
-            } 
+            }
         }
         return priority;
     }
 
     public AsciiBuffer getMsgId() {
-        if( msgId == null ) {
+        if (msgId == null) {
             String p = frame.getHeaders().get(Stomp.Headers.Message.MESSAGE_ID);
-            if( p!=null ) {
+            if (p != null) {
                 msgId = new AsciiBuffer(p);
             }
         }
@@ -82,7 +91,7 @@
     }
 
     public <T> T asType(Class<T> type) {
-        if( type == StompFrame.class ) {
+        if (type == StompFrame.class) {
             return type.cast(frame);
         }
         return null;
@@ -102,22 +111,31 @@
     }
 
     public long getTrackingNumber() {
-        return tracking ;
+        return tracking;
     }
 
     public void setTrackingNumber(long tracking) {
         this.tracking = tracking;
     }
-    
+
     /**
      * Returns the message's buffer representation.
+     * 
      * @return
      */
-    public Buffer getMessageBuffer()
-    {
-        //Todo use asType() instead?
+    public Buffer getMessageBuffer() {
+        // Todo use asType() instead?
         throw new UnsupportedOperationException("not yet implemented");
     }
-   
 
+    public boolean isResponseRequired() {
+        return receiptId != null;
+    }
+
+    public void onMessagePersisted() {
+        if (persistListener != null) {
+            persistListener.onMessagePersisted(this);
+            persistListener = null;
+        }
+    }
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=756773&r1=756772&r2=756773&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
Fri Mar 20 21:48:39 2009
@@ -57,8 +57,7 @@
 import org.apache.activemq.util.FactoryFinder;
 import org.apache.activemq.wireformat.WireFormat;
 
-
-public class StompProtocolHandler implements ProtocolHandler {
+public class StompProtocolHandler implements ProtocolHandler, StompMessageDelivery.PersistListener
{
 
     interface ActionHander {
         public void onStompFrame(StompFrame frame) throws Exception;
@@ -68,11 +67,12 @@
     protected final HashMap<String, ConsumerContext> consumers = new HashMap<String,
ConsumerContext>();
 
     protected final Object inboundMutex = new Object();
-    protected IFlowController<MessageDelivery> inboundController;
-    
+    protected IFlowController<StompMessageDelivery> inboundController;
+
     protected BrokerConnection connection;
-    
-    // TODO: need to update the FrameTranslator to normalize to new broker API objects instead
of to the openwire command set.
+
+    // TODO: need to update the FrameTranslator to normalize to new broker API
+    // objects instead of to the openwire command set.
     private final FrameTranslator translator = new LegacyFrameTranslator();
     private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/broker/stomp/frametranslator/");
     private SingleFlowRelay<MessageDelivery> outboundQueue;
@@ -90,19 +90,19 @@
         }
         return translator;
     }
-    
+
     public StompProtocolHandler() {
-        actionHandlers.put(Stomp.Commands.CONNECT, new ActionHander(){
+        actionHandlers.put(Stomp.Commands.CONNECT, new ActionHander() {
             public void onStompFrame(StompFrame frame) throws Exception {
                 StompFrame response = new StompFrame(Stomp.Responses.CONNECTED);
                 connection.write(response);
             }
         });
-        actionHandlers.put(Stomp.Commands.SEND, new ActionHander(){
+        actionHandlers.put(Stomp.Commands.SEND, new ActionHander() {
             public void onStompFrame(StompFrame frame) throws Exception {
                 String dest = frame.getHeaders().get(Stomp.Headers.Send.DESTINATION);
                 Destination destination = translator(frame).convertToDestination(StompProtocolHandler.this,
dest);
-                
+
                 frame.setAction(Stomp.Responses.MESSAGE);
                 StompMessageDelivery md = new StompMessageDelivery(frame, destination);
                 while (!inboundController.offer(md, null)) {
@@ -110,7 +110,7 @@
                 }
             }
         });
-        actionHandlers.put(Stomp.Commands.SUBSCRIBE, new ActionHander(){
+        actionHandlers.put(Stomp.Commands.SUBSCRIBE, new ActionHander() {
             public void onStompFrame(StompFrame frame) throws Exception {
                 ConsumerContext ctx = new ConsumerContext(frame);
                 consumers.put(ctx.stompDestination, ctx);
@@ -118,51 +118,55 @@
                 ack(frame);
             }
         });
-        actionHandlers.put(Stomp.Commands.UNSUBSCRIBE, new ActionHander(){
+        actionHandlers.put(Stomp.Commands.UNSUBSCRIBE, new ActionHander() {
             public void onStompFrame(StompFrame frame) throws Exception {
             }
         });
-        actionHandlers.put(Stomp.Commands.ACK, new ActionHander(){
+        actionHandlers.put(Stomp.Commands.ACK, new ActionHander() {
             public void onStompFrame(StompFrame frame) throws Exception {
                 frame.getHeaders().get(Stomp.Headers.Ack.MESSAGE_ID);
             }
         });
-        actionHandlers.put(Stomp.Commands.DISCONNECT, new ActionHander(){
+        actionHandlers.put(Stomp.Commands.DISCONNECT, new ActionHander() {
             public void onStompFrame(StompFrame frame) throws Exception {
             }
         });
-        
-        actionHandlers.put(Stomp.Commands.ABORT_TRANSACTION, new ActionHander(){
+
+        actionHandlers.put(Stomp.Commands.ABORT_TRANSACTION, new ActionHander() {
             public void onStompFrame(StompFrame frame) throws Exception {
             }
         });
-        actionHandlers.put(Stomp.Commands.BEGIN_TRANSACTION, new ActionHander(){
+        actionHandlers.put(Stomp.Commands.BEGIN_TRANSACTION, new ActionHander() {
             public void onStompFrame(StompFrame frame) throws Exception {
             }
         });
-        actionHandlers.put(Stomp.Commands.COMMIT_TRANSACTION, new ActionHander(){
+        actionHandlers.put(Stomp.Commands.COMMIT_TRANSACTION, new ActionHander() {
             public void onStompFrame(StompFrame frame) throws Exception {
             }
         });
     }
-    
+
     public void start() throws Exception {
         // Setup the inbound processing..
-        final Flow inboundFlow = new Flow("broker-"+connection.getName()+"-inbound", false);
-        SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(connection.getInputWindowSize(),
connection.getInputResumeThreshold());
-        inboundController = new FlowController<MessageDelivery>(new FlowControllableAdapter()
{
-            public void flowElemAccepted(ISourceController<MessageDelivery> controller,
MessageDelivery elem) {
-                route(controller, elem);
+        final Flow inboundFlow = new Flow("broker-" + connection.getName() + "-inbound",
false);
+        SizeLimiter<StompMessageDelivery> inLimiter = new SizeLimiter<StompMessageDelivery>(connection.getInputWindowSize(),
connection.getInputResumeThreshold());
+        inboundController = new FlowController<StompMessageDelivery>(new FlowControllableAdapter()
{
+            public void flowElemAccepted(ISourceController<StompMessageDelivery> controller,
StompMessageDelivery elem) {
+                if (elem.isResponseRequired()) {
+                    elem.setPersistListener(StompProtocolHandler.this);
+                }
+                router.route(elem, controller);
+                controller.elementDispatched(elem);
             }
-        
+
             public String toString() {
                 return inboundFlow.getFlowName();
             }
-        }, inboundFlow, limiter, inboundMutex);
-        
-        Flow outboundFlow = new Flow("broker-"+connection.getName()+"-outbound", false);
-        limiter = new SizeLimiter<MessageDelivery>(connection.getOutputWindowSize(),
connection.getOutputWindowSize());
-        outboundQueue = new SingleFlowRelay<MessageDelivery>(outboundFlow, outboundFlow.getFlowName(),
limiter);
+        }, inboundFlow, inLimiter, inboundMutex);
+
+        Flow outboundFlow = new Flow("broker-" + connection.getName() + "-outbound", false);
+        SizeLimiter<MessageDelivery> outLimiter = new SizeLimiter<MessageDelivery>(connection.getOutputWindowSize(),
connection.getOutputWindowSize());
+        outboundQueue = new SingleFlowRelay<MessageDelivery>(outboundFlow, outboundFlow.getFlowName(),
outLimiter);
         outboundQueue.setDrain(new IFlowDrain<MessageDelivery>() {
             public void drain(final MessageDelivery message, final ISourceController<MessageDelivery>
controller) {
                 StompFrame msg = message.asType(StompFrame.class);
@@ -180,19 +184,19 @@
     }
 
     public void onCommand(Object o) {
-        StompFrame command = (StompFrame)o;
+        StompFrame command = (StompFrame) o;
         try {
             String action = command.getAction();
             ActionHander actionHander = actionHandlers.get(action);
-            if( actionHander == null ) {
-                throw new IOException("Unsupported command: "+action);
+            if (actionHander == null) {
+                throw new IOException("Unsupported command: " + action);
             }
             actionHander.onStompFrame(command);
         } catch (Exception error) {
             try {
-                
+
                 error.printStackTrace();
-                
+
                 ByteArrayOutputStream baos = new ByteArrayOutputStream();
                 PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
                 error.printStackTrace(stream);
@@ -215,11 +219,11 @@
             }
         }
     }
-    
+
     public void onException(Exception error) {
-        if( !connection.isStopping() ) {
+        if (!connection.isStopping()) {
             try {
-                
+
                 ByteArrayOutputStream baos = new ByteArrayOutputStream();
                 PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
                 error.printStackTrace(stream);
@@ -227,7 +231,7 @@
 
                 sendError(error.getMessage(), baos.toByteArray());
                 connection.stop();
-                
+
             } catch (Exception ignore) {
             }
         }
@@ -236,15 +240,15 @@
     // /////////////////////////////////////////////////////////////////
     // Internal Support Methods
     // /////////////////////////////////////////////////////////////////
-    static class FlowControllableAdapter implements FlowControllable<MessageDelivery>
{
-        public void flowElemAccepted(ISourceController<MessageDelivery> controller,
MessageDelivery elem) {
+    static class FlowControllableAdapter implements FlowControllable<StompMessageDelivery>
{
+        public void flowElemAccepted(ISourceController<StompMessageDelivery> controller,
StompMessageDelivery elem) {
         }
 
-        public IFlowSink<MessageDelivery> getFlowSink() {
+        public IFlowSink<StompMessageDelivery> getFlowSink() {
             return null;
         }
 
-        public IFlowSource<MessageDelivery> getFlowSource() {
+        public IFlowSource<StompMessageDelivery> getFlowSource() {
             return null;
         }
     }
@@ -260,14 +264,14 @@
         private String stompDestination;
         private Destination destination;
         private String ackMode;
-        
+
         private LinkedHashMap<AsciiBuffer, AsciiBuffer> sentMessageIds = new LinkedHashMap<AsciiBuffer,
AsciiBuffer>();
 
         private boolean durable;
 
         public ConsumerContext(final StompFrame subscribe) throws Exception {
             translator = translator(subscribe);
-            
+
             Map<String, String> headers = subscribe.getHeaders();
             stompDestination = headers.get(Stomp.Headers.Subscribe.DESTINATION);
             destination = translator.convertToDestination(StompProtocolHandler.this, stompDestination);
@@ -278,17 +282,17 @@
                 ackMode = StompSubscription.CLIENT_ACK;
             } else if (Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL.equals(ackMode))
{
                 ackMode = StompSubscription.INDIVIDUAL_ACK;
-                sendError(StompSubscription.INDIVIDUAL_ACK+" not supported.");
+                sendError(StompSubscription.INDIVIDUAL_ACK + " not supported.");
                 connection.stop();
                 return;
             } else {
                 ackMode = StompSubscription.AUTO_ACK;
             }
-            
+
             selector = parseSelector(subscribe);
 
-            if( ackMode != StompSubscription.AUTO_ACK ) {
-                Flow flow = new Flow("broker-"+subscriptionId+"-outbound", false);
+            if (ackMode != StompSubscription.AUTO_ACK) {
+                Flow flow = new Flow("broker-" + subscriptionId + "-outbound", false);
                 limiter = new WindowLimiter<MessageDelivery>(true, flow, 1000, 500)
{
                     public int getElementSize(MessageDelivery m) {
                         return 1;
@@ -298,8 +302,8 @@
                 queue.setDrain(new IFlowDrain<MessageDelivery>() {
                     public void drain(final MessageDelivery message, ISourceController<MessageDelivery>
controller) {
                         StompFrame frame = message.asType(StompFrame.class);
-                        if (ackMode == StompSubscription.CLIENT_ACK || ackMode==StompSubscription.INDIVIDUAL_ACK)
{
-                            synchronized(allSentMessageIds) {
+                        if (ackMode == StompSubscription.CLIENT_ACK || ackMode == StompSubscription.INDIVIDUAL_ACK)
{
+                            synchronized (allSentMessageIds) {
                                 AsciiBuffer msgId = message.getMsgId();
                                 sentMessageIds.put(msgId, msgId);
                                 allSentMessageIds.put(msgId, ConsumerContext.this);
@@ -311,26 +315,26 @@
             } else {
                 queue = outboundQueue;
             }
-            
+
         }
 
         public void ack(StompFrame info) throws Exception {
-            if (ackMode == StompSubscription.CLIENT_ACK || ackMode==StompSubscription.INDIVIDUAL_ACK)
{
+            if (ackMode == StompSubscription.CLIENT_ACK || ackMode == StompSubscription.INDIVIDUAL_ACK)
{
                 int credits = 0;
-                synchronized(allSentMessageIds) {
+                synchronized (allSentMessageIds) {
                     AsciiBuffer mid = new AsciiBuffer(info.getHeaders().get(Stomp.Headers.Ack.MESSAGE_ID));
                     for (Iterator<AsciiBuffer> iterator = sentMessageIds.keySet().iterator();
iterator.hasNext();) {
                         AsciiBuffer next = iterator.next();
                         iterator.remove();
                         allSentMessageIds.remove(next);
                         credits++;
-                        if( next.equals(mid) ) {
+                        if (next.equals(mid)) {
                             break;
                         }
                     }
-                        
+
                 }
-                synchronized(queue) {
+                synchronized (queue) {
                     limiter.onProtocolCredit(credits);
                 }
 
@@ -351,27 +355,29 @@
             if (stompMessage == null) {
                 return false;
             }
-            
+
             return true;
-            
-//          TODO: implement selector bits.
-//            Message msg = message.asType(Message.class);
-//            if (msg == null) {
-//                return false;
-//            }
-//
-//            // TODO: abstract the Selector bits so that it is not openwire specific.
-//            MessageEvaluationContext selectorContext = new MessageEvaluationContext();
-//            selectorContext.setMessageReference(msg);
-//            selectorContext.setDestination(msg.getDestination());
-//            try {
-//                return (selector == null || selector.matches(selectorContext));
-//            } catch (JMSException e) {
-//                e.printStackTrace();
-//                return false;
-//            }
+
+            // TODO: implement selector bits.
+            // Message msg = message.asType(Message.class);
+            // if (msg == null) {
+            // return false;
+            // }
+            //
+            // // TODO: abstract the Selector bits so that it is not openwire
+            // specific.
+            // MessageEvaluationContext selectorContext = new
+            // MessageEvaluationContext();
+            // selectorContext.setMessageReference(msg);
+            // selectorContext.setDestination(msg.getDestination());
+            // try {
+            // return (selector == null || selector.matches(selectorContext));
+            // } catch (JMSException e) {
+            // e.printStackTrace();
+            // return false;
+            // }
         }
-        
+
         public boolean isDurable() {
             return durable;
         }
@@ -381,11 +387,11 @@
         }
 
     }
-    
+
     private void sendError(String message) {
         sendError(message, StompFrame.NO_DATA);
     }
-    
+
     private void sendError(String message, String details) {
         try {
             sendError(message, details.getBytes("UTF-8"));
@@ -393,6 +399,7 @@
             throw new RuntimeException(e);
         }
     }
+
     private void sendError(String message, byte[] details) {
         HashMap<String, String> headers = new HashMap<String, String>();
         headers.put(Stomp.Headers.Error.MESSAGE, message);
@@ -400,9 +407,16 @@
         connection.write(errorMessage);
     }
 
-    private void ack(StompFrame frame) {
+    //Callback from MessageDelivery when message's persistence guarantees are met. 
+    public void onMessagePersisted(StompMessageDelivery delivery) {
+        //TODO this method must not block:
+        ack(delivery.getStomeFame());
+    }
+
+    void ack(StompFrame frame) {
         ack(frame.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED));
     }
+
     private void ack(String receiptId) {
         if (receiptId != null) {
             StompFrame receipt = new StompFrame();
@@ -413,49 +427,6 @@
         }
     }
 
-    protected void route(ISourceController<MessageDelivery> controller, MessageDelivery
messageDelivery) {
-        // TODO:
-        // Consider doing some caching of this target list. Most producers
-        // always send to
-        // the same destination.
-        Collection<DeliveryTarget> targets = router.route(messageDelivery);
-        final StompMessageDelivery smd = ((StompMessageDelivery) messageDelivery);
-        String receiptId = smd.getReceiptId();
-        if (targets != null) {
-            if (receiptId!=null) {
-                // We need to ack the message once we ensure we won't loose it.
-                // We know we won't loose it once it's persisted or delivered to
-                // a consumer
-                // Setup a callback to get notifed once one of those happens.
-                if (messageDelivery.isPersistent()) {
-                    messageDelivery.setCompletionCallback(new Runnable() {
-                        public void run() {
-                            ack(smd.getStomeFame());
-                        }
-                    });
-                } else {
-                    // Let the client know the broker got the message.
-                    ack(smd.getStomeFame());
-                }
-            }
-
-            // Deliver the message to all the targets..
-            for (DeliveryTarget dt : targets) {
-                if (dt.match(messageDelivery)) {
-                    dt.getSink().add(messageDelivery, controller);
-                }
-            }
-
-        } else {
-            // Let the client know we got the message even though there
-            // were no valid targets to deliver the message to.
-            if (receiptId!=null) {
-                ack(receiptId);
-            }
-        }
-        controller.elementDispatched(messageDelivery);
-    }
-
     static public Destination convert(ActiveMQDestination dest) {
         if (dest.isComposite()) {
             ActiveMQDestination[] compositeDestinations = dest.getCompositeDestinations();
@@ -476,11 +447,11 @@
         }
         return new Destination.SingleDestination(domain, new AsciiBuffer(dest.getPhysicalName()));
     }
-    
+
     private static BooleanExpression parseSelector(StompFrame frame) throws InvalidSelectorException
{
         BooleanExpression rc = null;
         String selector = frame.getHeaders().get(Stomp.Headers.Subscribe.SELECTOR);
-        if( selector !=null ) { 
+        if (selector != null) {
             rc = SelectorParser.parse(selector);
         }
         return rc;
@@ -512,5 +483,4 @@
         // TODO Auto-generated method stub
         return null;
     }
-
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java?rev=756773&r1=756772&r2=756773&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
Fri Mar 20 21:48:39 2009
@@ -366,8 +366,7 @@
         }
 
         public void onCommit() {
-            // Notify that we've saved the message.
-            delivery.getCompletionCallback().run();
+            delivery.onMessagePersisted();
         }
 
     }



Mime
View raw message