activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r355920 - in /incubator/activemq/activemq-core/src: main/java/org/activemq/broker/ main/java/org/activemq/transport/discovery/ main/java/org/activemq/transport/discovery/multicast/ main/java/org/activemq/transport/discovery/rendezvous/ main...
Date Sun, 11 Dec 2005 05:37:30 GMT
Author: chirino
Date: Sat Dec 10 21:37:23 2005
New Revision: 355920

URL: http://svn.apache.org/viewcvs?rev=355920&view=rev
Log:
- Revamped discovery a bit.
  - We now use DiscoveryAgentFactory implementations to create the DiscoveryAgent (so it can
be more flexible on how it used the discovery usi to configure the agent).
  - using the discovery:tcp://localhost syntax when binding a transport seemed fishy.  a transport
now support a discoveryUri property that can be configured like "multicast://groupname"
  - You can now programaticaly configure the embedded broker that the vm connection factory
starts up.
  - Peer switch to using explicity configuration of the embedded broker.

Added:
    incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/multicast/MulticastDiscoveryAgentFactory.java
  (with props)
    incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgentFactory.java
  (with props)
    incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/simple/SimpleDiscoveryAgentFactory.java
  (with props)
Removed:
    incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/DiscoveryTransportServer.java
Modified:
    incubator/activemq/activemq-core/src/main/java/org/activemq/broker/TransportConnector.java
    incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/DiscoveryAgentFactory.java
    incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/DiscoveryTransportFactory.java
    incubator/activemq/activemq-core/src/main/java/org/activemq/transport/peer/PeerTransportFactory.java
    incubator/activemq/activemq-core/src/main/java/org/activemq/transport/vm/VMTransportFactory.java
    incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/multicast
    incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/rendezvous
    incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/simple
    incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/static
    incubator/activemq/activemq-core/src/test/java/org/activemq/network/NetworkTestSupport.java
    incubator/activemq/activemq-core/src/test/java/org/activemq/transport/discovery/DiscoveryTransportBrokerTest.java
    incubator/activemq/activemq-core/src/test/resources/activemq.xml

Modified: incubator/activemq/activemq-core/src/main/java/org/activemq/broker/TransportConnector.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/activemq-core/src/main/java/org/activemq/broker/TransportConnector.java?rev=355920&r1=355919&r2=355920&view=diff
==============================================================================
--- incubator/activemq/activemq-core/src/main/java/org/activemq/broker/TransportConnector.java
(original)
+++ incubator/activemq/activemq-core/src/main/java/org/activemq/broker/TransportConnector.java
Sat Dec 10 21:37:23 2005
@@ -34,6 +34,9 @@
 import org.activemq.transport.TransportAcceptListener;
 import org.activemq.transport.TransportFactory;
 import org.activemq.transport.TransportServer;
+import org.activemq.transport.discovery.DiscoveryAgent;
+import org.activemq.transport.discovery.DiscoveryAgentFactory;
+import org.activemq.util.ServiceStopper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -56,6 +59,10 @@
     private TaskRunnerFactory taskRunnerFactory = null;
     protected CopyOnWriteArrayList connections = new CopyOnWriteArrayList();
     protected TransportStatusDetector statusDector;
