activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r784272 - in /activemq/sandbox/activemq-flow: activemq-broker/src/main/java/org/apache/activemq/apollo/ activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-broker/src/test/java/org/apache/activemq/broker/ activemq-disp...
Date Fri, 12 Jun 2009 21:01:24 GMT
Author: chirino
Date: Fri Jun 12 21:01:23 2009
New Revision: 784272

URL: http://svn.apache.org/viewvc?rev=784272&view=rev
Log:
Flushing out the Broker class a bit.
 - It now supports multiple TransportServers and the assocaited connect uris.
 - It now keeps better track of it's state and defensivly throws IllegalStateExceptions on
some operations if they are using in the wrong state.


Added:
    activemq/sandbox/activemq-flow/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java
Removed:
    activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/DispatchableTransportServer.java
Modified:
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
    activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
    activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
    activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java
    activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/DispatchableTransport.java

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java?rev=784272&r1=784271&r2=784272&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/Connection.java
Fri Jun 12 21:01:23 2009
@@ -24,12 +24,13 @@
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.activemq.Service;
 import org.apache.activemq.dispatch.IDispatcher;
 import org.apache.activemq.transport.DispatchableTransport;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportListener;
 
-abstract public class Connection implements TransportListener {
+abstract public class Connection implements TransportListener, Service {
 
     protected Transport transport;
     protected String name;

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java?rev=784272&r1=784271&r2=784272&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Broker.java
Fri Jun 12 21:01:23 2009
@@ -16,143 +16,234 @@
  */
 package org.apache.activemq.apollo.broker;
 
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.activemq.Service;
 import org.apache.activemq.apollo.Connection;
 import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.broker.store.StoreFactory;
+import org.apache.activemq.dispatch.DispatcherAware;
 import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.dispatch.PriorityDispatcher;
 import org.apache.activemq.protobuf.AsciiBuffer;
-import org.apache.activemq.transport.DispatchableTransportServer;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportAcceptListener;
-import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportServer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
-public class Broker implements TransportAcceptListener {
+public class Broker implements Service {
 
+	static final private Log LOG = LogFactory.getLog(Broker.class);
+	
     public static final int MAX_USER_PRIORITY = 10;
     public static final int MAX_PRIORITY = MAX_USER_PRIORITY + 1;
 
-    final ArrayList<Connection> clientConnections = new ArrayList<Connection>();
+    private final ArrayList<Connection> clientConnections = new ArrayList<Connection>();
+    private final ArrayList<TransportServer> transportServers = new ArrayList<TransportServer>();
+    private final ArrayList<String> connectUris = new ArrayList<String>();
+
     private final LinkedHashMap<AsciiBuffer, VirtualHost> virtualHosts = new LinkedHashMap<AsciiBuffer,
VirtualHost>();
     private VirtualHost defaultVirtualHost;
-
-    private TransportServer transportServer;
-    private String bindUri;
-    private String connectUri;
-    private String name;
+	private String name;
     private IDispatcher dispatcher;
     private BrokerDatabase database;
     
-    private final AtomicBoolean stopping = new AtomicBoolean();
+    private final class BrokerAcceptListener implements TransportAcceptListener {
+		public void onAccept(final Transport transport) {
+		    BrokerConnection connection = new BrokerConnection();
+		    connection.setBroker(Broker.this);
+		    connection.setTransport(transport);
+		    connection.setPriorityLevels(MAX_PRIORITY);
+		    connection.setDispatcher(dispatcher);
+		    clientConnections.add(connection);
+		    try {
+		        connection.start();
+		    } catch (Exception e1) {
+		        onAcceptError(e1);
+		    }
+		}
+
+		public void onAcceptError(Exception error) {
+			LOG.warn("Accept error: " + error);
+			LOG.debug("Accept error details: ", error);
+		}
+	}
+
+	enum State { 
+    	CONFIGURATION, STARTING, RUNNING, STOPPING {
+    		@Override
+    		public boolean isStopping() {
+    			return true;
+    		}
+    	}
+    	, STOPPED {
+    		@Override
+    		public boolean isStopping() {
+    			return true;
+    		}
+    	}, 
+    	UNKNOWN;
+    	
+    	public boolean isStopping() {
+    		return false;
+    	}
+    	
+    };
+    
+    private final AtomicReference<State> state = new AtomicReference<State>(State.CONFIGURATION);
 
-    public String getName() {
-        return name;
+    // /////////////////////////////////////////////////////////////////
+    // Methods of the Service Interface
+    // /////////////////////////////////////////////////////////////////    
+
+    public final void start() throws Exception {
+
+		if ( state.get()!=State.CONFIGURATION ) {
+    		throw new IllegalStateException("Can only start a broker that has never been started
before");
+    	}
+
+		// Don't change the state to STARTING yet as we may need to 
+		// apply some default configuration to this broker instance before it's started.
+		if( dispatcher == null ) {
+			int threads = Runtime.getRuntime().availableProcessors();
+			dispatcher = PriorityDispatcher.createPriorityDispatchPool("Broker: "+getName(), Broker.MAX_PRIORITY,
threads);
+		}
+		if ( database == null ) {
+			Store store = StoreFactory.createStore("kaha-db");
+			database = new BrokerDatabase(store);
+		}
+	    addVirtualHost(getDefaultVirtualHost());
+
+	    // Ok now we are ready to start the broker up....
+		if ( !state.compareAndSet(State.CONFIGURATION, State.STARTING) ) {
+    		throw new IllegalStateException("Can only start a broker that has never been started
before");
+    	}
+    	try {
+		    dispatcher.start();
+		    database.setDispatcher(dispatcher);
+		    database.start();
+
+	    	synchronized(virtualHosts) {
+			    for (VirtualHost virtualHost : virtualHosts.values()) {
+			        virtualHost.start();
+			    }
+	    	}
+	    	
+		    // Startup the transports.
+	    	synchronized(transportServers) {
+			    for (TransportServer server : transportServers) {
+			    	startTransportServer(server);
+			    }
+	    	}
+	    	
+        	state.set(State.RUNNING);
+        	
+    	} catch (Exception e) {
+    		// We should try to avoid falling here... basically means 
+    		// we need to handle failure during the startup to avoid 
+    		// a partially started up broker.
+        	state.set(State.UNKNOWN);
+    	}
+        
     }
 
     public final void stop() throws Exception {
-        stopping.set(true);
-        transportServer.stop();
+    	if ( !state.compareAndSet(State.RUNNING, State.STOPPING) ) {
+    		throw new IllegalStateException("Can only stop a broker that is running");
+    	}
+    	
+    	synchronized(transportServers) {
+	        for ( TransportServer server : transportServers) {
+				stop(server);
+	        }
+    	}
 
         for (Connection connection : clientConnections) {
-            connection.stop();
+        	stop(connection);
         }
 
         for (VirtualHost virtualHost : virtualHosts.values()) {
-            virtualHost.stop();
+        	stop(virtualHost);
         }
-        database.stop();
+        stop(database);
         dispatcher.shutdown();
+    	state.set(State.STOPPED);
 
     }
-
-    public final void start() throws Exception {
-        dispatcher.start();
-        if (database != null) {
-            database.start();
-        } else {
-            throw new Exception("Store not initialized");
-        }
-        addVirtualHost(getDefaultVirtualHost());
-
-        for (VirtualHost virtualHost : virtualHosts.values()) {
-            virtualHost.start();
-        }
-
-        transportServer = TransportFactory.bind(new URI(bindUri));
-        transportServer.setAcceptListener(this);
-        if (transportServer instanceof DispatchableTransportServer) {
-            ((DispatchableTransportServer) transportServer).setDispatcher(dispatcher);
-        }
-        transportServer.start();
-
-    }
-
-    public void onAccept(final Transport transport) {
-        BrokerConnection connection = new BrokerConnection();
-        connection.setBroker(this);
-        connection.setTransport(transport);
-        connection.setPriorityLevels(MAX_PRIORITY);
-        connection.setDispatcher(dispatcher);
-        clientConnections.add(connection);
-        try {
-            connection.start();
-        } catch (Exception e1) {
-            onAcceptError(e1);
-        }
-    }
-
-    public void onAcceptError(Exception error) {
-        System.out.println("Accept error: " + error);
-        error.printStackTrace();
-    }
-
-    public IDispatcher getDispatcher() {
-        return dispatcher;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public void setDispatcher(IDispatcher dispatcher) {
-        this.dispatcher = dispatcher;
-    }
-
-    public BrokerDatabase getDatabase() {
-        return database;
-    }
-
-    public void setDatabase(BrokerDatabase database) {
-        this.database = database;
-    }
+        
+    // /////////////////////////////////////////////////////////////////
+    // Life cycle support operations.
+    // /////////////////////////////////////////////////////////////////    
     
-    public String getBindUri() {
-        return bindUri;
-    }
-
-    public void setBindUri(String uri) {
-        this.bindUri = uri;
-    }
-
     public boolean isStopping() {
-        return stopping.get();
-    }
-
-    public String getConnectUri() {
-        return connectUri;
-    }
-
-    public void setConnectUri(String connectUri) {
-        this.connectUri = connectUri;
+        return state.get().isStopping();
     }
+    
+    // /////////////////////////////////////////////////////////////////
+    // connectUris Related Operations
+    // /////////////////////////////////////////////////////////////////    
+    public List<String> getConnectUris() {
+    	synchronized(connectUris) {
+    		return new ArrayList<String>(connectUris);
+    	}
+	}
+
+	public void addConnectUri(String uri) {
+    	synchronized(connectUris) {
+    		this.connectUris.add(uri);
+    	}
+	}
+	
+	public void removeConnectUri(String uri) {
+    	synchronized(connectUris) {
+    		this.connectUris.remove(uri);
+    	}
+	}
 
+	
     // /////////////////////////////////////////////////////////////////
-    // Virtual Host Related Opperations
+    // transportServers Related Operations
+    // /////////////////////////////////////////////////////////////////    
+    public List<TransportServer> getTransportServers() {
+    	synchronized(transportServers) {
+    		return new ArrayList<TransportServer>(transportServers);
+    	}
+	}
+
+	public void addTransportServer(TransportServer server) {
+    	synchronized(transportServers) {
+    		switch(state.get()) {
+    		case RUNNING:
+    			startTransportServerWrapException(server);
+    		case CONFIGURATION:
+        		this.transportServers.add(server);
+    		default:
+    			throw new IllegalStateException("Cannot add a transport server when broker is: ");
+    		}
+    	}
+	}
+	
+	public void removeTransportServer(TransportServer server) {
+    	synchronized(transportServers) {
+    		switch(state.get()) {
+    		case RUNNING:
+    			stopTransportServerWrapException(server);
+    		case STOPPED:
+    		case CONFIGURATION:
+        		this.transportServers.remove(server);
+    		default:
+    			throw new IllegalStateException("Cannot add a transport server when broker is: ");
+    		}
+    	}
+	}
+
+	// /////////////////////////////////////////////////////////////////
+    // Virtual Host Related Operations
     // /////////////////////////////////////////////////////////////////
     public VirtualHost getDefaultVirtualHost() {
         synchronized (virtualHosts) {
@@ -167,12 +258,15 @@
     }
 
     public void setDefaultVirtualHost(VirtualHost defaultVirtualHost) {
-        synchronized (virtualHosts) {
+    	assertInConfigurationState();
+    	synchronized (virtualHosts) {
             this.defaultVirtualHost = defaultVirtualHost;
         }
     }
 
     public void addVirtualHost(VirtualHost host) throws Exception {
+    	assertInConfigurationState();
+    	
         synchronized (virtualHosts) {
             // Make sure it's valid.
             ArrayList<AsciiBuffer> hostNames = host.getHostNames();
@@ -197,7 +291,8 @@
         }
     }
 
-    public synchronized void removeVirtualHost(VirtualHost host) throws Exception {
+	public synchronized void removeVirtualHost(VirtualHost host) throws Exception {
+    	assertInConfigurationState();
         synchronized (virtualHosts) {
             for (AsciiBuffer name : host.getHostNames()) {
                 virtualHosts.remove(name);
@@ -225,9 +320,84 @@
             return new ArrayList<VirtualHost>(virtualHosts.values());
         }
     }
+    
+    // /////////////////////////////////////////////////////////////////
+    // Property Accessors
+    // /////////////////////////////////////////////////////////////////
+
+    public String getName() {
+        return name;
+    }
+    public void setName(String name) {
+    	assertInConfigurationState();
+        this.name = name;
+    }
+
+    public IDispatcher getDispatcher() {
+        return dispatcher;
+    }
+    public void setDispatcher(IDispatcher dispatcher) {
+    	assertInConfigurationState();
+        this.dispatcher = dispatcher;
+    }
 
+    public BrokerDatabase getDatabase() {
+        return database;
+    }
+    public void setDatabase(BrokerDatabase database) {
+    	assertInConfigurationState();
+        this.database = database;
+    }
     public void setStore(Store store) {
-        database = new BrokerDatabase(store, dispatcher);
+    	assertInConfigurationState();
+        database = new BrokerDatabase(store);
     }
+ 
+    // /////////////////////////////////////////////////////////////////
+    // Helper Methods
+    // /////////////////////////////////////////////////////////////////
 
+    private void assertInConfigurationState() {
+		if( state.get() != State.CONFIGURATION ) {
+			throw new IllegalStateException("Broker is not in the configuration state.");
+		}
+	}
+    
+    /**
+     * Helper method to help stop broker services and log error if they fail to start.
+     * @param server
+     */
+    private void stop(Service server) {
+		try {
+			server.stop();
+		} catch (Exception e) {
+			LOG.warn("Could not stop "+server+": "+e);
+			LOG.debug("Could not stop "+server+" due to: ", e);
+		}
+	}
+
+    private void startTransportServerWrapException(TransportServer server) {
+		try {
+			startTransportServer(server);
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+	}
+    
+	private void startTransportServer(TransportServer server) throws Exception {
+		server.setAcceptListener(new BrokerAcceptListener());
+		if (server instanceof DispatcherAware ) {
+			((DispatcherAware) server).setDispatcher(dispatcher);
+		}
+		server.start();
+	}
+
+    private void stopTransportServerWrapException(TransportServer server) {
+		try {
+			server.stop();
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+	}
+   
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java?rev=784272&r1=784271&r2=784272&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
Fri Jun 12 21:01:23 2009
@@ -27,6 +27,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.activemq.Service;
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.broker.store.Store.Callback;
 import org.apache.activemq.broker.store.Store.FatalStoreException;
@@ -36,6 +37,7 @@
 import org.apache.activemq.broker.store.Store.QueueRecord;
 import org.apache.activemq.broker.store.Store.Session;
 import org.apache.activemq.broker.store.Store.VoidCallback;
+import org.apache.activemq.dispatch.DispatcherAware;
 import org.apache.activemq.dispatch.IDispatcher;
 import org.apache.activemq.flow.AbstractLimitedFlowResource;
 import org.apache.activemq.flow.Flow;
@@ -51,7 +53,7 @@
 import org.apache.kahadb.util.LinkedNode;
 import org.apache.kahadb.util.LinkedNodeList;
 
-public class BrokerDatabase extends AbstractLimitedFlowResource<BrokerDatabase.OperationBase>
{
+public class BrokerDatabase extends AbstractLimitedFlowResource<BrokerDatabase.OperationBase>
implements Service, DispatcherAware {
 
     private static final boolean DEBUG = false;
     private final Store store;
@@ -61,7 +63,7 @@
     private final FlowController<OperationBase> storeController;
     private final int FLUSH_QUEUE_SIZE = 10000 * 1024;
 
-    private final IDispatcher dispatcher;
+    private IDispatcher dispatcher;
     private Thread flushThread;
     private AtomicBoolean running = new AtomicBoolean(false);
     private DatabaseListener listener;
@@ -91,9 +93,8 @@
         public void onDatabaseException(IOException ioe);
     }
 
-    public BrokerDatabase(Store store, IDispatcher dispatcher) {
+    public BrokerDatabase(Store store) {
         this.store = store;
-        this.dispatcher = dispatcher;
         this.opQueue = new LinkedNodeList<OperationBase>();
         storeLimiter = new SizeLimiter<OperationBase>(FLUSH_QUEUE_SIZE, 0) {
 
@@ -1097,4 +1098,11 @@
         return store.allocateStoreTracking();
     }
 
+	public IDispatcher getDispatcher() {
+		return dispatcher;
+	}
+	public void setDispatcher(IDispatcher dispatcher) {
+		this.dispatcher = dispatcher;
+	}
+
 }

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=784272&r1=784271&r2=784272&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
Fri Jun 12 21:01:23 2009
@@ -37,6 +37,7 @@
 import org.apache.activemq.metric.MetricAggregator;
 import org.apache.activemq.metric.Period;
 import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.transport.TransportFactory;
 
 public abstract class BrokerTestBase extends TestCase {
 
@@ -442,7 +443,7 @@
                 }
             }
         });
-        consumer.setUri(new URI(rcvBroker.getConnectUri()));
+        consumer.setUri(new URI(rcvBroker.getConnectUris().get(0)));
         consumer.setDestination(destination);
         consumer.setName("consumer" + (i + 1));
         consumer.setTotalConsumerRate(totalConsumerRate);
@@ -462,7 +463,7 @@
                 }
             }
         });
-        producer.setUri(new URI(sendBroker.getConnectUri()));
+        producer.setUri(new URI(sendBroker.getConnectUris().get(0)));
         producer.setProducerId(id + 1);
         producer.setName("producer" + (id + 1));
         producer.setDestination(destination);
@@ -477,8 +478,8 @@
     private Broker createBroker(String name, String bindURI, String connectUri) throws Exception
{
         Broker broker = new Broker();
         broker.setName(name);
-        broker.setBindUri(bindURI);
-        broker.setConnectUri(connectUri);
+        broker.addTransportServer(TransportFactory.bind(new URI(bindURI)));
+        broker.addConnectUri(connectUri);
         broker.setDispatcher(dispatcher);
         broker.setStore(createStore(broker));
         return broker;

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java?rev=784272&r1=784271&r2=784272&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/SharedQueueTest.java
Fri Jun 12 21:01:23 2009
@@ -21,9 +21,9 @@
 
 import junit.framework.TestCase;
 
+import org.apache.activemq.apollo.broker.Broker;
 import org.apache.activemq.apollo.broker.BrokerDatabase;
 import org.apache.activemq.apollo.broker.BrokerQueueStore;
-import org.apache.activemq.apollo.broker.Broker;
 import org.apache.activemq.apollo.broker.MessageDelivery;
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.broker.store.StoreFactory;
@@ -66,7 +66,8 @@
     protected void startServices() throws Exception {
         dispatcher = createDispatcher();
         dispatcher.start();
-        database = new BrokerDatabase(createStore(), dispatcher);
+        database = new BrokerDatabase(createStore());
+        database.setDispatcher(dispatcher);
         database.start();
         queueStore = new BrokerQueueStore();
         queueStore.setDatabase(database);

Added: activemq/sandbox/activemq-flow/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java?rev=784272&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java
(added)
+++ activemq/sandbox/activemq-flow/activemq-dispatcher/src/main/java/org/apache/activemq/dispatch/DispatcherAware.java
Fri Jun 12 21:01:23 2009
@@ -0,0 +1,13 @@
+package org.apache.activemq.dispatch;
+
+/**
+ * Handy interface to signal classes which would like an IDispatcher instance
+ * injected into them.
+ *  
+ * @author chirino
+ */
+public interface DispatcherAware {
+
+	public void setDispatcher(IDispatcher dispatcher);
+	
+}

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=784272&r1=784271&r2=784272&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
Fri Jun 12 21:01:23 2009
@@ -206,7 +206,7 @@
                 BrokerInfo brokerInfo = new BrokerInfo();
                 brokerInfo.setBrokerId(new BrokerId(connection.getBroker().getName()));
                 brokerInfo.setBrokerName(connection.getBroker().getName());
-                brokerInfo.setBrokerURL(connection.getBroker().getBindUri());
+                brokerInfo.setBrokerURL(connection.getBroker().getConnectUris().get(0));
                 connection.write(brokerInfo);
                 return ack(info);
             }

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java?rev=784272&r1=784271&r2=784272&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
Fri Jun 12 21:01:23 2009
@@ -93,7 +93,8 @@
     protected void startServices() throws Exception {
         dispatcher = createDispatcher();
         dispatcher.start();
-        database = new BrokerDatabase(createStore(), dispatcher);
+        database = new BrokerDatabase(createStore());
+        database.setDispatcher(dispatcher);
         database.start();
         queueStore = new BrokerQueueStore();
         queueStore.setDatabase(database);

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java?rev=784272&r1=784271&r2=784272&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockBroker.java
Fri Jun 12 21:01:23 2009
@@ -21,11 +21,11 @@
 import java.util.HashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.activemq.dispatch.DispatcherAware;
 import org.apache.activemq.dispatch.IDispatcher;
 import org.apache.activemq.dispatch.PriorityDispatcher;
 import org.apache.activemq.flow.IFlowSink;
 import org.apache.activemq.flow.Commands.Destination;
-import org.apache.activemq.transport.DispatchableTransportServer;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportAcceptListener;
 import org.apache.activemq.transport.TransportFactory;
@@ -125,8 +125,8 @@
 
         transportServer = TransportFactory.bind(new URI(uri));
         transportServer.setAcceptListener(this);
-        if (transportServer instanceof DispatchableTransportServer) {
-            ((DispatchableTransportServer) transportServer).setDispatcher(dispatcher);
+        if (transportServer instanceof DispatcherAware) {
+            ((DispatcherAware) transportServer).setDispatcher(dispatcher);
         }
         transportServer.start();
 

Modified: activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/DispatchableTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/DispatchableTransport.java?rev=784272&r1=784271&r2=784272&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/DispatchableTransport.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-transport/src/main/java/org/apache/activemq/transport/DispatchableTransport.java
Fri Jun 12 21:01:23 2009
@@ -16,11 +16,9 @@
  */
 package org.apache.activemq.transport;
 
-import org.apache.activemq.dispatch.IDispatcher;
+import org.apache.activemq.dispatch.DispatcherAware;
 
-public interface DispatchableTransport extends Transport {
-
-    public void setDispatcher(IDispatcher dispatcher);
+public interface DispatchableTransport extends Transport, DispatcherAware {
 
     public void setDispatchPriority(int priority);
 



Mime
View raw message