activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r735986 - in /activemq/activemq-blaze/trunk/src: main/java/org/apache/activeblaze/ main/java/org/apache/activeblaze/cluster/ main/java/org/apache/activeblaze/group/ main/java/org/apache/activeblaze/impl/network/ main/java/org/apache/activeb...
Date Tue, 20 Jan 2009 10:40:09 GMT
Author: rajdavies
Date: Tue Jan 20 02:40:08 2009
New Revision: 735986

URL: http://svn.apache.org/viewvc?rev=735986&view=rev
Log:
More fixes for pointcast

Added:
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/URISupport.java   (with props)
    activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/network/
    activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/network/mcast
    activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/network/multicast
    activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/network/static
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazePointcastChannelTest.java   (with props)
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazePointcastClusterGroupChannelTest.java   (with props)
Modified:
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeConfiguration.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Processor.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java
    activemq/activemq-blaze/trunk/src/main/proto/blaze.proto
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelTest.java
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java?rev=735986&r1=735985&r2=735986&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java Tue Jan 20 02:40:08 2009
@@ -26,6 +26,7 @@
 
     AtomicBoolean initialialized = new AtomicBoolean();
     AtomicBoolean started = new AtomicBoolean();
+    AtomicBoolean hasStarted = new AtomicBoolean();
    
     public boolean init() throws Exception {
         boolean result =  this.initialialized.compareAndSet(false, true);
@@ -50,6 +51,7 @@
         }
         boolean result =  this.started.compareAndSet(false, true); 
         this.started.set(true);
+        this.hasStarted.set(true);
         return result;
     }
 
@@ -75,4 +77,7 @@
     public boolean isShutDown() {
        return !isInitialized();
     }
+    public boolean hasStarted() {
+        return hasStarted.get();
+    }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java?rev=735986&r1=735985&r2=735986&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java Tue Jan 20 02:40:08 2009
@@ -16,7 +16,6 @@
  */
 package org.apache.activeblaze;
 
-import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -52,10 +51,8 @@
     protected AtomicLong sequence = new AtomicLong();
     protected AtomicLong session = new AtomicLong(1);
     protected Processor broadcast;
-    protected Buffer managementURIBuffer;
     protected BlazeConfiguration configuration = new BlazeConfiguration();
     private String id;