+    private DiscoveryAgent discoveryAgent;
+    private URI discoveryUri;
+
+    private URI connectUri;
 
     /**
      * @return Returns the connections.
@@ -160,18 +167,30 @@
     public void start() throws Exception {
         getServer().start();
         log.info("Accepting connection on: "+getServer().getConnectURI());
+
+        DiscoveryAgent da = getDiscoveryAgent();
+        if( da!=null ) {
+            da.registerService(getConnectUri().toString());
+            da.start();
+        }
+
         this.statusDector.start();
     }
 
     public void stop() throws Exception {
+        ServiceStopper ss = new ServiceStopper();
+        if( discoveryAgent!=null ) {
+            ss.stop(discoveryAgent);
+        }
         if (server != null) {
-            server.stop();
+            ss.stop(server);
         }
         this.statusDector.stop();
         for (Iterator iter = connections.iterator(); iter.hasNext();) {
             ConnectionContext context = (ConnectionContext) iter.next();
-            context.getConnection().stop();
+            ss.stop(context.getConnection());
         }
+        ss.throwFirstException();
     }
 
     // Implementation methods
@@ -209,6 +228,46 @@
             throw new IllegalArgumentException("You must specify the broker property. Maybe
this connector should be added to a broker?");
         }
         return TransportFactory.bind(broker.getBrokerId().getBrokerId(),uri);
+    }
+    
+    public DiscoveryAgent getDiscoveryAgent() throws IOException {
+        if( discoveryAgent==null ) {
+            discoveryAgent = createDiscoveryAgent();
+        }
+        return discoveryAgent;
+    }
+
+    protected DiscoveryAgent createDiscoveryAgent() throws IOException {
+        if( discoveryUri!=null ) {
+            return DiscoveryAgentFactory.createDiscoveryAgent(discoveryUri);
+        }
+        return null;
+    }
+
+    public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) {
+        this.discoveryAgent = discoveryAgent;
+    }
+
+    public URI getDiscoveryUri() {
+        return discoveryUri;
+    }
+
+    public void setDiscoveryUri(URI discoveryUri) {
+        this.discoveryUri = discoveryUri;
+    }
+
+    public URI getConnectUri() throws IOException, URISyntaxException {
+        if( connectUri==null ) {
+            if( getServer().getConnectURI()==null ) {
+                throw new IllegalStateException("The transportConnector has not been started.");
+            }
+            connectUri = getServer().getConnectURI();
+        }
+        return connectUri;
+    }
+
+    public void setConnectUri(URI transportUri) {
+        this.connectUri = transportUri;
     }
 
 }

Modified: incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/DiscoveryAgentFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/DiscoveryAgentFactory.java?rev=355920&r1=355919&r2=355920&view=diff
==============================================================================
--- incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/DiscoveryAgentFactory.java
(original)
+++ incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/DiscoveryAgentFactory.java
Sat Dec 10 21:37:23 2005
@@ -20,40 +20,60 @@
 
 import java.io.IOException;
 import java.net.URI;
-import java.util.Map;
 
 import org.activeio.FactoryFinder;
-import org.activemq.transport.discovery.simple.SimpleDiscoveryAgent;
 import org.activemq.util.IOExceptionSupport;
-import org.activemq.util.IntrospectionSupport;
-import org.activemq.util.URISupport;
-import org.activemq.util.URISupport.CompositeData;
+
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
 
 public abstract class DiscoveryAgentFactory {
 
     static final private FactoryFinder discoveryAgentFinder = new FactoryFinder("META-INF/services/org/activemq/transport/discoveryagent/");
   
-    
-    public static DiscoveryAgent createDiscoveryAgent(String type) throws IOException {
-        try {
-            return (DiscoveryAgent)discoveryAgentFinder.newInstance(type);
-        } catch (Throwable e) {
-            throw IOExceptionSupport.create("Could not create discovery agent: "+type, e);
+    static final private ConcurrentHashMap discoveryAgentFactorys = new ConcurrentHashMap();
+
+    /**
+     * @param uri
+     * @return
+     * @throws IOException
+     */
+    private static DiscoveryAgentFactory findDiscoveryAgentFactory(URI uri) throws IOException
{
+        String scheme = uri.getScheme();
+        if( scheme == null )
+            throw new IOException("DiscoveryAgent scheme not specified: [" + uri + "]");
+        DiscoveryAgentFactory daf = (DiscoveryAgentFactory) discoveryAgentFactorys.get(scheme);
+        if (daf == null) {
+            // Try to load if from a META-INF property.
+            try {
+                daf = (DiscoveryAgentFactory) discoveryAgentFinder.newInstance(scheme);
+                discoveryAgentFactorys.put(scheme, daf);
+            }
+            catch (Throwable e) {
+                throw IOExceptionSupport.create("DiscoveryAgent scheme NOT recognized: ["
+ scheme + "]", e);
+            }
         }
+        return daf;
     }
     
     public static DiscoveryAgent createDiscoveryAgent(URI uri) throws IOException {
-        try {
-            String type = ( uri.getScheme() == null ) ? uri.getPath() : uri.getScheme();
-            DiscoveryAgent rc = (DiscoveryAgent) discoveryAgentFinder.newInstance(type);
-            Map options = URISupport.parseParamters(uri);
-            IntrospectionSupport.setProperties(rc, options);
-            if( rc.getClass() == SimpleDiscoveryAgent.class ) {
-                CompositeData data = URISupport.parseComposite(uri);
-                ((SimpleDiscoveryAgent)rc).setServices(data.getComponents());
-            }
-            return rc;
-        } catch (Throwable e) {
-            throw IOExceptionSupport.create("Could not create discovery agent: "+uri, e);
-        }
-    }   
+        DiscoveryAgentFactory tf = findDiscoveryAgentFactory(uri);
+        return tf.doCreateDiscoveryAgent(uri);
+
+    }
+
+    abstract protected DiscoveryAgent doCreateDiscoveryAgent(URI uri) throws IOException;
+//    {
+//        try {
+//            String type = ( uri.getScheme() == null ) ? uri.getPath() : uri.getScheme();
+//            DiscoveryAgent rc = (DiscoveryAgent) discoveryAgentFinder.newInstance(type);
+//            Map options = URISupport.parseParamters(uri);
+//            IntrospectionSupport.setProperties(rc, options);
+//            if( rc.getClass() == SimpleDiscoveryAgent.class ) {
+//                CompositeData data = URISupport.parseComposite(uri);
+//                ((SimpleDiscoveryAgent)rc).setServices(data.getComponents());
+//            }
+//            return rc;
+//        } catch (Throwable e) {
+//            throw IOExceptionSupport.create("Could not create discovery agent: "+uri, e);
+//        }
+//    }   
 }

