activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmacn...@apache.org
Subject svn commit: r783177 - in /activemq/sandbox/activemq-flow: activemq-broker/src/main/java/org/apache/activemq/broker/ activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/ activemq-openwire/src/main/java/org/apache/activemq/openwire/ activ...
Date Wed, 10 Jun 2009 01:51:31 GMT
Author: cmacnaug
Date: Wed Jun 10 01:51:30 2009
New Revision: 783177

URL: http://svn.apache.org/viewvc?rev=783177&view=rev
Log: (empty)

Added:
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/MultiSubscription.java
  (with props)
Modified:
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerSubscription.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/Destination.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/DurableSubscription.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/Queue.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/Router.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/TopicSubscription.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/VirtualHost.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
    activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
    activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactory.java
    activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
    activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/StatefulWireFormat.java

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerSubscription.java?rev=783177&r1=783176&r2=783177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerSubscription.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerSubscription.java
Wed Jun 10 01:51:30 2009
@@ -24,6 +24,8 @@
 
     public void disconnect(Subscription<MessageDelivery> subscription);
     
+    public Destination getDestination();
+    
     @SuppressWarnings("serial")
     public class UserAlreadyConnectedException extends Exception {
 

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/Destination.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/Destination.java?rev=783177&r1=783176&r2=783177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/Destination.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/Destination.java
Wed Jun 10 01:51:30 2009
@@ -17,26 +17,31 @@
 package org.apache.activemq.broker;
 
 import java.util.Collection;
+import java.util.HashSet;
 
 import org.apache.activemq.protobuf.AsciiBuffer;
 
 public interface Destination {
 
     AsciiBuffer getDomain();
+
     AsciiBuffer getName();
+
     Collection<Destination> getDestinations();
 
     public class SingleDestination implements Destination {
 
         private AsciiBuffer domain;
         private AsciiBuffer name;
-        
+
         public SingleDestination() {
         }
+
         public SingleDestination(AsciiBuffer domain, AsciiBuffer name) {
             setDomain(domain);
             setName(name);
         }
+
         public SingleDestination(String domain, String name) {
             setDomain(domain);
             setName(name);
@@ -53,51 +58,47 @@
         public AsciiBuffer getName() {
             return name;
         }
+
         public void setDomain(AsciiBuffer domain) {
             this.domain = domain;
         }
+
         public void setName(AsciiBuffer name) {
             this.name = name;
         }
-        
+
         private void setName(String name) {
             setName(new AsciiBuffer(name));
         }
+
         private void setDomain(String domain) {
             setDomain(new AsciiBuffer(domain));
         }
 
-//        public ActiveMQDestination asActiveMQDestination() {
-//            if(domain.equals(Router.TOPIC_DOMAIN))
-//            {
-//                return new ActiveMQTopic(name.toString());
-//            }
-//            else if(domain.equals(Router.QUEUE_DOMAIN))
-//            {
-//                return new ActiveMQQueue(name.toString());
-//            }
-//            return null;
-//        }
+        //        public ActiveMQDestination asActiveMQDestination() {
+        //            if(domain.equals(Router.TOPIC_DOMAIN))
+        //            {
+        //                return new ActiveMQTopic(name.toString());
+        //            }
+        //            else if(domain.equals(Router.QUEUE_DOMAIN))
+        //            {
+        //                return new ActiveMQQueue(name.toString());
+        //            }
+        //            return null;
+        //        }
     }
-    
-    public class MultiDestination implements Destination {
 
-        private Collection<Destination> destinations;
+    public class MultiDestination implements Destination {
 
+        private final HashSet<Destination> destinations = new HashSet<Destination>();
+        
         public MultiDestination() {
         }
 
-        public MultiDestination(Collection<Destination> destinations) {
-            this.destinations=destinations;
-        }
 
         public Collection<Destination> getDestinations() {
             return destinations;
         }
-        
-        public void setDestinations(Collection<Destination> destinations) {
-            this.destinations = destinations;
-        }
 
         public AsciiBuffer getDomain() {
             return null;
@@ -106,12 +107,21 @@
         public AsciiBuffer getName() {
             return null;
         }
+
+        public void add(Destination d) {
+            destinations.add(d);
+            
+        }
         
-//        public ActiveMQDestination asActiveMQDestination() {
-//            throw new UnsupportedOperationException("Not yet implemented");
-//        }
+        public void remove(Destination d)
+        {
+            destinations.remove(d);
+        }
+
+        //        public ActiveMQDestination asActiveMQDestination() {
+        //            throw new UnsupportedOperationException("Not yet implemented");
+        //        }
 
     }
-    
-    
+
 }

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/DurableSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/DurableSubscription.java?rev=783177&r1=783176&r2=783177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/DurableSubscription.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/DurableSubscription.java
Wed Jun 10 01:51:30 2009
@@ -38,9 +38,16 @@
         this.queue = queue;
         this.destination = destination;
         this.selector = selector;
+        //TODO If a durable subscribes to a queue 
         this.host.getRouter().bind(destination, this);
     }
-    
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.broker.BrokerSubscription#getDestination()
+     */
+    public Destination getDestination() {
+        return destination;
+    }
 
     /* (non-Javadoc)
      * @see org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq.broker.MessageDelivery,
org.apache.activemq.flow.ISourceController)

Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/MultiSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/MultiSubscription.java?rev=783177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/MultiSubscription.java
(added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/MultiSubscription.java
Wed Jun 10 01:51:30 2009
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.filter.FilterException;
+import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.queue.Subscription;
+
+/**
+ * MultiSubscription
+ * <p>
+ * Description:
+ * </p>
+ * 
+ * @author cmacnaug
+ * @version 1.0
+ */
+public class MultiSubscription implements BrokerSubscription, DeliveryTarget {
+
+    private final Destination destination;
+    private final VirtualHost host;
+    private final BooleanExpression selector;
+    private Subscription<MessageDelivery> connectedSub;
+
+    MultiSubscription(VirtualHost host, Destination destination, BooleanExpression selector)
{
+        this.destination = destination;
+        this.host = host;
+        this.selector = selector;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq
+     * .broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
+     */
+    public final void deliver(MessageDelivery message, ISourceController<?> source)
{
+        Subscription<MessageDelivery> s = connectedSub;
+        if (s != null) {
+            s.add(message, source, null);
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.broker.DeliveryTarget#hasSelector()
+     */
+    public boolean hasSelector() {
+        return selector != null;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.activemq.broker.BrokerSubscription#connect(org.apache.activemq
+     * .broker.protocol.ProtocolHandler.ConsumerContext)
+     */
+    public synchronized void connect(Subscription<MessageDelivery> subsription) throws
UserAlreadyConnectedException {
+        connectedSub = subsription;
+        host.getRouter().bind(destination, this);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache.activemq
+     * .broker.protocol.ProtocolHandler.ConsumerContext)
+     */
+    public synchronized void disconnect(Subscription<MessageDelivery> context) {
+        host.getRouter().unbind(destination, this);
+        connectedSub = null;
+    }
+
+    public boolean matches(MessageDelivery message) {
+        if (selector == null) {
+            return true;
+        }
+
+        MessageEvaluationContext selectorContext = message.createMessageEvaluationContext();
+        selectorContext.setDestination(destination);
+        try {
+            return (selector.matches(selectorContext));
+        } catch (FilterException e) {
+            e.printStackTrace();
+            return false;
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.broker.BrokerSubscription#getDestination()
+     */
+    public Destination getDestination() {
+        return destination;
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/MultiSubscription.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/Queue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/Queue.java?rev=783177&r1=783176&r2=783177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/Queue.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/Queue.java
Wed Jun 10 01:51:30 2009
@@ -126,6 +126,13 @@
         public void disconnect(Subscription<MessageDelivery> context) {
             queue.removeSubscription(subscription);
         }
+        
+        /* (non-Javadoc)
+         * @see org.apache.activemq.broker.BrokerSubscription#getDestination()
+         */
+        public Destination getDestination() {
+            return queue.getDestination();
+        }
     }
 
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/Router.java?rev=783177&r1=783176&r2=783177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/Router.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/Router.java
Wed Jun 10 01:51:30 2009
@@ -57,13 +57,27 @@
     }
 
     public synchronized void bind(Destination destination, DeliveryTarget target) {
-        Domain domain = domains.get(destination.getDomain());
-        domain.bind(destination.getName(), target);
+        Collection<Destination> destinationList = destination.getDestinations();
+        if (destinationList == null) {
+            Domain domain = domains.get(destination.getDomain());
+            domain.bind(destination.getName(), target);
+        } else {
+            for (Destination d : destinationList) {
+                bind(d, target);
+            }
+        }
     }
-    
+
     public synchronized void unbind(Destination destination, DeliveryTarget target) {
-        Domain domain = domains.get(destination.getDomain());
-        domain.unbind(destination.getName(), target);
+        Collection<Destination> destinationList = destination.getDestinations();
+        if (destinationList == null) {
+            Domain domain = domains.get(destination.getDomain());
+            domain.unbind(destination.getName(), target);
+        } else {
+            for (Destination d : destinationList) {
+                unbind(d, target);
+            }
+        }
     }
 
     public void route(final BrokerMessageDelivery msg, ISourceController<?> controller)
{
@@ -73,8 +87,7 @@
         //Set up the delivery for persistence:
         msg.beginDispatch(database);
 
-        try
-        {
+        try {
             // TODO:
             // Consider doing some caching of this sub list. Most producers
             // always send to the same destination.
@@ -85,9 +98,7 @@
                     target.deliver(msg, controller);
                 }
             }
-        }
-        finally
-        {
+        } finally {
             try {
                 msg.finishDispatch(controller);
             } catch (IOException ioe) {

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/TopicSubscription.java?rev=783177&r1=783176&r2=783177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/TopicSubscription.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/TopicSubscription.java
Wed Jun 10 01:51:30 2009
@@ -22,7 +22,7 @@
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.queue.Subscription;
 
-public class TopicSubscription implements BrokerSubscription, DeliveryTarget {
+class TopicSubscription implements BrokerSubscription, DeliveryTarget {
 
     protected final BooleanExpression selector;
     protected final Destination destination;
@@ -30,16 +30,23 @@
     private final VirtualHost host;
 
     TopicSubscription(VirtualHost host, Destination destination, BooleanExpression selector)
{
-       this.host = host;
-       this.selector = selector;
-       this.destination = destination;
+        this.host = host;
+        this.selector = selector;
+        this.destination = destination;
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq.broker.MessageDelivery,
org.apache.activemq.flow.ISourceController)
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq
+     * .broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
      */
     public final void deliver(MessageDelivery message, ISourceController<?> source)
{
-        connectedSub.add(message, source, null);
+        Subscription<MessageDelivery> s = connectedSub;
+        if (s != null) {
+            s.add(message, source, null);
+        }
     }
 
     /*
@@ -72,6 +79,7 @@
      */
     public synchronized void disconnect(Subscription<MessageDelivery> context) {
         host.getRouter().unbind(destination, this);
+        connectedSub = null;
     }
 
     public boolean matches(MessageDelivery message) {
@@ -88,4 +96,13 @@
             return false;
         }
     }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.broker.BrokerSubscription#getDestination()
+     */
+    public Destination getDestination() {
+        return destination;
+    }
 }

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/VirtualHost.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/VirtualHost.java?rev=783177&r1=783176&r2=783177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/VirtualHost.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/broker/VirtualHost.java
Wed Jun 10 01:51:30 2009
@@ -98,11 +98,11 @@
         for (Queue queue : queues.values()) {
             queue.shutdown(true);
         }
-        
+
         for (AbstractFlowQueue<MessageDelivery> queue : queueStore.getDurableQueues())
{
             queue.shutdown(true);
         }
-        
+
         started = false;
     }
 
@@ -133,23 +133,30 @@
     public BrokerSubscription createSubscription(ConsumerContext consumer) {
         Destination destination = consumer.getDestination();
         BrokerSubscription sub = null;
-        if(destination.getDomain().equals(Router.TOPIC_DOMAIN))
-        {
-            if (consumer.isDurable()) {
-                sub = durableSubs.get(consumer.getSubscriptionName());
-                if (sub == null) {
-                    ExclusivePersistentQueue<Long, MessageDelivery> queue = queueStore.createDurableQueue(consumer.getSubscriptionName());
-                    queue.start();
-                    DurableSubscription dsub = new DurableSubscription(this, destination,
consumer.getSelectorExpression(), queue);
-                    durableSubs.put(consumer.getSubscriptionName(), dsub);
-                    sub = dsub;
-                }
-            } else if (consumer.getDestination().getDomain().equals(Router.TOPIC_DOMAIN))
{
-                sub = new TopicSubscription(this, destination, consumer.getSelectorExpression());
+
+        if (consumer.isDurable()) {
+            DurableSubscription dsub = durableSubs.get(consumer.getSubscriptionName());
+            if (dsub == null) {
+                ExclusivePersistentQueue<Long, MessageDelivery> queue = queueStore.createDurableQueue(consumer.getSubscriptionName());
+                queue.start();
+                dsub = new DurableSubscription(this, destination, consumer.getSelectorExpression(),
queue);
+                durableSubs.put(consumer.getSubscriptionName(), dsub);
             }
+            sub = dsub;
         } else {
-            Queue queue = queues.get(destination.getName());
-            sub = new Queue.QueueSubscription(queue);
+            if(destination.getDestinations() != null)
+            {
+                sub = new MultiSubscription(this, destination, consumer.getSelectorExpression());
+            }
+            else
+            {
+                if (destination.getDomain().equals(Router.TOPIC_DOMAIN)) {
+                    sub = new TopicSubscription(this, destination, consumer.getSelectorExpression());
+                } else {
+                    Queue queue = queues.get(destination.getName());
+                    sub = new Queue.QueueSubscription(queue);
+                }
+            }
         }
         return sub;
     }

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=783177&r1=783176&r2=783177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
Wed Jun 10 01:51:30 2009
@@ -166,6 +166,7 @@
 
                     OpenWireMessageDelivery md = new OpenWireMessageDelivery(info);
                     md.setStoreWireFormat(storeWireFormat);
+                    md.setPersistListener(OpenwireProtocolHandler.this);
 
                     // Only producers that are not using a window will block,
                     // and if it blocks.
@@ -663,11 +664,11 @@
     static public Destination convert(ActiveMQDestination dest) {
         if (dest.isComposite()) {
             ActiveMQDestination[] compositeDestinations = dest.getCompositeDestinations();
-            ArrayList<Destination> d = new ArrayList<Destination>();
+            Destination.MultiDestination md = new Destination.MultiDestination();
             for (int i = 0; i < compositeDestinations.length; i++) {
-                d.add(convert(compositeDestinations[i]));
+                md.add(convert(compositeDestinations[i]));
             }
-            return new Destination.MultiDestination(d);
+            return md;
         }
         AsciiBuffer domain;
         if (dest.isQueue()) {

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java?rev=783177&r1=783176&r2=783177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
Wed Jun 10 01:51:30 2009
@@ -27,6 +27,7 @@
 import org.apache.activemq.command.CommandTypes;
 import org.apache.activemq.command.DataStructure;
 import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.transport.ResponseCorrelator;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.InactivityMonitor;
 import org.apache.activemq.transport.WireFormatNegotiator;
@@ -652,6 +653,7 @@
         }
 
         transport = new WireFormatNegotiator(transport, this, 1);
+        transport = new ResponseCorrelator(transport);
         return transport;
     }
 }

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java?rev=783177&r1=783176&r2=783177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/flow/TestWireFormatFactory.java
Wed Jun 10 01:51:30 2009
@@ -1,72 +1,13 @@
 package org.apache.activemq.flow;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.Map;
-
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.wireformat.ObjectStreamWireFormat;
 import org.apache.activemq.wireformat.WireFormat;
 import org.apache.activemq.wireformat.WireFormatFactory;
 
 public class TestWireFormatFactory implements WireFormatFactory {
 
-    static public class TestWireFormat implements WireFormat {
-
-        public void marshal(Object value, DataOutput out) throws IOException {
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            ObjectOutputStream oos = new ObjectOutputStream(baos);
-            oos.writeObject(value);
-            oos.close();
-            
-            byte[] data = baos.toByteArray();
-            out.writeInt(data.length);
-            out.write(data);
-        }
-
-        public Object unmarshal(DataInput in) throws IOException {
-            byte data[] = new byte[in.readInt()];
-            in.readFully(data);
-            
-            ByteArrayInputStream is = new ByteArrayInputStream(data);
-            ObjectInputStream ois = new ObjectInputStream(is);
-            try {
-                return ois.readObject();
-            } catch (ClassNotFoundException e) {
-                throw IOExceptionSupport.create(e);
-            }
-        }
-
-        public int getVersion() {
-            return 0;
-        }
-        public void setVersion(int version) {
-        }
-
-        public boolean inReceive() {
-            return false;
-        }
-
-        public ByteSequence marshal(Object value) throws IOException {
-            return null;
-        }
-        public Object unmarshal(ByteSequence data) throws IOException {
-            return null;
-        }
-
-        public Transport createTransportFilters(Transport transport, Map options) {
-           return transport;
-        }
-    }
-
 	public WireFormat createWireFormat() {
-		return new TestWireFormat();
+		return new ObjectStreamWireFormat();
 	}	
 
 }

Modified: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=783177&r1=783176&r2=783177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
Wed Jun 10 01:51:30 2009
@@ -531,11 +531,11 @@
     static public Destination convert(ActiveMQDestination dest) {
         if (dest.isComposite()) {
             ActiveMQDestination[] compositeDestinations = dest.getCompositeDestinations();
-            ArrayList<Destination> d = new ArrayList<Destination>();
+            Destination.MultiDestination md = new Destination.MultiDestination();
             for (int i = 0; i < compositeDestinations.length; i++) {
-                d.add(convert(compositeDestinations[i]));
+                md.add(convert(compositeDestinations[i]));
             }
-            return new Destination.MultiDestination(d);
+            return md;
         }
         AsciiBuffer domain;
         if (dest.isQueue()) {

Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactory.java?rev=783177&r1=783176&r2=783177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactory.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFactory.java
Wed Jun 10 01:51:30 2009
@@ -243,7 +243,6 @@
         transport = compositeConfigure(transport, wf, options);
 
         transport = new MutexTransport(transport);
-//        transport = new ResponseCorrelator(transport);
 
         return transport;
     }

Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java?rev=783177&r1=783176&r2=783177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/MultiWireFormatFactory.java
Wed Jun 10 01:51:30 2009
@@ -26,7 +26,6 @@
 import java.util.Map;
 
 import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportFilter;
 import org.apache.activemq.util.ByteArrayInputStream;
 import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;

Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/StatefulWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/StatefulWireFormat.java?rev=783177&r1=783176&r2=783177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/StatefulWireFormat.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/wireformat/StatefulWireFormat.java
Wed Jun 10 01:51:30 2009
@@ -23,12 +23,18 @@
 
 public interface StatefulWireFormat extends WireFormat{
 
+    
     /**
      * Writes a command to the sub buffer, returning false if
-     * the command couldn't entirely fit into the sub. 
-     * @param command
-     * @param sub
-     * @return
+     * the command couldn't entirely fit into the target buffer. In this case
+     * the caller should supply an additional buffer or drain the current
+     * until the command is marshalled. 
+     *  
+     * @param command The command to marshal.
+     * @param target The target buffer.
+     * @return true if the command was fully marshalled. 
+     * 
+     * @throws IOException if there is an error writing the buffer. 
      */
     public boolean marshal(Object command, ByteBuffer target) throws IOException;
     



Mime
View raw message