activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r741060 [1/2] - in /activemq/activemq-blaze/trunk/src: main/java/org/apache/activeblaze/ main/java/org/apache/activeblaze/group/ main/java/org/apache/activeblaze/impl/network/ main/java/org/apache/activeblaze/impl/reliable/swp/ main/java/or...
Date Thu, 05 Feb 2009 09:44:02 GMT
Author: rajdavies
Date: Thu Feb  5 09:44:01 2009
New Revision: 741060

URL: http://svn.apache.org/viewvc?rev=741060&view=rev
Log:
Updated JMS support and added JMS perf test

Added:
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/BaseNetwork.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConsumer.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeMessageDispatcher.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeQueueMessageDispatcher.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeTopicMessageDispatcher.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessageTransformation.java   (contents, props changed)
      - copied, changed from r739885, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageTransformation.java
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfConsumer.java   (with props)
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfProducer.java   (with props)
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/PerfRate.java   (with props)
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/SimpleTopicTest.java   (with props)
Removed:
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageTransformation.java
Modified:
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannel.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Subscription.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionFactory.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsDestination.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageConsumer.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueReceiver.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsSession.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTopicSubscriber.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsBytesMessage.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMapMessage.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessage.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jndi/JNDIStorable.java
    activemq/activemq-blaze/trunk/src/main/proto/blaze.proto

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java?rev=741060&r1=741059&r2=741060&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java Thu Feb  5 09:44:01 2009
@@ -24,6 +24,7 @@
  */
 public abstract class BaseService implements Service {
     AtomicBoolean initialialized = new AtomicBoolean();
+    AtomicBoolean shutDown = new AtomicBoolean();
     AtomicBoolean started = new AtomicBoolean();
 
     protected abstract void doInit() throws Exception;
@@ -35,8 +36,9 @@
     protected abstract void doStop() throws Exception;
 
     public final boolean init() throws Exception {
-        boolean result = this.initialialized.compareAndSet(false, true);
+        boolean result = this.initialialized.compareAndSet(false, true) || this.shutDown.get();
         this.initialialized.set(true);
+        this.shutDown.set(false);
         if (result) {
             doInit();
         }
@@ -45,8 +47,8 @@
 
     public final boolean shutDown() throws Exception {
         stop();
-        boolean result = this.initialialized.compareAndSet(true, false);
-        this.initialialized.set(false);
+        boolean result = this.shutDown.compareAndSet(false, true);
+        this.shutDown.set(true);
         if (result) {
             doShutDown();
         }
@@ -54,9 +56,9 @@
     }
 
     public final boolean start() throws Exception {
-        if (!this.initialialized.get()) {
+        
             init();
-        }
+      
         boolean result = this.started.compareAndSet(false, true);
         this.started.set(true);
         if (result) {
@@ -87,6 +89,6 @@
     }
 
     public final boolean isShutDown() {
-        return !isInitialized();
+        return this.shutDown.get();
     }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannel.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannel.java?rev=741060&r1=741059&r2=741060&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannel.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannel.java Thu Feb  5 09:44:01 2009
@@ -34,6 +34,14 @@
      * @throws Exception 
      */
     public void broadcast(String destination, BlazeMessage msg) throws Exception;
+    
+    /**
+     * broadcast as message
+     * @param destination
+     * @param msg
+     * @throws Exception 
+     */
+    public void broadcast(Destination destination, BlazeMessage msg) throws Exception;
 
     /**
      * @return the configuration

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java?rev=741060&r1=741059&r2=741060&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java Thu Feb  5 09:44:01 2009
@@ -37,17 +37,23 @@
 
 /**
  * <P>
- * A <CODE>BlazeChannel</CODE> handles all client communication, either unicast, broadcast or multicast with other
- * peers in the Blaze network
+ * A <CODE>BlazeChannel</CODE> handles all client communication, either unicast,
+ * broadcast or multicast with other peers in the Blaze network
  * 
  * 
  */
-public class BlazeChannelImpl extends DefaultChainedProcessor implements BlazeChannel, ExceptionListener {
+public class BlazeChannelImpl extends DefaultChainedProcessor implements
+        BlazeChannel, ExceptionListener {
     protected Map<Subscription, BlazeMessageListener> topicessageListenerMap = new ConcurrentHashMap<Subscription, BlazeMessageListener>();
+
     protected final IdGenerator idGenerator = new IdGenerator();
+
     protected Buffer producerId;
+
     protected Processor broadcast;
+
     protected BlazeConfiguration configuration = new BlazeConfiguration();
+
     private String id;
 
     /**
@@ -61,22 +67,34 @@
         this.producerId = new Buffer(this.id);
     }
 
+    /**
+     * @return the id
+     * @see org.apache.activeblaze.BlazeChannel#getId()
+     */
     public String getId() {
         return this.id;
     }
 
     /**
+     * @return the name
+     */
+    public String getName() {
+        return getId();
+    }
+
+    /**
      * @param destination
      * @param l
      * @throws Exception
      * @see org.apache.activeblaze.BlazeChannel#addBlazeTopicMessageListener(java.lang.String,
      *      org.apache.activeblaze.BlazeMessageListener)
      */
-    public void addBlazeTopicMessageListener(String destination, BlazeMessageListener l) throws Exception {
+    public void addBlazeTopicMessageListener(String destination,
+            BlazeMessageListener l) throws Exception {
         Subscription sub = new Subscription(destination);
         this.topicessageListenerMap.put(sub, l);
     }
-    
+
     /**
      * @param subscription
      * @param l
@@ -84,8 +102,8 @@
      * @see org.apache.activeblaze.BlazeChannel#addBlazeTopicMessageListener(java.lang.String,
      *      org.apache.activeblaze.BlazeMessageListener)
      */
-    public void addBlazeTopicMessageListener(Subscription subscription, BlazeMessageListener l) throws Exception {
-       
+    public void addBlazeTopicMessageListener(Subscription subscription,
+            BlazeMessageListener l) throws Exception {
         this.topicessageListenerMap.put(subscription, l);
     }
 
@@ -94,42 +112,48 @@
      * 
      * @return the TopicListener
      * @throws Exception
-     * @see org.apache.activeblaze.BlazeChannel#removeBlazeTopicMessageListener(String destination)
+     * @see org.apache.activeblaze.BlazeChannel#removeBlazeTopicMessageListener(String
+     *      destination)
      */
-    public BlazeMessageListener removeBlazeTopicMessageListener(String destination) throws Exception {
+    public BlazeMessageListener removeBlazeTopicMessageListener(
+            String destination) throws Exception {
         Subscription key = new Subscription(destination);
         return this.topicessageListenerMap.remove(key);
     }
-    
+
     /**
      * @param key
      * 
      * @return the TopicListener
      * @throws Exception
-     * @see org.apache.activeblaze.BlazeChannel#removeBlazeTopicMessageListener(String destination)
+     * @see org.apache.activeblaze.BlazeChannel#removeBlazeTopicMessageListener(String
+     *      destination)
      */
-    public BlazeMessageListener removeBlazeTopicMessageListener(Subscription key) throws Exception {
-      
+    public BlazeMessageListener removeBlazeTopicMessageListener(Subscription key)
+            throws Exception {
         return this.topicessageListenerMap.remove(key);
     }
 
     public void doInit() throws Exception {
         super.doInit();
         String broadcastURIStr = getConfiguration().getBroadcastURI();
-        broadcastURIStr = PropertyUtil.addPropertiesToURIFromBean(broadcastURIStr, getConfiguration());
+        broadcastURIStr = PropertyUtil.addPropertiesToURIFromBean(
+                broadcastURIStr, getConfiguration());
         URI broadcastURI = new URI(broadcastURIStr);
         URI managementURI = null;
         String managementURIStr = getConfiguration().getManagementURI();
         if (managementURIStr != null && managementURIStr.length() > 0) {
             managementURI = new URI(getConfiguration().getManagementURI());
         }
-        Network transport = NetworkFactory.get(broadcastURI, managementURI, getConfiguration().getReliableBroadcast());
-        transport.setName(getId());
+        Network transport = NetworkFactory.get(broadcastURI, managementURI,
+                getConfiguration().getReliableBroadcast());
+        transport.setName(getName());
         this.broadcast = configureProcess(transport);
         this.broadcast.init();
     }
 
-    protected final Processor configureProcess(ChainedProcessor transport) throws Exception {
+    protected final Processor configureProcess(ChainedProcessor transport)
+            throws Exception {
         int maxPacketSize = getConfiguration().getMaxPacketSize();
         CompressionProcessor result = new CompressionProcessor();
         result.setPrev(this);
@@ -142,14 +166,18 @@
         return result;
     }
 
-    protected ChainedProcessor getReliability(String reliability) throws Exception {
+    protected ChainedProcessor getReliability(String reliability)
+            throws Exception {
         DefaultChainedProcessor reliable = ReliableFactory.get(reliability);
         return reliable;
     }
 
     public void doShutDown() throws Exception {
         super.doShutDown();
-        this.broadcast.shutDown();
+        Processor p = this.broadcast;
+        if (p != null) {
+            p.shutDown();
+        }
     }
 
     public void doStart() throws Exception {
@@ -162,8 +190,14 @@
         this.broadcast.stop();
     }
 
-    public synchronized void broadcast(String destination, BlazeMessage msg) throws Exception {
-        msg.setDestination(new Destination(destination));
+    public void broadcast(String destination, BlazeMessage msg)
+            throws Exception {
+        broadcast(new Destination(destination), msg);
+    }
+
+    public synchronized void broadcast(Destination destination, BlazeMessage msg)
+            throws Exception {
+        msg.setDestination(destination);
         msg.storeContent();
         BlazeData blazeData = msg.getContent();
         PacketData packetData = getPacketData(MessageType.BLAZE_DATA, blazeData);
@@ -172,7 +206,8 @@
         this.broadcast.downStream(packet);
     }
 
-    protected final synchronized PacketData getPacketData(MessageType type, Message<?> message) {
+    protected final synchronized PacketData getPacketData(MessageType type,
+            Message<?> message) {
         PacketData packetData = new PacketData();
         packetData.setType(type.getNumber());
         packetData.setProducerId(this.producerId);
@@ -186,7 +221,8 @@
         processData(packet.getId(), data.getCorrelationId(), data);
     }
 
-    protected void processData(String id, Buffer correlationId, PacketData data) throws Exception {
+    protected void processData(String id, Buffer correlationId, PacketData data)
+            throws Exception {
         MessageType type = MessageType.valueOf(data.getType());
         if (type == MessageType.BLAZE_DATA) {
             doProcessBlazeData(data);
@@ -199,6 +235,7 @@
 
     /**
      * Set the configuration
+     * 
      * @param configuration
      */
     public void setConfiguration(BlazeConfiguration configuration) {
@@ -218,7 +255,8 @@
         dispatch(message);
     }
 
-    protected final BlazeMessage buildBlazeMessage(PacketData data) throws Exception {
+    protected final BlazeMessage buildBlazeMessage(PacketData data)
+            throws Exception {
         BlazeMessage message = null;
         if (data != null) {
             MessageType type = MessageType.BLAZE_DATA;
@@ -236,7 +274,9 @@
                 message.setMessageId(data.getMessageId().toStringUtf8());
             }
             if (data.hasCorrelationId()) {
-                message.setCorrelationId(data.getCorrelationId().toStringUtf8());
+                message
+                        .setCorrelationId(data.getCorrelationId()
+                                .toStringUtf8());
             }
             message.setTimeStamp(blazeData.getTimestamp());
             message.setContent(blazeData);
@@ -250,8 +290,10 @@
 
     protected final void dispatch(BlazeMessage message) {
         if (message != null) {
-            Buffer destination = message.getContent().getDestinationData().getName();
-            for (Map.Entry<Subscription, BlazeMessageListener> entry : this.topicessageListenerMap.entrySet()) {
+            Buffer destination = message.getContent().getDestinationData()
+                    .getName();
+            for (Map.Entry<Subscription, BlazeMessageListener> entry : this.topicessageListenerMap
+                    .entrySet()) {
                 if (entry.getKey().matches(destination)) {
                     entry.getValue().onMessage(message);
                 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java?rev=741060&r1=741059&r2=741060&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java Thu Feb  5 09:44:01 2009
@@ -85,12 +85,13 @@
     private transient String fromId;
     private transient String messageId;
     private transient String correlationId;
-    private transient String type;
+    private transient String messageType;
     private transient long timeStamp;
     private transient long expiration;
     private transient int redeliveryCounter;
     private transient int priority;
     private transient boolean persistent;
+    private transient int type;
     private BlazeData content;
 
     /**
@@ -192,6 +193,7 @@
      * @return the destination
      */
     public Destination getDestination() {
+        initializeReading();
         return this.destination;
     }
 
@@ -216,6 +218,7 @@
      * @return the fromId
      */
     public String getFromId() {
+        initializeReading();
         return this.fromId;
     }
 
@@ -231,6 +234,7 @@
      * @return the messageId
      */
     public String getMessageId() {
+        initializeReading();
         return this.messageId;
     }
 
@@ -246,6 +250,7 @@
      * @return the correlationId
      */
     public String getCorrelationId() {
+        initializeReading();
         return this.correlationId;
     }
 
@@ -261,6 +266,7 @@
      * @return the timeStamp
      */
     public long getTimeStamp() {
+        initializeReading();
         return this.timeStamp;
     }
 
@@ -276,6 +282,7 @@
      * @return the replyTo
      */
     public Destination getReplyTo() {
+        initializeReading();
         return this.replyTo;
     }
 
@@ -299,6 +306,7 @@
      * @return the expiration
      */
     public long getExpiration() {
+        initializeReading();
         return this.expiration;
     }
 
@@ -314,6 +322,7 @@
      * @return the redeliveryCounter
      */
     public int getRedeliveryCounter() {
+        initializeReading();
         return this.redeliveryCounter;
     }
 
@@ -329,6 +338,7 @@
      * @return the priority
      */
     public int getPriority() {
+        initializeReading();
         return this.priority;
     }
 
@@ -344,6 +354,7 @@
      * @return the persistent
      */
     public boolean isPersistent() {
+        initializeReading();
         return this.persistent;
     }
 
@@ -358,16 +369,26 @@
     /**
      * @return the type
      */
-    public String getType() {
-        return this.type;
+    public String getMessageType() {
+        initializeReading();
+        return this.messageType;
     }
 
     /**
      * @param type
      *            the type to set
      */
-    public void setType(String type) {
-        this.type = type;
+    public void setMessageType(String type) {
+        this.messageType = type;
+    }
+    
+    /**
+     * Get the type
+     * @return the type
+     */
+    public int getType() {
+        initializeReading();
+        return this.type;
     }
 
     /**
@@ -1019,6 +1040,10 @@
         }
     }
 
+    /** 
+     * @return pretty print
+     * @see java.lang.Object#toString()
+     */
     public String toString() {
         return super.toString() + "MQBlazeMessage{ " + "map = " + this.map + " }";
     }
@@ -1184,7 +1209,7 @@
      * Store content into a BlazeData object for serialization
      */
     public void storeContent() {
-        if (getContent() == null && !this.map.isEmpty()) {
+        if (getContent() == null) {
             BlazeData bd = new BlazeData();
             MapData mapData = new MapData();
             for (Map.Entry<String, Object> entry : this.map.entrySet()) {
@@ -1206,14 +1231,15 @@
             if (this.fromId != null) {
                 bd.setFromId(new Buffer(this.fromId));
             }
-            if (this.type != null) {
-                bd.setType(new Buffer(this.type));
+            if (this.messageType != null) {
+                bd.setMessageType(new Buffer(this.messageType));
             }
-            bd.setTimestamp(getTimeStamp());
-            bd.setExpiration(getExpiration());
-            bd.setRedeliveryCounter(getRedeliveryCounter());
-            bd.setPriority(getPriority());
-            bd.setPersistent(isPersistent());
+            bd.setTimestamp(this.timeStamp);
+            bd.setExpiration(this.expiration);
+            bd.setRedeliveryCounter(this.redeliveryCounter);
+            bd.setPriority(this.priority);
+            bd.setPersistent(this.persistent);
+            bd.setType(this.type);
             this.content = bd;
         }
     }
@@ -1241,15 +1267,18 @@
             if (data.hasCorrelationId()) {
                 this.correlationId = data.getCorrelationId().toStringUtf8();
             }
-            if(data.hasType()) {
-                this.type = data.getType().toStringUtf8();
+            if(data.hasMessageType()) {
+                this.messageType = data.getMessageType().toStringUtf8();
             }
             this.timeStamp = data.getTimestamp();
             this.expiration = data.getExpiration();
             this.redeliveryCounter = data.getRedeliveryCounter();
             this.priority = data.getPriority();
             this.persistent = data.getPersistent();
+            this.type=data.getType();
            
         }
     }
+
+   
 }
\ No newline at end of file

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Subscription.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Subscription.java?rev=741060&r1=741059&r2=741060&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Subscription.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Subscription.java Thu Feb  5 09:44:01 2009
@@ -63,7 +63,7 @@
      */
     public Subscription(String destination, boolean topic) {
         this.data = new SubscriptionData();
-        Destination dest = new Destination(destination);
+        Destination dest = new Destination(destination,topic);
         this.data.setDestinationData(dest.getData());
         setTopic(topic);
     }
@@ -233,6 +233,20 @@
     public void setTemporary(boolean temporary) {
         this.getDestinationData().setTemporary(temporary);
     }
+    
+    /**
+     * @return noLocal
+     */
+    public boolean isNoLocal() {
+        return this.data.getNoLocal();
+    }
+    
+    /**
+     * @param noLocal
+     */
+    public void setNoLocal(boolean noLocal) {
+        this.data.setNoLocal(noLocal);
+    }
 
     public int hashCode() {
         return this.data.getDestinationData().getName().hashCode();

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java?rev=741060&r1=741059&r2=741060&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java Thu Feb  5 09:44:01 2009
@@ -21,6 +21,7 @@
 import org.apache.activeblaze.BlazeChannel;
 import org.apache.activeblaze.BlazeMessage;
 import org.apache.activeblaze.BlazeMessageListener;
+import org.apache.activeblaze.Destination;
 import org.apache.activeblaze.Subscription;
 
 /**
@@ -78,6 +79,15 @@
      * @throws Exception
      */
     public void send(String destination, BlazeMessage message) throws Exception;
+    
+    /**
+     * Send a message to the group - one Member will be picked to receive the message in a round-robin fashion
+     * 
+     * @param destination
+     * @param message
+     * @throws Exception
+     */
+    public void send(Destination destination, BlazeMessage message) throws Exception;
 
     /**
      * Send a message to the group - one Member will be picked to receive the message in a round-robin fashion and wait
@@ -89,6 +99,17 @@
      * @throws Exception
      */
     public BlazeMessage sendRequest(String destination, BlazeMessage message) throws Exception;
+    
+    /**
+     * Send a message to the group - one Member will be picked to receive the message in a round-robin fashion and wait
+     * for a response
+     * 
+     * @param destination
+     * @param message
+     * @return a response
+     * @throws Exception
+     */
+    public BlazeMessage sendRequest(Destination destination, BlazeMessage message) throws Exception;
 
     /**
      * Send a message to the group - one Member will be picked to receive the message in a round-robin fashion and wait
@@ -103,6 +124,20 @@
      */
     public BlazeMessage sendRequest(String destination, BlazeMessage message, int timeout) throws Exception;
 
+    
+    /**
+     * Send a message to the group - one Member will be picked to receive the message in a round-robin fashion and wait
+     * for a response
+     * 
+     * @param destination
+     * @param message
+     * @param timeout -
+     *            time in milliseconds to wait for a response
+     * @return a response of null if timed out
+     * @throws Exception
+     */
+    public BlazeMessage sendRequest(Destination destination, BlazeMessage message, int timeout) throws Exception;
+
     /**
      * Send a response message to an original message - for request/response
      * 

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java?rev=741060&r1=741059&r2=741060&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java Thu Feb  5 09:44:01 2009
@@ -53,20 +53,34 @@
 
 /**
  * <P>
- * A <CODE>BlazeGroupChannel</CODE> enables peer-based point to point communication
+ * A <CODE>BlazeGroupChannel</CODE> enables peer-based point to point
+ * communication
  * 
  */
-public class BlazeGroupChannelImpl extends BlazeChannelImpl implements BlazeGroupChannel {
-    private static final Log LOG = LogFactory.getLog(BlazeGroupChannelImpl.class);
+public class BlazeGroupChannelImpl extends BlazeChannelImpl implements
+        BlazeGroupChannel {
+    private static final Log LOG = LogFactory
+            .getLog(BlazeGroupChannelImpl.class);
+
     private String name;
+
     protected Processor unicast;
+
     private MemberImpl local;
+
     private BlazeMessageListener inboxListener;
-    protected Map<Buffer, SendRequest> messageRequests = new LRUCache<Buffer, SendRequest>(10000);
+
+    protected Map<Buffer, SendRequest> messageRequests = new LRUCache<Buffer, SendRequest>(
+            10000);
+
     private Map<Subscription, BlazeMessageListener> queueMessageListenerMap = new ConcurrentHashMap<Subscription, BlazeMessageListener>();
+
     private Group group;
+
     protected Buffer inboxAddress;
+
     protected int inBoxPort;
+
     protected final Object localMutex = new Object();
 
     /**
@@ -86,23 +100,27 @@
     public void doInit() throws Exception {
         super.doInit();
         String unicastURIStr = getConfiguration().getUnicastURI();
-        unicastURIStr = PropertyUtil.addPropertiesToURIFromBean(unicastURIStr, getConfiguration());
+        unicastURIStr = PropertyUtil.addPropertiesToURIFromBean(unicastURIStr,
+                getConfiguration());
         URI unicastURI = new URI(unicastURIStr);
         BaseTransport transport = TransportFactory.get(unicastURI);
         transport.setName(getId() + "-Unicast");
-        this.unicast = configureProcess(transport, getConfiguration().getReliableUnicast());
+        this.unicast = configureProcess(transport, getConfiguration()
+                .getReliableUnicast());
         this.unicast.init();
         // if using a port of zero - the port will be assigned automatically,
         // so need to get the potentially new value
         unicastURI = transport.getLocalURI();
-        InetSocketAddress addr = new InetSocketAddress(unicastURI.getHost(), unicastURI.getPort());
+        InetSocketAddress addr = new InetSocketAddress(unicastURI.getHost(),
+                unicastURI.getPort());
         this.inboxAddress = new Buffer(addr.getAddress().getAddress());
         this.inBoxPort = addr.getPort();
         this.local = createLocal(unicastURI);
         this.group = createGroup();
     }
 
-    protected final Processor configureProcess(ChainedProcessor transport, String reliability) throws Exception {
+    protected final Processor configureProcess(ChainedProcessor transport,
+            String reliability) throws Exception {
         int maxPacketSize = getConfiguration().getMaxPacketSize();
         CompressionProcessor result = new CompressionProcessor();
         result.setPrev(this);
@@ -117,7 +135,8 @@
         return result;
     }
 
-    protected ChainedProcessor getReliability(String reliability) throws Exception {
+    protected ChainedProcessor getReliability(String reliability)
+            throws Exception {
         DefaultChainedProcessor reliable = ReliableFactory.get(reliability);
         return reliable;
     }
@@ -136,8 +155,12 @@
      */
     public void doShutDown() throws Exception {
         super.doShutDown();
-        this.group.shutDown();
-        this.unicast.shutDown();
+        if (this.group != null) {
+            this.group.shutDown();
+        }
+        if (this.unicast != null) {
+            this.unicast.shutDown();
+        }
     }
 
     /**
@@ -164,7 +187,9 @@
      * @return the name
      */
     public String getName() {
-        return this.name;
+        synchronized (this.localMutex) {
+            return this.name;
+        }
     }
 
     /**
@@ -174,7 +199,12 @@
      * @see org.apache.activeblaze.group.BlazeGroupChannel#setName(java.lang.String)
      */
     public void setName(String name) {
-        this.name = name;
+        synchronized (this.localMutex) {
+            this.name = name;
+            if (this.local != null) {
+                this.local.setName(name);
+            }
+        }
     }
 
     /**
@@ -221,7 +251,8 @@
      * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#addMemberChangedListener(org.apache.activeblaze.group.MemberChangedListener)
      */
-    public void addMemberChangedListener(MemberChangedListener l) throws Exception {
+    public void addMemberChangedListener(MemberChangedListener l)
+            throws Exception {
         init();
         this.group.addMemberChangedListener(l);
     }
@@ -231,7 +262,8 @@
      * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#removeMemberChangedListener(org.apache.activeblaze.group.MemberChangedListener)
      */
-    public void removeMemberChangedListener(MemberChangedListener l) throws Exception {
+    public void removeMemberChangedListener(MemberChangedListener l)
+            throws Exception {
         init();
         this.group.removeMemberChangedListener(l);
     }
@@ -266,7 +298,8 @@
      * @return the member or null
      * @throws Exception
      */
-    public Member getAndWaitForMemberByName(String name, int timeout) throws Exception {
+    public Member getAndWaitForMemberByName(String name, int timeout)
+            throws Exception {
         init();
         return this.group.getAndWaitForMemberByName(name, timeout);
     }
@@ -287,15 +320,29 @@
      * @param destination
      * @param message
      * @throws Exception
-     * @see org.apache.activeblaze.group.BlazeGroupChannel#send(java.lang.String, org.apache.activeblaze.BlazeMessage)
+     * @see org.apache.activeblaze.group.BlazeGroupChannel#send(java.lang.String,
+     *      org.apache.activeblaze.BlazeMessage)
      */
     public void send(String destination, BlazeMessage message) throws Exception {
-        Buffer buffer = new Buffer(destination);
+        send(new Destination(destination, false), message);
+    }
+
+    /**
+     * Send a message to a member of the group - in a round-robin fashion
+     * 
+     * @param destination
+     * @param message
+     * @throws Exception
+     * @see org.apache.activeblaze.group.BlazeGroupChannel#send(java.lang.String,
+     *      org.apache.activeblaze.BlazeMessage)
+     */
+    public void send(Destination destination, BlazeMessage message)
+            throws Exception {
         while (true) {
-            MemberImpl member = getQueueDestination(buffer);
+            MemberImpl member = getQueueDestination(destination.getName());
             if (member != null) {
                 try {
-                    send(member, buffer, message);
+                    send(member, destination.getName(), message);
                     return;
                 } catch (BlazeNoRouteException e) {
                     LOG.debug("No response - resending to another client", e);
@@ -314,7 +361,8 @@
      *      org.apache.activeblaze.BlazeMessage)
      */
     public void send(Member member, BlazeMessage message) throws Exception {
-        send((MemberImpl) member, new Buffer(member.getInBoxDestination()), message);
+        send((MemberImpl) member, new Buffer(member.getInBoxDestination()),
+                message);
     }
 
     /**
@@ -325,7 +373,8 @@
      * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(org.apache.activeblaze.group.Member,
      *      org.apache.activeblaze.BlazeMessage)
      */
-    public BlazeMessage sendRequest(Member member, BlazeMessage message) throws Exception {
+    public BlazeMessage sendRequest(Member member, BlazeMessage message)
+            throws Exception {
         return sendRequest(member, message, 0);
     }
 
@@ -338,8 +387,23 @@
      * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(org.apache.activeblaze.group.Member,
      *      org.apache.activeblaze.BlazeMessage, int)
      */
-    public BlazeMessage sendRequest(Member member, BlazeMessage message, int timeout) throws Exception {
-        return sendRequest((MemberImpl) member, new Buffer(member.getInBoxDestination()), message, timeout);
+    public BlazeMessage sendRequest(Member member, BlazeMessage message,
+            int timeout) throws Exception {
+        return sendRequest((MemberImpl) member, new Buffer(member
+                .getInBoxDestination()), message, timeout);
+    }
+
+    /**
+     * @param destination
+     * @param message
+     * @return the response
+     * @throws Exception
+     * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(java.lang.String,
+     *      org.apache.activeblaze.BlazeMessage)
+     */
+    public BlazeMessage sendRequest(String destination, BlazeMessage message)
+            throws Exception {
+        return sendRequest(new Destination(destination, false), message, 0);
     }
 
     /**
@@ -350,7 +414,8 @@
      * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(java.lang.String,
      *      org.apache.activeblaze.BlazeMessage)
      */
-    public BlazeMessage sendRequest(String destination, BlazeMessage message) throws Exception {
+    public BlazeMessage sendRequest(Destination destination,
+            BlazeMessage message) throws Exception {
         return sendRequest(destination, message, 0);
     }
 
@@ -358,13 +423,29 @@
      * @param destination
      * @param message
      * @param timeout
+     * @return the request
+     * @throws Exception
+     * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(java.lang.String,
+     *      org.apache.activeblaze.BlazeMessage, int)
+     */
+    public BlazeMessage sendRequest(String destination, BlazeMessage message,
+            int timeout) throws Exception {
+        return sendRequest(new Destination(destination, false), message,
+                timeout);
+    }
+
+    /**
+     * @param destination
+     * @param message
+     * @param timeout
      * @return the response
      * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(java.lang.String,
      *      org.apache.activeblaze.BlazeMessage, int)
      */
-    public BlazeMessage sendRequest(String destination, BlazeMessage message, int timeout) throws Exception {
-        Buffer key = new Buffer(destination);
+    public BlazeMessage sendRequest(Destination destination,
+            BlazeMessage message, int timeout) throws Exception {
+        Buffer key = destination.getName();
         long deadline = 0;
         long waitTime = timeout;
         if (timeout > 0) {
@@ -374,14 +455,16 @@
             MemberImpl member = getQueueDestination(key);
             if (member != null) {
                 try {
-                    BlazeMessage result = sendRequest(member, key, message, (int) waitTime);
+                    BlazeMessage result = sendRequest(member, key, message,
+                            (int) waitTime);
                     if (result != null) {
                         return result;
                     }
                 } catch (BlazeNoRouteException e) {
                 } finally {
                     if (timeout > 0) {
-                        waitTime = (int) Math.max(deadline - System.currentTimeMillis(), 0);
+                        waitTime = (int) Math.max(deadline
+                                - System.currentTimeMillis(), 0);
                     }
                 }
             } else {
@@ -401,8 +484,8 @@
      * @return the response
      * @throws Exception
      */
-    public BlazeMessage sendRequest(MemberImpl member, Buffer destinationName, BlazeMessage message, int timeout)
-            throws Exception {
+    public BlazeMessage sendRequest(MemberImpl member, Buffer destinationName,
+            BlazeMessage message, int timeout) throws Exception {
         BlazeMessage result = null;
         if (member != null) {
             SendRequest request = new SendRequest();
@@ -431,7 +514,8 @@
      * @see org.apache.activeblaze.group.BlazeGroupChannel#sendReply(org.apache.activeblaze.group.Member,
      *      org.apache.activeblaze.BlazeMessage, java.lang.String)
      */
-    public void sendReply(Member to, BlazeMessage response, String correlationId) throws Exception {
+    public void sendReply(Member to, BlazeMessage response, String correlationId)
+            throws Exception {
         response.storeContent();
         BlazeData blazeData = response.getContent();
         PacketData data = getPacketData(blazeData.type(), blazeData);
@@ -442,13 +526,15 @@
         this.unicast.downStream(packet);
     }
 
-    protected void send(MemberImpl member, Buffer destination, BlazeMessage message) throws Exception {
+    protected void send(MemberImpl member, Buffer destination,
+            BlazeMessage message) throws Exception {
         message.storeContent();
         BlazeData blazeData = message.getContent();
         send(member, destination, blazeData);
     }
 
-    protected void send(MemberImpl member, Buffer destinationName, BlazeData blazeData) throws Exception {
+    protected void send(MemberImpl member, Buffer destinationName,
+            BlazeData blazeData) throws Exception {
         Destination dest = new Destination(destinationName, false);
         blazeData.clearDestinationData();
         blazeData.setDestinationData(dest.getData());
@@ -467,7 +553,8 @@
      * @see org.apache.activeblaze.group.BlazeGroupChannel#addBlazeQueueMessageListener(java.lang.String,
      *      org.apache.activeblaze.group.BlazeMessageListener)
      */
-    public void addBlazeQueueMessageListener(String destination, BlazeMessageListener l) throws Exception {
+    public void addBlazeQueueMessageListener(String destination,
+            BlazeMessageListener l) throws Exception {
         init();
         Subscription key = new Subscription(destination, false);
         this.queueMessageListenerMap.put(key, l);
@@ -481,7 +568,8 @@
      * @see org.apache.activeblaze.group.BlazeGroupChannel#addBlazeQueueMessageListener(java.lang.String,
      *      org.apache.activeblaze.group.BlazeMessageListener)
      */
-    public void addBlazeQueueMessageListener(Subscription key, BlazeMessageListener l) throws Exception {
+    public void addBlazeQueueMessageListener(Subscription key,
+            BlazeMessageListener l) throws Exception {
         init();
         this.queueMessageListenerMap.put(key, l);
         buildLocal();
@@ -493,7 +581,8 @@
      * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#removeBlazeQueueMessageListener(java.lang.String)
      */
-    public BlazeMessageListener removeBlazeQueueMessageListener(String destination) throws Exception {
+    public BlazeMessageListener removeBlazeQueueMessageListener(
+            String destination) throws Exception {
         init();
         Subscription key = new Subscription(destination, false);
         BlazeMessageListener result = this.queueMessageListenerMap.remove(key);
@@ -507,7 +596,8 @@
      * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#removeBlazeQueueMessageListener(java.lang.String)
      */
-    public BlazeMessageListener removeBlazeQueueMessageListener(Subscription key) throws Exception {
+    public BlazeMessageListener removeBlazeQueueMessageListener(Subscription key)
+            throws Exception {
         init();
         BlazeMessageListener result = this.queueMessageListenerMap.remove(key);
         buildLocal();
@@ -521,7 +611,8 @@
      * @see org.apache.activeblaze.BlazeChannel#addBlazeTopicMessageListener(java.lang.String,
      *      org.apache.activeblaze.BlazeMessageListener)
      */
-    public void addBlazeTopicMessageListener(String destination, BlazeMessageListener l) throws Exception {
+    public void addBlazeTopicMessageListener(String destination,
+            BlazeMessageListener l) throws Exception {
         init();
         super.addBlazeTopicMessageListener(destination, l);
         buildLocal();
@@ -533,9 +624,11 @@
      * @throws Exception
      * @see org.apache.activeblaze.BlazeChannel#removeBlazeTopicMessageListener(java.lang.String)
      */
-    public BlazeMessageListener removeBlazeTopicMessageListener(String destination) throws Exception {
+    public BlazeMessageListener removeBlazeTopicMessageListener(
+            String destination) throws Exception {
         init();
-        BlazeMessageListener result = super.removeBlazeTopicMessageListener(destination);
+        BlazeMessageListener result = super
+                .removeBlazeTopicMessageListener(destination);
         buildLocal();
         return result;
     }
@@ -570,7 +663,8 @@
         return this.local.getGroups();
     }
 
-    protected void processData(String id, Buffer correlationId, PacketData data) throws Exception {
+    protected void processData(String id, Buffer correlationId, PacketData data)
+            throws Exception {
         if (isStarted()) {
             if (!processRequest(correlationId, data)) {
                 MessageType type = MessageType.valueOf(data.getType());
@@ -603,11 +697,14 @@
         if (message.getContent().getDestinationData().getTopic()) {
             dispatch(message);
         } else {
-            Buffer destinationName = message.getContent().getDestinationData().getName();
-            if (this.inboxListener != null && this.producerId.equals(destinationName)) {
+            Buffer destinationName = message.getContent().getDestinationData()
+                    .getName();
+            if (this.inboxListener != null
+                    && this.producerId.equals(destinationName)) {
                 this.inboxListener.onMessage(message);
             } else {
-                for (Map.Entry<Subscription, BlazeMessageListener> entry : this.queueMessageListenerMap.entrySet()) {
+                for (Map.Entry<Subscription, BlazeMessageListener> entry : this.queueMessageListenerMap
+                        .entrySet()) {
                     if (entry.getKey().matches(destinationName)) {
                         entry.getValue().onMessage(message);
                         break;
@@ -640,7 +737,8 @@
      * @param message
      * @throws Exception
      */
-    public void broadcastMessage(MessageType messageType, Message<?> message) throws Exception {
+    public void broadcastMessage(MessageType messageType, Message<?> message)
+            throws Exception {
         PacketData data = getPacketData(messageType, message);
         data.setReliable(false);
         Packet packet = new Packet(data);
@@ -656,8 +754,8 @@
      * @param message
      * @throws Exception
      */
-    public void sendMessage(AsyncGroupRequest asyncRequest, MemberImpl member, MessageType messageType,
-            Message<?> message) throws Exception {
+    public void sendMessage(AsyncGroupRequest asyncRequest, MemberImpl member,
+            MessageType messageType, Message<?> message) throws Exception {
         SendRequest request = new SendRequest();
         PacketData data = getPacketData(messageType, message);
         asyncRequest.add(data.getMessageId(), request);
@@ -678,7 +776,8 @@
      * @param correlationId
      * @throws Exception
      */
-    public void broadcastMessage(MessageType messageType, Message<?> message, String correlationId) throws Exception {
+    public void broadcastMessage(MessageType messageType, Message<?> message,
+            String correlationId) throws Exception {
         PacketData data = getPacketData(messageType, message);
         data.setCorrelationId(new Buffer(correlationId));
         data.setReliable(true);
@@ -692,7 +791,8 @@
      * @param message
      * @throws Exception
      */
-    public void sendMessage(InetSocketAddress to, MessageType messageType, Message<?> message) throws Exception {
+    public void sendMessage(InetSocketAddress to, MessageType messageType,
+            Message<?> message) throws Exception {
         PacketData data = getPacketData(messageType, message);
         data.setReliable(false);
         Packet packet = new Packet(data);
@@ -707,8 +807,8 @@
      * @param correlationId
      * @throws Exception
      */
-    public void sendReply(MemberImpl to, MessageType messageType, Message<?> message, String correlationId)
-            throws Exception {
+    public void sendReply(MemberImpl to, MessageType messageType,
+            Message<?> message, String correlationId) throws Exception {
         PacketData data = getPacketData(messageType, message);
         data.setCorrelationId(new Buffer(correlationId));
         data.setReliable(false);
@@ -744,7 +844,8 @@
         if (isInitialized()) {
             try {
                 synchronized (this.localMutex) {
-                    MemberImpl result = new MemberImpl(getLocalMember().getData().clone());
+                    MemberImpl result = new MemberImpl(getLocalMember()
+                            .getData().clone());
                     result.getData().clearSubscriptionData();
                     // add topic destinations
                     for (Subscription s : this.topicessageListenerMap.keySet()) {

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java?rev=741060&r1=741059&r2=741060&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java Thu Feb  5 09:44:01 2009
@@ -16,6 +16,14 @@
  */
 package org.apache.activeblaze.group;
 
+import org.apache.activeblaze.BaseService;
+import org.apache.activeblaze.Subscription;
+import org.apache.activeblaze.wire.MemberData;
+import org.apache.activeblaze.wire.MessageType;
+import org.apache.activeblaze.wire.SubscriptionData;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -27,15 +35,6 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
-import org.apache.activeblaze.BaseService;
-import org.apache.activeblaze.Subscription;
-import org.apache.activeblaze.wire.DestinationData;
-import org.apache.activeblaze.wire.MemberData;
-import org.apache.activeblaze.wire.MessageType;
-import org.apache.activeblaze.wire.SubscriptionData;
-import org.apache.activemq.protobuf.Buffer;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 
 /**
  * Maintains members of a group
@@ -390,6 +389,7 @@
     }
 
     protected void processMemberUpdate(MemberImpl oldMember, MemberImpl newMember) throws Exception {
+        //check for deltas
         processDestinationsForStopped(oldMember);
         processDestinationsForStarted(newMember);
     }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java?rev=741060&r1=741059&r2=741060&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java Thu Feb  5 09:44:01 2009
@@ -75,7 +75,15 @@
      * @return the name
      */
     public String getName() {
-        return this.data.getName();
+       return this.data.getName();
+    }
+    
+    /**
+     * Set the name
+     * @param name
+     */
+    public void setName(String name) {
+        this.data.setName(name);
     }
 
     /**

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/BaseNetwork.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/BaseNetwork.java?rev=741060&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/BaseNetwork.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/BaseNetwork.java Thu Feb  5 09:44:01 2009
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.impl.network;
+
+import org.apache.activeblaze.ExceptionListener;
+import org.apache.activeblaze.Processor;
+import org.apache.activeblaze.impl.processor.ChainedProcessor;
+import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
+import org.apache.activeblaze.impl.reliable.ReliableFactory;
+import org.apache.activeblaze.impl.transport.BaseTransport;
+import org.apache.activeblaze.impl.transport.TransportFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+
+/**
+ * Base class for network broadcast protocols
+ * 
+ */
+public abstract class BaseNetwork extends DefaultChainedProcessor implements Network, ExceptionListener {
+    protected static final Log LOG = LogFactory.getLog(BaseNetwork.class);
+    protected URI broadcastURI;
+    protected URI managementURI;
+    protected ChainedProcessor broadcast;
+    protected ChainedProcessor management;
+    protected String name = "";
+    protected InetSocketAddress broadcastAddress;
+    protected InetSocketAddress managementAddress;
+    protected String reliability = "simple";
+
+    /**
+     * @return the name
+     */
+    public String getName() {
+        return this.name;
+    }
+
+    /**
+     * @param name
+     *            the name to set
+     */
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    /**
+     * @param uri
+     * @throws Exception
+     * @see org.apache.activeblaze.impl.network.Network#setManagementURI(java.net.URI)
+     */
+    public void setManagementURI(URI uri) throws Exception {
+        this.managementURI = uri;
+    }
+
+    /**
+     * @param uri
+     * @throws Exception
+     * @see org.apache.activeblaze.impl.network.Network#setURI(java.net.URI)
+     */
+    public void setURI(URI uri) throws Exception {
+        this.broadcastURI = uri;
+    }
+
+    /**
+     * @return the reliable protocol used
+     * @see org.apache.activeblaze.impl.network.Network#getReliability()
+     */
+    public String getReliability() {
+        return this.reliability;
+    }
+
+    /**
+     * @param reliability
+     * @see org.apache.activeblaze.impl.network.Network#setReliability(java.lang.String)
+     */
+    public void setReliability(String reliability) {
+        this.reliability = reliability;
+    }
+
+    /**
+     * initialize the network
+     * 
+     * @throws Exception
+     * @see org.apache.activeblaze.Service#init()
+     */
+    public void doInit() throws Exception {
+        super.doInit();
+        BaseTransport transport = createTransport();
+        transport.setName(getName() + "-Broadcast");
+        transport.setExceptionListener(this);
+        if (getReliability() == null || getReliability().length() == 0) {
+            LOG.warn("No reliability set for broadcast");
+            this.broadcast = transport;
+        } else {
+            this.broadcast = ReliableFactory.get(getReliability());
+            this.broadcast.setNext(transport);
+            transport.setPrev(this.broadcast);
+        }
+        this.broadcast.setPrev(getPrev());
+        this.broadcast.init();
+        if (doCreateManagement()) {
+            BaseTransport managementTransport = TransportFactory.get(this.managementURI);
+            managementTransport.setName(getName() + "-Management");
+            managementTransport.setExceptionListener(this);
+            if (getReliability() == null || getReliability().length() == 0) {
+                LOG.warn("No reliability set for management");
+                this.management = managementTransport;
+            } else {
+                this.management = ReliableFactory.get(getReliability());
+                this.management.setNext(managementTransport);
+                managementTransport.setPrev(this.management);
+            }
+            this.management.setPrev(getPrev());
+            this.management.init();
+        }
+    }
+
+    /**
+     * 
+     * @throws Exception
+     * @see org.apache.activeblaze.Service#shutDown()
+     */
+    public void doShutDown() throws Exception {
+        super.doShutDown();
+        if (this.broadcast != null) {
+            this.broadcast.shutDown();
+        }
+        if (this.management != null) {
+            this.management.shutDown();
+        }
+    }
+
+    /**
+     * 
+     * @throws Exception
+     * @see org.apache.activeblaze.Service#start()
+     */
+    public void doStart() throws Exception {
+        super.doStart();
+        if (this.broadcast != null) {
+            this.broadcast.start();
+        }
+        if (this.management != null) {
+            this.management.start();
+        }
+    }
+
+    /**
+     * 
+     * @throws Exception
+     * @see org.apache.activeblaze.Service#stop()
+     */
+    public void doStop() throws Exception {
+        super.doStop();
+        if (this.broadcast != null) {
+            this.broadcast.stop();
+        }
+        if (this.management != null) {
+            this.management.stop();
+        }
+    }
+
+   
+
+    /**
+     * @param l
+     * @see org.apache.activeblaze.Processor#setExceptionListener(org.apache.activeblaze.ExceptionListener)
+     */
+    public void setExceptionListener(ExceptionListener l) {
+        this.exceptionListener = l;
+    }
+
+    /**
+     * @param ex
+     * @see org.apache.activeblaze.ExceptionListener#onException(java.lang.Exception)
+     */
+    public void onException(Exception ex) {
+        if (this.exceptionListener != null) {
+            this.exceptionListener.onException(ex);
+        }
+    }
+
+    public void setNext(Processor next) {
+        super.setNext(next);
+        if (this.management != null) {
+            this.management.setNext(next);
+        }
+        if (this.broadcast != null) {
+            this.broadcast.setNext(next);
+        }
+    }
+
+    public void setPrev(Processor prev) {
+        super.setPrev(prev);
+        if (this.management != null) {
+            this.management.setPrev(prev);
+            ;
+        }
+        if (this.broadcast != null) {
+            this.broadcast.setPrev(prev);
+        }
+    }
+
+    public void setMaxPacketSize(int maxPacketSize) {
+        if (this.management != null) {
+            this.management.setMaxPacketSize(maxPacketSize);
+        }
+        if (this.broadcast != null) {
+            this.broadcast.setMaxPacketSize(maxPacketSize);
+        }
+    }
+
+    protected abstract BaseTransport createTransport() throws Exception;
+
+    protected abstract BaseTransport createManagementTransport() throws Exception;
+
+    protected abstract boolean doCreateManagement();
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/BaseNetwork.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/BaseNetwork.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java?rev=741060&r1=741059&r2=741060&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/MulticastNetwork.java Thu Feb  5 09:44:01 2009
@@ -17,13 +17,7 @@
 package org.apache.activeblaze.impl.network;
 
 import java.net.InetSocketAddress;
-import java.net.URI;
-import org.apache.activeblaze.ExceptionListener;
-import org.apache.activeblaze.Processor;
-import org.apache.activeblaze.impl.processor.ChainedProcessor;
-import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
 import org.apache.activeblaze.impl.processor.Packet;
-import org.apache.activeblaze.impl.reliable.ReliableFactory;
 import org.apache.activeblaze.impl.transport.BaseTransport;
 import org.apache.activeblaze.impl.transport.TransportFactory;
 
@@ -31,135 +25,19 @@
  * Uses multicast to implement a Network
  * 
  */
-public class MulticastNetwork extends DefaultChainedProcessor implements Network, ExceptionListener {
-    private URI uri;
-    private URI managementURI;
-    private ChainedProcessor broadcast;
-    private ChainedProcessor management;
-    private String name = "";
-    private InetSocketAddress broadcastAddress;
-    private InetSocketAddress managementAddress;
-    private String reliability = "simple";
-
-    /**
-     * @return the name
-     */
-    public String getName() {
-        return this.name;
-    }
-
-    /**
-     * @param name
-     *            the name to set
-     */
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    /**
-     * @param uri
-     * @see org.apache.activeblaze.impl.network.Network#setManagementURI(java.net.URI)
-     */
-    public void setManagementURI(URI uri) {
-        this.managementURI = uri;
-    }
-
-    /**
-     * @param uri
-     * @see org.apache.activeblaze.impl.network.Network#setURI(java.net.URI)
-     */
-    public void setURI(URI uri) {
-        this.uri = uri;
-    }
-
+public class MulticastNetwork extends BaseNetwork {
     /**
-     * @return the reliable protocol used
-     * @see org.apache.activeblaze.impl.network.Network#getReliability()
+     * Constructor
      */
-    public String getReliability() {
-        return this.reliability;
+    public MulticastNetwork() {
+        this.reliability = "simple";
     }
-
-    /**
-     * @param reliability
-     * @see org.apache.activeblaze.impl.network.Network#setReliability(java.lang.String)
-     */
-    public void setReliability(String reliability) {
-        this.reliability = reliability;
-    }
-
-    /**
-     * initialize the network
-     * 
-     * @throws Exception
-     * @see org.apache.activeblaze.Service#init()
-     */
+    
     public void doInit() throws Exception {
         super.doInit();
-        this.broadcast = ReliableFactory.get(getReliability());
-        BaseTransport transport = TransportFactory.get(this.uri);
-        transport.setName(getName() + "-Broadcast");
-        transport.setExceptionListener(this);
-        this.broadcast.setPrev(getPrev());
-        this.broadcast.setNext(transport);
-        transport.setPrev(this.broadcast);
-        this.broadcastAddress = new InetSocketAddress(this.uri.getHost(), this.uri.getPort());
-        this.broadcast.init();
-        if (this.managementURI != null && !this.managementURI.equals(this.uri)) {
-            this.management = ReliableFactory.get(getReliability());
-            BaseTransport managementTransport = TransportFactory.get(this.managementURI);
-            managementTransport.setName(getName() + "-Management");
-            managementTransport.setExceptionListener(this);
-            this.management.setPrev(getPrev());
-            this.management.setNext(managementTransport);
-            managementTransport.setPrev(this.management);
-            this.managementAddress = new InetSocketAddress(this.managementURI.getHost(), this.managementURI.getPort());
-            this.management.init();
-        }
-    }
-
-    /**
-     * 
-     * @throws Exception
-     * @see org.apache.activeblaze.Service#shutDown()
-     */
-    public void doShutDown() throws Exception {
-        super.doShutDown();
-        if (this.broadcast != null) {
-            this.broadcast.shutDown();
-        }
-        if (this.management != null) {
-            this.management.shutDown();
-        }
-    }
-
-    /**
-     * 
-     * @throws Exception
-     * @see org.apache.activeblaze.Service#start()
-     */
-    public void doStart() throws Exception {
-        super.doStart();
-        if (this.broadcast != null) {
-            this.broadcast.start();
-        }
-        if (this.management != null) {
-            this.management.start();
-        }
-    }
-
-    /**
-     * 
-     * @throws Exception
-     * @see org.apache.activeblaze.Service#stop()
-     */
-    public void doStop() throws Exception {
-        super.doStop();
-        if (this.broadcast != null) {
-            this.broadcast.stop();
-        }
-        if (this.management != null) {
-            this.management.stop();
+        this.broadcastAddress = new InetSocketAddress(this.broadcastURI.getHost(),this.broadcastURI.getPort());
+        if (this.managementURI!=null) {
+            this.managementAddress = new InetSocketAddress(this.managementURI.getHost(),this.managementURI.getPort());
         }
     }
 
@@ -188,50 +66,33 @@
     }
 
     /**
-     * @param l
-     * @see org.apache.activeblaze.Processor#setExceptionListener(org.apache.activeblaze.ExceptionListener)
+     * @return
+     * @throws Exception
+     * @see org.apache.activeblaze.impl.network.BaseNetwork#createManagementTransport()
      */
-    public void setExceptionListener(ExceptionListener l) {
-        // TODO Auto-generated method stub
+    @Override
+    protected BaseTransport createManagementTransport() throws Exception {
+        BaseTransport managementTransport = TransportFactory.get(this.managementURI);
+        return managementTransport;
     }
 
     /**
-     * @param ex
-     * @see org.apache.activeblaze.ExceptionListener#onException(java.lang.Exception)
+     * @return
+     * @throws Exception
+     * @see org.apache.activeblaze.impl.network.BaseNetwork#createTransport()
      */
-    public void onException(Exception ex) {
-        if (this.exceptionListener != null) {
-            this.exceptionListener.onException(ex);
-        }
-    }
-
-    public void setNext(Processor next) {
-        super.setNext(next);
-        if (this.management != null) {
-            this.management.setNext(next);
-        }
-        if (this.broadcast != null) {
-            this.broadcast.setNext(next);
-        }
-    }
-
-    public void setPrev(Processor prev) {
-        super.setPrev(prev);
-        if (this.management != null) {
-            this.management.setPrev(prev);
-            ;
-        }
-        if (this.broadcast != null) {
-            this.broadcast.setPrev(prev);
-        }
+    @Override
+    protected BaseTransport createTransport() throws Exception {
+        BaseTransport transport = TransportFactory.get(this.broadcastURI);
+        return transport;
     }
 
-    public void setMaxPacketSize(int maxPacketSize) {
-        if (this.management != null) {
-            this.management.setMaxPacketSize(maxPacketSize);
-        }
-        if (this.broadcast != null) {
-            this.broadcast.setMaxPacketSize(maxPacketSize);
-        }
+    /**
+     * @return
+     * @see org.apache.activeblaze.impl.network.BaseNetwork#doCreateManagement()
+     */
+    @Override
+    protected boolean doCreateManagement() {
+        return this.managementURI != null && !this.managementURI.equals(this.broadcastURI);
     }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java?rev=741060&r1=741059&r2=741060&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/NetworkFactory.java Thu Feb  5 09:44:01 2009
@@ -32,12 +32,14 @@
     /**
      * @param broadcast
      * @param management
+     * @param reliability 
      * @return the network associated with the URI
      * @throws Exception
      */
     public static Network get(URI broadcast, URI management,String reliability) throws Exception {
         Network result = findNetwork(broadcast);
         result.setURI(broadcast);
+        result.setReliability(reliability);
         result.setManagementURI(management);
         configureNetwork(result, broadcast);
         configureNetwork(result, management);

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java?rev=741060&r1=741059&r2=741060&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java Thu Feb  5 09:44:01 2009
@@ -16,70 +16,35 @@
  */
 package org.apache.activeblaze.impl.network;
 
-import java.net.BindException;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
 import org.apache.activeblaze.BlazeException;
-import org.apache.activeblaze.ExceptionListener;
-import org.apache.activeblaze.Processor;
-import org.apache.activeblaze.impl.processor.ChainedProcessor;
-import org.apache.activeblaze.impl.processor.DefaultChainedProcessor;
 import org.apache.activeblaze.impl.processor.Packet;
-import org.apache.activeblaze.impl.reliable.ReliableFactory;
 import org.apache.activeblaze.impl.transport.BaseTransport;
 import org.apache.activeblaze.impl.transport.TransportFactory;
 import org.apache.activeblaze.util.URISupport;
 import org.apache.activeblaze.util.URISupport.CompositeData;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
 /**
  * Uses a list of URI's to create a network
  * 
  */
-public class StaticNetwork extends DefaultChainedProcessor implements Network, ExceptionListener {
-    private static final Log LOG = LogFactory.getLog(StaticNetwork.class);
+public class StaticNetwork extends BaseNetwork {
+    private InetSocketAddress localAddress;
+    private InetSocketAddress localManagementAddress;
     private List<URI> broadcastURIs = new ArrayList<URI>();
     private List<URI> managementURIs = new ArrayList<URI>();
     private List<InetSocketAddress> broadcastAddresses = new ArrayList<InetSocketAddress>();
     private List<InetSocketAddress> managementAddresses = new ArrayList<InetSocketAddress>();
-    private ChainedProcessor broadcast;
-    private ChainedProcessor management;
-    private String name = "";
     private boolean useFirstFreeAddress;
-    private String reliability = "swp";
-
-    /**
-     * @return the name
-     */
-    public String getName() {
-        return this.name;
-    }
-
-    /**
-     * @param name
-     *            the name to set
-     */
-    public void setName(String name) {
-        this.name = name;
-    }
 
     /**
-     * @return the reliability protocol used
-     * @see org.apache.activeblaze.impl.network.Network#getReliability()
+     * Constructor
      */
-    public String getReliability() {
-        return this.reliability;
-    }
-
-    /**
-     * @param reliability
-     * @see org.apache.activeblaze.impl.network.Network#setReliability(java.lang.String)
-     */
-    public void setReliability(String reliability) {
-        this.reliability = reliability;
+    public StaticNetwork() {
+        this.reliability = "swp";
     }
 
     /**
@@ -88,6 +53,7 @@
      * @see org.apache.activeblaze.impl.network.Network#setManagementURI(java.net.URI)
      */
     public void setManagementURI(URI location) throws Exception {
+        super.setManagementURI(location);
         if (location != null) {
             CompositeData compositeData = URISupport.parseComposite(location);
             if (compositeData.getComponents() != null) {
@@ -95,7 +61,8 @@
                     URI uri = compositeData.getComponents()[i];
                     if (!this.managementURIs.contains(uri)) {
                         this.managementURIs.add(uri);
-                        this.managementAddresses.add(new InetSocketAddress(uri.getHost(), uri.getPort()));
+                        this.managementAddresses.add(new InetSocketAddress(uri
+                                .getHost(), uri.getPort()));
                     }
                 }
             }
@@ -108,6 +75,7 @@
      * @see org.apache.activeblaze.impl.network.Network#setURI(java.net.URI)
      */
     public void setURI(URI location) throws Exception {
+        super.setURI(location);
         if (location != null) {
             CompositeData compositeData = URISupport.parseComposite(location);
             if (compositeData.getComponents() != null) {
@@ -115,7 +83,8 @@
                     URI uri = compositeData.getComponents()[i];
                     if (!this.broadcastURIs.contains(uri)) {
                         this.broadcastURIs.add(uri);
-                        this.broadcastAddresses.add(new InetSocketAddress(uri.getHost(), uri.getPort()));
+                        this.broadcastAddresses.add(new InetSocketAddress(uri
+                                .getHost(), uri.getPort()));
                     }
                 }
             }
@@ -127,74 +96,6 @@
      * @throws Exception
      * @see org.apache.activeblaze.Service#init()
      */
-    public void doInit() throws Exception {
-        super.doInit();
-        this.broadcast = ReliableFactory.get(getReliability());
-        BaseTransport transport = createTransport(this.broadcastURIs);
-        transport.setName(getName() + "-Broadcast");
-        transport.setExceptionListener(this);
-        this.broadcast.setPrev(getPrev());
-        this.broadcast.setNext(transport);
-        transport.setPrev(this.broadcast);
-        this.broadcast.init();
-        if (!this.managementURIs.isEmpty() && !this.managementURIs.equals(this.broadcastURIs)) {
-            this.management = ReliableFactory.get(getReliability());
-                BaseTransport managementTransport =createTransport(this.managementURIs);
-                managementTransport.setName(getName() + "-Management");
-                managementTransport.setExceptionListener(this);
-            this.management.setPrev(getPrev());
-            this.management.setNext(managementTransport);
-            managementTransport.setPrev(this.management);
-            this.management.init();
-        }
-        
-    }
-
-    /**
-     * 
-     * @throws Exception
-     * @see org.apache.activeblaze.Service#shutDown()
-     */
-    public void doShutDown() throws Exception {
-        super.doShutDown();
-        if (this.broadcast != null) {
-            this.broadcast.shutDown();
-        }
-        if (this.management != null) {
-            this.management.shutDown();
-        }
-    }
-
-    /**
-     * 
-     * @throws Exception
-     * @see org.apache.activeblaze.Service#start()
-     */
-    public void doStart() throws Exception {
-        super.doStart();
-        if (this.broadcast != null) {
-            this.broadcast.start();
-        }
-        if (this.management != null) {
-            this.management.start();
-        }
-    }
-
-    /**
-     * 
-     * @throws Exception
-     * @see org.apache.activeblaze.Service#stop()
-     */
-    public void doStop() throws Exception {
-        super.doStop();
-        if (this.broadcast != null) {
-            this.broadcast.stop();
-        }
-        if (this.management != null) {
-            this.management.stop();
-        }
-    }
-
     /**
      * @param packet
      * @throws Exception
@@ -204,7 +105,12 @@
         if (this.management != null) {
             for (InetSocketAddress address : this.managementAddresses) {
                 packet.setTo(address);
-                this.management.downStream(packet);
+                if (!address.equals(this.localManagementAddress)) {
+                    this.management.downStream(packet);
+                } else {
+                    // route to-self
+                    upStream(packet);
+                }
             }
         } else {
             downStream(packet);
@@ -219,65 +125,45 @@
     public void downStream(Packet packet) throws Exception {
         for (InetSocketAddress address : this.broadcastAddresses) {
             packet.setTo(address);
-            this.broadcast.downStream(packet);
-        }
-    }
-
-    /**
-     * @param l
-     * @see org.apache.activeblaze.Processor#setExceptionListener(org.apache.activeblaze.ExceptionListener)
-     */
-    public void setExceptionListener(ExceptionListener l) {
-        // TODO Auto-generated method stub
-    }
-
-    /**
-     * @param ex
-     * @see org.apache.activeblaze.ExceptionListener#onException(java.lang.Exception)
-     */
-    public void onException(Exception ex) {
-        if (this.exceptionListener != null) {
-            this.exceptionListener.onException(ex);
+            if (!address.equals(this.localAddress)) {
+                this.broadcast.downStream(packet);
+            } else {
+                // route to-self
+                upStream(packet);
+            }
         }
     }
 
-    public void setNext(Processor next) {
-        super.setNext(next);
-        if (this.management != null) {
-            this.management.setNext(next);
-        }
-        if (this.broadcast != null) {
-            this.broadcast.setNext(next);
-        }
+    protected boolean doCreateManagement() {
+        return !this.managementURIs.isEmpty()
+                && !this.managementURIs.equals(this.broadcastURIs);
     }
 
-    public void setPrev(Processor prev) {
-        super.setPrev(prev);
-        if (this.management != null) {
-            this.management.setPrev(prev);
-            ;
-        }
-        if (this.broadcast != null) {
-            this.broadcast.setPrev(prev);
+    protected BaseTransport createTransport() throws Exception {
+        BaseTransport result = createTransport(this.broadcastURIs);
+        if (result != null) {
+            this.localAddress = new InetSocketAddress(result.getLocalURI()
+                    .getHost(), result.getLocalURI().getPort());
         }
+        return result;
     }
 
-    public void setMaxPacketSize(int maxPacketSize) {
-        if (this.management != null) {
-            this.management.setMaxPacketSize(maxPacketSize);
-        }
-        if (this.broadcast != null) {
-            this.broadcast.setMaxPacketSize(maxPacketSize);
+    protected BaseTransport createManagementTransport() throws Exception {
+        BaseTransport result = createTransport(this.managementURIs);
+        if (result != null) {
+            this.localManagementAddress = new InetSocketAddress(result
+                    .getLocalURI().getHost(), result.getLocalURI().getPort());
         }
+        return result;
     }
 
     private BaseTransport createTransport(List<URI> uris) throws Exception {
         BaseTransport result = null;
-        for (URI uri : uris) {
+        for (final URI uri : uris) {
             try {
                 result = TransportFactory.get(uri);
                 result.init();
-                LOG.info("using local address: " + uri);
+                LOG.info(getName() + " using local address: " + uri);
                 break;
             } catch (BindException e) {
                 result = null;

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java?rev=741060&r1=741059&r2=741060&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java Thu Feb  5 09:44:01 2009
@@ -33,7 +33,6 @@
 import org.apache.activeblaze.wire.PacketData;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 /**
  * state on a request
  * 
@@ -73,16 +72,17 @@
         try {
             this.replayBuffer.addPacket(packet);
             if (LOG.isDebugEnabled()) {
-                LOG.debug(this + " Sending " + packet.getMessageSequence() + "  - buffer size = "
-                        + this.replayBuffer.size());
+                LOG.debug(this + " Sending " + packet + "  - buffer size = " + this.replayBuffer.size());
             }
             int windowSize = this.replayBuffer.getBufferSize();
             if (windowSize >= this.maxWindowSize) {
                 if (!this.full.await(this.rtt, TimeUnit.MILLISECONDS)) {
-                    throw new BlazeNoRouteException("No route to " + packet.getTo());
+                    this.replayBuffer.clear();
+                    throw new BlazeNoRouteException("No route to "+packet.getTo());
                 }
             }
         } catch (InterruptedException e) {
+            //ignore - we are shutting down
         } finally {
             this.lock.unlock();
         }
@@ -109,7 +109,7 @@
                 }
                 this.lastAckId = ackData.getId();
                 this.lastAckTime = System.currentTimeMillis();
-                if (this.replayBuffer.getBufferSize() < this.maxWindowSize) {
+                if (this.replayBuffer.getBufferSize() <= this.maxWindowSize) {
                     this.lock.lock();
                     try {
                         this.full.signalAll();

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java?rev=741060&r1=741059&r2=741060&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java Thu Feb  5 09:44:01 2009
@@ -30,18 +30,31 @@
  */
 public abstract class BaseTransport extends ThreadChainedProcessor {
     private static final Log LOG = LogFactory.getLog(BaseTransport.class);
+
     static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+
     private URI localURI;
+
     private Buffer bufferOfLocalURI;
+
     private int bufferSize = DEFAULT_BUFFER_SIZE;
+
     private int soTimeout = 2000;
+
     private int timeToLive = 1;
+
     private boolean loopBack = false;
+
     protected final PacketAudit audit = new PacketAudit();
+
     private boolean broadcast = true;
+
     private boolean enableAudit = false;
+
     private int maxDispatchQueueSize = 10000;
+
     private LinkedBlockingQueue<Packet> dispatchQueue;
+
     private Thread dispatchQueueThread;
 
     public void doInit() throws Exception {
@@ -50,7 +63,8 @@
         if (this.localURI != null) {
             this.bufferOfLocalURI = new Buffer(this.localURI.toString());
         }
-        this.dispatchQueue = new LinkedBlockingQueue<Packet>(getMaxDispatchQueueSize());
+        this.dispatchQueue = new LinkedBlockingQueue<Packet>(
+                getMaxDispatchQueueSize());
     }
 
     public void doShutDown() throws Exception {
@@ -68,7 +82,8 @@
                 }
             }
         };
-        this.dispatchQueueThread = new Thread(runable, getLocalURI() + "-DispatchQueue");
+        this.dispatchQueueThread = new Thread(runable, getLocalURI()
+                + "-DispatchQueue");
         this.dispatchQueueThread.setDaemon(true);
         this.dispatchQueueThread.start();
     }
@@ -208,7 +223,8 @@
     }
 
     public String toString() {
-        return this.localURI != null ? this.localURI.toString() : " Uninitialized Transport";
+        return this.localURI != null ? this.localURI.toString()
+                : " Uninitialized Transport";
     }
 
     /**



Mime
View raw message