Modified: incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/DiscoveryTransportFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/DiscoveryTransportFactory.java?rev=355920&r1=355919&r2=355920&view=diff
==============================================================================
--- incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/DiscoveryTransportFactory.java
(original)
+++ incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/DiscoveryTransportFactory.java
Sat Dec 10 21:37:23 2005
@@ -20,15 +20,13 @@
 
 import java.io.IOException;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.HashMap;
 import java.util.Map;
+
 import org.activemq.transport.Transport;
-import org.activemq.transport.TransportFactory;
 import org.activemq.transport.TransportServer;
 import org.activemq.transport.failover.FailoverTransportFactory;
 import org.activemq.util.IntrospectionSupport;
-import org.activemq.util.URISupport;
 import org.activemq.util.URISupport.CompositeData;
 
 /**
@@ -48,33 +46,34 @@
     }
 
     public TransportServer doBind(String brokerId,URI location) throws IOException{
-        try{
-            CompositeData compositData=URISupport.parseComposite(location);
-            URI[] components=compositData.getComponents();
-            if(components.length!=1){
-                throw new IOException("Invalid location: "+location
-                                +", the location must have 1 and only 1 composite URI in
it - components = "
-                                +components.length);
-            }
-            Map parameters=new HashMap(compositData.getParameters());
-            DiscoveryTransportServer server=new DiscoveryTransportServer(TransportFactory.bind(brokerId,components[0]));
-            IntrospectionSupport.setProperties(server,parameters,"discovery");
-            DiscoveryAgent discoveryAgent=DiscoveryAgentFactory.createDiscoveryAgent(server.getDiscovery());
-            // Use the host name to configure the group of the discovery agent.
-            if(!parameters.containsKey("discovery.group")){
-                if(compositData.getHost()!=null){
-                    parameters.put("discovery.group",compositData.getHost());
-                }
-            }
-            if(!parameters.containsKey("discovery.brokerName")){
-                parameters.put("discovery.brokerName",brokerId);
-            }
-            IntrospectionSupport.setProperties(discoveryAgent,parameters,"discovery.");
-            server.setDiscoveryAgent(discoveryAgent);
-            return server;
-        }catch(URISyntaxException e){
-            throw new IOException("Invalid location: "+location);
-        }
+        throw new IOException("Invalid server URI: "+location);
+//        try{
+//            CompositeData compositData=URISupport.parseComposite(location);
+//            URI[] components=compositData.getComponents();
+//            if(components.length!=1){
+//                throw new IOException("Invalid location: "+location
+//                                +", the location must have 1 and only 1 composite URI in
it - components = "
+//                                +components.length);
+//            }
+//            Map parameters=new HashMap(compositData.getParameters());
+//            DiscoveryTransportServer server=new DiscoveryTransportServer(TransportFactory.bind(brokerId,components[0]));
+//            IntrospectionSupport.setProperties(server,parameters,"discovery");
+//            DiscoveryAgent discoveryAgent=DiscoveryAgentFactory.createDiscoveryAgent(server.getDiscovery());
+//            // Use the host name to configure the group of the discovery agent.
+//            if(!parameters.containsKey("discovery.group")){
+//                if(compositData.getHost()!=null){
+//                    parameters.put("discovery.group",compositData.getHost());
+//                }
+//            }
+//            if(!parameters.containsKey("discovery.brokerName")){
+//                parameters.put("discovery.brokerName",brokerId);
+//            }
+//            IntrospectionSupport.setProperties(discoveryAgent,parameters,"discovery.");
+//            server.setDiscoveryAgent(discoveryAgent);
+//            return server;
+//        }catch(URISyntaxException e){
+//            throw new IOException("Invalid location: "+location);
+//        }
     }
 
 }

Added: incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/multicast/MulticastDiscoveryAgentFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/multicast/MulticastDiscoveryAgentFactory.java?rev=355920&view=auto
==============================================================================
--- incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/multicast/MulticastDiscoveryAgentFactory.java
(added)
+++ incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/multicast/MulticastDiscoveryAgentFactory.java
Sat Dec 10 21:37:23 2005
@@ -0,0 +1,46 @@
+/**
+ * <a href="http://activemq.org">ActiveMQ: The Open Source Message Fabric</a>
+ *
+ * Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+package org.activemq.transport.discovery.multicast;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+
+import org.activemq.transport.discovery.DiscoveryAgent;
+import org.activemq.transport.discovery.DiscoveryAgentFactory;
+import org.activemq.util.IOExceptionSupport;
+import org.activemq.util.IntrospectionSupport;
+import org.activemq.util.URISupport;
+
+public class MulticastDiscoveryAgentFactory extends DiscoveryAgentFactory {
+
+    protected DiscoveryAgent doCreateDiscoveryAgent(URI uri) throws IOException {
+        try {
+            
+            Map options = URISupport.parseParamters(uri);
+            MulticastDiscoveryAgent rc = new MulticastDiscoveryAgent();
+            rc.setGroup(uri.getHost());
+            IntrospectionSupport.setProperties(rc, options);
+            return rc;
+            
+        } catch (Throwable e) {
+            throw IOExceptionSupport.create("Could not create discovery agent: " + uri, e);
+        }
+    }
+}

Propchange: incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/multicast/MulticastDiscoveryAgentFactory.java
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgentFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgentFactory.java?rev=355920&view=auto
==============================================================================
--- incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgentFactory.java
(added)
+++ incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgentFactory.java
Sat Dec 10 21:37:23 2005
@@ -0,0 +1,45 @@
+/**
+ * <a href="http://activemq.org">ActiveMQ: The Open Source Message Fabric</a>
+ *
+ * Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+package org.activemq.transport.discovery.rendezvous;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+
+import org.activemq.transport.discovery.DiscoveryAgent;
+import org.activemq.transport.discovery.DiscoveryAgentFactory;
+import org.activemq.util.IOExceptionSupport;
+import org.activemq.util.IntrospectionSupport;
+import org.activemq.util.URISupport;
+
+public class RendezvousDiscoveryAgentFactory extends DiscoveryAgentFactory {
+
+    protected DiscoveryAgent doCreateDiscoveryAgent(URI uri) throws IOException {
+        try {
+            Map options = URISupport.parseParamters(uri);
+            RendezvousDiscoveryAgent rc = new RendezvousDiscoveryAgent();
+            rc.setGroup(uri.getHost());
+            IntrospectionSupport.setProperties(rc, options);
+            return rc;
+            
+        } catch (Throwable e) {
+            throw IOExceptionSupport.create("Could not create discovery agent: " + uri, e);
+        }
+    }
+}

Propchange: incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgentFactory.java
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/simple/SimpleDiscoveryAgentFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/simple/SimpleDiscoveryAgentFactory.java?rev=355920&view=auto
==============================================================================
--- incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/simple/SimpleDiscoveryAgentFactory.java
(added)
+++ incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/simple/SimpleDiscoveryAgentFactory.java
Sat Dec 10 21:37:23 2005
@@ -0,0 +1,51 @@
+/**
+ * <a href="http://activemq.org">ActiveMQ: The Open Source Message Fabric</a>
+ *
+ * Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+package org.activemq.transport.discovery.simple;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+
+import org.activemq.transport.discovery.DiscoveryAgent;
+import org.activemq.transport.discovery.DiscoveryAgentFactory;
+import org.activemq.util.IOExceptionSupport;
+import org.activemq.util.IntrospectionSupport;
+import org.activemq.util.URISupport;
+import org.activemq.util.URISupport.CompositeData;
+
+public class SimpleDiscoveryAgentFactory extends DiscoveryAgentFactory {
+
+    protected DiscoveryAgent doCreateDiscoveryAgent(URI uri) throws IOException {
+        try {
+            
+            CompositeData data = URISupport.parseComposite(uri);
+            Map options = URISupport.parseParamters(uri);
+
+            SimpleDiscoveryAgent rc = new SimpleDiscoveryAgent();
+            rc.setGroup(uri.getHost());
+            IntrospectionSupport.setProperties(rc, options);
+            rc.setServices(data.getComponents());
+            
+            return rc;
+            
+        } catch (Throwable e) {
+            throw IOExceptionSupport.create("Could not create discovery agent: " + uri, e);
+        }
+    }
+}

Propchange: incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/simple/SimpleDiscoveryAgentFactory.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: incubator/activemq/activemq-core/src/main/java/org/activemq/transport/peer/PeerTransportFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/activemq-core/src/main/java/org/activemq/transport/peer/PeerTransportFactory.java?rev=355920&r1=355919&r2=355920&view=diff
==============================================================================
--- incubator/activemq/activemq-core/src/main/java/org/activemq/transport/peer/PeerTransportFactory.java
(original)
+++ incubator/activemq/activemq-core/src/main/java/org/activemq/transport/peer/PeerTransportFactory.java
Sat Dec 10 21:37:23 2005
@@ -23,20 +23,23 @@
 import java.net.URISyntaxException;
 import java.util.HashMap;
 import java.util.Map;
+
+import org.activemq.broker.BrokerService;
+import org.activemq.broker.TransportConnector;
+import org.activemq.broker.BrokerFactory.BrokerFactoryHandler;
 import org.activemq.transport.Transport;
 import org.activemq.transport.TransportFactory;
 import org.activemq.transport.TransportServer;
 import org.activemq.transport.vm.VMTransportFactory;
 import org.activemq.util.IOExceptionSupport;
 import org.activemq.util.IdGenerator;
+import org.activemq.util.IntrospectionSupport;
 import org.activemq.util.URISupport;
+
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
 
 public class PeerTransportFactory extends TransportFactory {
 
-    VMTransportFactory vmTransportFactory = new VMTransportFactory();
-
     final public static ConcurrentHashMap brokers = new ConcurrentHashMap();
 
     final public static ConcurrentHashMap connectors = new ConcurrentHashMap();
@@ -45,13 +48,14 @@
   
     private IdGenerator idGenerator = new IdGenerator("peer-");
 
+    
     public Transport doConnect(URI location) throws Exception {
-        location = convertURI(location);
+        VMTransportFactory vmTransportFactory = createTransportFactory(location);
         return vmTransportFactory.doConnect(location);
     }
 
     public Transport doCompositeConnect(URI location) throws Exception {
-        location = convertURI(location);
+        VMTransportFactory vmTransportFactory = createTransportFactory(location);
         return vmTransportFactory.doCompositeConnect(location);
     }
 
@@ -60,7 +64,7 @@
      * @return the converted URI
      * @throws URISyntaxException
      */
