activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r752825 [1/2] - in /activemq/activemq-blaze/trunk/src: main/java/org/apache/activeblaze/ main/java/org/apache/activeblaze/cluster/ main/java/org/apache/activeblaze/group/ main/java/org/apache/activeblaze/impl/reliable/swp/ main/java/org/apa...
Date Thu, 12 Mar 2009 10:12:44 GMT
Author: rajdavies
Date: Thu Mar 12 10:12:30 2009
New Revision: 752825

URL: http://svn.apache.org/viewvc?rev=752825&view=rev
Log:
performance improvements for JMS

Added:
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageProcessor.java
Modified:
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessageTransformation.java
    activemq/activemq-blaze/trunk/src/main/proto/blaze.proto
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/jms/perf/SimpleTopicTest.java

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java?rev=752825&r1=752824&r2=752825&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java Thu Mar 12 10:12:30 2009
@@ -30,6 +30,7 @@
 import org.apache.activeblaze.impl.reliable.ReliableFactory;
 import org.apache.activeblaze.util.IdGenerator;
 import org.apache.activeblaze.util.PropertyUtil;
+import org.apache.activeblaze.wire.DestinationData;
 import org.apache.activeblaze.wire.MessageType;
 import org.apache.activeblaze.wire.PacketData;
 import org.apache.activeblaze.wire.BlazeData.BlazeDataBuffer;
@@ -50,6 +51,7 @@
     protected Buffer producerId;
     protected Processor broadcast;
     protected BlazeConfiguration configuration = new BlazeConfiguration();
+    protected BlazeMessageProcessor blazeMessageProcessor;
     private String id;
 
     /**
@@ -188,13 +190,15 @@
         BlazeDataBuffer blazeData = msg.getContent().freeze();
         PacketDataBean packetData = getPacketData(MessageType.BLAZE_DATA, blazeData);
         packetData.setReliable(true);
+        packetData.setDestinationData(destination.getData());
+        packetData.setPayloadType(msg.getType());
         Packet packet = new Packet(packetData.freeze());
         this.broadcast.downStream(packet);
     }
 
     protected final synchronized PacketDataBean getPacketData(MessageType type, MessageBuffer message) {
         PacketDataBean packetData = new PacketDataBean();
-        packetData.setType(type);
+        packetData.setMessageType(type);
         packetData.setProducerId(this.producerId);
         packetData.setPayload(message.toUnframedBuffer());
         packetData.setMessageId(new Buffer(this.idGenerator.generateId()));
@@ -207,7 +211,7 @@
     }
 
     protected void processData(String id, Buffer correlationId, PacketDataBuffer data) throws Exception {
-        MessageType type = data.getType();
+        MessageType type = data.getMessageType();
         if (type == MessageType.BLAZE_DATA) {
             doProcessBlazeData(data);
         }
@@ -225,6 +229,21 @@
     public void setConfiguration(BlazeConfiguration configuration) {
         this.configuration = configuration;
     }
+    
+
+    /**
+     * @return the blazeMessageProcessor
+     */
+    public BlazeMessageProcessor getBlazeMessageProcessor(){
+        return this.blazeMessageProcessor;
+    }
+
+    /**
+     * @param blazeMessageProcessor the blazeMessageProcessor to set
+     */
+    public void setBlazeMessageProcessor(BlazeMessageProcessor blazeMessageProcessor){
+        this.blazeMessageProcessor = blazeMessageProcessor;
+    }
 
     /**
      * @param ex
@@ -235,45 +254,53 @@
     }
 
     protected void doProcessBlazeData(PacketData data) throws Exception {
-        BlazeMessage message = buildBlazeMessage(data);
-        dispatch(message);
+        dispatch(data);
     }
 
     protected final BlazeMessage buildBlazeMessage(PacketData data) throws Exception {
-        BlazeMessage message = null;
+        BlazeMessage result = null;
+        if (this.blazeMessageProcessor != null) {
+            result = this.blazeMessageProcessor.processBlazeMessage(data);
+        }else {
+            
         if (data != null) {
+            DestinationData destination = data.getDestinationData();
             Buffer payload = data.getPayload();
             BlazeDataBuffer blazeData = BlazeDataBuffer.parseUnframed(payload);
             String fromId = null;
             if (data.hasProducerId()) {
                 fromId = data.getProducerId().toStringUtf8();
             }
-            message = createMessage(fromId);
-            if( blazeData.hasDestinationData() ) {
-                message.setDestination(blazeData.getDestinationData());
-            }
-            message.setFromId(fromId);
+            result = createMessage(fromId);
+            result.setDestination(destination);
+            result.setFromId(fromId);
             if (data.hasMessageId()) {
-                message.setMessageId(data.getMessageId().toStringUtf8());
+                result.setMessageId(data.getMessageId().toStringUtf8());
             }
             if (data.hasCorrelationId()) {
-                message.setCorrelationId(data.getCorrelationId().toStringUtf8());
+                result.setCorrelationId(data.getCorrelationId().toStringUtf8());
             }
-            message.setTimeStamp(blazeData.getTimestamp());
-            message.setContent(blazeData);
+            result.setTimeStamp(blazeData.getTimestamp());
+            result.setType(data.getPayloadType());
+            result.setContent(blazeData);
         }
-        return message;
+        }
+        return result;
     }
 
     protected BlazeMessage createMessage(String fromId) {
         return new BlazeMessage();
     }
 
-    protected final void dispatch(BlazeMessage message) {
-        if (message != null) {
-            Buffer destination = message.getContent().getDestinationData().getName();
+    protected final void dispatch(PacketData data) throws Exception {
+        if (data != null) {
+            BlazeMessage message = null;
+            Buffer destination = data.getDestinationData().getName();
             for (SubscriptionHolder entry : this.topicMessageListeners) {
                 if (entry.getSubscription().matches(destination)) {
+                    if (message == null) {
+                        message = buildBlazeMessage(data);
+                    }
                     entry.getListener().onMessage(message);
                 }
             }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java?rev=752825&r1=752824&r2=752825&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java Thu Mar 12 10:12:30 2009
@@ -89,7 +89,7 @@
  * <code>NullPointerException</code>.
  * 
  */