-    private InetSocketAddress toAddress;
 
     /**
      * Constructor
@@ -103,13 +100,10 @@
             broadcastURIStr = PropertyUtil.addPropertiesToURIFromBean(broadcastURIStr, getConfiguration());
             URI broadcastURI = new URI(broadcastURIStr);
             URI managementURI = null;
-            if (getConfiguration().getManagementURI() != null) {
+            String managementURIStr = getConfiguration().getManagementURI();
+            if (managementURIStr != null && managementURIStr.length() > 0) {
                 managementURI = new URI(getConfiguration().getManagementURI());
-                this.managementURIBuffer = new Buffer(managementURI.toString());
-            } else {
-                this.managementURIBuffer = new Buffer();
             }
-            this.toAddress = new InetSocketAddress(broadcastURI.getHost(), broadcastURI.getPort());
             Network transport = NetworkFactory.get(broadcastURI, managementURI);
             transport.setName(getId());
             this.broadcast = configureProcess(transport);
@@ -143,7 +137,7 @@
 
     public boolean start() throws Exception {
         boolean result = super.start();
-        if (result) {
+        if (true ||result) {
             this.broadcast.start();
         }
         return result;
@@ -164,15 +158,12 @@
         blazeData.setDestination(new Buffer(destination));
         PacketData packetData = getPacketData(MessageType.BLAZE_DATA, blazeData);
         packetData.setReliable(true);
-        packetData.setFromAddress(this.managementURIBuffer);
         Packet packet = new Packet(packetData);
-        packet.setTo(this.toAddress);
         this.broadcast.downStream(packet);
     }
 
-    protected synchronized PacketData getPacketData(MessageType type, Message<?> message) {
+    protected final synchronized PacketData getPacketData(MessageType type, Message<?> message) {
         PacketData packetData = new PacketData();
-        packetData.setFromAddress(this.managementURIBuffer);
         packetData.setType(type.getNumber());
         packetData.setProducerId(this.producerId);
         packetData.setSessionId(this.session.get());

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeConfiguration.java?rev=735986&r1=735985&r2=735986&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeConfiguration.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeConfiguration.java Tue Jan 20 02:40:08 2009
@@ -30,12 +30,11 @@
     public static final int DEFAULT_MAX_PACKET_SIZE = 4 * 1024;
     // transport bindings
     private int unicastPort = 0;
-    private String unicastURI = "udp://localhost:0";
     private String broadcastURI = "mcast://224.2.2.2:9999";
-    private String managementURI = "mcast://224.2.2.2:8888";
+    private String managementURI = "";
     private int maxDispatchQueueSize = 10000;
     private int maxPacketSize = DEFAULT_MAX_PACKET_SIZE;
-    //reliability
+    // reliability
     private String reliable = "simple";
 
     /**
@@ -54,21 +53,6 @@
     }
 
     /**
-     * @return the unicastURL
-     */
-    public String getUnicastURI() {
-        return this.unicastURI;
-    }
-
-    /**
-     * @param unicastURI
-     *            the unicastURI to set
-     */
-    public void setUnicastURI(String unicastURI) {
-        this.unicastURI = unicastURI;
-    }
-
-    /**
      * @return the broadcastURL
      */
     public String getBroadcastURI() {
@@ -82,6 +66,7 @@
     public void setBroadcastURI(String broadcastURL) {
         this.broadcastURI = broadcastURL;
     }
+
     /**
      * @return the maxDispatchQueueSize
      */
@@ -129,11 +114,12 @@
 
     /**
      * Copy the configuration
+     * 
      * @return a deep copy of the configuration
      * @throws Exception
      */
     public final BlazeConfiguration copy() throws Exception {
-        Map<String,String>props = PropertyUtil.getProperties(this);
+        Map<String, String> props = PropertyUtil.getProperties(this);
         BlazeConfiguration result = newInstance();
         PropertyUtil.setProperties(result, props);
         return result;
@@ -151,7 +137,8 @@
     }
 
     /**
-     * @param reliable the reliable to set
+     * @param reliable
+     *            the reliable to set
      */
     public void setReliable(String reliable) {
         this.reliable = reliable;

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Processor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Processor.java?rev=735986&r1=735985&r2=735986&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Processor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Processor.java Tue Jan 20 02:40:08 2009
@@ -30,6 +30,13 @@
 	 */
 	void downStream(Packet packet) throws Exception;
 	
+	 /**
+     * Send a management packet - this may be on a different address
+     * @param packet
+     * @throws Exception
+     */
+    public void downStreamManagement(Packet packet) throws Exception;
+	
 	/**
 	 * @param packet
 	 * @throws Exception

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java?rev=735986&r1=735985&r2=735986&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java Tue Jan 20 02:40:08 2009
@@ -52,7 +52,7 @@
     }
 
     /**
-     * @return
+     * @return the state
      * @see org.apache.activeblaze.cluster.BlazeClusterGroupChannel#getState()
      */
     public ClusterState getState() {
@@ -239,7 +239,6 @@
             PacketData data = getPacketData(type, message);
             data.setReliable(true);
             data.setResponseRequired(false);
-            data.setFromAddress(this.inboxURI);
             Packet packet = new Packet(data);
             packet.setTo(to.getAddress());
             synchronized (this.messageRequests) {

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java?rev=735986&r1=735985&r2=735986&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java Tue Jan 20 02:40:08 2009
@@ -55,14 +55,13 @@
     private static final Log LOG = LogFactory.getLog(BlazeGroupChannelImpl.class);
     private final String name;
     protected Processor unicast;
-    private BaseTransport groupManagementTransport;
-    private InetSocketAddress toManagementAddress;
     private MemberImpl local;
     private BlazeQueueListener inboxListener;
     protected Map<Buffer, SendRequest> messageRequests = new LRUCache<Buffer, SendRequest>(10000);
     private Map<Buffer, BlazeQueueListener> queueMessageListenerMap = new ConcurrentHashMap<Buffer, BlazeQueueListener>();
     private Group group;
-    protected Buffer inboxURI;
+    protected Buffer inboxAddress;
+    protected int inBoxPort;
     protected final Object localMutex = new Object();
 
     /**
@@ -76,7 +75,7 @@
     }
 
     /**
-     * @return
+     * @return true if initialized
      * @throws Exception
      * @see org.apache.activeblaze.Service#init()
      */
@@ -86,7 +85,7 @@
             String unicastURIStr = getConfiguration().getUnicastURI();
             unicastURIStr = PropertyUtil.addPropertiesToURIFromBean(unicastURIStr, getConfiguration());
             URI unicastURI = new URI(unicastURIStr);
-            this.inboxURI = new Buffer(unicastURIStr);
+            
             BaseTransport transport = TransportFactory.get(unicastURI);
             transport.setName(getId() + "-Unicast");
             this.unicast = configureProcess(transport);
@@ -94,16 +93,9 @@
             // if using a port of zero - the port will be assigned automatically,
             // so need to get the potentially new value
             unicastURI = transport.getLocalURI();
-            // append configuration properties
-            String groupManagementURIStr = getConfiguration().getGroupManagementURI();
-            groupManagementURIStr = PropertyUtil.addPropertiesToURIFromBean(groupManagementURIStr, getConfiguration());
-            URI groupManagementURI = new URI(groupManagementURIStr);
-            this.toManagementAddress = new InetSocketAddress(groupManagementURI.getHost(), groupManagementURI.getPort());
-            this.groupManagementTransport = TransportFactory.get(groupManagementURI);
-            this.groupManagementTransport.setMaxPacketSize(getConfiguration().getMaxPacketSize());
-            this.groupManagementTransport.setPrev(this);
-            this.groupManagementTransport.setName(getId() + "-HeartbeatTransport");
-            this.groupManagementTransport.init();
+            InetSocketAddress addr = new InetSocketAddress(unicastURI.getHost(),unicastURI.getPort());
+            this.inboxAddress=new Buffer(addr.getAddress().getAddress());
+            this.inBoxPort=addr.getPort();
             this.local = createLocal(unicastURI);
             this.group = createGroup();
         }
@@ -119,7 +111,7 @@
     }
 
     /**
-     * @return
+     * @return true if shutDown
      * @throws Exception
      * @see org.apache.activeblaze.Service#shutDown()
      */
@@ -127,21 +119,20 @@
         boolean result = super.shutDown();
         if (result) {
             this.group.shutDown();
-            this.groupManagementTransport.shutDown();
+           
             this.unicast.shutDown();
         }
         return result;
     }
 
     /**
-     * @return
+     * @return true if started
      * @throws Exception
      * @see org.apache.activeblaze.Service#start()
      */
     public boolean start() throws Exception {
         boolean result = super.start();
         if (result) {
-            this.groupManagementTransport.start();
             this.unicast.start();
             this.group.start();
         }
@@ -149,7 +140,7 @@
     }
 
     /**
-     * @return
+     * @return true if stopped
      * @throws Exception
      * @see org.apache.activeblaze.Service#stop()
      */
@@ -157,7 +148,7 @@
         boolean result = super.stop();
         if (result) {
             this.group.stop();
-            this.groupManagementTransport.stop();
+          
             this.unicast.stop();
         }
         return result;
@@ -187,7 +178,7 @@
 
     /**
      * @return this channel's configuration
-     * @see org.apache.activeblaze.group.BlazeGroupChannel#getGroupConfiguration()
+     * @see org.apache.activeblaze.group.BlazeGroupChannel#getConfiguration()
      */
     public BlazeGroupConfiguration getConfiguration() {
         return (BlazeGroupConfiguration) this.configuration;
@@ -291,6 +282,14 @@
                     send(member, key, message);
                     return;
                 } catch (BlazeNoRouteException e) {
+                   LOG.debug("No response - resending to another client",e);
+                   
+                   
+                   
+                   
+                   
+                   
+                   
                 }
             } else {
                 return;
@@ -407,7 +406,6 @@
             synchronized (this.messageRequests) {
                 this.messageRequests.put(packetData.getMessageId(), request);
             }
-            packetData.setFromAddress(this.inboxURI);
             Packet packet = new Packet(packetData);
             packet.setTo((member).getAddress());
             this.unicast.downStream(packet);
@@ -432,7 +430,6 @@
         PacketData data = getPacketData(blazeData.type(), blazeData);
         data.setCorrelationId(new Buffer(correlationId));
         data.setReliable(true);
-        data.setFromAddress(this.inboxURI);
         Packet packet = new Packet(data);
         packet.setTo(((MemberImpl) to).getAddress());
         this.unicast.downStream(packet);
@@ -450,7 +447,6 @@
         PacketData data = getPacketData(MessageType.BLAZE_DATA, blazeData);
         data.setReliable(true);
         data.setResponseRequired(true);
-        data.setFromAddress(this.inboxURI);
         Packet packet = new Packet(data);
         packet.setTo(member.getAddress());
         this.unicast.downStream(packet);
@@ -619,10 +615,8 @@
     public void broadcastMessage(MessageType messageType, Message<?> message) throws Exception {
         PacketData data = getPacketData(messageType, message);
         data.setReliable(false);
-        data.setFromAddress(this.inboxURI);
         Packet packet = new Packet(data);
-        packet.setTo(this.toManagementAddress);
-        this.groupManagementTransport.downStream(packet);
+        this.broadcast.downStreamManagement(packet);
     }
 
     /**
@@ -642,7 +636,6 @@
             this.messageRequests.put(data.getMessageId(), request);
         }
         data.setReliable(false);
-        data.setFromAddress(this.inboxURI);
         Packet packet = new Packet(data);
         packet.setTo(member.getAddress());
         this.unicast.downStream(packet);
@@ -660,10 +653,9 @@
         PacketData data = getPacketData(messageType, message);
         data.setCorrelationId(new Buffer(correlationId));
         data.setReliable(true);
-        data.setFromAddress(this.inboxURI);
         Packet packet = new Packet(data);
-        packet.setTo(this.toManagementAddress);
-        this.groupManagementTransport.downStream(packet);
+        
+        this.broadcast.downStreamManagement(packet);
     }
 
     /**
@@ -675,7 +667,6 @@
     public void sendMessage(InetSocketAddress to, MessageType messageType, Message<?> message) throws Exception {
         PacketData data = getPacketData(messageType, message);
         data.setReliable(false);
-        data.setFromAddress(this.inboxURI);
         Packet packet = new Packet(data);
         packet.setTo(to);
         this.unicast.downStream(packet);
@@ -693,7 +684,6 @@
         PacketData data = getPacketData(messageType, message);
         data.setCorrelationId(new Buffer(correlationId));
         data.setReliable(false);
-        data.setFromAddress(this.inboxURI);
         Packet packet = new Packet(data);
         packet.setTo(to.getAddress());
         this.unicast.downStream(packet);

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java?rev=735986&r1=735985&r2=735986&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java Tue Jan 20 02:40:08 2009
@@ -20,29 +20,12 @@
 
 /**
  * Configuration for a BlazeGroupChannel
- *
+ * 
  */
 public class BlazeGroupConfiguration extends BlazeConfiguration {
-    private String groupManagementURI = "mcast://224.2.2.2:8888";
-    
     private int heartBeatInterval = 800;
-   
-    /**
-     * @return the groupManagementUTI
-     */
-    public String getGroupManagementURI() {
-        return this.groupManagementURI;
-    }
-
-    /**
-     * @param groupManagementURI
-     *            the groupManagementURI to set
-     */
-    public void setGroupManagementURI(String groupManagementURI) {
-        this.groupManagementURI = groupManagementURI;
-    }
+    private String unicastURI = "udp://localhost:0";
 
-    
     /**
      * @return the heartBeatInterval
      */
@@ -51,13 +34,29 @@
     }
 
     /**
-     * @param heartBeatInterval the heartBeatInterval to set
+     * @param heartBeatInterval
+     *            the heartBeatInterval to set
      */
     public void setHeartBeatInterval(int heartBeatInterval) {
         this.heartBeatInterval = heartBeatInterval;
     }
-    
+
     protected BlazeConfiguration newInstance() {
         return new BlazeGroupConfiguration();
     }
+
+    /**
+     * @return the unicastURL
+     */
+    public String getUnicastURI() {
+        return this.unicastURI;
+    }
+
+    /**
+     * @param unicastURI
+     *            the unicastURI to set
+     */
+    public void setUnicastURI(String unicastURI) {
+        this.unicastURI = unicastURI;
+    }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java?rev=735986&r1=735985&r2=735986&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java Tue Jan 20 02:40:08 2009
@@ -16,6 +16,7 @@
  */
 package org.apache.activeblaze.impl.network;
 
+import java.net.InetSocketAddress;
 import java.net.URI;
 import org.apache.activeblaze.ExceptionListener;
 import org.apache.activeblaze.Processor;
@@ -23,18 +24,20 @@
 import org.apache.activeblaze.impl.processor.Packet;
 import org.apache.activeblaze.impl.transport.BaseTransport;
 import org.apache.activeblaze.impl.transport.TransportFactory;
+import org.apache.activemq.protobuf.Buffer;
 
 /**
  * Uses multicast to implement a Network
- *
+ * 
  */
-public class MulticastNetwork  extends DefaultChainedProcessor implements Network, ExceptionListener{
-    
+public class MulticastNetwork extends DefaultChainedProcessor implements Network, ExceptionListener {
     private URI uri;
     private URI managementURI;
     private BaseTransport broadcast;
     private BaseTransport management;
     private String name = "";
+    private InetSocketAddress broadcastAddress;
+    private InetSocketAddress managementAddress;
     
     /**
      * @return the name
@@ -42,8 +45,10 @@
     public String getName() {
         return this.name;
     }
+
     /**
-     * @param name the name to set
+     * @param name
+     *            the name to set
      */
     public void setName(String name) {
         this.name = name;
@@ -54,9 +59,7 @@
      * @see org.apache.activeblaze.impl.network.Network#setManagementURI(java.net.URI)
      */
     public void setManagementURI(URI uri) {
-     this.managementURI=uri;
-     
-        
+        this.managementURI = uri;
     }
 
     /**
@@ -64,12 +67,9 @@
      * @see org.apache.activeblaze.impl.network.Network#setURI(java.net.URI)
      */
     public void setURI(URI uri) {
-        this.uri=uri;
-        
+        this.uri = uri;
     }
 
-    
-   
     /**
      * @return true if initialized
      * @throws Exception
@@ -78,26 +78,27 @@
     public boolean init() throws Exception {
         boolean result = super.init();
         if (result) {
-            this.broadcast  = TransportFactory.get(this.uri);
+            this.broadcast = TransportFactory.get(this.uri);
             this.broadcast.setName(getName() + "-Broadcast");
             this.broadcast.setExceptionListener(this);
             this.broadcast.setPrev(getPrev());
             this.broadcast.setNext(getNext());
+            this.broadcastAddress = new InetSocketAddress(this.uri.getHost(), this.uri.getPort());
             this.broadcast.init();
-            if(this.managementURI != null && !this.managementURI.equals(this.uri)){
-                this.management =  TransportFactory.get(this.managementURI);
+            if (this.managementURI != null && !this.managementURI.equals(this.uri)) {
+                this.management = TransportFactory.get(this.managementURI);
                 this.management.setName(getName() + "-Management");
                 this.management.setExceptionListener(this);
                 this.management.setPrev(getPrev());
                 this.management.setNext(getNext());
+                this.managementAddress = new InetSocketAddress(this.managementURI.getHost(), this.managementURI
+                        .getPort());
                 this.management.init();
-                
             }
         }
         return result;
     }
 
-  
     /**
      * @return true if shutDown
      * @throws Exception
@@ -105,10 +106,10 @@
      */
     public boolean shutDown() throws Exception {
         boolean result = super.shutDown();
-        if (this.broadcast!=null) {
+        if (this.broadcast != null) {
             this.broadcast.shutDown();
         }
-        if (this.management!=null) {
+        if (this.management != null) {
             this.management.shutDown();
         }
         return result;
@@ -121,10 +122,10 @@
      */
     public boolean start() throws Exception {
         boolean result = super.start();
-        if (this.broadcast!=null) {
+        if (this.broadcast != null) {
             this.broadcast.start();
         }
-        if (this.management!=null) {
+        if (this.management != null) {
             this.management.start();
         }
         return result;
@@ -137,16 +138,15 @@
      */
     public boolean stop() throws Exception {
         boolean result = super.stop();
-        if (this.broadcast!=null) {
+        if (this.broadcast != null) {
             this.broadcast.stop();
         }
-        if (this.management!=null) {
+        if (this.management != null) {
             this.management.stop();
         }
         return result;
     }
-    
-    
+
     /**
      * @param packet
      * @throws Exception
@@ -154,11 +154,11 @@
      */
     public void downStreamManagement(Packet packet) throws Exception {
         if (this.management != null) {
+            packet.setTo(this.managementAddress);
             this.management.downStream(packet);
-        }else {
-            this.broadcast.downStream(packet);
+        } else {
+            downStream(packet);
         }
-        
     }
 
     /**
@@ -167,8 +167,8 @@
      * @see org.apache.activeblaze.Processor#downStream(org.apache.activeblaze.impl.processor.Packet)
      */
     public void downStream(Packet packet) throws Exception {
+        packet.setTo(this.broadcastAddress);
         this.broadcast.downStream(packet);
-        
     }
 
     /**
@@ -177,35 +177,35 @@
      */
     public void setExceptionListener(ExceptionListener l) {
         // TODO Auto-generated method stub
-        
     }
+
     /**
      * @param ex
      * @see org.apache.activeblaze.ExceptionListener#onException(java.lang.Exception)
      */
     public void onException(Exception ex) {
-        if (this.exceptionListener!=null) {
+        if (this.exceptionListener != null) {
             this.exceptionListener.onException(ex);
         }
-        
     }
-    
+
     public void setNext(Processor next) {
         super.setNext(next);
         if (this.management != null) {
             this.management.setNext(next);
         }
-        if(this.broadcast!=null){
+        if (this.broadcast != null) {
             this.broadcast.setNext(next);
         }
     }
-    
+
     public void setPrev(Processor prev) {
         super.setPrev(prev);
         if (this.management != null) {
-            this.management.setPrev(prev);;
+            this.management.setPrev(prev);
+            ;
         }
-        if(this.broadcast!=null){
+        if (this.broadcast != null) {
             this.broadcast.setPrev(prev);
         }
     }
@@ -214,10 +214,8 @@
         if (this.management != null) {
             this.management.setMaxPacketSize(maxPacketSize);
         }
-        if(this.broadcast!=null){
+        if (this.broadcast != null) {
             this.broadcast.setMaxPacketSize(maxPacketSize);
         }
     }
-    
-    
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java?rev=735986&r1=735985&r2=735986&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/Network.java Tue Jan 20 02:40:08 2009
@@ -39,22 +39,19 @@
     /**
      * Set the uri for the <Code>Network</Code> to use
      * @param uri
+     * @throws Exception 
      */
-    public void setURI(URI uri);
+    public void setURI(URI uri) throws Exception;
     
     /**
      * Set the uri for the <Code>Network</Code> to use for management
      * @param uri
+     * @throws Exception 
      */
-    public void setManagementURI(URI uri);
+    public void setManagementURI(URI uri) throws Exception;
     
         
-    /**
-     * Send a management packet - this may be on a different address
-     * @param packet
-     * @throws Exception
-     */
-    public void downStreamManagement(Packet packet) throws Exception;
+   
     
     
 

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java?rev=735986&r1=735985&r2=735986&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java Tue Jan 20 02:40:08 2009
@@ -17,32 +17,46 @@
 package org.apache.activeblaze.impl.network;
 
 import java.net.URI;
+import java.util.Map;
+import org.apache.activeblaze.util.ObjectFinder;
+import org.apache.activeblaze.util.PropertyUtil;
 
 /**
  * create a new Network instance
- *
+ * 
  */
-public class NetworkFactory {
-    
+public abstract class NetworkFactory {
+    private static final ObjectFinder OBJECT_FINDER = new ObjectFinder(
+            "META-INF/services/org/apache/activeblaze/network/");
+
     /**
-     * @param broadcast 
-     * @param management 
+     * @param broadcast
+     * @param management
      * @return the network associated with the URI
      * @throws Exception
      */
     public static Network get(URI broadcast, URI management) throws Exception {
-        Network result = null;
-        String scheme = broadcast.getScheme();
-        scheme = scheme.trim();
-        if (scheme.equalsIgnoreCase("mcast") || scheme.equalsIgnoreCase("multicast")){
-            result = new MulticastNetwork();
+        Network result = findNetwork(broadcast);
+        result.setURI(broadcast);
+        result.setManagementURI(management);
+        configureNetwork(result, broadcast);
+        configureNetwork(result, management);
+        return result;
+    }
+
+    static void configureNetwork(Network network, URI uri) throws Exception {
+        if (network != null && uri != null) {
+            Map<String, String> options = PropertyUtil.parseParameters(uri);
+            PropertyUtil.setProperties(network, options);
         }
-        if(result != null) {
-            result.setURI(broadcast);
-            if(management!=null) {
-                result.setManagementURI(management);
-            }
+    }
+
+    private static Network findNetwork(URI location) throws Exception {
+        String scheme = location.getScheme();
+        if (scheme == null) {
+            throw new IllegalArgumentException("Transport scheme not specified: [" + location + "]");
         }
+        Network result = (Network) OBJECT_FINDER.newInstance(scheme);
         return result;
     }
 }

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java?rev=735986&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java Tue Jan 20 02:40:08 2009
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.network;
+
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.activeblaze.BlazeException;
+import org.apache.activeblaze.ExceptionListener;
+import org.apache.activeblaze.Processor;
+import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
+import org.apache.activeblaze.impl.processor.Packet;
+import org.apache.activeblaze.impl.transport.BaseTransport;
+import org.apache.activeblaze.impl.transport.TransportFactory;
+import org.apache.activeblaze.util.URISupport;
+import org.apache.activeblaze.util.URISupport.CompositeData;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Uses a list of URI's to create a network
+ * 
+ */
+public class StaticNetwork extends DefaultChainedProcessor implements Network, ExceptionListener {
+    private static final Log LOG = LogFactory.getLog(StaticNetwork.class);
+    private List<URI> broadcastURIs = new ArrayList<URI>();
+    private List<URI> managementURIs = new ArrayList<URI>();
+    private List<InetSocketAddress> broadcastAddresses = new ArrayList<InetSocketAddress>();
+    private List<InetSocketAddress> managementAddresses = new ArrayList<InetSocketAddress>();
+    private BaseTransport broadcast;
+    private BaseTransport management;
+    private String name = "";
+    private boolean useFirstFreeAddress;
+
+    /**
+     * @return the name
+     */
+    public String getName() {
+        return this.name;
+    }
+
+    /**
+     * @param name
+     *            the name to set
+     */
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    /**
+     * @param location
+     * @throws Exception
+     * @see org.apache.activeblaze.impl.network.Network#setManagementURI(java.net.URI)
+     */
+    public void setManagementURI(URI location) throws Exception {
+        if (location != null) {
+            CompositeData compositeData = URISupport.parseComposite(location);
+            if (compositeData.getComponents() != null) {
+                for (int i = 0; i < compositeData.getComponents().length; i++) {
+                    URI uri = compositeData.getComponents()[i];
+                    if (!this.managementURIs.contains(uri)) {
+                        this.managementURIs.add(uri);
+                        this.managementAddresses.add(new InetSocketAddress(uri.getHost(), uri.getPort()));
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * @param location
+     * @throws Exception
+     * @see org.apache.activeblaze.impl.network.Network#setURI(java.net.URI)
+     */
+    public void setURI(URI location) throws Exception {
+        if (location != null) {
+            CompositeData compositeData = URISupport.parseComposite(location);
+            if (compositeData.getComponents() != null) {
+                for (int i = 0; i < compositeData.getComponents().length; i++) {
+                    URI uri = compositeData.getComponents()[i];
+                    if (!this.broadcastURIs.contains(uri)) {
+                        this.broadcastURIs.add(uri);
+                        this.broadcastAddresses.add(new InetSocketAddress(uri.getHost(), uri.getPort()));
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * @return true if initialized
+     * @throws Exception
+     * @see org.apache.activeblaze.Service#init()
+     */
+    public boolean init() throws Exception {
+        boolean result = super.init();
+        if (result) {
+            this.broadcast = createTransport(this.broadcastURIs);
+            this.broadcast.setName(getName() + "-Broadcast");
+            this.broadcast.setExceptionListener(this);
+            this.broadcast.setPrev(getPrev());
+            this.broadcast.setNext(getNext());
+            if (!this.managementURIs.isEmpty() && !this.managementURIs.equals(this.broadcastURIs)) {
+                this.management = createTransport(this.managementURIs);
+                this.management.setName(getName() + "-Management");
+                this.management.setExceptionListener(this);
+                this.management.setPrev(getPrev());
+                this.management.setNext(getNext());
+                this.management.init();
+            }
+        }
+        return result;
+    }
+
+    /**
+     * @return true if shutDown
+     * @throws Exception
+     * @see org.apache.activeblaze.Service#shutDown()
+     */
+    public boolean shutDown() throws Exception {
+        boolean result = super.shutDown();
+        if (this.broadcast != null) {
+            this.broadcast.shutDown();
+        }
+        if (this.management != null) {
+            this.management.shutDown();
+        }
+        return result;
+    }
+
+    /**
+     * @return true if started
+     * @throws Exception
+     * @see org.apache.activeblaze.Service#start()
+     */
+    public boolean start() throws Exception {
+        boolean result = super.start();
+        if (this.broadcast != null) {
+            this.broadcast.start();
+        }
+        if (this.management != null) {
+            this.management.start();
+        }
+        return result;
+    }
+
+    /**
+     * @return true if stopped
+     * @throws Exception
+     * @see org.apache.activeblaze.Service#stop()
+     */
+    public boolean stop() throws Exception {
+        boolean result = super.stop();
+        if (this.broadcast != null) {
+            this.broadcast.stop();
+        }
+        if (this.management != null) {
+            this.management.stop();
+        }
+        return result;
+    }
+
+    /**
+     * @param packet
+     * @throws Exception
+     * @see org.apache.activeblaze.impl.network.Network#downStreamManagement(org.apache.activeblaze.impl.processor.Packet)
+     */
+    public void downStreamManagement(Packet packet) throws Exception {
+        if (this.management != null) {
+            for (InetSocketAddress address : this.managementAddresses) {
+                packet.setTo(address);
+                this.management.downStream(packet);
+            }
+        } else {
+            downStream(packet);
+        }
+    }
+
+    /**
+     * @param packet
+     * @throws Exception
+     * @see org.apache.activeblaze.Processor#downStream(org.apache.activeblaze.impl.processor.Packet)
+     */
+    public void downStream(Packet packet) throws Exception {
+        for (InetSocketAddress address : this.broadcastAddresses) {
+            packet.setTo(address);
+            this.broadcast.downStream(packet);
+        }
+    }
+
+    /**
+     * @param l
+     * @see org.apache.activeblaze.Processor#setExceptionListener(org.apache.activeblaze.ExceptionListener)
+     */
+    public void setExceptionListener(ExceptionListener l) {
+        // TODO Auto-generated method stub
+    }
+
+    /**
+     * @param ex
+     * @see org.apache.activeblaze.ExceptionListener#onException(java.lang.Exception)
+     */
+    public void onException(Exception ex) {
+        if (this.exceptionListener != null) {
+            this.exceptionListener.onException(ex);
+        }
+    }
+
+    public void setNext(Processor next) {
+        super.setNext(next);
+        if (this.management != null) {
+            this.management.setNext(next);
+        }
+        if (this.broadcast != null) {
+            this.broadcast.setNext(next);
+        }
+    }
+
+    public void setPrev(Processor prev) {
+        super.setPrev(prev);
+        if (this.management != null) {
+            this.management.setPrev(prev);
+            ;
+        }
+        if (this.broadcast != null) {
+            this.broadcast.setPrev(prev);
+        }
+    }
+
+    public void setMaxPacketSize(int maxPacketSize) {
+        if (this.management != null) {
+            this.management.setMaxPacketSize(maxPacketSize);
+        }
+        if (this.broadcast != null) {
+            this.broadcast.setMaxPacketSize(maxPacketSize);
+        }
+    }
+
+    private BaseTransport createTransport(List<URI> uris) throws Exception {
+        BaseTransport result = null;
+        for (URI uri : uris) {
+            try {
+                result = TransportFactory.get(uri);
+                result.init();
+                LOG.info("using local address: " + uri);
+                break;
+            } catch (BindException e) {
+                result = null;
+                if (this.useFirstFreeAddress) {
+                    throw e;
+                }
+            }
+        }
+        if (result == null) {
+            throw new BlazeException("Could not bind to any uri: " + uris);
+        }
+        return result;
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java?rev=735986&r1=735985&r2=735986&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/DefaultChainedProcessor.java Tue Jan 20 02:40:08 2009
@@ -143,6 +143,17 @@
         }
         return result;
     }
+    
+    /**
+     * Send a management packet - this may be on a different address
+     * @param packet
+     * @throws Exception
+     */
+    public void downStreamManagement(Packet packet) throws Exception{
+        if (this.next!=null) {
+            this.next.downStreamManagement(packet);
+        }
+    }
 
     public void downStream(Packet packet) throws Exception {
         if (this.next != null) {

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java?rev=735986&r1=735985&r2=735986&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java Tue Jan 20 02:40:08 2009
@@ -26,7 +26,7 @@
  * 
  */
 public final class Packet {
-    final private SocketAddress from;
+    private SocketAddress from;
     private SocketAddress to;
     private String id;
     final private PacketData packetData;
@@ -50,7 +50,7 @@
      * @param data
      */
     public Packet(PacketData data) {
-        this.packetData = data;  
+        this.packetData = data;
         this.from = null;
         this.to = null;
     }
@@ -60,8 +60,9 @@
      * 
      * @param from
      * @param data
+     * @throws Exception
      */
-    public Packet(SocketAddress from, PacketData data) {
+    public Packet(SocketAddress from, PacketData data) throws Exception {
         this.from = from;
         this.packetData = data;
         this.to = null;
@@ -92,7 +93,7 @@
         this.packetData = data;
         this.from = null;
     }
-    
+
     public String toString() {
         StringBuilder builder = new StringBuilder("Packet:");
         builder.append(getId());
@@ -106,7 +107,7 @@
      * @return the id
      */
     public String getId() {
-        if (this.id==null && this.packetData!=null) {
+        if (this.id == null && this.packetData != null) {
             if (this.packetData.hasMessageId()) {
                 this.id = this.packetData.getMessageId().toStringUtf8();
             }
@@ -122,7 +123,10 @@
     }
 
     public Packet clone() {
-        return new Packet(this.id, this.packetData.clone());
+        Packet result = new Packet(this.id, this.packetData.clone());
+        result.to=this.to;
+        result.from=this.from;
+        return result;
     }
 
     /**
@@ -140,7 +144,8 @@
     }
 
     /**
-     * @param to the to to set
+     * @param to
+     *            the to to set
      */
     public void setTo(SocketAddress to) {
         this.to = to;

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java?rev=735986&r1=735985&r2=735986&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java Tue Jan 20 02:40:08 2009
@@ -98,27 +98,19 @@
                 InputStream stream = IOUtils.getByteBufferInputStream(buffer);
                 PacketData data = PacketData.parseFramed(stream);
                 stream.close();
-                if (MessageType.ACK_DATA.getNumber() == data.getType()) {
+                if (data.getResponse()) {
                     synchronized (this.messageRequests) {
                         SendRequest request = this.messageRequests.remove(data.getCorrelationId());
                         if (request != null) {
-                            MessageType type = MessageType.ACK_DATA;
-                            AckData ack = (AckData) type.createMessage();
-                            ack.mergeFramed(data.getPayload());
-                            request.put(data.getMessageId(), ack);
+                            request.put(data.getMessageId(), data);
                         }
                     }
                 } else {
                     if (data.getResponseRequired()) {
-                        MessageType type = MessageType.ACK_DATA;
-                        AckData ack = (AckData) type.createMessage();
-                        ack.setMessageId(data.getMessageId());
                         PacketData pd = new PacketData();
                         pd.setResponseRequired(false);
                         pd.setCorrelationId(data.getMessageId());
-                        pd.setType(type.getNumber());
-                        pd.setFromAddress(getBufferOfLocalURI());
-                        pd.setPayload(ack.toFramedBuffer());
+                        pd.setResponse(true);
                         Packet packet = new Packet(pd);
                         packet.setTo(address);
                         downStream(packet);
@@ -161,7 +153,7 @@
                 }
             }
         } else {
-            throw new BlazeException("Not started - trying to downStream " + packet);
+            throw new BlazeException(this + " " + outBuffer + " Not started - trying to downStream " + packet);
         }
     }
 }

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/URISupport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/URISupport.java?rev=735986&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/URISupport.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/URISupport.java Tue Jan 20 02:40:08 2009
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.util;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @version $Revision$
+ */
+public class URISupport {
+
+    public static class CompositeData {
+        private String host;
+        private String scheme;
+        private String path;
+        private URI components[];
+        private Map<String, String> parameters;
+        private String fragment;
+
+        public URI[] getComponents() {
+            return components;
+        }
+
+        public String getFragment() {
+            return fragment;
+        }
+
+        public Map<String, String> getParameters() {
+            return parameters;
+        }
+
+        public String getScheme() {
+            return scheme;
+        }
+
+        public String getPath() {
+            return path;
+        }
+
+        public String getHost() {
+            return host;
+        }
+
+        public URI toURI() throws URISyntaxException {
+            StringBuffer sb = new StringBuffer();
+            if (scheme != null) {
+                sb.append(scheme);
+                sb.append(':');
+            }
+
+            if (host != null && host.length() != 0) {
+                sb.append(host);
+            } else {
+                sb.append('(');
+                for (int i = 0; i < components.length; i++) {
+                    if (i != 0) {
+                        sb.append(',');
+                    }
+                    sb.append(components[i].toString());
+                }
+                sb.append(')');
+            }
+
+            if (path != null) {
+                sb.append('/');
+                sb.append(path);
+            }
+            if (!parameters.isEmpty()) {
+                sb.append("?");
+                sb.append(createQueryString(parameters));
+            }
+            if (fragment != null) {
+                sb.append("#");
+                sb.append(fragment);
+            }
+            return new URI(sb.toString());
+        }
+    }
+
+    public static Map<String, String> parseQuery(String uri) throws URISyntaxException {
+        try {
+            Map<String, String> rc = new HashMap<String, String>();
+            if (uri != null) {
+                String[] parameters = uri.split("&");
+                for (int i = 0; i < parameters.length; i++) {
+                    int p = parameters[i].indexOf("=");
+                    if (p >= 0) {
+                        String name = URLDecoder.decode(parameters[i].substring(0, p), "UTF-8");
+                        String value = URLDecoder.decode(parameters[i].substring(p + 1), "UTF-8");
+                        rc.put(name, value);
+                    } else {
+                        rc.put(parameters[i], null);
+                    }
+                }
+            }
+            return rc;
+        } catch (UnsupportedEncodingException e) {
+            throw (URISyntaxException)new URISyntaxException(e.toString(), "Invalid encoding").initCause(e);
+        }
+    }
+
+    public static Map<String, String> parseParamters(URI uri) throws URISyntaxException {
+        return uri.getQuery() == null ? emptyMap() : parseQuery(stripPrefix(uri.getQuery(), "?"));
+    }
+
+    @SuppressWarnings("unchecked")
+    private static Map<String, String> emptyMap() {
+        return Collections.EMPTY_MAP;
+    }
+
+    /**
+     * Removes any URI query from the given uri
+     */
+    public static URI removeQuery(URI uri) throws URISyntaxException {
+        return createURIWithQuery(uri, null);
+    }
+
+    /**
+     * Creates a URI with the given query
+     */
+    public static URI createURIWithQuery(URI uri, String query) throws URISyntaxException {
+        return new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), uri.getPort(), uri.getPath(),
+                       query, uri.getFragment());
+    }
+
+    public static CompositeData parseComposite(URI uri) throws URISyntaxException {
+
+        CompositeData rc = new CompositeData();
+        rc.scheme = uri.getScheme();
+        String ssp = stripPrefix(uri.getSchemeSpecificPart().trim(), "//").trim();
+
+        parseComposite(uri, rc, ssp);
+
+        rc.fragment = uri.getFragment();
+        return rc;
+    }
+
+    /**
+     * @param uri
+     * @param rc
+     * @param ssp
+     * @param p
+     * @throws URISyntaxException
+     */
+    private static void parseComposite(URI uri, CompositeData rc, String ssp) throws URISyntaxException {
+        String componentString;
+        String params;
+
+        if (!checkParenthesis(ssp)) {
+            throw new URISyntaxException(uri.toString(), "Not a matching number of '(' and ')' parenthesis");
+        }
+
+        int p;
+        int intialParen = ssp.indexOf("(");
+        if (intialParen == 0) {
+            rc.host = ssp.substring(0, intialParen);
+            p = rc.host.indexOf("/");
+            if (p >= 0) {
+                rc.path = rc.host.substring(p);
+                rc.host = rc.host.substring(0, p);
+            }
+            p = ssp.lastIndexOf(")");
+            componentString = ssp.substring(intialParen + 1, p);
+            params = ssp.substring(p + 1).trim();
+
+        } else {
+            componentString = ssp;
+            params = "";
+        }
+
+        String components[] = splitComponents(componentString);
+        rc.components = new URI[components.length];
+        for (int i = 0; i < components.length; i++) {
+            rc.components[i] = new URI(components[i].trim());
+        }
+
+        p = params.indexOf("?");
+        if (p >= 0) {
+            if (p > 0) {
+                rc.path = stripPrefix(params.substring(0, p), "/");
+            }
+            rc.parameters = parseQuery(params.substring(p + 1));
+        } else {
+            if (params.length() > 0) {
+                rc.path = stripPrefix(params, "/");
+            }
+            rc.parameters = emptyMap();
+        }
+    }
+
+    /**
+     * @param componentString
+     * @return
+     */
+    private static String[] splitComponents(String str) {
+        List<String> l = new ArrayList<String>();
+
+        int last = 0;
+        int depth = 0;
+        char chars[] = str.toCharArray();
+        for (int i = 0; i < chars.length; i++) {
+            switch (chars[i]) {
+            case '(':
+                depth++;
+                break;
+            case ')':
+                depth--;
+                break;
+            case ',':
+                if (depth == 0) {
+                    String s = str.substring(last, i);
+                    l.add(s);
+                    last = i + 1;
+                }
+                break;
+            default:
+            }
+        }
+
+        String s = str.substring(last);
+        if (s.length() != 0) {
+            l.add(s);
+        }
+
+        String rc[] = new String[l.size()];
+        l.toArray(rc);
+        return rc;
+    }
+
+    public static String stripPrefix(String value, String prefix) {
+        if (value.startsWith(prefix)) {
+            return value.substring(prefix.length());
+        }
+        return value;
+    }
+
+    public static URI stripScheme(URI uri) throws URISyntaxException {
+        return new URI(stripPrefix(uri.getSchemeSpecificPart().trim(), "//"));
+    }
+
+    public static String createQueryString(Map options) throws URISyntaxException {
+        try {
+            if (options.size() > 0) {
+                StringBuffer rc = new StringBuffer();
+                boolean first = true;
+                for (Iterator iter = options.keySet().iterator(); iter.hasNext();) {
+                    if (first) {
+                        first = false;
+                    } else {
+                        rc.append("&");
+                    }
+                    String key = (String)iter.next();
+                    String value = (String)options.get(key);
+                    rc.append(URLEncoder.encode(key, "UTF-8"));
+                    rc.append("=");
+                    rc.append(URLEncoder.encode(value, "UTF-8"));
+                }
+                return rc.toString();
+            } else {
+                return "";
+            }
+        } catch (UnsupportedEncodingException e) {
+            throw (URISyntaxException)new URISyntaxException(e.toString(), "Invalid encoding").initCause(e);
+        }
+    }
+
+    /**
+     * Creates a URI from the original URI and the remaining paramaters
+     * 
+     * @throws URISyntaxException
+     */
+    public static URI createRemainingURI(URI originalURI, Map params) throws URISyntaxException {
+        String s = createQueryString(params);
+        if (s.length() == 0) {
+            s = null;
+        }
+        return createURIWithQuery(originalURI, s);
+    }
+
+    public static URI changeScheme(URI bindAddr, String scheme) throws URISyntaxException {
+        return new URI(scheme, bindAddr.getUserInfo(), bindAddr.getHost(), bindAddr.getPort(), bindAddr
+            .getPath(), bindAddr.getQuery(), bindAddr.getFragment());
+    }
+
+    public static boolean checkParenthesis(String str) {
+        boolean result = true;
+        if (str != null) {
+            int open = 0;
+            int closed = 0;
+
+            int i = 0;
+            while ((i = str.indexOf('(', i)) >= 0) {
+                i++;
+                open++;
+            }
+            i = 0;
+            while ((i = str.indexOf(')', i)) >= 0) {
+                i++;
+                closed++;
+            }
+            result = open == closed;
+        }
+        return result;
+    }
+
+    public int indexOfParenthesisMatch(String str) {
+        int result = -1;
+
+        return result;
+    }
+
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/URISupport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/URISupport.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: activemq/activemq-blaze/trunk/src/main/proto/blaze.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/proto/blaze.proto?rev=735986&r1=735985&r2=735986&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/proto/blaze.proto (original)
+++ activemq/activemq-blaze/trunk/src/main/proto/blaze.proto Tue Jan 20 02:40:08 2009
@@ -35,12 +35,12 @@
 }
     message PacketData {   
       optional bool responseRequired = 1;
-      optional int32 type =2;  
-	    optional bytes producerId = 3;
-	    optional bytes fromAddress =4;
-	    optional int64 sessionId = 5;
-      optional int64 messageSequence = 6;
-      optional bool reliable = 7;
+      optional bool reliable = 2;
+      optional bool response = 3;
+      optional int32 type =4;  
+	    optional bytes producerId = 5;
+	    optional int64 sessionId = 6;
+      optional int64 messageSequence = 7;  
       optional int32 numberOfParts= 8;
       optional int32 partNumber= 9;
       optional bytes payload= 10;
@@ -48,14 +48,14 @@
       optional bytes correlationId = 12;
 	  
     }
+    
     message AckData {
      //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType";
        //| option java_type_method = "MessageType";
-       optional bytes messageId = 1;
-       optional int64 messageSequence =2;
-       optional bytes fromAddress =3;
-       optional int64 sessionId = 4;
-       optional int64 messageSequence = 5;
+       optional int64 startSequence =2;
+       optional int64 endSequence =3;
+       optional bytes fromAddress =4;
+       optional int64 sessionId = 5;
     }
     
     message DestinationData {

Added: activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/network/mcast
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/network/mcast?rev=735986&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/network/mcast (added)
+++ activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/network/mcast Tue Jan 20 02:40:08 2009
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activeblaze.impl.network.MulticastNetwork

Added: activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/network/multicast
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/network/multicast?rev=735986&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/network/multicast (added)
+++ activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/network/multicast Tue Jan 20 02:40:08 2009
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activeblaze.impl.network.MulticastNetwork

Added: activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/network/static
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/network/static?rev=735986&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/network/static (added)
+++ activemq/activemq-blaze/trunk/src/main/resources/META-INF/services/org/apache/activeblaze/network/static Tue Jan 20 02:40:08 2009
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activeblaze.impl.network.StaticNetwork

Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java?rev=735986&r1=735985&r2=735986&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazeChannelTest.java Tue Jan 20 02:40:08 2009
@@ -29,10 +29,11 @@
  */
 public class BlazeChannelTest extends TestCase {
     public void testChannel() throws Exception {
-        int count = 10000;
+        int count = 1000;
         final AtomicInteger received = new AtomicInteger();
         String destination = "test.foo";
         BlazeChannelFactory factory = new BlazeChannelFactory();
+        configureChannelFactory(factory, 2);
         BlazeChannel sender = factory.createChannel();
         BlazeChannel receiver = factory.createChannel();
         sender.start();
@@ -49,6 +50,7 @@
         msg.setText("value");
         for (int i = 0; i < count; i++) {
             sender.broadcast(destination, msg);
+            Thread.sleep(100);
         }
         latch.await(10, TimeUnit.SECONDS);
         receiver.stop();
@@ -57,19 +59,20 @@
     }
 
     public void testGroupBroadcast() throws Exception {
-        final int number = 10;
+        final int GROUP_SIZE = 10;
         String destination = "test.foo";
         final AtomicInteger count = new AtomicInteger();
         List<BlazeChannel> channels = new ArrayList<BlazeChannel>();
         BlazeChannelFactory factory = new BlazeChannelFactory();
-        for (int i = 0; i < number; i++) {
+        configureChannelFactory(factory, GROUP_SIZE);
+        for (int i = 0; i < GROUP_SIZE; i++) {
             BlazeChannel channel = factory.createChannel();
             channel.start();
             channels.add(channel);
             channel.addBlazeTopicMessageListener(destination, new BlazeTopicListener() {
                 public void onMessage(BlazeMessage message) {
                     synchronized (count) {
-                        if (count.incrementAndGet() == number) {
+                        if (count.incrementAndGet() == GROUP_SIZE) {
                             count.notifyAll();
                         }
                     }
@@ -80,14 +83,17 @@
         msg.setText("hello");
         channels.get(0).broadcast(destination, msg);
         synchronized (count) {
-            if (count.get() < number) {
+            if (count.get() < GROUP_SIZE) {
                 count.wait(5000);
             }
         }
        
-        assertEquals(number, count.get());
+        assertEquals(GROUP_SIZE, count.get());
         for (BlazeChannel channel : channels) {
             channel.shutDown();
         }
     }
+    
+    protected void configureChannelFactory(BlazeChannelFactory fac, int groupNumber) {
+    }
 }

Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazePointcastChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazePointcastChannelTest.java?rev=735986&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazePointcastChannelTest.java (added)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazePointcastChannelTest.java Tue Jan 20 02:40:08 2009
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze;
+
+/**
+ * 
+ * 
+ */
+public class BlazePointcastChannelTest extends BlazeChannelTest {
+    protected void configureChannelFactory(BlazeChannelFactory fac, int groupNumber) {
+        int portStart = 61616;
+        String uri = "static://(";
+        for (int i = 0; i < groupNumber; i++) {
+            uri += "udp://localhost:" + (portStart + i);
+            uri += ",";
+        }
+        uri += ")";
+        fac.getConfiguration().setManagementURI("");
+        fac.getConfiguration().setBroadcastURI(uri);
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/BlazePointcastChannelTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelTest.java?rev=735986&r1=735985&r2=735986&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelTest.java Tue Jan 20 02:40:08 2009
@@ -20,6 +20,7 @@
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import junit.framework.TestCase;
+import org.apache.activeblaze.BlazeChannelFactory;
 import org.apache.activeblaze.group.Member;
 
 /**
@@ -27,25 +28,23 @@
  * 
  */
 public class BlazeClusterGroupChannelTest extends TestCase {
-    
-    
-    
-    
     public void XtestOneChannel() throws Exception {
         BlazeClusterGroupChannelFactory factory = new BlazeClusterGroupChannelFactory();
         BlazeClusterGroupChannel channel = factory.createChannel("test");
         assertEquals(1, channel.getMembers().size());
         channel.start();
-        boolean electionFinished = channel.waitForElection((int) (channel.getConfiguration().getAwaitGroupTimeout()+500));
+        boolean electionFinished = channel
+                .waitForElection((int) (channel.getConfiguration().getAwaitGroupTimeout() + 500));
         assertTrue(electionFinished);
         assertEquals(1, channel.getMembers().size());
         channel.shutDown();
     }
 
-    public void testGroup() throws Exception {
+    public void XtestGroup() throws Exception {
         final int number = 3;
         List<BlazeClusterGroupChannel> channels = new ArrayList<BlazeClusterGroupChannel>();
         BlazeClusterGroupChannelFactory factory = new BlazeClusterGroupChannelFactory();
+        configureChannelFactory(factory, number);
         for (int i = 0; i < number; i++) {
             BlazeClusterGroupChannel channel = factory.createChannel("test" + i);
             channel.getConfiguration().setMinimumGroupSize(number);
@@ -84,10 +83,12 @@
         }
     }
 
-    public void XtestWeightedGroup() throws Exception {
+    public void testWeightedGroup() throws Exception {
         final int number = 4;
         List<BlazeClusterGroupChannel> channels = new ArrayList<BlazeClusterGroupChannel>();
         BlazeClusterGroupChannelFactory factory = new BlazeClusterGroupChannelFactory();
+        configureChannelFactory(factory, number);
+        configureChannelFactory(factory, number);
         BlazeClusterGroupChannel weightedMaster = null;
         for (int i = 0; i < number; i++) {
             BlazeClusterGroupChannel channel = factory.createChannel("test" + i);
@@ -119,15 +120,17 @@
         }
     }
 
-    public void XtestChangedWeightedGroup() throws Exception {
+    public void testChangedWeightedGroup() throws Exception {
         final int number = 4;
         List<BlazeClusterGroupChannel> channels = new ArrayList<BlazeClusterGroupChannel>();
         BlazeClusterGroupChannelFactory factory = new BlazeClusterGroupChannelFactory();
+        configureChannelFactory(factory, number);
         BlazeClusterGroupChannel weightedMaster = null;
         for (int i = 0; i < number; i++) {
             BlazeClusterGroupChannel channel = factory.createChannel("test" + i);
-            channel.addToGroup("test");
+            
             channel.getConfiguration().setMinimumGroupSize(number);
+            channel.addToGroup("changedWeightedTest");
             if (i == number / 2) {
                 channel.getConfiguration().setMasterWeight(10);
                 weightedMaster = channel;
@@ -137,7 +140,7 @@
             channel.start();
             channels.add(channel);
         }
-        channels.get(number - 1).waitForElection(5000);
+        channels.get(number - 1).waitForElection(10000);
         int masterNumber = 0;
         BlazeClusterGroupChannel master = null;
         for (BlazeClusterGroupChannel channel : channels) {
@@ -147,8 +150,8 @@
             }
         }
         assertNotNull(master);
-        assertTrue(master == weightedMaster);
         assertEquals(1, masterNumber);
+        assertTrue(master == weightedMaster);
         channels.get(0).getConfiguration().setMasterWeight(2000);
         Thread.sleep(2000);
         masterNumber = 0;
@@ -194,4 +197,7 @@
         channel.shutDown();
         master.shutDown();
     }
+
+    protected void configureChannelFactory(BlazeChannelFactory fac, int groupNumber) {
+    }
 }

Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazePointcastClusterGroupChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazePointcastClusterGroupChannelTest.java?rev=735986&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazePointcastClusterGroupChannelTest.java (added)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazePointcastClusterGroupChannelTest.java Tue Jan 20 02:40:08 2009
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.cluster;
+
+import org.apache.activeblaze.BlazeChannelFactory;
+
+public class BlazePointcastClusterGroupChannelTest extends BlazeClusterGroupChannelTest {
+    int portNum = 61616;
+
+    protected void configureChannelFactory(BlazeChannelFactory fac, int groupNumber) {
+        String uri = "static://(";
+        for (int i = 0; i < groupNumber; i++) {
+            uri += "udp://localhost:" + (this.portNum++);
+            uri += ",";
+        }
+        uri += ")";
+        fac.getConfiguration().setManagementURI("");
+        fac.getConfiguration().setBroadcastURI(uri);
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazePointcastClusterGroupChannelTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java?rev=735986&r1=735985&r2=735986&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java Tue Jan 20 02:40:08 2009
@@ -55,7 +55,7 @@
                 }
             });
         }
-        channels.get(0).getAndWaitForMemberByName(channels.get(number-1).getName(), 2000);
+        channels.get(0).getAndWaitForMemberByName(channels.get(number - 1).getName(), 2000);
         BlazeMessage msg = new BlazeMessage();
         msg.setString("test", "hello");
         channels.get(0).send(channels.get(1).getLocalMember(), msg);
@@ -79,7 +79,6 @@
      * @throws Exception
      */
     public void testSendRequestMemberBlazeMessage() throws Exception {
-       
         final int number = 100;
         final List<BlazeMessage> requests = new ArrayList<BlazeMessage>();
         final List<BlazeMessage> replies = new ArrayList<BlazeMessage>();
@@ -118,9 +117,8 @@
         request.shutDown();
         reply.shutDown();
     }
-    
-    
-    public void testSendRequestString() throws Exception{
+
+    public void testSendRequestString() throws Exception {
         String destination = "/test/foo";
         final int number = 100;
         final List<BlazeMessage> requests = new ArrayList<BlazeMessage>();
@@ -132,7 +130,7 @@
         BlazeGroupChannelFactory factory = new BlazeGroupChannelFactory();
         final BlazeGroupChannel request = factory.createGroupChannel("request");
         final BlazeGroupChannel reply = factory.createGroupChannel("reply");
-        reply.addBlazeQueueMessageListener(destination,new BlazeQueueListener() {
+        reply.addBlazeQueueMessageListener(destination, new BlazeQueueListener() {
             public void onMessage(BlazeMessage message) {
                 if (!replies.isEmpty()) {
                     BlazeMessage replyMsg = replies.remove(0);
@@ -149,11 +147,11 @@
         reply.addToGroup("test");
         request.start();
         reply.start();
-        Member result = request.getAndWaitForMemberByName("reply",1000);
+        Member result = request.getAndWaitForMemberByName("reply", 1000);
         assertNotNull(result);
         for (int i = 0; i < requests.size(); i++) {
             BlazeMessage requestMsg = requests.get(i);
-            BlazeMessage replyMsg = request.sendRequest(destination, requestMsg,1000);
+            BlazeMessage replyMsg = request.sendRequest(destination, requestMsg, 1000);
             assertNotNull(replyMsg);
         }
         assertTrue(replies.isEmpty());
@@ -161,7 +159,6 @@
         reply.shutDown();
     }
 
-
     /**
      * Test method for
      * {@link org.apache.activeblaze.group.BlazeGroupChannel#send(java.lang.String, org.apache.activeblaze.BlazeMessage)}.
@@ -192,7 +189,8 @@
         }
         Thread.sleep(2000);
         BlazeMessage msg = new BlazeMessage("hello");
-        channels.get(0).send(destination, msg);
+        BlazeGroupChannel channel = channels.get(0);
+        channel.send(destination, msg);
         synchronized (count) {
             if (count.get() == 0) {
                 count.wait(5000);
@@ -201,8 +199,8 @@
         // wait a while to check that only one got it
         Thread.sleep(2000);
         assertEquals(1, count.get());
-        for (BlazeChannel channel : channels) {
-            channel.shutDown();
+        for (BlazeChannel c : channels) {
+            c.shutDown();
         }
     }
 }

Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java?rev=735986&r1=735985&r2=735986&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java Tue Jan 20 02:40:08 2009
@@ -48,7 +48,6 @@
         packetData.setType(1);
         packetData.setMessageId(new Buffer("foo"));
         packetData.setProducerId(duff);
-        packetData.setFromAddress(duff);
         packetData.setSessionId(1);
         packetData.setMessageSequence(0);
         packetData.setPayload(new Buffer(payload));

Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java?rev=735986&r1=735985&r2=735986&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java Tue Jan 20 02:40:08 2009
@@ -56,7 +56,6 @@
             packetData.setType(1);
             packetData.setMessageId(new Buffer("foo"));
             packetData.setProducerId(duff);
-            packetData.setFromAddress(duff);
             packetData.setSessionId(1);
             packetData.setMessageSequence(0);
             packetData.setPayload(new Buffer(payload));



Mime
View raw message