-    private URI convertURI(URI location) throws IOException {
+    private VMTransportFactory createTransportFactory(URI location) throws IOException {
         try {
             String group = location.getHost();
             String broker = location.getPath();
@@ -72,23 +76,34 @@
                 broker = idGenerator.generateSanitizedId();
             }
             
-            Map brokerOptions = new HashMap(URISupport.parseParamters(location));
-            if (!brokerOptions.containsKey("brokerName")){
-                brokerOptions.put("brokerName", broker);
-            }
+            final Map brokerOptions = new HashMap(URISupport.parseParamters(location));
             if (!brokerOptions.containsKey("persistent")){
                 brokerOptions.put("persistent", "false");
             }
-            
-            Map serverNetworkOptions = new HashMap();
-            serverNetworkOptions.put("discovery.group", group);
-            String serverDiscoveryOptions = URISupport.createQueryString(serverNetworkOptions);
-            
-            Map networkOptions = new HashMap();
-            networkOptions.put("group", group);
-            String discoveryOptions = URISupport.createQueryString(networkOptions);
-            location = new URI("vm:broker:(discovery:(tcp://localhost:0)?" + serverDiscoveryOptions+",network:multicast?"+discoveryOptions+")?"+URISupport.createQueryString(brokerOptions));
-            return location;
+                        
+            final URI finalLocation = new URI("vm://"+broker);
+            final String finalBroker = broker;
+            final String finalGroup = group;
+            VMTransportFactory rc = new VMTransportFactory() {
+                public Transport doConnect(URI ignore) throws Exception {
+                    return super.doConnect(finalLocation);
+                };
+                public Transport doCompositeConnect(URI ignore) throws Exception {
+                    return super.doCompositeConnect(finalLocation);
+                };
+            };
+            rc.setBrokerFactoryHandler(new BrokerFactoryHandler(){
+                public BrokerService createBroker(URI brokerURI) throws Exception {
+                    BrokerService service = new BrokerService();
+                    IntrospectionSupport.setProperties(service, brokerOptions);
+                    service.setBrokerName(finalBroker);
+                    TransportConnector c = service.addConnector("tcp://localhost:0");
+                    c.setDiscoveryUri(new URI("multicast://"+finalGroup));
+                    service.addNetworkConnector("multicast://"+finalGroup);
+                    return service;
+                }
+            });
+            return rc;
             
         } catch (URISyntaxException e) {
             throw IOExceptionSupport.create(e);

Modified: incubator/activemq/activemq-core/src/main/java/org/activemq/transport/vm/VMTransportFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/activemq-core/src/main/java/org/activemq/transport/vm/VMTransportFactory.java?rev=355920&r1=355919&r2=355920&view=diff
==============================================================================
--- incubator/activemq/activemq-core/src/main/java/org/activemq/transport/vm/VMTransportFactory.java
(original)
+++ incubator/activemq/activemq-core/src/main/java/org/activemq/transport/vm/VMTransportFactory.java
Sat Dec 10 21:37:23 2005
@@ -28,6 +28,7 @@
 import org.activemq.broker.BrokerRegistry;
 import org.activemq.broker.BrokerService;
 import org.activemq.broker.TransportConnector;
+import org.activemq.broker.BrokerFactory.BrokerFactoryHandler;
 import org.activemq.transport.MarshallingTransportFilter;
 import org.activemq.transport.Transport;
 import org.activemq.transport.TransportFactory;
@@ -46,6 +47,8 @@
     final public static ConcurrentHashMap connectors = new ConcurrentHashMap();
     final public static ConcurrentHashMap servers = new ConcurrentHashMap();
 
+    BrokerFactoryHandler brokerFactoryHandler;
+    
     public Transport doConnect(URI location) throws Exception {
         return VMTransportServer.configure(doCompositeConnect(location));
     }
@@ -87,7 +90,11 @@
             BrokerService broker = BrokerRegistry.getInstance().lookup(host);
             if (broker == null) {
                 try {
-                    broker = BrokerFactory.createBroker(brokerURI);
+                    if( brokerFactoryHandler !=null ) {
+                        broker = brokerFactoryHandler.createBroker(brokerURI);
+                    } else {
+                        broker = BrokerFactory.createBroker(brokerURI);
+                    }
                     broker.start();
                 }
                 catch (URISyntaxException e) {
@@ -151,6 +158,14 @@
                 ServiceSupport.dispose(broker);
             }
         }
+    }
+
+    public BrokerFactoryHandler getBrokerFactoryHandler() {
+        return brokerFactoryHandler;
+    }
+
+    public void setBrokerFactoryHandler(BrokerFactoryHandler brokerFactoryHandler) {
+        this.brokerFactoryHandler = brokerFactoryHandler;
     }
 
 }