-public class BlazeMessage implements Map<String, Object> {
+public class BlazeMessage implements Map<String, Object>{
     private static final String DEFAULT_TEXT_PAYLOAD = "DEFAULT_TEXT_PAYLOAD";
     private static final String DEFAULT_BYTES_PAYLOAD = "DEFAULT_BYTES_PAYLOAD";
     private static final String DEFAULT_OBJECT_PAYLOAD = "DEFAULT_OBJECT_PAYLOAD";
@@ -107,13 +107,13 @@
     private transient boolean persistent;
     private transient int type;
     private BlazeData content;
-
+    
     /**
      * Default Constructor
      */
     public BlazeMessage() {
     }
-
+    
     /**
      * Constructor - Utility to construct a message with a text <Code>String</Code> payload
      * 
@@ -122,7 +122,7 @@
     public BlazeMessage(String text) {
         setStringValue(DEFAULT_TEXT_PAYLOAD, text);
     }
-
+    
     /**
      * Constructor - Utility to construct a message with a byte[] array payload
      * 
@@ -131,7 +131,7 @@
     public BlazeMessage(byte[] data) {
         setBytesValue(DEFAULT_BYTES_PAYLOAD, data);
     }
-
+    
     /**
      * Constructor - Utility to construct a message with an object payload
      * 
@@ -140,275 +140,269 @@
     public BlazeMessage(Object data) {
         setObject(data);
     }
-
+    
     /**
      * Utility method for setting a default <Code>String</Code> payload
      * 
      * @param text
      */
-    public void setText(String text) {
+    public void setText(String text){
         setStringValue(DEFAULT_TEXT_PAYLOAD, text);
     }
-
+    
     /**
      * Utility method used for when a BlazeMessage is only carrying a byte[] array
      * 
      * @return text the default text
      * @throws Exception
      */
-    public String getText() throws Exception {
+    public String getText() throws Exception{
         return getStringValue(DEFAULT_TEXT_PAYLOAD);
     }
-
+    
     /**
      * Utility method for setting a default <Code>byte[]</Code> payload
      * 
      * @param payload
      */
-    public void setBytes(byte[] payload) {
+    public void setBytes(byte[] payload){
         setBytesValue(DEFAULT_BYTES_PAYLOAD, payload);
     }
-
+    
     /**
      * Utility method used for when a BlazeMessage is only carrying an Object
      * 
      * @return text the default text
      * @throws Exception
      */
-    public Object getObject() throws Exception {
+    public Object getObject() throws Exception{
         Buffer buffer = getBufferValue(DEFAULT_OBJECT_PAYLOAD);
         return IOUtils.getObject(buffer);
     }
-
+    
     /**
      * Utility method for setting a default <Code>Object</Code> payload
      * 
      * @param payload
      */
-    public void setObject(Object payload) {
+    public void setObject(Object payload){
         try {
             put(DEFAULT_OBJECT_PAYLOAD, IOUtils.getBuffer(payload));
         } catch (Exception e) {
             throw new BlazeRuntimeException(e);
         }
     }
-
+    
     /**
      * Utility method used for when a BlazeMessage is only carrying a String
      * 
      * @return text the default text
      * @throws Exception
      */
-    public byte[] getBytes() throws Exception {
+    public byte[] getBytes() throws Exception{
         return getBytesValue(DEFAULT_BYTES_PAYLOAD);
     }
-
+    
     /**
      * @return the destination
      */
-    public Destination getDestination() {
+    public Destination getDestination(){
         initializeReading();
         return this.destination;
     }
-
+    
     /**
-     * @param destination
-     *            the destination to set
+     * @param destination the destination to set
      */
-    public void setDestination(Destination destination) {
+    public void setDestination(Destination destination){
         this.destination = destination;
     }
-
+    
     /**
      * @param destination
      */
-    public void setDestination(DestinationData destination) {
-        this.destination = new Destination(destination);
+    public void setDestination(DestinationData destinationData){
+        if (destinationData != null) {
+            this.destination = new Destination(destinationData);
+        }
     }
-
+    
     /**
      * The id of the channel that sent the message
      * 
      * @return the fromId
      */
-    public String getFromId() {
+    public String getFromId(){
         initializeReading();
         return this.fromId;
     }
-
+    
     /**
-     * @param fromId
-     *            the fromId to set
+     * @param fromId the fromId to set
      */
-    public void setFromId(String fromId) {
+    public void setFromId(String fromId){
         this.fromId = fromId;
     }
-
+    
     /**
      * @return the messageId
      */
-    public String getMessageId() {
+    public String getMessageId(){
         initializeReading();
         return this.messageId;
     }
-
+    
     /**
-     * @param messageId
-     *            the messageId to set
+     * @param messageId the messageId to set
      */
-    public void setMessageId(String messageId) {
+    public void setMessageId(String messageId){
         this.messageId = messageId;
     }
-
+    
     /**
      * @return the correlationId
      */
-    public String getCorrelationId() {
+    public String getCorrelationId(){
         initializeReading();
         return this.correlationId;
     }
-
+    
     /**
-     * @param correlationId
-     *            the correlationId to set
+     * @param correlationId the correlationId to set
      */
-    public void setCorrelationId(String correlationId) {
+    public void setCorrelationId(String correlationId){
         this.correlationId = correlationId;
     }
-
+    
     /**
      * @return the timeStamp
      */
-    public long getTimeStamp() {
+    public long getTimeStamp(){
         initializeReading();
         return this.timeStamp;
     }
-
+    
     /**
-     * @param timeStamp
-     *            the timeStamp to set
+     * @param timeStamp the timeStamp to set
      */
-    public void setTimeStamp(long timeStamp) {
+    public void setTimeStamp(long timeStamp){
         this.timeStamp = timeStamp;
     }
-
+    
     /**
      * @return the replyTo
      */
-    public Destination getReplyTo() {
+    public Destination getReplyTo(){
         initializeReading();
         return this.replyTo;
     }
-
+    
     /**
-     * @param replyTo
-     *            the replyTo to set
+     * @param replyTo the replyTo to set
      */
-    public void setReplyTo(Destination replyTo) {
+    public void setReplyTo(Destination replyTo){
         this.replyTo = replyTo;
     }
-
+    
     /**
-     * @param replyTo
-     *            the replyTo to set
+     * @param replyTo the replyTo to set
      */
-    public void setReplyTo(DestinationData replyTo) {
+    public void setReplyTo(DestinationData replyTo){
         this.replyTo = new Destination(replyTo);
     }
-
+    
     /**
      * @return the expiration
      */
-    public long getExpiration() {
+    public long getExpiration(){
         initializeReading();
         return this.expiration;
     }
-
+    
     /**
-     * @param expiration
-     *            the expiration to set
+     * @param expiration the expiration to set
      */
-    public void setExpiration(long expiration) {
+    public void setExpiration(long expiration){
         this.expiration = expiration;
     }
-
+    
     /**
      * @return the redeliveryCounter
      */
-    public int getRedeliveryCounter() {
+    public int getRedeliveryCounter(){
         initializeReading();
         return this.redeliveryCounter;
     }
-
+    
     /**
-     * @param redeliveryCounter
-     *            the redeliveryCounter to set
+     * @param redeliveryCounter the redeliveryCounter to set
      */
-    public void setRedeliveryCounter(int redeliveryCounter) {
+    public void setRedeliveryCounter(int redeliveryCounter){
         this.redeliveryCounter = redeliveryCounter;
     }
-
+    
     /**
      * @return the priority
      */
-    public int getPriority() {
+    public int getPriority(){
         initializeReading();
         return this.priority;
     }
-
+    
     /**
-     * @param priority
-     *            the priority to set
+     * @param priority the priority to set
      */
-    public void setPriority(int priority) {
+    public void setPriority(int priority){
         this.priority = priority;
     }
-
+    
     /**
      * @return the persistent
      */
-    public boolean isPersistent() {
+    public boolean isPersistent(){
         initializeReading();
         return this.persistent;
     }
-
+    
     /**
-     * @param persistent
-     *            the persistent to set
+     * @param persistent the persistent to set
      */
-    public void setPersistent(boolean persistent) {
+    public void setPersistent(boolean persistent){
         this.persistent = persistent;
     }
-
+    
     /**
      * @return the type
      */
-    public String getMessageType() {
+    public String getMessageType(){
         initializeReading();
         return this.messageType;
     }
-
+    
     /**
-     * @param type
-     *            the type to set
+     * @param type the type to set
      */
-    public void setMessageType(String type) {
+    public void setMessageType(String type){
         this.messageType = type;
     }
     
     /**
      * Get the type
+     * 
      * @return the type
      */
-    public int getType() {
-        initializeReading();
+    public int getType(){    
         return this.type;
     }
-
+    
+    public void setType(int type){       
+        this.type = type;
+    }
+    
     /**
      * @return a copy of this message
      */
-    public BlazeMessage clone() {
+    public BlazeMessage clone(){
         BlazeMessage copy = new BlazeMessage();
         try {
             copy(copy);
@@ -417,24 +411,22 @@
         }
         return copy;
     }
-
+    
     /**
      * clear the contents of this message
      */
-    public void clear() {
+    public void clear(){
         this.map.clear();
     }
-
+    
     /**
      * Returns the <CODE>boolean</CODE> value with the specified name.
      * 
-     * @param name
-     *            the name of the <CODE>boolean</CODE>
+     * @param name the name of the <CODE>boolean</CODE>
      * @return the <CODE>boolean</CODE> value with the specified name
-     * @throws BlazeMessageFormatException
-     *             if this type conversion is invalid.
+     * @throws BlazeMessageFormatException if this type conversion is invalid.
      */
-    public boolean getBooleanValue(String name) throws BlazeMessageFormatException {
+    public boolean getBooleanValue(String name) throws BlazeMessageFormatException{
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -448,17 +440,15 @@
         }
         throw new BlazeMessageFormatException(" cannot read a boolean from " + value.getClass().getName());
     }
-
+    
     /**
      * Returns the <CODE>byte</CODE> value with the specified name.
      * 
-     * @param name
-     *            the name of the <CODE>byte</CODE>
+     * @param name the name of the <CODE>byte</CODE>
      * @return the <CODE>byte</CODE> value with the specified name
-     * @throws BlazeMessageFormatException
-     *             if this type conversion is invalid.
+     * @throws BlazeMessageFormatException if this type conversion is invalid.
      */
-    public byte getByteValue(String name) throws BlazeMessageFormatException {
+    public byte getByteValue(String name) throws BlazeMessageFormatException{
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -472,17 +462,15 @@
         }
         throw new BlazeMessageFormatException(" cannot read a byte from " + value.getClass().getName());
     }
-
+    
     /**
      * Returns the <CODE>short</CODE> value with the specified name.
      * 
-     * @param name
-     *            the name of the <CODE>short</CODE>
+     * @param name the name of the <CODE>short</CODE>
      * @return the <CODE>short</CODE> value with the specified name
-     * @throws BlazeMessageFormatException
-     *             if this type conversion is invalid.
+     * @throws BlazeMessageFormatException if this type conversion is invalid.
      */
-    public short getShortValue(String name) throws BlazeMessageFormatException {
+    public short getShortValue(String name) throws BlazeMessageFormatException{
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -499,17 +487,15 @@
         }
         throw new BlazeMessageFormatException(" cannot read a short from " + value.getClass().getName());
     }
-
+    
     /**
      * Returns the Unicode character value with the specified name.
      * 
-     * @param name
-     *            the name of the Unicode character
+     * @param name the name of the Unicode character
      * @return the Unicode character value with the specified name
-     * @throws BlazeMessageFormatException
-     *             if this type conversion is invalid.
+     * @throws BlazeMessageFormatException if this type conversion is invalid.
      */
-    public char getCharValue(String name) throws BlazeMessageFormatException {
+    public char getCharValue(String name) throws BlazeMessageFormatException{
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -520,17 +506,15 @@
         }
         throw new BlazeMessageFormatException(" cannot read a short from " + value.getClass().getName());
     }
-
+    
     /**
      * Returns the <CODE>int</CODE> value with the specified name.
      * 
-     * @param name
-     *            the name of the <CODE>int</CODE>
+     * @param name the name of the <CODE>int</CODE>
      * @return the <CODE>int</CODE> value with the specified name
-     * @throws BlazeMessageFormatException
-     *             if this type conversion is invalid.
+     * @throws BlazeMessageFormatException if this type conversion is invalid.
      */
-    public int getIntValue(String name) throws BlazeMessageFormatException {
+    public int getIntValue(String name) throws BlazeMessageFormatException{
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -550,17 +534,15 @@
         }
         throw new BlazeMessageFormatException(" cannot read an int from " + value.getClass().getName());
     }
-
+    
     /**
      * Returns the <CODE>long</CODE> value with the specified name.
      * 
-     * @param name
-     *            the name of the <CODE>long</CODE>
+     * @param name the name of the <CODE>long</CODE>
      * @return the <CODE>long</CODE> value with the specified name
-     * @throws BlazeMessageFormatException
-     *             if this type conversion is invalid.
+     * @throws BlazeMessageFormatException if this type conversion is invalid.
      */
-    public long getLongValue(String name) throws BlazeMessageFormatException {
+    public long getLongValue(String name) throws BlazeMessageFormatException{
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -583,17 +565,15 @@
         }
         throw new BlazeMessageFormatException(" cannot read a long from " + value.getClass().getName());
     }
-
+    
     /**
      * Returns the <CODE>float</CODE> value with the specified name.
      * 
-     * @param name
-     *            the name of the <CODE>float</CODE>
+     * @param name the name of the <CODE>float</CODE>
      * @return the <CODE>float</CODE> value with the specified name
-     * @throws BlazeMessageFormatException
-     *             if this type conversion is invalid.
+     * @throws BlazeMessageFormatException if this type conversion is invalid.
      */
-    public float getFloatValue(String name) throws BlazeMessageFormatException {
+    public float getFloatValue(String name) throws BlazeMessageFormatException{
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -607,17 +587,15 @@
         }
         throw new BlazeMessageFormatException(" cannot read a float from " + value.getClass().getName());
     }
-
+    
     /**
      * Returns the <CODE>double</CODE> value with the specified name.
      * 
-     * @param name
-     *            the name of the <CODE>double</CODE>
+     * @param name the name of the <CODE>double</CODE>
      * @return the <CODE>double</CODE> value with the specified name
-     * @throws BlazeMessageFormatException
-     *             if this type conversion is invalid.
+     * @throws BlazeMessageFormatException if this type conversion is invalid.
      */
-    public double getDoubleValue(String name) throws BlazeMessageFormatException {
+    public double getDoubleValue(String name) throws BlazeMessageFormatException{
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -634,18 +612,16 @@
         }
         throw new BlazeMessageFormatException(" cannot read a double from " + value.getClass().getName());
     }
-
+    
     /**
      * Returns the <CODE>String</CODE> value with the specified name.
      * 
-     * @param name
-     *            the name of the <CODE>String</CODE>
+     * @param name the name of the <CODE>String</CODE>
      * @return the <CODE>String</CODE> value with the specified name; if there is no item by this name, a null value
      *         is returned
-     * @throws BlazeMessageFormatException
-     *             if this type conversion is invalid.
+     * @throws BlazeMessageFormatException if this type conversion is invalid.
      */
-    public String getStringValue(String name) throws BlazeMessageFormatException {
+    public String getStringValue(String name) throws BlazeMessageFormatException{
         initializeReading();
         Object value = this.map.get(name);
         if (value == null) {
@@ -656,17 +632,15 @@
         }
         return value.toString();
     }
-
+    
     /**
      * Returns the byte array value with the specified name.
      * 
-     * @param name
-     *            the name of the byte array
+     * @param name the name of the byte array
      * @return the byte array value with the specified name; if there is no item by this name, a null value is returned.
-     * @throws BlazeMessageFormatException
-     *             if this type conversion is invalid.
+     * @throws BlazeMessageFormatException if this type conversion is invalid.
      */
-    public byte[] getBytesValue(String name) throws BlazeMessageFormatException {
+    public byte[] getBytesValue(String name) throws BlazeMessageFormatException{
         initializeReading();
         Object value = this.map.get(name);
         if (value instanceof byte[]) {
@@ -674,17 +648,15 @@
         }
         throw new BlazeMessageFormatException(" cannot read a byte[] from " + value.getClass().getName());
     }
-
+    
     /**
      * Returns a Buffer with the specified name.
      * 
-     * @param name
-     *            the name of the byte array
+     * @param name the name of the byte array
      * @return the byte array value with the specified name; if there is no item by this name, a null value is returned.
-     * @throws BlazeMessageFormatException
-     *             if this type conversion is invalid.
+     * @throws BlazeMessageFormatException if this type conversion is invalid.
      */
-    public Buffer getBufferValue(String name) throws BlazeMessageFormatException {
+    public Buffer getBufferValue(String name) throws BlazeMessageFormatException{
         initializeReading();
         Object value = this.map.get(name);
         if (value instanceof Buffer) {
@@ -692,7 +664,7 @@
         }
         throw new BlazeMessageFormatException(" cannot read a Buffer from " + value.getClass().getName());
     }
-
+    
     /**
      * Returns the value of the object with the specified name.
      * <P>
@@ -702,36 +674,34 @@
      * <P>
      * Note that byte values are returned as <CODE>byte[]</CODE>, not <CODE>Byte[]</CODE>.
      * 
-     * @param name
-     *            the name of the Java object
+     * @param name the name of the Java object
      * @return a copy of the Java object value with the specified name, in objectified format (for example, if the
      *         object was set as an <CODE>int</CODE>, an <CODE>Integer</CODE> is returned); if there is no item by
      *         this name, a null value is returned
      */
-    public Object getObjectValue(String name) {
+    public Object getObjectValue(String name){
         initializeReading();
         return this.map.get(name);
     }
-
+    
     /**
      * Returns an <CODE>Enumeration</CODE> of all the names in the <CODE>BlazeMessage</CODE> object.
      * 
      * @return an enumeration of all the names in this <CODE>BlazeMessage</CODE>
      */
-    public Enumeration<String> getNames() {
+    public Enumeration<String> getNames(){
         initializeReading();
         return Collections.enumeration(this.map.keySet());
     }
-
+    
     /**
      * put a key,value pair into the message
      * 
      * @param name
-     * @param value
-     *            must be a supported primitive, or map of supported primitives
+     * @param value must be a supported primitive, or map of supported primitives
      * @return the previous value associated with the key
      */
-    public Object put(String name, Object value) {
+    public Object put(String name,Object value){
         initializeWriting();
         if (name == null) {
             throw new IllegalArgumentException("The name of the property cannot be null.");
@@ -742,136 +712,115 @@
         checkValidObject(value);
         return this.map.put(name, value);
     }
-
+    
     /**
      * Sets a <CODE>boolean</CODE> value with the specified name into the Map.
      * 
-     * @param name
-     *            the name of the <CODE>boolean</CODE>
-     * @param value
-     *            the <CODE>boolean</CODE> value to set in the Map
+     * @param name the name of the <CODE>boolean</CODE>
+     * @param value the <CODE>boolean</CODE> value to set in the Map
      */
-    public void setBooleanValue(String name, boolean value) {
+    public void setBooleanValue(String name,boolean value){
         initializeWriting();
         put(name, value ? Boolean.TRUE : Boolean.FALSE);
     }
-
+    
     /**
      * Sets a <CODE>byte</CODE> value with the specified name into the Map.
      * 
-     * @param name
-     *            the name of the <CODE>byte</CODE>
-     * @param value
-     *            the <CODE>byte</CODE> value to set in the Map
+     * @param name the name of the <CODE>byte</CODE>
+     * @param value the <CODE>byte</CODE> value to set in the Map
      */
-    public void setByteValue(String name, byte value) {
+    public void setByteValue(String name,byte value){
         initializeWriting();
         put(name, Byte.valueOf(value));
     }
-
+    
     /**
      * Sets a <CODE>short</CODE> value with the specified name into the Map.
      * 
-     * @param name
-     *            the name of the <CODE>short</CODE>
-     * @param value
-     *            the <CODE>short</CODE> value to set in the Map
+     * @param name the name of the <CODE>short</CODE>
+     * @param value the <CODE>short</CODE> value to set in the Map
      */
-    public void setShortValue(String name, short value) {
+    public void setShortValue(String name,short value){
         initializeWriting();
         put(name, Short.valueOf(value));
     }
-
+    
     /**
      * Sets a Unicode character value with the specified name into the Map.
      * 
-     * @param name
-     *            the name of the Unicode character
-     * @param value
-     *            the Unicode character value to set in the Map
+     * @param name the name of the Unicode character
+     * @param value the Unicode character value to set in the Map
      */
-    public void setCharValue(String name, char value) {
+    public void setCharValue(String name,char value){
         initializeWriting();
         put(name, Character.valueOf(value));
     }
-
+    
     /**
      * Sets an <CODE>int</CODE> value with the specified name into the Map.
      * 
-     * @param name
-     *            the name of the <CODE>int</CODE>
-     * @param value
-     *            the <CODE>int</CODE> value to set in the Map
+     * @param name the name of the <CODE>int</CODE>
+     * @param value the <CODE>int</CODE> value to set in the Map
      */
-    public void setIntValue(String name, int value) {
+    public void setIntValue(String name,int value){
         initializeWriting();
         put(name, Integer.valueOf(value));
     }
-
+    
     /**
      * Sets a <CODE>long</CODE> value with the specified name into the Map.
      * 
-     * @param name
-     *            the name of the <CODE>long</CODE>
-     * @param value
-     *            the <CODE>long</CODE> value to set in the Map
+     * @param name the name of the <CODE>long</CODE>
+     * @param value the <CODE>long</CODE> value to set in the Map
      */
-    public void setLongValue(String name, long value) {
+    public void setLongValue(String name,long value){
         initializeWriting();
         put(name, Long.valueOf(value));
     }
-
+    
     /**
      * Sets a <CODE>float</CODE> value with the specified name into the Map.
      * 
-     * @param name
-     *            the name of the <CODE>float</CODE>
-     * @param value
-     *            the <CODE>float</CODE> value to set in the Map
+     * @param name the name of the <CODE>float</CODE>
+     * @param value the <CODE>float</CODE> value to set in the Map
      */
-    public void setFloatValue(String name, float value) {
+    public void setFloatValue(String name,float value){
         initializeWriting();
         put(name, new Float(value));
     }
-
+    
     /**
      * Sets a <CODE>double</CODE> value with the specified name into the Map.
      * 
-     * @param name
-     *            the name of the <CODE>double</CODE>
-     * @param value
-     *            the <CODE>double</CODE> value to set in the Map
+     * @param name the name of the <CODE>double</CODE>
+     * @param value the <CODE>double</CODE> value to set in the Map
      */
-    public void setDoubleValue(String name, double value) {
+    public void setDoubleValue(String name,double value){
         initializeWriting();
         put(name, new Double(value));
     }
-
+    
     /**
      * Sets a <CODE>String</CODE> value with the specified name into the Map.
      * 
-     * @param name
-     *            the name of the <CODE>String</CODE>
-     * @param value
-     *            the <CODE>String</CODE> value to set in the Map
+     * @param name the name of the <CODE>String</CODE>
+     * @param value the <CODE>String</CODE> value to set in the Map
      */
-    public void setStringValue(String name, String value) {
+    public void setStringValue(String name,String value){
         initializeWriting();
         put(name, value);
     }
-
+    
     /**
      * Sets a byte array value with the specified name into the Map.
      * 
-     * @param name
-     *            the name of the byte array
-     * @param value
-     *            the byte array value to set in the Map; the array is copied so that the value for <CODE>name </CODE>
+     * @param name the name of the byte array
+     * @param value the byte array value to set in the Map; the array is copied so that the value for <CODE>name </CODE>
      *            will not be altered by future modifications
-     * @throws NullPointerException
-     *             if the name is null, or if the name is an empty string.
+     * @throws NullPointerException if the name is null, or if the name is an empty string.
      */
-    public void setBytesValue(String name, byte[] value) {
+    public void setBytesValue(String name,byte[] value){
         initializeWriting();
         if (value != null) {
             put(name, value);
@@ -879,18 +828,15 @@
             this.map.remove(name);
         }
     }
-
+    
     /**
      * Sets a Buffer value with the specified name into the Map.
      * 
-     * @param name
-     *            the name of the byte array
-     * @param value
-     *            the Buffer value to set in the Map
-     * @throws NullPointerException
-     *             if the name is null, or if the name is an empty string.
+     * @param name the name of the byte array
+     * @param value the Buffer value to set in the Map
+     * @throws NullPointerException if the name is null, or if the name is an empty string.
      */
-    public void setBufferValue(String name, Buffer value) {
+    public void setBufferValue(String name,Buffer value){
         initializeWriting();
         if (value != null) {
             put(name, value);
@@ -898,26 +844,22 @@
             this.map.remove(name);
         }
     }
-
+    
     /**
      * Sets a portion of the byte array value with the specified name into the Map.
      * 
-     * @param name
-     *            the name of the byte array
-     * @param value
-     *            the byte array value to set in the Map
-     * @param offset
-     *            the initial offset within the byte array
-     * @param length
-     *            the number of bytes to use
+     * @param name the name of the byte array
+     * @param value the byte array value to set in the Map
+     * @param offset the initial offset within the byte array
+     * @param length the number of bytes to use
      */
-    public void setBytesValue(String name, byte[] value, int offset, int length) {
+    public void setBytesValue(String name,byte[] value,int offset,int length){
         initializeWriting();
         byte[] data = new byte[length];
         System.arraycopy(value, offset, data, 0, length);
         put(name, data);
     }
-
+    
     /**
      * Find out if the message contains a key This isn't recursive
      * 
@@ -925,11 +867,11 @@
      * @return true if the message contains the key
      * 
      */
-    public boolean containsKey(Object key) {
+    public boolean containsKey(Object key){
         initializeReading();
         return this.map.containsKey(key.toString());
     }
-
+    
     /**
      * Find out if the message contains a value
      * 
@@ -937,61 +879,60 @@
      * @return true if the value exists
      * 
      */
-    public boolean containsValue(Object value) {
+    public boolean containsValue(Object value){
         initializeReading();
         return this.map.containsValue(value);
     }
-
+    
     /**
      * @return a set of Map.Entry values
      * 
      */
-    public Set<java.util.Map.Entry<String, Object>> entrySet() {
+    public Set<java.util.Map.Entry<String, Object>> entrySet(){
         initializeReading();
         return this.map.entrySet();
     }
-
+    
     /**
      * Retrieve the object associated with the key
      * 
      * @param key
      * @return the object
      */
-    public Object get(Object key) {
+    public Object get(Object key){
         initializeReading();
         return getObjectValue(key.toString());
     }
-
+    
     /**
      * @return true if the message is empty
      * 
      */
-    public boolean isEmpty() {
+    public boolean isEmpty(){
         initializeReading();
         return this.map.isEmpty();
     }
-
+    
     /**
      * @return a Set of all the keys
      */
-    public Set<String> keySet() {
+    public Set<String> keySet(){
         initializeReading();
         return this.map.keySet();
     }
-
+    
     /**
      * Add all entries in a Map to the message
      * 
-     * @param t
-     *            the map
+     * @param t the map
      * 
      */
-    public void putAll(Map<? extends String, ? extends Object> t) {
+    public void putAll(Map<? extends String, ? extends Object> t){
         for (Map.Entry<? extends String, ? extends Object> entry : t.entrySet()) {
             put(entry.getKey(), entry.getValue());
         }
     }
-
+    
     /**
      * Remove a key/value pair from the message
      * 
@@ -999,45 +940,46 @@
      * @return the value removed or null
      * 
      */
-    public Object remove(Object key) {
+    public Object remove(Object key){
         setContent(null);
         return this.map.remove(key.toString());
     }
-
+    
     /**
      * @return the number of entries in the message
      */
-    public int size() {
+    public int size(){
         initializeReading();
         return this.map.size();
     }
-
+    
     /**
      * @return a Collection of the values in the message
      */
-    public Collection<Object> values() {
+    public Collection<Object> values(){
         initializeReading();
         return this.map.values();
     }
     
     /**
      * check if a named value exists in the message
+     * 
      * @param name
      * @return true if value exits
      */
-    public boolean valueExists(String name) {
+    public boolean valueExists(String name){
         return this.map.containsKey(name);
     }
-
-    protected void initializeReading() {
+    
+    protected void initializeReading(){
         loadContent();
     }
-
-    protected void initializeWriting() {
+    
+    protected void initializeWriting(){
         setContent(null);
     }
-
-    protected void checkValidObject(Object value) throws IllegalArgumentException {
+    
+    protected void checkValidObject(Object value) throws IllegalArgumentException{
         boolean valid = value instanceof Boolean || value instanceof Byte || value instanceof Short
                 || value instanceof Integer || value instanceof Long;
         valid = valid || value instanceof Float || value instanceof Double || value instanceof Character
@@ -1053,37 +995,37 @@
             throw new IllegalArgumentException("Not a valid message value: " + value);
         }
     }
-
-    /** 
+    
+    /**
      * @return pretty print
      * @see java.lang.Object#toString()
      */
-    public String toString() {
+    public String toString(){
         return super.toString() + "MQBlazeMessage{ " + "map = " + this.map + " }";
     }
-
-    protected void copy(BlazeMessage copy) throws BlazeException {
+    
+    protected void copy(BlazeMessage copy) throws BlazeException{
         storeContent();
         copy.content = this.content;
     }
-
+    
     /**
      * @return the content data
      */
-    public BlazeData getContent() {
+    public BlazeData getContent(){
         return this.content;
     }
-
+    
     /**
      * Set the content data
      * 
      * @param content
      */
-    public void setContent(BlazeData content) {
+    public void setContent(BlazeData content){
         this.content = content;
     }
-
-    protected void marshallMap(MapDataBean mapData, String name, Object value) throws BlazeRuntimeException {
+    
+    protected void marshallMap(MapDataBean mapData,String name,Object value) throws BlazeRuntimeException{
         if (value != null) {
             if (value.getClass() == Boolean.class) {
                 BoolTypeBean type = new BoolTypeBean();
@@ -1152,8 +1094,8 @@
             }
         }
     }
-
-    protected Map<String, Object> unmarshall(MapData mapData) {
+    
+    protected Map<String, Object> unmarshall(MapData mapData){
         Map<String, Object> result = new ConcurrentHashMap<String, Object>();
         if (mapData.hasBoolType()) {
             for (BoolType type : mapData.getBoolTypeList()) {
@@ -1218,11 +1160,11 @@
         }
         return result;
     }
-
+    
     /**
      * Store content into a BlazeData object for serialization
      */
-    public void storeContent() {
+    public void storeContent(){
         if (getContent() == null) {
             BlazeDataBean bd = new BlazeDataBean();
             MapDataBean mapData = new MapDataBean();
@@ -1230,9 +1172,6 @@
                 marshallMap(mapData, entry.getKey().toString(), entry.getValue());
             }
             bd.setMapData(mapData);
-            if (this.destination != null) {
-                bd.setDestinationData(this.destination.getData());
-            }
             if (this.replyTo != null) {
                 bd.setReplyToData(this.replyTo.getData());
             }
@@ -1253,22 +1192,18 @@
             bd.setRedeliveryCounter(this.redeliveryCounter);
             bd.setPriority(this.priority);
             bd.setPersistent(this.persistent);
-            bd.setType(this.type);
             this.content = bd;
         }
     }
-
+    
     /**
      * Builds the message body from data
      * 
      */
-    protected void loadContent() throws BlazeRuntimeException {
+    protected void loadContent() throws BlazeRuntimeException{
         BlazeData data = getContent();
         if (data != null && this.map.isEmpty()) {
             this.map = unmarshall(data.getMapData());
-            if (data.hasDestinationData()) {
-                this.destination = new Destination(data.getDestinationData());
-            }
             if (data.hasReplyToData()) {
                 this.replyTo = new Destination(data.getReplyToData());
             }
@@ -1281,7 +1216,7 @@
             if (data.hasCorrelationId()) {
                 this.correlationId = data.getCorrelationId().toStringUtf8();
             }
-            if(data.hasMessageType()) {
+            if (data.hasMessageType()) {
                 this.messageType = data.getMessageType().toStringUtf8();
             }
             this.timeStamp = data.getTimestamp();
@@ -1289,10 +1224,7 @@
             this.redeliveryCounter = data.getRedeliveryCounter();
             this.priority = data.getPriority();
             this.persistent = data.getPersistent();
-            this.type=data.getType();
-           
         }
     }
-
-   
+    
 }
\ No newline at end of file

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageProcessor.java?rev=752825&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageProcessor.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessageProcessor.java Thu Mar 12 10:12:30 2009
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze;
+
+import org.apache.activeblaze.wire.PacketData;
+
+/**
+ * BlazeMessageProcessor - build a BlazeMessage from PacketData
+ *
+ */
+public interface BlazeMessageProcessor{
+    
+    /**
+     * Process a PacketData of that is a BlazeMessage type
+     * @param data
+     * @return the built BlazeMessage
+     */
+    BlazeMessage processBlazeMessage(PacketData data);
+
+}

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java?rev=752825&r1=752824&r2=752825&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java Thu Mar 12 10:12:30 2009
@@ -24,7 +24,6 @@
 import org.apache.activeblaze.group.MemberImpl;
 import org.apache.activeblaze.impl.processor.Packet;
 import org.apache.activeblaze.util.SendRequest;
-import org.apache.activeblaze.wire.AckData;
 import org.apache.activeblaze.wire.MessageType;
 import org.apache.activeblaze.wire.PacketData;
 import org.apache.activeblaze.wire.AckData.AckDataBuffer;
@@ -164,7 +163,7 @@
     protected void processData(String id, Buffer correlationId, PacketDataBuffer data) throws Exception {
         if (isStarted()) {
             processRequest(correlationId, data);
-            MessageType type = data.getType();
+            MessageType type = data.getMessageType();
             switch(type) {
             case BLAZE_DATA:
                 doProcessBlazeData(data);
@@ -244,7 +243,7 @@
             this.unicast.downStream(packet);
             PacketDataBuffer response = request.get(timeout);
             if (response != null) {
-                type = response.getType();
+                type = response.getMessageType();
                 result = type.parseUnframed(response.getPayload());
             }
         }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java?rev=752825&r1=752824&r2=752825&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java Thu Mar 12 10:12:30 2009
@@ -18,9 +18,11 @@
 
 import java.util.List;
 import java.util.Set;
+
 import org.apache.activeblaze.BlazeChannel;
 import org.apache.activeblaze.BlazeMessage;
 import org.apache.activeblaze.BlazeMessageListener;
+import org.apache.activeblaze.BlazeMessageProcessor;
 import org.apache.activeblaze.Destination;
 import org.apache.activeblaze.Subscription;
 
@@ -283,4 +285,14 @@
      * @throws Exception
      */
     public List<String> getGroups() throws Exception;
+
+    /**
+     * @param processor
+     */
+    public void setBlazeMessageProcessor(BlazeMessageProcessor processor);
+    
+    /**
+     * @return BlazeMessageProcessor
+     */
+    public BlazeMessageProcessor getBlazeMessageProcessor();
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java?rev=752825&r1=752824&r2=752825&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java Thu Mar 12 10:12:30 2009
@@ -55,25 +55,26 @@
 import org.apache.activemq.protobuf.MessageBuffer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
 /**
  * <P>
- * A <CODE>BlazeGroupChannel</CODE> enables peer-based point to point
- * communication
+ * A <CODE>BlazeGroupChannel</CODE> enables peer-based point to point communication
  * 
  */
-public class BlazeGroupChannelImpl extends BlazeChannelImpl implements BlazeGroupChannel {
+public class BlazeGroupChannelImpl extends BlazeChannelImpl implements BlazeGroupChannel{
     private static final Log LOG = LogFactory.getLog(BlazeGroupChannelImpl.class);
     private String name;
     protected Processor unicast;
     private MemberImpl local;
     private BlazeMessageListener inboxListener;
-    protected Map<Buffer, SendRequest<PacketDataBuffer>> messageRequests = new LRUCache<Buffer, SendRequest<PacketDataBuffer>>(10000);
+    protected Map<Buffer, SendRequest<PacketDataBuffer>> messageRequests = new LRUCache<Buffer, SendRequest<PacketDataBuffer>>(
+            10000);
     private final List<SubscriptionHolder> queueMessageListeners = new CopyOnWriteArrayList<SubscriptionHolder>();
     private Group group;
     protected Buffer inboxAddress;
     protected int inBoxPort;
     protected final Object localMutex = new Object();
-
+    
     /**
      * Constructor
      * 
@@ -83,12 +84,12 @@
         super();
         this.name = name;
     }
-
+    
     /**
      * @throws Exception
      * @see org.apache.activeblaze.Service#init()
      */
-    public void doInit() throws Exception {
+    public void doInit() throws Exception{
         super.doInit();
         String unicastURIStr = getConfiguration().getUnicastURI();
         unicastURIStr = PropertyUtil.addPropertiesToURIFromBean(unicastURIStr, getConfiguration());
@@ -106,8 +107,8 @@
         this.local = createLocal(unicastURI);
         this.group = createGroup();
     }
-
-    protected final Processor configureProcess(ChainedProcessor transport, String reliability) throws Exception {
+    
+    protected final Processor configureProcess(ChainedProcessor transport,String reliability) throws Exception{
         int maxPacketSize = getConfiguration().getMaxPacketSize();
         CompressionProcessor result = new CompressionProcessor();
         result.setPrev(this);
@@ -121,25 +122,25 @@
         result.setEnd(transport);
         return result;
     }
-
-    protected ChainedProcessor getReliability(String reliability) throws Exception {
+    
+    protected ChainedProcessor getReliability(String reliability) throws Exception{
         DefaultChainedProcessor reliable = ReliableFactory.get(reliability);
         return reliable;
     }
-
-    protected MemberImpl createLocal(URI uri) throws Exception {
+    
+    protected MemberImpl createLocal(URI uri) throws Exception{
         return new MemberImpl(getId(), getName(), 0, 0, uri);
     }
-
-    protected Group createGroup() {
+    
+    protected Group createGroup(){
         return new Group(this);
     }
-
+    
     /**
      * @throws Exception
      * @see org.apache.activeblaze.Service#shutDown()
      */
-    public void doShutDown() throws Exception {
+    public void doShutDown() throws Exception{
         super.doShutDown();
         if (this.group != null) {
             this.group.shutDown();
@@ -148,43 +149,43 @@
             this.unicast.shutDown();
         }
     }
-
+    
     /**
      * @throws Exception
      * @see org.apache.activeblaze.Service#start()
      */
-    public void doStart() throws Exception {
+    public void doStart() throws Exception{
         super.doStart();
         this.unicast.start();
         this.group.start();
     }
-
+    
     /**
      * @throws Exception
      * @see org.apache.activeblaze.Service#stop()
      */
-    public void doStop() throws Exception {
+    public void doStop() throws Exception{
         super.doStop();
         this.group.stop();
         this.unicast.stop();
     }
-
+    
     /**
      * @return the name
      */
-    public String getName() {
+    public String getName(){
         synchronized (this.localMutex) {
             return this.name;
         }
     }
-
+    
     /**
      * set the name
      * 
      * @param name
      * @see org.apache.activeblaze.group.BlazeGroupChannel#setName(java.lang.String)
      */
-    public void setName(String name) {
+    public void setName(String name){
         synchronized (this.localMutex) {
             this.name = name;
             if (this.local != null) {
@@ -192,88 +193,87 @@
             }
         }
     }
-
+    
     /**
      * @return the inboxListener
      */
-    public BlazeMessageListener getInboxListener() {
+    public BlazeMessageListener getInboxListener(){
         return this.inboxListener;
     }
-
+    
     /**
-     * @param inboxListener
-     *            the inboxListener to set
+     * @param inboxListener the inboxListener to set
      */
-    public void setInboxListener(BlazeMessageListener inboxListener) {
+    public void setInboxListener(BlazeMessageListener inboxListener){
         this.inboxListener = inboxListener;
     }
-
+    
     /**
      * @return this channel's configuration
      * @see org.apache.activeblaze.group.BlazeGroupChannel#getConfiguration()
      */
-    public BlazeGroupConfiguration getConfiguration() {
+    public BlazeGroupConfiguration getConfiguration(){
         return (BlazeGroupConfiguration) this.configuration;
     }
-
+    
     /**
      * @return the member for this channel
      * @see org.apache.activeblaze.group.BlazeGroupChannel#getLocalMember()
      */
-    public MemberImpl getLocalMember() {
+    public MemberImpl getLocalMember(){
         synchronized (this.localMutex) {
             return this.local;
         }
     }
-
-    protected void setLocalMember(MemberImpl local) {
+    
+    protected void setLocalMember(MemberImpl local){
         synchronized (this.localMutex) {
             this.local = local;
         }
     }
-
+    
     /**
      * @param l
      * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#addMemberChangedListener(org.apache.activeblaze.group.MemberChangedListener)
      */
-    public void addMemberChangedListener(MemberChangedListener l) throws Exception {
+    public void addMemberChangedListener(MemberChangedListener l) throws Exception{
         init();
         this.group.addMemberChangedListener(l);
     }
-
+    
     /**
      * @param l
      * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#removeMemberChangedListener(org.apache.activeblaze.group.MemberChangedListener)
      */
-    public void removeMemberChangedListener(MemberChangedListener l) throws Exception {
+    public void removeMemberChangedListener(MemberChangedListener l) throws Exception{
         init();
         this.group.removeMemberChangedListener(l);
     }
-
+    
     /**
      * @param id
      * @return the Member
      * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#getMemberById(java.lang.String)
      */
-    public Member getMemberById(String id) throws Exception {
+    public Member getMemberById(String id) throws Exception{
         init();
         return this.group.getMemberById(id);
     }
-
+    
     /**
      * @param name
      * @return the Member
      * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#getMemberByName(java.lang.String)
      */
-    public Member getMemberByName(String name) throws Exception {
+    public Member getMemberByName(String name) throws Exception{
         init();
         return this.group.getMemberByName(name);
     }
-
+    
     /**
      * Will wait for a member to advertise itself if not available
      * 
@@ -282,44 +282,42 @@
      * @return the member or null
      * @throws Exception
      */
-    public Member getAndWaitForMemberByName(String name, int timeout) throws Exception {
+    public Member getAndWaitForMemberByName(String name,int timeout) throws Exception{
         init();
         return this.group.getAndWaitForMemberByName(name, timeout);
     }
-
+    
     /**
      * @return the members
      * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#getMembers()
      */
-    public Set<Member> getMembers() throws Exception {
+    public Set<Member> getMembers() throws Exception{
         init();
         return this.group.getMembers();
     }
-
+    
     /**
      * Send a message to a member of the group - in a round-robin fashion
      * 
      * @param destination
      * @param message
      * @throws Exception
-     * @see org.apache.activeblaze.group.BlazeGroupChannel#send(java.lang.String,
-     *      org.apache.activeblaze.BlazeMessage)
+     * @see org.apache.activeblaze.group.BlazeGroupChannel#send(java.lang.String, org.apache.activeblaze.BlazeMessage)
      */
-    public void send(String destination, BlazeMessage message) throws Exception {
+    public void send(String destination,BlazeMessage message) throws Exception{
         send(new Destination(destination, false), message);
     }
-
+    
     /**
      * Send a message to a member of the group - in a round-robin fashion
      * 
      * @param destination
      * @param message
      * @throws Exception
-     * @see org.apache.activeblaze.group.BlazeGroupChannel#send(java.lang.String,
-     *      org.apache.activeblaze.BlazeMessage)
+     * @see org.apache.activeblaze.group.BlazeGroupChannel#send(java.lang.String, org.apache.activeblaze.BlazeMessage)
      */
-    public void send(Destination destination, BlazeMessage message) throws Exception {
+    public void send(Destination destination,BlazeMessage message) throws Exception{
         while (true) {
             MemberImpl member = getQueueDestination(destination.getName());
             if (member != null) {
@@ -334,7 +332,7 @@
             }
         }
     }
-
+    
     /**
      * @param member
      * @param message
@@ -342,10 +340,10 @@
      * @see org.apache.activeblaze.group.BlazeGroupChannel#send(org.apache.activeblaze.group.Member,
      *      org.apache.activeblaze.BlazeMessage)
      */
-    public void send(Member member, BlazeMessage message) throws Exception {
+    public void send(Member member,BlazeMessage message) throws Exception{
         send((MemberImpl) member, new Buffer(member.getInBoxDestination()), message);
     }
-
+    
     /**
      * @param member
      * @param message
@@ -354,10 +352,10 @@
      * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(org.apache.activeblaze.group.Member,
      *      org.apache.activeblaze.BlazeMessage)
      */
-    public BlazeMessage sendRequest(Member member, BlazeMessage message) throws Exception {
+    public BlazeMessage sendRequest(Member member,BlazeMessage message) throws Exception{
         return sendRequest(member, message, 0);
     }
-
+    
     /**
      * @param member
      * @param message
@@ -367,10 +365,10 @@
      * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(org.apache.activeblaze.group.Member,
      *      org.apache.activeblaze.BlazeMessage, int)
      */
-    public BlazeMessage sendRequest(Member member, BlazeMessage message, int timeout) throws Exception {
+    public BlazeMessage sendRequest(Member member,BlazeMessage message,int timeout) throws Exception{
         return sendRequest((MemberImpl) member, new Buffer(member.getInBoxDestination()), message, timeout);
     }
-
+    
     /**
      * @param destination
      * @param message
@@ -379,10 +377,10 @@
      * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(java.lang.String,
      *      org.apache.activeblaze.BlazeMessage)
      */
-    public BlazeMessage sendRequest(String destination, BlazeMessage message) throws Exception {
+    public BlazeMessage sendRequest(String destination,BlazeMessage message) throws Exception{
         return sendRequest(new Destination(destination, false), message, 0);
     }
-
+    
     /**
      * @param destination
      * @param message
@@ -391,10 +389,10 @@
      * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(java.lang.String,
      *      org.apache.activeblaze.BlazeMessage)
      */
-    public BlazeMessage sendRequest(Destination destination, BlazeMessage message) throws Exception {
+    public BlazeMessage sendRequest(Destination destination,BlazeMessage message) throws Exception{
         return sendRequest(destination, message, 0);
     }
-
+    
     /**
      * @param destination
      * @param message
@@ -404,10 +402,10 @@
      * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(java.lang.String,
      *      org.apache.activeblaze.BlazeMessage, int)
      */
-    public BlazeMessage sendRequest(String destination, BlazeMessage message, int timeout) throws Exception {
+    public BlazeMessage sendRequest(String destination,BlazeMessage message,int timeout) throws Exception{
         return sendRequest(new Destination(destination, false), message, timeout);
     }
-
+    
     /**
      * @param destination
      * @param message
@@ -417,7 +415,7 @@
      * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(java.lang.String,
      *      org.apache.activeblaze.BlazeMessage, int)
      */
-    public BlazeMessage sendRequest(Destination destination, BlazeMessage message, int timeout) throws Exception {
+    public BlazeMessage sendRequest(Destination destination,BlazeMessage message,int timeout) throws Exception{
         Buffer key = destination.getName();
         long deadline = 0;
         long waitTime = timeout;
@@ -444,7 +442,7 @@
         }
         return null;
     }
-
+    
     /**
      * send Request
      * 
@@ -455,15 +453,18 @@
      * @return the response
      * @throws Exception
      */
-    public BlazeMessage sendRequest(MemberImpl member, Buffer destinationName, BlazeMessage message, int timeout)
-            throws Exception {
+    public BlazeMessage sendRequest(MemberImpl member,Buffer destinationName,BlazeMessage message,int timeout)
+            throws Exception{
         BlazeMessage result = null;
         if (member != null) {
             SendRequest<PacketDataBuffer> request = new SendRequest<PacketDataBuffer>();
-            message.setDestination(new Destination(destinationName, false));
+            Destination dest = new Destination(destinationName, false);
+            message.setDestination(dest);
             message.storeContent();
             BlazeDataBuffer blazeData = message.getContent().freeze();
             PacketDataBean packetData = getPacketData(MessageType.BLAZE_DATA, blazeData);
+            packetData.setDestinationData(dest.getData());
+            packetData.setPayloadType(message.getType());
             synchronized (this.messageRequests) {
                 this.messageRequests.put(packetData.getMessageId(), request);
             }
@@ -475,7 +476,7 @@
         }
         return result;
     }
-
+    
     /**
      * @param to
      * @param response
@@ -484,18 +485,23 @@
      * @see org.apache.activeblaze.group.BlazeGroupChannel#sendReply(org.apache.activeblaze.group.Member,
      *      org.apache.activeblaze.BlazeMessage, java.lang.String)
      */
-    public void sendReply(Member to, BlazeMessage response, String correlationId) throws Exception {
+    public void sendReply(Member to,BlazeMessage response,String correlationId) throws Exception{
         response.storeContent();
+        Destination dest = response.getDestination();
         BlazeDataBuffer blazeData = response.getContent().freeze();
         PacketDataBean data = getPacketData(MessageType.BLAZE_DATA, blazeData);
         data.setCorrelationId(new Buffer(correlationId));
+        if (dest != null) {
+            data.setDestinationData(dest.getData());
+        }
+        data.setPayloadType(response.getType());
         data.setReliable(true);
         Packet packet = new Packet(data.freeze());
         packet.setTo(((MemberImpl) to).getAddress());
         this.unicast.downStream(packet);
     }
-
-    protected void send(MemberImpl member, Buffer destinationName, BlazeMessage message) throws Exception {
+    
+    protected void send(MemberImpl member,Buffer destinationName,BlazeMessage message) throws Exception{
         Destination dest = new Destination(destinationName, false);
         message.setDestination(dest);
         message.storeContent();
@@ -503,11 +509,13 @@
         PacketDataBean data = getPacketData(MessageType.BLAZE_DATA, message.getContent().freeze());
         data.setReliable(true);
         data.setResponseRequired(true);
+        data.setDestinationData(dest.getData());
+        data.setPayloadType(message.getType());
         Packet packet = new Packet(data.freeze());
         packet.setTo(member.getAddress());
         this.unicast.downStream(packet);
     }
-
+    
     /**
      * @param destination
      * @param l
@@ -515,63 +523,62 @@
      * @see org.apache.activeblaze.group.BlazeGroupChannel#addBlazeQueueMessageListener(java.lang.String,
      *      org.apache.activeblaze.group.BlazeMessageListener)
      */
-    public void addBlazeQueueMessageListener(String destination, BlazeMessageListener l) throws Exception {
+    public void addBlazeQueueMessageListener(String destination,BlazeMessageListener l) throws Exception{
         init();
-        SubscriptionHolder key = new SubscriptionHolder(destination, false,l);
+        SubscriptionHolder key = new SubscriptionHolder(destination, false, l);
+        
+        this.queueMessageListeners.add(key);
         
-            this.queueMessageListeners.add(key);
-       
         buildLocal();
     }
-
+    
     /**
-     * @param subscription 
+     * @param subscription
      * @param l
      * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#addBlazeQueueMessageListener(java.lang.String,
      *      org.apache.activeblaze.group.BlazeMessageListener)
      */
-    public void addBlazeQueueMessageListener(Subscription subscription, BlazeMessageListener l) throws Exception {
+    public void addBlazeQueueMessageListener(Subscription subscription,BlazeMessageListener l) throws Exception{
         init();
         SubscriptionHolder key = new SubscriptionHolder(subscription, l);
-            this.queueMessageListeners.add(key);
-       
+        this.queueMessageListeners.add(key);
+        
         buildLocal();
     }
-
+    
     /**
      * @param destination
      * @param l
      * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#removeBlazeQueueMessageListener(java.lang.String)
      */
-    public void removeBlazeQueueMessageListener(String destination,BlazeMessageListener l) throws Exception {
+    public void removeBlazeQueueMessageListener(String destination,BlazeMessageListener l) throws Exception{
         init();
-        SubscriptionHolder key = new SubscriptionHolder(destination, false,l);
+        SubscriptionHolder key = new SubscriptionHolder(destination, false, l);
+        
+        this.queueMessageListeners.remove(key);
         
-       
-             this.queueMessageListeners.remove(key);
-       
         buildLocal();
-       
+        
     }
-
+    
     /**
      * @param subscription
      * @param l
      * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#removeBlazeQueueMessageListener(java.lang.String)
      */
-    public void removeBlazeQueueMessageListener(Subscription subscription,BlazeMessageListener l) throws Exception {
+    public void removeBlazeQueueMessageListener(Subscription subscription,BlazeMessageListener l) throws Exception{
         init();
-       
+        
         SubscriptionHolder key = new SubscriptionHolder(subscription, l);
-            this.queueMessageListeners.remove(key);
-       
+        this.queueMessageListeners.remove(key);
+        
         buildLocal();
-       
+        
     }
-
+    
     /**
      * @param destination
      * @param l
@@ -579,59 +586,59 @@
      * @see org.apache.activeblaze.BlazeChannel#addBlazeTopicMessageListener(java.lang.String,
      *      org.apache.activeblaze.BlazeMessageListener)
      */
-    public void addBlazeTopicMessageListener(String destination, BlazeMessageListener l) throws Exception {
+    public void addBlazeTopicMessageListener(String destination,BlazeMessageListener l) throws Exception{
         init();
         super.addBlazeTopicMessageListener(destination, l);
         buildLocal();
     }
-
+    
     /**
      * @param destination
      * @return the removed <Code>BlazeTopicListener</Code>
      * @throws Exception
      * @see org.apache.activeblaze.BlazeChannel#removeBlazeTopicMessageListener(java.lang.String)
      */
-    public void removeBlazeTopicMessageListener(String destination,BlazeMessageListener l) throws Exception {
+    public void removeBlazeTopicMessageListener(String destination,BlazeMessageListener l) throws Exception{
         init();
-         super.removeBlazeTopicMessageListener(destination,l);
+        super.removeBlazeTopicMessageListener(destination, l);
         buildLocal();
-       
+        
     }
-
+    
     /**
      * @param groupName
      * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#addToGroup(java.lang.String)
      */
-    public void addToGroup(String groupName) throws Exception {
+    public void addToGroup(String groupName) throws Exception{
         init();
         this.local.addToGroup(groupName);
     }
-
+    
     /**
      * @param groupName
      * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#removeFromGroup(java.lang.String)
      */
-    public void removeFromGroup(String groupName) throws Exception {
+    public void removeFromGroup(String groupName) throws Exception{
         init();
         this.local.removeFromGroup(groupName);
     }
-
+    
     /**
      * @return the groups
      * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#getGroups()
      */
-    public List<String> getGroups() throws Exception {
+    public List<String> getGroups() throws Exception{
         init();
         return this.local.getGroups();
     }
-
-    protected void processData(String id, Buffer correlationId, PacketDataBuffer data) throws Exception {
+    
+    protected void processData(String id,Buffer correlationId,PacketDataBuffer data) throws Exception{
         if (isStarted()) {
             if (!processRequest(correlationId, data)) {
-                MessageType type = data.getType();
+                MessageType type = data.getMessageType();
                 if (type == MessageType.BLAZE_DATA) {
                     doProcessBlazeData(data);
                 } else if (type == MessageType.MEMBER_DATA) {
@@ -640,8 +647,8 @@
             }
         }
     }
-
-    protected boolean processRequest(Buffer correlationId, PacketDataBuffer value) {
+    
+    protected boolean processRequest(Buffer correlationId,PacketDataBuffer value){
         boolean result = false;
         if (correlationId != null) {
             SendRequest<PacketDataBuffer> request = null;
@@ -655,61 +662,61 @@
         }
         return result;
     }
-
-    protected void doProcessBlazeData(PacketData data) throws Exception {
-        BlazeMessage message = buildBlazeMessage(data);
-        if (message.getContent().getDestinationData().getTopic()) {
-            dispatch(message);
+    
+    protected void doProcessBlazeData(PacketData data) throws Exception{
+        
+        if (data.hasDestinationData()&&data.getDestinationData().getTopic()) {
+            dispatch(data);
         } else {
-            Buffer destinationName = message.getContent().getDestinationData().getName();
+            
+            Buffer destinationName = data.getDestinationData().getName();
+            BlazeMessage message = buildBlazeMessage(data);
             if (this.inboxListener != null && this.producerId.equals(destinationName)) {
                 this.inboxListener.onMessage(message);
             } else {
                 
-                    int index=0;
-                    for (SubscriptionHolder entry : this.queueMessageListeners) {
-                        if (entry.getSubscription().matches(destinationName)) {
-                           entry.getListener().onMessage(message);
-                           this.queueMessageListeners.remove(index);
-                           this.queueMessageListeners.add(entry);
-                        }
-                        index++;
+                int index = 0;
+                for (SubscriptionHolder entry : this.queueMessageListeners) {
+                    if (entry.getSubscription().matches(destinationName)) {
+                        entry.getListener().onMessage(message);
+                        this.queueMessageListeners.remove(index);
+                        this.queueMessageListeners.add(entry);
                     }
-                   
-               
+                    index++;
+                }
                 
             }
         }
     }
-
-    protected Group getGroup() {
+    
+    protected Group getGroup(){
         return this.group;
     }
-
-    protected BlazeMessage createMessage(String fromId) {
+    
+    protected BlazeMessage createMessage(String fromId){
         Member member = this.group.getMemberById(fromId);
         BlazeMessage message = new BlazeGroupMessage(member);
         return message;
     }
-
-    protected final void doProcessMemberData(PacketData data) throws Exception {
+    
+    protected final void doProcessMemberData(PacketData data) throws Exception{
         Buffer payload = data.getPayload();
         MemberDataBuffer memberData = MemberDataBuffer.parseUnframed(payload);
         this.group.processMember(memberData);
     }
-
+    
     /**
      * @param messageType
      * @param message
      * @throws Exception
      */
-    public void broadcastMessage(MessageType messageType, MessageBuffer message) throws Exception {
+    public void broadcastMessage(MessageType messageType,MessageBuffer message) throws Exception{
         PacketDataBean data = getPacketData(messageType, message);
         data.setReliable(false);
         Packet packet = new Packet(data.freeze());
         this.broadcast.downStreamManagement(packet);
     }
-
+    
     /**
      * send a message
      * 
@@ -719,7 +726,8 @@
      * @param message
      * @throws Exception
      */
-    public void sendMessage(AsyncGroupRequest asyncRequest, MemberImpl member, MessageType messageType,  MessageBuffer message) throws Exception {
+    public void sendMessage(AsyncGroupRequest asyncRequest,MemberImpl member,MessageType messageType,
+            MessageBuffer message) throws Exception{
         SendRequest<PacketDataBuffer> request = new SendRequest<PacketDataBuffer>();
         PacketDataBean data = getPacketData(messageType, message);
         asyncRequest.add(data.getMessageId(), request);
@@ -731,7 +739,7 @@
         packet.setTo(member.getAddress());
         this.unicast.downStream(packet);
     }
-
+    
     /**
      * broadcast a general message
      * 
@@ -740,28 +748,28 @@
      * @param correlationId
      * @throws Exception
      */
-    public void broadcastMessage(MessageType messageType, MessageBuffer message, String correlationId) throws Exception {
+    public void broadcastMessage(MessageType messageType,MessageBuffer message,String correlationId) throws Exception{
         PacketDataBean data = getPacketData(messageType, message);
         data.setCorrelationId(new Buffer(correlationId));
         data.setReliable(true);
         Packet packet = new Packet(data.freeze());
         this.broadcast.downStreamManagement(packet);
     }
-
+    
     /**
      * @param to
      * @param messageType
      * @param message
      * @throws Exception
      */
-    public void sendMessage(InetSocketAddress to, MessageType messageType, MessageBuffer message) throws Exception {
+    public void sendMessage(InetSocketAddress to,MessageType messageType,MessageBuffer message) throws Exception{
         PacketDataBean data = getPacketData(messageType, message);
         data.setReliable(false);
         Packet packet = new Packet(data.freeze());
         packet.setTo(to);
         this.unicast.downStream(packet);
     }
-
+    
     /**
      * @param to
      * @param messageType
@@ -769,8 +777,8 @@
      * @param correlationId
      * @throws Exception
      */
-    public void sendReply(MemberImpl to, MessageType messageType, MessageBuffer message, String correlationId)
-            throws Exception {
+    public void sendReply(MemberImpl to,MessageType messageType,MessageBuffer message,String correlationId)
+            throws Exception{
         PacketDataBean data = getPacketData(messageType, message);
         data.setCorrelationId(new Buffer(correlationId));
         data.setReliable(false);
@@ -778,8 +786,8 @@
         packet.setTo(to.getAddress());
         this.unicast.downStream(packet);
     }
-
-    protected MemberImpl getQueueDestination(Buffer destination) {
+    
+    protected MemberImpl getQueueDestination(Buffer destination){
         // choose a member
         MemberImpl result = null;
         Map<Subscription, List<MemberImpl>> map = this.group.getQueueMap();
@@ -801,8 +809,8 @@
         }
         return result;
     }
-
-    protected void buildLocal() {
+    
+    protected void buildLocal(){
         if (isInitialized()) {
             try {
                 synchronized (this.localMutex) {
@@ -818,7 +826,7 @@
                     for (SubscriptionHolder s : this.queueMessageListeners) {
                         bean.addSubscriptionData(s.getSubscription().getData());
                     }
-                   
+                    
                     MemberImpl result = new MemberImpl(bean.freeze());
                     
                     this.group.processMemberUpdate(this.local, result);

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java?rev=752825&r1=752824&r2=752825&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java Thu Mar 12 10:12:30 2009
@@ -59,7 +59,7 @@
 
     void processInBound(Packet packet) throws Exception {
         PacketData packetData = packet.getPacketData();
-        MessageType type = packetData.getType();
+        MessageType type = packetData.getMessageType();
         if (type == MessageType.CONTROL_DATA) {
             if (this.replayBuffer.isEmpty()) {
                 // send back a control message
@@ -119,7 +119,7 @@
                         PacketDataBean pd = new PacketDataBean();
                         pd.setResponseRequired(false);
                         pd.setPayload(nack.freeze().toUnframedBuffer());
-                        pd.setType(MessageType.NACK_DATA);
+                        pd.setMessageType(MessageType.NACK_DATA);
                         Packet nackPacket = new Packet(pd.freeze());
                         nackPacket.setTo(this.peerAddress);
                         this.swp.sendDownStream(nackPacket);
@@ -160,7 +160,7 @@
             PacketDataBean pd = new PacketDataBean();
             pd.setResponseRequired(false);
             pd.setPayload(ack.freeze().toUnframedBuffer());
-            pd.setType(MessageType.ACK_DATA);
+            pd.setMessageType(MessageType.ACK_DATA);
             ackPacket = new Packet(pd.freeze());
             ackPacket.setTo(this.peerAddress);
             this.lastAckTime = System.currentTimeMillis();

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java?rev=752825&r1=752824&r2=752825&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java Thu Mar 12 10:12:30 2009
@@ -96,7 +96,7 @@
         Packet result = null;
         PacketData data = packet.getPacketData();
         if (data != null) {
-            MessageType type = data.getType();
+            MessageType type = data.getMessageType();
             if (type == MessageType.ACK_DATA) {
                 AckDataBuffer ackData = AckDataBuffer.parseUnframed(data.getPayload());
                 long start = ackData.getStartSequence();
@@ -158,7 +158,7 @@
                 PacketDataBean pd = new PacketDataBean();
                 pd.setResponseRequired(false);
                 pd.setPayload(control.freeze().toUnframedBuffer());
-                pd.setType(MessageType.CONTROL_DATA);
+                pd.setMessageType(MessageType.CONTROL_DATA);
                 ackPacket = new Packet(pd.freeze());
                 ackPacket.setTo(this.peerAddress);
                 LOG.debug(this + " Sent Control message " + control);

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java?rev=752825&r1=752824&r2=752825&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java Thu Mar 12 10:12:30 2009
@@ -169,7 +169,7 @@
         pd.setCorrelationId(data.getMessageId());
         pd.setResponse(true);
         pd.setPayload(ackData.freeze().toUnframedBuffer());
-        pd.setType(MessageType.ACK_DATA);
+        pd.setMessageType(MessageType.ACK_DATA);
         Packet packet = new Packet(pd.freeze());
         return packet;
     }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java?rev=752825&r1=752824&r2=752825&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnection.java Thu Mar 12 10:12:30 2009
@@ -18,6 +18,7 @@
 
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
+
 import javax.jms.Connection;
 import javax.jms.ConnectionConsumer;
 import javax.jms.ConnectionMetaData;
@@ -33,16 +34,20 @@
 import javax.jms.Topic;
 import javax.jms.TopicConnection;
 import javax.jms.TopicSession;
+
 import org.apache.activeblaze.BlazeMessageListener;
+import org.apache.activeblaze.BlazeMessageProcessor;
 import org.apache.activeblaze.Subscription;
 import org.apache.activeblaze.group.BlazeGroupChannel;
+import org.apache.activeblaze.jms.message.BlazeJmsMessage;
 import org.apache.activeblaze.util.IdGenerator;
+import org.apache.activeblaze.wire.PacketData;
 /**
  * Implementation of a JMS Connection
  * 
  */
 public class BlazeJmsConnection implements Connection, TopicConnection, QueueConnection,
-        org.apache.activeblaze.ExceptionListener {
+        org.apache.activeblaze.ExceptionListener,BlazeMessageProcessor{
     protected final BlazeGroupChannel channel;
     protected final IdGenerator tempDestinationGenerator = new IdGenerator("");
     private String clientId;
@@ -55,7 +60,8 @@
     protected BlazeJmsConnection(BlazeGroupChannel channel) {
         this.channel = channel;
         this.channel.setExceptionListener(this);
-        this.clientId = channel.getName();
+        this.clientId = channel.getName(); 
+        this.channel.setBlazeMessageProcessor(this);
     }
 
     /**
@@ -354,4 +360,30 @@
     public void setConsumerMaxDispatchQueueDepth(int consumerMaxDispatchQueueDepth) {
         this.consumerMaxDispatchQueueDepth = consumerMaxDispatchQueueDepth;
     }
+
+    /**
+     * @param data 
+     * @return a BlazeMessage
+     * 
+     */
+    public BlazeJmsMessage processBlazeMessage(PacketData data){
+        BlazeJmsMessage result = null;
+        /*
+        int type = message.getType();
+        if (type == BlazeJmsMessage.JmsMessageType.BYTES.ordinal()) {
+            result = new BlazeJmsBytesMessage();
+        } else if (type == BlazeJmsMessage.JmsMessageType.MAP.ordinal()) {
+            result = new BlazeJmsMapMessage();
+        } else if (type == BlazeJmsMessage.JmsMessageType.OBJECT.ordinal()) {
+            result = new BlazeJmsObjectMessage();
+        } else if (type == BlazeJmsMessage.JmsMessageType.STREAM.ordinal()) {
+            result = new BlazeJmsStreamMessage();
+        } else if (type == BlazeJmsMessage.JmsMessageType.TEXT.ordinal()) {
+            result = new BlazeJmsTextMessage();
+        } else {
+            result = new BlazeJmsMessage();
+        }
+        */
+        return result;
+    }
 }



Mime
View raw message