activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r755682 - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/openwire/ main/java/org/apache/activemq/broker/stomp/ test/java/org/apache/activemq/broker/ test/java/org/apache/ac...
Date Wed, 18 Mar 2009 18:04:18 GMT
Author: chirino
Date: Wed Mar 18 18:04:17 2009
New Revision: 755682

URL: http://svn.apache.org/viewvc?rev=755682&view=rev
Log:
Added virtual hosts to the broker model.  Some protocols may be able to take advantage of
it.. if not, they can use the default virtual host.


Added:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java
Removed:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Broker.java
Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/FrameTranslator.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/JmsFrameTranslator.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/LegacyFrameTranslator.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerConnection.java
Wed Mar 18 18:04:17 2009
@@ -26,7 +26,7 @@
 
 public class BrokerConnection extends Connection {
     
-    protected Broker broker;
+    protected MessageBroker broker;
     private ProtocolHandler protocolHandler;
 
     public BrokerConnection() {
@@ -41,11 +41,11 @@
         });
     }
     
-    public Broker getBroker() {
+    public MessageBroker getBroker() {
         return broker;
     }
 
-    public void setBroker(Broker broker) {
+    public void setBroker(MessageBroker broker) {
         this.broker = broker;
     }
     

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java
Wed Mar 18 18:04:17 2009
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker;
 
+import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.flow.IFlowSink;
 
 public interface DeliveryTarget {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Destination.java
Wed Mar 18 18:04:17 2009
@@ -18,6 +18,7 @@
 
 import java.util.Collection;
 
+import org.apache.activemq.broker.Destination;
 import org.apache.activemq.protobuf.AsciiBuffer;
 
 public interface Destination {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Domain.java Wed
Mar 18 18:04:17 2009
@@ -18,6 +18,8 @@
 
 import java.util.Collection;
 
+import org.apache.activemq.broker.DeliveryTarget;
+import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.protobuf.AsciiBuffer;
 
 /**

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java?rev=755682&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java
(added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java
Wed Mar 18 18:04:17 2009
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.Connection;
+import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.transport.DispatchableTransportServer;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportAcceptListener;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportServer;
+
+public class MessageBroker implements TransportAcceptListener {
+
+    public static final int MAX_USER_PRIORITY = 10;
+    public static final int MAX_PRIORITY = MAX_USER_PRIORITY + 1;
+
+    final ArrayList<Connection> clientConnections = new ArrayList<Connection>();
+    private final LinkedHashMap<AsciiBuffer, VirtualHost> virtualHosts = new LinkedHashMap<AsciiBuffer,
VirtualHost>();
+    private VirtualHost defaultVirtualHost;
+
+    private TransportServer transportServer;
+    private String bindUri;
+    private String connectUri;
+    private String name;
+    private IDispatcher dispatcher;
+    private final AtomicBoolean stopping = new AtomicBoolean();
+
+    public String getName() {
+        return name;
+    }
+
+    public final void stop() throws Exception {
+        stopping.set(true);
+        transportServer.stop();
+
+        for (Connection connection : clientConnections) {
+            connection.stop();
+        }
+
+        for (VirtualHost virtualHost : virtualHosts.values()) {
+            virtualHost.stop();
+        }
+        dispatcher.shutdown();
+
+    }
+
+    public final void start() throws Exception {
+        dispatcher.start();
+
+        for (VirtualHost virtualHost : virtualHosts.values()) {
+            virtualHost.start();
+        }
+
+        transportServer = TransportFactory.bind(new URI(bindUri));
+        transportServer.setAcceptListener(this);
+        if (transportServer instanceof DispatchableTransportServer) {
+            ((DispatchableTransportServer) transportServer).setDispatcher(dispatcher);
+        }
+        transportServer.start();
+
+    }
+
+    public void onAccept(final Transport transport) {
+        BrokerConnection connection = new BrokerConnection();
+        connection.setBroker(this);
+        connection.setTransport(transport);
+        connection.setPriorityLevels(MAX_PRIORITY);
+        connection.setDispatcher(dispatcher);
+        clientConnections.add(connection);
+        try {
+            connection.start();
+        } catch (Exception e1) {
+            onAcceptError(e1);
+        }
+    }
+
+    public void onAcceptError(Exception error) {
+        System.out.println("Accept error: " + error);
+        error.printStackTrace();
+    }
+
+    public IDispatcher getDispatcher() {
+        return dispatcher;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public void setDispatcher(IDispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+
+    public String getBindUri() {
+        return bindUri;
+    }
+
+    public void setBindUri(String uri) {
+        this.bindUri = uri;
+    }
+
+    public boolean isStopping() {
+        return stopping.get();
+    }
+
+    public String getConnectUri() {
+        return connectUri;
+    }
+
+    public void setConnectUri(String connectUri) {
+        this.connectUri = connectUri;
+    }
+
+    // /////////////////////////////////////////////////////////////////
+    // Virtual Host Related Opperations 
+    // /////////////////////////////////////////////////////////////////
+    public VirtualHost getDefaultVirtualHost() {
+        synchronized (virtualHosts) {
+            if( defaultVirtualHost==null ) {
+                defaultVirtualHost = new VirtualHost();
+            }
+            return defaultVirtualHost;
+        }
+    }
+
+    public void setDefaultVirtualHost(VirtualHost defaultVirtualHost) {
+        synchronized (virtualHosts) {
+            this.defaultVirtualHost = defaultVirtualHost;
+        }
+    }
+
+    public void addVirtualHost(VirtualHost host) throws Exception {
+        synchronized (virtualHosts) {
+            // Make sure it's valid.
+            ArrayList<AsciiBuffer> hostNames = host.getHostNames();
+            if (hostNames.isEmpty()) {
+                throw new Exception("Virtual host must be configured with at least one host
name.");
+            }
+            for (AsciiBuffer name : hostNames) {
+                if (virtualHosts.containsKey(name)) {
+                    throw new Exception("Virtual host with host name " + name + " already
exists.");
+                }
+            }
+
+            // Register it.
+            for (AsciiBuffer name : hostNames) {
+                virtualHosts.put(name, host);
+            }
+
+            // The first virtual host defined is the default virtual host.
+            if (virtualHosts.size() == 1) {
+                setDefaultVirtualHost(host);
+            }
+        }
+    }
+
+    public synchronized void removeVirtualHost(VirtualHost host) throws Exception {
+        synchronized (virtualHosts) {
+            for (AsciiBuffer name : host.getHostNames()) {
+                virtualHosts.remove(name);
+            }
+            // Was the default virtual host removed? Set the default to the next virtual
host.
+            if( host == defaultVirtualHost ) {
+                if( virtualHosts.isEmpty() ) {
+                    defaultVirtualHost = null;
+                } else {
+                    defaultVirtualHost = virtualHosts.values().iterator().next();
+                }
+            }
+        }
+    }
+
+    public VirtualHost getVirtualHost(AsciiBuffer name) {
+        synchronized (virtualHosts) {
+            return virtualHosts.get(name);
+        }
+    }
+
+    public synchronized Collection<VirtualHost> getVirtualHosts() {
+        synchronized (virtualHosts) {
+            return new ArrayList<VirtualHost>(virtualHosts.values());
+        }
+    }
+
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
Wed Mar 18 18:04:17 2009
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker;
 
+import org.apache.activemq.broker.Destination;
 import org.apache.activemq.protobuf.AsciiBuffer;
 
 public interface MessageDelivery {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java Wed
Mar 18 18:04:17 2009
@@ -18,6 +18,9 @@
 
 import java.util.HashMap;
 
+import org.apache.activemq.broker.DeliveryTarget;
+import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.flow.IFlowSink;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.PrioritySizeLimiter;
@@ -35,7 +38,7 @@
     HashMap<DeliveryTarget, Subscription<MessageDelivery>> subs = new HashMap<DeliveryTarget,
Subscription<MessageDelivery>>();
     private Destination destination;
     private IQueue<AsciiBuffer, MessageDelivery> queue;
-    private Broker broker;
+    private MessageBroker broker;
     
     private Mapper<Integer, MessageDelivery> partitionMapper;
     private Mapper<AsciiBuffer, MessageDelivery> keyExtractor;
@@ -65,8 +68,8 @@
     };
     
     private IQueue<AsciiBuffer, MessageDelivery> createSharedFlowQueue() {
-        if (Broker.MAX_PRIORITY > 1) {
-            PrioritySizeLimiter<MessageDelivery> limiter = new PrioritySizeLimiter<MessageDelivery>(100,
1, Broker.MAX_PRIORITY);
+        if (MessageBroker.MAX_PRIORITY > 1) {
+            PrioritySizeLimiter<MessageDelivery> limiter = new PrioritySizeLimiter<MessageDelivery>(100,
1, MessageBroker.MAX_PRIORITY);
             limiter.setPriorityMapper(PRIORITY_MAPPER);
             SharedPriorityQueue<AsciiBuffer, MessageDelivery> queue = new SharedPriorityQueue<AsciiBuffer,
MessageDelivery>(destination.getName().toString(), limiter);
             queue.setKeyMapper(keyExtractor);
@@ -141,11 +144,11 @@
         return true;
     }
 
-    public Broker getBroker() {
+    public MessageBroker getBroker() {
         return broker;
     }
 
-    public void setBroker(Broker broker) {
+    public void setBroker(MessageBroker broker) {
         this.broker = broker;
     }
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/QueueDomain.java
Wed Mar 18 18:04:17 2009
@@ -20,6 +20,10 @@
 import java.util.Collection;
 import java.util.HashMap;
 
+import org.apache.activemq.broker.DeliveryTarget;
+import org.apache.activemq.broker.Domain;
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.Queue;
 import org.apache.activemq.protobuf.AsciiBuffer;
 
 public class QueueDomain implements Domain {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java Wed
Mar 18 18:04:17 2009
@@ -20,6 +20,12 @@
 import java.util.HashMap;
 import java.util.HashSet;
 
+import org.apache.activemq.broker.DeliveryTarget;
+import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.Domain;
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.QueueDomain;
+import org.apache.activemq.broker.TopicDomain;
 import org.apache.activemq.protobuf.AsciiBuffer;
 
 final public class Router {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/TopicDomain.java
Wed Mar 18 18:04:17 2009
@@ -20,6 +20,9 @@
 import java.util.Collection;
 import java.util.HashMap;
 
+import org.apache.activemq.broker.DeliveryTarget;
+import org.apache.activemq.broker.Domain;
+import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.protobuf.AsciiBuffer;
 
 public class TopicDomain implements Domain {

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java?rev=755682&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java
(added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java
Wed Mar 18 18:04:17 2009
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.protobuf.AsciiBuffer;
+
+/**
+ * @author chirino
+ */
+public class VirtualHost implements Service {
+    
+    private ArrayList<AsciiBuffer> hostNames = new ArrayList<AsciiBuffer>();
+    private Router router = new Router();
+    private HashMap<Destination, Queue> queues = new HashMap<Destination, Queue>();
+
+    public AsciiBuffer getHostName() {
+        if( hostNames.size() > 0 ) {
+            hostNames.get(0);
+        }
+        return null;
+    }
+    
+    public ArrayList<AsciiBuffer> getHostNames() {
+        return hostNames;
+    }
+    public void setHostNames(ArrayList<AsciiBuffer> hostNames) {
+        this.hostNames = hostNames;
+    }
+    
+    public Router getRouter() {
+        return router;
+    }
+    
+    public void start() throws Exception {
+        for (Queue queue : queues.values()) {
+            queue.start();
+        }
+    }
+    public void stop() throws Exception {
+        for (Queue queue : queues.values()) {
+            queue.stop();
+        }
+    }
+
+    public void addQueue(Queue queue) {
+        Domain domain = router.getDomain(queue.getDestination().getDomain());
+        domain.add(queue.getDestination().getName(), queue);
+    }
+
+
+}

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
Wed Mar 18 18:04:17 2009
@@ -91,7 +91,8 @@
     protected IFlowController<MessageDelivery> inboundController;
     
     protected BrokerConnection connection;
-    private OpenWireFormat wireFormat; 
+    private OpenWireFormat wireFormat;
+    private Router router; 
     
     public void start() throws Exception {
         // Setup the inbound processing..
@@ -137,7 +138,7 @@
                 public Response processAddConsumer(ConsumerInfo info) throws Exception {
                     ConsumerContext ctx = new ConsumerContext(info);
                     consumers.put(info.getConsumerId(), ctx);
-                    connection.getBroker().getRouter().bind(convert(info.getDestination()),
ctx);
+                    router.bind(convert(info.getDestination()), ctx);
                     return ack(command);
                 }
 
@@ -468,7 +469,7 @@
         // Consider doing some caching of this target list. Most producers
         // always send to
         // the same destination.
-        Collection<DeliveryTarget> targets = connection.getBroker().getRouter().route(elem);
+        Collection<DeliveryTarget> targets = router.route(elem);
 
         final Message message = ((OpenWireMessageDelivery) elem).getMessage();
         if (targets != null) {
@@ -556,6 +557,7 @@
 
     public void setConnection(BrokerConnection connection) {
         this.connection = connection;
+        this.router = connection.getBroker().getDefaultVirtualHost().getRouter();
     }
 
     public void setWireFormat(WireFormat wireFormat) {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/FrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/FrameTranslator.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/FrameTranslator.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/FrameTranslator.java
Wed Mar 18 18:04:17 2009
@@ -23,6 +23,8 @@
 import javax.jms.JMSException;
 
 import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.stomp.FrameTranslator;
+import org.apache.activemq.broker.stomp.StompProtocolHandler;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.transport.stomp.ProtocolException;

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/JmsFrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/JmsFrameTranslator.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/JmsFrameTranslator.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/JmsFrameTranslator.java
Wed Mar 18 18:04:17 2009
@@ -25,6 +25,9 @@
 
 import javax.jms.JMSException;
 
+import org.apache.activemq.broker.stomp.FrameTranslator;
+import org.apache.activemq.broker.stomp.LegacyFrameTranslator;
+import org.apache.activemq.broker.stomp.StompProtocolHandler;
 import org.apache.activemq.command.ActiveMQMapMessage;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQObjectMessage;

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/LegacyFrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/LegacyFrameTranslator.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/LegacyFrameTranslator.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/LegacyFrameTranslator.java
Wed Mar 18 18:04:17 2009
@@ -24,6 +24,8 @@
 
 import org.apache.activemq.broker.Destination;
 import org.apache.activemq.broker.Router;
+import org.apache.activemq.broker.stomp.FrameTranslator;
+import org.apache.activemq.broker.stomp.StompProtocolHandler;
 import org.apache.activemq.command.ActiveMQBytesMessage;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
Wed Mar 18 18:04:17 2009
@@ -78,6 +78,7 @@
     private SingleFlowRelay<MessageDelivery> outboundQueue;
 
     private HashMap<AsciiBuffer, ConsumerContext> allSentMessageIds = new HashMap<AsciiBuffer,
ConsumerContext>();
+    private Router router;
 
     protected FrameTranslator translator(StompFrame frame) {
         try {
@@ -113,7 +114,7 @@
             public void onStompFrame(StompFrame frame) throws Exception {
                 ConsumerContext ctx = new ConsumerContext(frame);
                 consumers.put(ctx.stompDestination, ctx);
-                connection.getBroker().getRouter().bind(ctx.destination, ctx);
+                router.bind(ctx.destination, ctx);
                 ack(frame);
             }
         });
@@ -407,7 +408,7 @@
         // Consider doing some caching of this target list. Most producers
         // always send to
         // the same destination.
-        Collection<DeliveryTarget> targets = connection.getBroker().getRouter().route(messageDelivery);
+        Collection<DeliveryTarget> targets = router.route(messageDelivery);
         final StompMessageDelivery smd = ((StompMessageDelivery) messageDelivery);
         String receiptId = smd.getReceiptId();
         if (targets != null) {
@@ -481,6 +482,7 @@
 
     public void setConnection(BrokerConnection connection) {
         this.connection = connection;
+        this.router = connection.getBroker().getDefaultVirtualHost().getRouter();
     }
 
     public void setWireFormat(WireFormat wireFormat) {

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
Wed Mar 18 18:04:17 2009
@@ -26,6 +26,11 @@
 
 import junit.framework.TestCase;
 
+import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.MessageBroker;
+import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.Queue;
+import org.apache.activemq.broker.Router;
 import org.apache.activemq.dispatch.IDispatcher;
 import org.apache.activemq.dispatch.PriorityDispatcher;
 import org.apache.activemq.metric.MetricAggregator;
@@ -71,9 +76,9 @@
     protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate
Producer Rate").unit("items");
     protected MetricAggregator totalConsumerRate = new MetricAggregator().name("Aggregate
Consumer Rate").unit("items");
 
-    protected Broker sendBroker;
-    protected Broker rcvBroker;
-    protected ArrayList<Broker> brokers = new ArrayList<Broker>();
+    protected MessageBroker sendBroker;
+    protected MessageBroker rcvBroker;
+    protected ArrayList<MessageBroker> brokers = new ArrayList<MessageBroker>();
     protected IDispatcher dispatcher;
     protected final AtomicLong msgIdGenerator = new AtomicLong();
     protected final AtomicBoolean stopping = new AtomicBoolean();
@@ -117,7 +122,7 @@
     }
 
     protected IDispatcher createDispatcher() {
-        return PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher", Broker.MAX_PRIORITY,
asyncThreadPoolSize);
+        return PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher", MessageBroker.MAX_PRIORITY,
asyncThreadPoolSize);
     }
     
     public void test_1_1_0() throws Exception {
@@ -390,10 +395,10 @@
             dests[i] = bean;
             if (ptp) {
                 Queue queue = createQueue(sendBroker, dests[i]);
-                sendBroker.addQueue(queue);
+                sendBroker.getDefaultVirtualHost().addQueue(queue);
                 if (multibroker) {
                     queue = createQueue(rcvBroker, dests[i]);
-                    rcvBroker.addQueue(queue);
+                    rcvBroker.getDefaultVirtualHost().addQueue(queue);
                 }
             }
         }
@@ -460,7 +465,7 @@
 
     abstract protected RemoteProducer cerateProducer();
 
-    private Queue createQueue(Broker broker, Destination destination) {
+    private Queue createQueue(MessageBroker broker, Destination destination) {
         Queue queue = new Queue();
         queue.setBroker(broker);
         queue.setDestination(destination);
@@ -471,8 +476,8 @@
         return queue;
     }
 
-    private Broker createBroker(String name, String bindURI, String connectUri) {
-        Broker broker = new Broker();
+    private MessageBroker createBroker(String name, String bindURI, String connectUri) {
+        MessageBroker broker = new MessageBroker();
         broker.setName(name);
         broker.setBindUri(bindURI);
         broker.setConnectUri(connectUri);
@@ -482,7 +487,7 @@
 
     private void stopServices() throws Exception {
         stopping.set(true);
-        for (Broker broker : brokers) {
+        for (MessageBroker broker : brokers) {
             broker.stop();
         }
         for (RemoteProducer connection : producers) {
@@ -497,7 +502,7 @@
     }
 
     private void startServices() throws Exception {
-        for (Broker broker : brokers) {
+        for (MessageBroker broker : brokers) {
             broker.start();
         }
         for (RemoteConsumer connection : consumers) {

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteConsumer.java
Wed Mar 18 18:04:17 2009
@@ -4,6 +4,8 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.Connection;
+import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.metric.MetricAggregator;
 import org.apache.activemq.metric.MetricCounter;

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/RemoteProducer.java
Wed Mar 18 18:04:17 2009
@@ -4,6 +4,8 @@
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.Connection;
+import org.apache.activemq.broker.Destination;
+import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
 import org.apache.activemq.dispatch.IDispatcher.Dispatchable;
 import org.apache.activemq.flow.IFlowController;

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java
Wed Mar 18 18:04:17 2009
@@ -10,6 +10,7 @@
 import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.broker.RemoteConsumer;
 import org.apache.activemq.broker.Router;
+import org.apache.activemq.broker.openwire.OpenWireMessageDelivery;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java?rev=755682&r1=755681&r2=755682&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java
Wed Mar 18 18:04:17 2009
@@ -13,6 +13,7 @@
 import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.broker.RemoteProducer;
 import org.apache.activemq.broker.Router;
+import org.apache.activemq.broker.openwire.OpenWireMessageDelivery;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTextMessage;



Mime
View raw message