Modified: incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/multicast
URL: http://svn.apache.org/viewcvs/incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/multicast?rev=355920&r1=355919&r2=355920&view=diff
==============================================================================
--- incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/multicast
(original)
+++ incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/multicast
Sat Dec 10 21:37:23 2005
@@ -1 +1 @@
-class=org.activemq.transport.discovery.multicast.MulticastDiscoveryAgent
+class=org.activemq.transport.discovery.multicast.MulticastDiscoveryAgentFactory

Modified: incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/rendezvous
URL: http://svn.apache.org/viewcvs/incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/rendezvous?rev=355920&r1=355919&r2=355920&view=diff
==============================================================================
--- incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/rendezvous
(original)
+++ incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/rendezvous
Sat Dec 10 21:37:23 2005
@@ -1 +1 @@
-class=org.activemq.transport.discovery.rendezvous.RendezvousDiscoveryAgent
+class=org.activemq.transport.discovery.rendezvous.RendezvousDiscoveryAgentFactory

Modified: incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/simple
URL: http://svn.apache.org/viewcvs/incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/simple?rev=355920&r1=355919&r2=355920&view=diff
==============================================================================
--- incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/simple
(original)
+++ incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/simple
Sat Dec 10 21:37:23 2005
@@ -1 +1 @@
-class=org.activemq.transport.discovery.simple.SimpleDiscoveryAgent
+class=org.activemq.transport.discovery.simple.SimpleDiscoveryAgentFactory

Modified: incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/static
URL: http://svn.apache.org/viewcvs/incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/static?rev=355920&r1=355919&r2=355920&view=diff
==============================================================================
--- incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/static
(original)
+++ incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/static
Sat Dec 10 21:37:23 2005
@@ -1 +1 @@
-class=org.activemq.transport.discovery.simple.SimpleDiscoveryAgent
+class=org.activemq.transport.discovery.simple.SimpleDiscoveryAgentFactory

Modified: incubator/activemq/activemq-core/src/test/java/org/activemq/network/NetworkTestSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/activemq-core/src/test/java/org/activemq/network/NetworkTestSupport.java?rev=355920&r1=355919&r2=355920&view=diff
==============================================================================
--- incubator/activemq/activemq-core/src/test/java/org/activemq/network/NetworkTestSupport.java
(original)
+++ incubator/activemq/activemq-core/src/test/java/org/activemq/network/NetworkTestSupport.java
Sat Dec 10 21:37:23 2005
@@ -18,7 +18,9 @@
 **/
 package org.activemq.network;
 
+import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Iterator;
 
@@ -48,8 +50,7 @@
     protected void setUp() throws Exception {
         
         super.setUp();
-        String brokerId = broker.getBrokerName();
-        connector = new TransportConnector(broker.getBroker(),TransportFactory.bind(brokerId,new
URI(getLocalURI())));
+        connector = createConnector();
         connector.start();
         
         remotePersistenceAdapter = createRemotePersistenceAdapter(true);
@@ -57,9 +58,32 @@
         remoteBroker = createRemoteBroker(remotePersistenceAdapter);
         remoteBroker.start();
         BrokerRegistry.getInstance().bind("remotehost", remoteBroker);
-        brokerId = remoteBroker.getBrokerName();
-        remoteConnector = new TransportConnector(remoteBroker.getBroker(),TransportFactory.bind(brokerId,new
URI(getRemoteURI())));
+        remoteConnector = createRemoteConnector();
         remoteConnector.start();
+    }
+
+    /**
+     * @return
+     * @throws Exception
+     * @throws IOException
+     * @throws URISyntaxException
+     */
+    protected TransportConnector createRemoteConnector() throws Exception, IOException, URISyntaxException
{
+        return new TransportConnector(remoteBroker.getBroker(),
+                TransportFactory.bind(broker.getBrokerName(),
+                        new URI(getRemoteURI())));
+    }
+
+    /**
+     * @param brokerId
+     * @return
+     * @throws Exception
+     * @throws IOException
+     * @throws URISyntaxException
+     */
+    protected TransportConnector createConnector() throws Exception, IOException, URISyntaxException
{
+        return new TransportConnector(broker.getBroker(),
+                TransportFactory.bind(broker.getBrokerName(), new URI(getLocalURI())));
     }
 
     protected String getRemoteURI() {

Modified: incubator/activemq/activemq-core/src/test/java/org/activemq/transport/discovery/DiscoveryTransportBrokerTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/activemq-core/src/test/java/org/activemq/transport/discovery/DiscoveryTransportBrokerTest.java?rev=355920&r1=355919&r2=355920&view=diff
==============================================================================
--- incubator/activemq/activemq-core/src/test/java/org/activemq/transport/discovery/DiscoveryTransportBrokerTest.java
(original)
+++ incubator/activemq/activemq-core/src/test/java/org/activemq/transport/discovery/DiscoveryTransportBrokerTest.java
Sat Dec 10 21:37:23 2005
@@ -18,7 +18,9 @@
 **/
 package org.activemq.transport.discovery;
 
+import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
 
 import javax.jms.DeliveryMode;
 
@@ -110,15 +112,27 @@
     }
     
     protected String getLocalURI() {
-        return "discovery://tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true";
+        return "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true";
     }
     
     protected String getRemoteURI() {
-        return "discovery://tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true";
+        return "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true";
     }
     
+    protected TransportConnector createConnector() throws Exception, IOException, URISyntaxException
{
+        TransportConnector x = super.createConnector();
+        x.setDiscoveryUri(new URI("multicast://default"));
+        return x;
+    }
+    
+    protected TransportConnector createRemoteConnector() throws Exception, IOException, URISyntaxException
{
+        TransportConnector x = super.createRemoteConnector();
+        x.setDiscoveryUri(new URI("multicast://default"));
+        return x;
+    }
+        
     protected StubConnection createFailoverConnection() throws Exception {        
-        URI failoverURI = new URI("discovery://multicast");
+        URI failoverURI = new URI("discovery:multicast://default");
         Transport transport = TransportFactory.connect(failoverURI);
         StubConnection connection = new StubConnection(transport);
         connections.add(connection);

Modified: incubator/activemq/activemq-core/src/test/resources/activemq.xml
URL: http://svn.apache.org/viewcvs/incubator/activemq/activemq-core/src/test/resources/activemq.xml?rev=355920&r1=355919&r2=355920&view=diff
==============================================================================
--- incubator/activemq/activemq-core/src/test/resources/activemq.xml (original)
+++ incubator/activemq/activemq-core/src/test/resources/activemq.xml Sat Dec 10 21:37:23 2005
@@ -1,21 +1,18 @@
 <!-- START SNIPPET: xbean -->
 <beans xmlns="http://activemq.org/config/1.0">
 
-  <broker useJmx="true">
+  <broker useJmx="false">
   
     <persistenceAdapter>
       <journaledJDBC journalLogFiles="5" dataDirectory="foo"/>
     </persistenceAdapter>
   
     <transportConnectors>
-      <transportConnector uri="discovery:tcp://localhost:0"/>
+      <transportConnector uri="tcp://localhost:0" discoveryUri="rendezvous://default"/>
     </transportConnectors>
     
     <networkConnectors>
-      <networkConnector uri="rendezvous"/>
-      <!--  
-      <networkConnector uri="static://(tcp://host1:61616,tcp://host2:61616)"/>
-      -->
+      <networkConnector uri="rendezvous://default"/>
     </networkConnectors>
     
   </broker>



Mime
View raw message