activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r745113 [1/2] - in /activemq/activemq-blaze/trunk: ./ src/main/java/org/apache/activeblaze/ src/main/java/org/apache/activeblaze/cluster/ src/main/java/org/apache/activeblaze/group/ src/main/java/org/apache/activeblaze/impl/network/ src/mai...
Date Tue, 17 Feb 2009 15:12:59 GMT
Author: chirino
Date: Tue Feb 17 15:12:52 2009
New Revision: 745113

URL: http://svn.apache.org/viewvc?rev=745113&view=rev
Log:
Updated to use the new alternative protobuf API.

Modified:
    activemq/activemq-blaze/trunk/pom.xml
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Destination.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Subscription.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterGroup.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterState.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ElectionService.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/StateKey.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/StateValue.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/CompressionProcessor.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ConsumerProcessor.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/ProducerProcessor.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessor.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsBytesMessage.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMapMessage.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsMessage.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsObjectMessage.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsStreamMessage.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/message/BlazeJmsTextMessage.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/SendRequest.java
    activemq/activemq-blaze/trunk/src/main/proto/blaze.proto
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/ClusterStateTest.java
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/ChainedProcessorTest.java
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/CompressionProcessorTest.java
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/FragmentationProcessorTest.java
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/processor/PacketAuditTest.java
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/reliable/swp/SwpProcessorTest.java
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/MulticastTransportTest.java
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/impl/transport/UdpTransportTest.java

Modified: activemq/activemq-blaze/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/pom.xml?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/pom.xml (original)
+++ activemq/activemq-blaze/trunk/pom.xml Tue Feb 17 15:12:52 2009
@@ -91,6 +91,9 @@
       <plugin>
         <groupId>org.apache.activemq.protobuf</groupId>
         <artifactId>activemq-protobuf</artifactId>
+        <configuration>
+          <type>alt</type>
+        </configuration>
         <version>1.0-SNAPSHOT</version>
          <executions>
           <execution>

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java Tue Feb 17 15:12:52 2009
@@ -16,6 +16,10 @@
  */
 package org.apache.activeblaze;
 
+import java.net.URI;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
 import org.apache.activeblaze.impl.network.Network;
 import org.apache.activeblaze.impl.network.NetworkFactory;
 import org.apache.activeblaze.impl.processor.ChainedProcessor;
@@ -26,14 +30,13 @@
 import org.apache.activeblaze.impl.reliable.ReliableFactory;
 import org.apache.activeblaze.util.IdGenerator;
 import org.apache.activeblaze.util.PropertyUtil;
-import org.apache.activeblaze.wire.BlazeData;
 import org.apache.activeblaze.wire.MessageType;
 import org.apache.activeblaze.wire.PacketData;
+import org.apache.activeblaze.wire.BlazeData.BlazeDataBuffer;
+import org.apache.activeblaze.wire.PacketData.PacketDataBean;
+import org.apache.activeblaze.wire.PacketData.PacketDataBuffer;
 import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.protobuf.Message;
-import java.net.URI;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.activemq.protobuf.MessageBuffer;
 /**
  * <P>
  * A <CODE>BlazeChannel</CODE> handles all client communication, either unicast,
@@ -182,29 +185,29 @@
     public synchronized void broadcast(Destination destination, BlazeMessage msg) throws Exception {
         msg.setDestination(destination);
         msg.storeContent();
-        BlazeData blazeData = msg.getContent();
-        PacketData packetData = getPacketData(MessageType.BLAZE_DATA, blazeData);
+        BlazeDataBuffer blazeData = msg.getContent().freeze();
+        PacketDataBean packetData = getPacketData(MessageType.BLAZE_DATA, blazeData);
         packetData.setReliable(true);
-        Packet packet = new Packet(packetData);
+        Packet packet = new Packet(packetData.freeze());
         this.broadcast.downStream(packet);
     }
 
-    protected final synchronized PacketData getPacketData(MessageType type, Message<?> message) {
-        PacketData packetData = new PacketData();
-        packetData.setType(type.getNumber());
+    protected final synchronized PacketDataBean getPacketData(MessageType type, MessageBuffer message) {
+        PacketDataBean packetData = new PacketDataBean();
+        packetData.setType(type);
         packetData.setProducerId(this.producerId);
-        packetData.setPayload(message.toFramedBuffer());
+        packetData.setPayload(message.toUnframedBuffer());
         packetData.setMessageId(new Buffer(this.idGenerator.generateId()));
         return packetData;
     }
 
     public void upStream(Packet packet) throws Exception {
-        PacketData data = packet.getPacketData();
+        PacketDataBuffer data = packet.getPacketData();
         processData(packet.getId(), data.getCorrelationId(), data);
     }
 
-    protected void processData(String id, Buffer correlationId, PacketData data) throws Exception {
-        MessageType type = MessageType.valueOf(data.getType());
+    protected void processData(String id, Buffer correlationId, PacketDataBuffer data) throws Exception {
+        MessageType type = data.getType();
         if (type == MessageType.BLAZE_DATA) {
             doProcessBlazeData(data);
         }
@@ -239,16 +242,16 @@
     protected final BlazeMessage buildBlazeMessage(PacketData data) throws Exception {
         BlazeMessage message = null;
         if (data != null) {
-            MessageType type = MessageType.BLAZE_DATA;
-            BlazeData blazeData = (BlazeData) type.createMessage();
             Buffer payload = data.getPayload();
-            blazeData.mergeFramed(payload);
+            BlazeDataBuffer blazeData = BlazeDataBuffer.parseUnframed(payload);
             String fromId = null;
             if (data.hasProducerId()) {
                 fromId = data.getProducerId().toStringUtf8();
             }
             message = createMessage(fromId);
-            message.setDestination(blazeData.getDestinationData());
+            if( blazeData.hasDestinationData() ) {
+                message.setDestination(blazeData.getDestinationData());
+            }
             message.setFromId(fromId);
             if (data.hasMessageId()) {
                 message.setMessageId(data.getMessageId().toStringUtf8());

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java Tue Feb 17 15:12:52 2009
@@ -23,6 +23,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.activeblaze.util.IOUtils;
 import org.apache.activeblaze.wire.BlazeData;
 import org.apache.activeblaze.wire.BoolType;
@@ -38,6 +39,19 @@
 import org.apache.activeblaze.wire.MapData;
 import org.apache.activeblaze.wire.ShortType;
 import org.apache.activeblaze.wire.StringType;
+import org.apache.activeblaze.wire.BlazeData.BlazeDataBean;
+import org.apache.activeblaze.wire.BoolType.BoolTypeBean;
+import org.apache.activeblaze.wire.BufferType.BufferTypeBean;
+import org.apache.activeblaze.wire.ByteType.ByteTypeBean;
+import org.apache.activeblaze.wire.BytesType.BytesTypeBean;
+import org.apache.activeblaze.wire.CharType.CharTypeBean;
+import org.apache.activeblaze.wire.DoubleType.DoubleTypeBean;
+import org.apache.activeblaze.wire.FloatType.FloatTypeBean;
+import org.apache.activeblaze.wire.IntType.IntTypeBean;
+import org.apache.activeblaze.wire.LongType.LongTypeBean;
+import org.apache.activeblaze.wire.MapData.MapDataBean;
+import org.apache.activeblaze.wire.ShortType.ShortTypeBean;
+import org.apache.activeblaze.wire.StringType.StringTypeBean;
 import org.apache.activemq.protobuf.Buffer;
 
 /**
@@ -1069,66 +1083,66 @@
         this.content = content;
     }
 
-    protected void marshallMap(MapData mapData, String name, Object value) throws BlazeRuntimeException {
+    protected void marshallMap(MapDataBean mapData, String name, Object value) throws BlazeRuntimeException {
         if (value != null) {
             if (value.getClass() == Boolean.class) {
-                BoolType type = new BoolType();
+                BoolTypeBean type = new BoolTypeBean();
                 type.setName(name);
                 type.setValue(((Boolean) value).booleanValue());
                 mapData.addBoolType(type);
             } else if (value.getClass() == Byte.class) {
-                ByteType type = new ByteType();
+                ByteTypeBean type = new ByteTypeBean();
                 type.setName(name);
                 type.setValue(((Byte) value).byteValue());
                 mapData.addByteType(type);
             } else if (value.getClass() == Character.class) {
-                CharType type = new CharType();
+                CharTypeBean type = new CharTypeBean();
                 type.setName(name);
                 type.setValue(value.toString());
                 mapData.addCharType(type);
             } else if (value.getClass() == Short.class) {
-                ShortType type = new ShortType();
+                ShortTypeBean type = new ShortTypeBean();
                 type.setName(name);
                 type.setValue(((Short) value).shortValue());
                 mapData.addShortType(type);
             } else if (value.getClass() == Integer.class) {
-                IntType type = new IntType();
+                IntTypeBean type = new IntTypeBean();
                 type.setName(name);
                 type.setValue(((Integer) value).intValue());
                 mapData.addIntType(type);
             } else if (value.getClass() == Long.class) {
-                LongType type = new LongType();
+                LongTypeBean type = new LongTypeBean();
                 type.setName(name);
                 type.setValue(((Long) value).longValue());
                 mapData.addLongType(type);
             } else if (value.getClass() == Float.class) {
-                FloatType type = new FloatType();
+                FloatTypeBean type = new FloatTypeBean();
                 type.setName(name);
                 type.setValue(((Float) value).floatValue());
                 mapData.addFloatType(type);
             } else if (value.getClass() == Double.class) {
-                DoubleType type = new DoubleType();
+                DoubleTypeBean type = new DoubleTypeBean();
                 type.setName(name);
                 type.setValue(((Double) value).doubleValue());
                 mapData.addDoubleType(type);
             } else if (value.getClass() == byte[].class) {
-                BytesType type = new BytesType();
+                BytesTypeBean type = new BytesTypeBean();
                 type.setName(name);
                 type.setValue(new Buffer((byte[]) value));
                 mapData.addBytesType(type);
             } else if (value.getClass() == String.class) {
-                StringType type = new StringType();
+                StringTypeBean type = new StringTypeBean();
                 type.setName(name);
                 type.setValue(value.toString());
                 mapData.addStringType(type);
             } else if (value.getClass() == Buffer.class) {
-                BufferType type = new BufferType();
+                BufferTypeBean type = new BufferTypeBean();
                 type.setName(name);
                 type.setValue((Buffer) value);
             } else if (value instanceof Map) {
                 Map<String, Key> subMap = (Map<String, Key>) value;
                 for (Map.Entry<String, Key> entry : subMap.entrySet()) {
-                    MapData md = new MapData();
+                    MapDataBean md = new MapDataBean();
                     md.setName(name);
                     marshallMap(md, entry.getKey().toString(), entry.getValue());
                     mapData.addMapType(md);
@@ -1210,8 +1224,8 @@
      */
     public void storeContent() {
         if (getContent() == null) {
-            BlazeData bd = new BlazeData();
-            MapData mapData = new MapData();
+            BlazeDataBean bd = new BlazeDataBean();
+            MapDataBean mapData = new MapDataBean();
             for (Map.Entry<String, Object> entry : this.map.entrySet()) {
                 marshallMap(mapData, entry.getKey().toString(), entry.getValue());
             }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Destination.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Destination.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Destination.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Destination.java Tue Feb 17 15:12:52 2009
@@ -17,6 +17,8 @@
 package org.apache.activeblaze;
 
 import org.apache.activeblaze.wire.DestinationData;
+import org.apache.activeblaze.wire.DestinationData.DestinationDataBean;
+import org.apache.activeblaze.wire.DestinationData.DestinationDataBuffer;
 import org.apache.activemq.protobuf.Buffer;
 
 /**
@@ -25,13 +27,13 @@
  */
 public class Destination {
     
-    private final DestinationData data;
+    private DestinationDataBuffer data;
     
     /**
      * Default Constructor
      */
     public Destination() {
-        this.data = new DestinationData();
+        this.data = new DestinationDataBean().freeze();
     }
     
     /**
@@ -39,7 +41,7 @@
      * @param data
      */
     public Destination(DestinationData data) {
-        this.data= data;
+        this.data=data.freeze();
     }
     
     /**
@@ -67,10 +69,7 @@
      * @param temporary
      */
     public Destination(String name,boolean topic,boolean temporary) {
-        this.data=new DestinationData();
-        this.data.setName(new Buffer(name));
-        this.data.setTopic(topic);
-        this.data.setTemporary(temporary);
+        this(new Buffer(name), topic, temporary);
     }
     
     /**
@@ -97,10 +96,11 @@
      * @param temporary
      */
     public Destination(Buffer name,boolean topic,boolean temporary) {
-        this.data=new DestinationData();
-        this.data.setName(name);
-        this.data.setTopic(topic);
-        this.data.setTemporary(temporary);
+        DestinationDataBean bean=new DestinationDataBean();
+        bean.setName(name);
+        bean.setTopic(topic);
+        bean.setTemporary(temporary);
+        this.data = bean.freeze();
     }
     
    
@@ -114,7 +114,7 @@
      * @param name the name to set
      */
     public void setName(Buffer name) {
-        this.data.setName(name);
+        this.data = this.data.copy().setName(name).freeze();
     }
     /**
      * @return the topic
@@ -126,7 +126,7 @@
      * @param topic the topic to set
      */
     public void setTopic(boolean topic) {
-        this.data.setTopic(topic);
+        this.data = this.data.copy().setTopic(topic).freeze();
     }
     /**
      * @return the temporary
@@ -138,7 +138,7 @@
      * @param temporary the temporary to set
      */
     public void setTemporary(boolean temporary) {
-        this.data.setTemporary(temporary);
+        this.data = this.data.copy().setTemporary(temporary).freeze();
     }
     
     /**
@@ -150,7 +150,7 @@
     /**
      * @return the data
      */
-    public  DestinationData getData() {
+    public  DestinationDataBuffer getData() {
         
         return this.data;
     }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Subscription.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Subscription.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Subscription.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/Subscription.java Tue Feb 17 15:12:52 2009
@@ -18,7 +18,8 @@
 
 import org.apache.activeblaze.impl.destination.DestinationMatch;
 import org.apache.activeblaze.wire.DestinationData;
-import org.apache.activeblaze.wire.SubscriptionData;
+import org.apache.activeblaze.wire.DestinationData.DestinationDataBean;
+import org.apache.activeblaze.wire.SubscriptionData.SubscriptionDataBean;
 import org.apache.activemq.protobuf.Buffer;
 
 /**
@@ -26,13 +27,13 @@
  * 
  */
 public class Subscription {
-    private final SubscriptionData data;
+    private final SubscriptionDataBean data;
 
     /**
      * Default Constructor
      */
     public Subscription() {
-        this.data = new SubscriptionData();
+        this.data = new SubscriptionDataBean();
     }
 
     /**
@@ -40,7 +41,7 @@
      * 
      * @param data
      */
-    public Subscription(SubscriptionData data) {
+    public Subscription(SubscriptionDataBean data) {
         this.data = data;
     }
 
@@ -50,7 +51,7 @@
      * @param destination
      */
     public Subscription(String destination) {
-        this.data = new SubscriptionData();
+        this.data = new SubscriptionDataBean();
         Destination dest = new Destination(destination);
         this.data.setDestinationData(dest.getData());
     }
@@ -62,7 +63,7 @@
      * @param topic
      */
     public Subscription(String destination, boolean topic) {
-        this.data = new SubscriptionData();
+        this.data = new SubscriptionDataBean();
         Destination dest = new Destination(destination,topic);
         this.data.setDestinationData(dest.getData());
         setTopic(topic);
@@ -74,15 +75,15 @@
      * @param destination
      */
     public Subscription(Buffer destination) {
-        this.data = new SubscriptionData();
+        this.data = new SubscriptionDataBean();
         Destination dest = new Destination(destination);
         this.data.setDestinationData(dest.getData());
     }
 
     /**
-     * @return the underlying SubscriptionData
+     * @return the underlying SubscriptionDataBean
      */
-    public SubscriptionData getData() {
+    public SubscriptionDataBean getData() {
         return this.data;
     }
 
@@ -178,8 +179,11 @@
     /**
      * @return the Destination
      */
-    public DestinationData getDestinationData() {
-        return this.data.getDestinationData();
+    public DestinationDataBean getDestinationData() {
+        if( !this.data.hasDestinationData() ) {
+            this.data.setDestinationData(new DestinationDataBean());
+        }
+        return this.data.getDestinationData().copy();
     }
 
     /**

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java Tue Feb 17 15:12:52 2009
@@ -17,17 +17,22 @@
 package org.apache.activeblaze.cluster;
 
 import java.net.URI;
+
 import org.apache.activeblaze.group.BlazeGroupChannelImpl;
 import org.apache.activeblaze.group.Group;
 import org.apache.activeblaze.group.Member;
 import org.apache.activeblaze.group.MemberImpl;
 import org.apache.activeblaze.impl.processor.Packet;
 import org.apache.activeblaze.util.SendRequest;
-import org.apache.activeblaze.wire.ElectionMessage;
+import org.apache.activeblaze.wire.AckData;
 import org.apache.activeblaze.wire.MessageType;
 import org.apache.activeblaze.wire.PacketData;
+import org.apache.activeblaze.wire.AckData.AckDataBuffer;
+import org.apache.activeblaze.wire.ElectionMessage.ElectionMessageBuffer;
+import org.apache.activeblaze.wire.PacketData.PacketDataBean;
+import org.apache.activeblaze.wire.PacketData.PacketDataBuffer;
 import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.protobuf.Message;
+import org.apache.activemq.protobuf.MessageBuffer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -156,20 +161,26 @@
         return this.clusterGroup.isElectionFinished();
     }
 
-    protected void processData(String id, Buffer correlationId, PacketData data) throws Exception {
+    protected void processData(String id, Buffer correlationId, PacketDataBuffer data) throws Exception {
         if (isStarted()) {
             processRequest(correlationId, data);
-            MessageType type = MessageType.valueOf(data.getType());
-            if (type == MessageType.BLAZE_DATA) {
+            MessageType type = data.getType();
+            switch(type) {
+            case BLAZE_DATA:
                 doProcessBlazeData(data);
-            } else if (type == MessageType.MEMBER_DATA) {
+                break;
+            case MEMBER_DATA:
                 doProcessMemberData(data);
-            } else if (type.equals(MessageType.ELECTION_MESSAGE)) {
+                break;
+            case ELECTION_MESSAGE:
                 doProcessElectionData(id, data);
-            } else if (type.equals(MessageType.STATE_DATA)) {
+                break;
+            case STATE_DATA:
                 doProcessStateData(data);
-            } else {
-                LOG.error("Unknown message type " + data);
+                break;
+            default:
+                LOG.error("Unexpected message type: " + type);
+                LOG.error("was: " + AckDataBuffer.parseUnframed(data.getPayload()));
             }
         }
     }
@@ -183,8 +194,8 @@
         synchronized (this.localMutex) {
             MemberImpl local = super.getLocalMember();
             long oldWeight = local.getMasterWeight();
-            local.getData().setMasterWeight(getConfiguration().getMasterWeight());
             if (oldWeight != getConfiguration().getMasterWeight()) {
+                local.setMasterWeight(getConfiguration().getMasterWeight());
                 // weight changed - so do an election in case the master changed
                 this.clusterGroup.scheduleElection();
             }
@@ -198,10 +209,8 @@
     }
 
     protected void doProcessElectionData(String id, PacketData data) throws Exception {
-        MessageType type = MessageType.ELECTION_MESSAGE;
-        ElectionMessage electionMessage = (ElectionMessage) type.createMessage();
         Buffer payload = data.getPayload();
-        electionMessage.mergeFramed(payload);
+        ElectionMessageBuffer electionMessage = ElectionMessageBuffer.parseUnframed(payload);
         ClusterGroup group = (ClusterGroup) getGroup();
         group.processElectionMessage(electionMessage, id);
     }
@@ -220,24 +229,23 @@
      * @return
      * @throws Exception
      */
-    protected Message<?> sendRequest(MemberImpl to, MessageType type, Message<?> message, int timeout) throws Exception {
-        Message<?> result = null;
+    protected MessageBuffer sendRequest(MemberImpl to, MessageType type, MessageBuffer message, int timeout) throws Exception {
+        MessageBuffer result = null;
         if (to != null) {
-            SendRequest request = new SendRequest();
-            PacketData data = getPacketData(type, message);
+            SendRequest<PacketDataBuffer> request = new SendRequest<PacketDataBuffer>();
+            PacketDataBean data = getPacketData(type, message);
             data.setReliable(true);
             data.setResponseRequired(false);
-            Packet packet = new Packet(data);
+            Packet packet = new Packet(data.freeze());
             packet.setTo(to.getAddress());
             synchronized (this.messageRequests) {
                 this.messageRequests.put(data.getMessageId(), request);
             }
             this.unicast.downStream(packet);
-            data = (PacketData) request.get(timeout);
-            if (data != null) {
-                type = MessageType.valueOf(data.getType());
-                result = type.createMessage();
-                result.mergeFramed(data.getPayload());
+            PacketDataBuffer response = request.get(timeout);
+            if (response != null) {
+                type = response.getType();
+                result = type.parseUnframed(response.getPayload());
             }
         }
         return result;

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterGroup.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterGroup.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterGroup.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterGroup.java Tue Feb 17 15:12:52 2009
@@ -25,12 +25,14 @@
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.activeblaze.group.Group;
 import org.apache.activeblaze.group.Member;
 import org.apache.activeblaze.group.MemberImpl;
-import org.apache.activeblaze.wire.ElectionMessage;
 import org.apache.activeblaze.wire.ElectionType;
 import org.apache.activeblaze.wire.MessageType;
+import org.apache.activeblaze.wire.ElectionMessage.ElectionMessageBean;
+import org.apache.activeblaze.wire.ElectionMessage.ElectionMessageBuffer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -220,15 +222,15 @@
         }
     }
 
-    void processElectionMessage(ElectionMessage msg, String correlationId) throws Exception {
-        MemberImpl from = new MemberImpl(msg.getMember());
+    void processElectionMessage(ElectionMessageBuffer msg, String correlationId) throws Exception {
+        MemberImpl from = new MemberImpl(msg.getMember().freeze());
         if (!from.getId().equals(getLocalMember().getId())) {
             LOG.debug(getLocalMember() + " Election message " + msg.getElectionType() + " from " + from);
             if (msg.getElectionType().equals(ElectionType.ELECTION)) {
-                ElectionMessage reply = new ElectionMessage();
+                ElectionMessageBean reply = new ElectionMessageBean();
                 reply.setElectionType(ElectionType.ANSWER);
                 reply.setMember(this.channel.getLocalMember().getData());
-                this.channel.sendReply(from, msg.type(), reply, correlationId);
+                this.channel.sendReply(from, MessageType.ELECTION_MESSAGE, reply.freeze(), correlationId);
                 // election(null, false);
             } else if (msg.getElectionType().equals(ElectionType.MASTER)) {
                 if (isValidMaster(from)) {
@@ -244,10 +246,10 @@
 
     void broadcastElectionType(ElectionType type) throws Exception {
         if (isStarted()) {
-            ElectionMessage msg = new ElectionMessage();
+            ElectionMessageBean msg = new ElectionMessageBean();
             msg.setMember(this.channel.getLocalMember().getData());
             msg.setElectionType(type);
-            this.channel.broadcastMessage(MessageType.ELECTION_MESSAGE, msg);
+            this.channel.broadcastMessage(MessageType.ELECTION_MESSAGE, msg.freeze());
         }
     }
 

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterState.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterState.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterState.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterState.java Tue Feb 17 15:12:52 2009
@@ -32,6 +32,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
+
 import org.apache.activeblaze.BaseService;
 import org.apache.activeblaze.BlazeRuntimeException;
 import org.apache.activeblaze.group.Member;
@@ -43,6 +44,8 @@
 import org.apache.activeblaze.wire.StateData;
 import org.apache.activeblaze.wire.StateKeyData;
 import org.apache.activeblaze.wire.StateType;
+import org.apache.activeblaze.wire.StateData.StateDataBean;
+import org.apache.activeblaze.wire.StateData.StateDataBuffer;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -116,10 +119,10 @@
         stateKey.setRemoveOnExit(isRemoveOwnedObjectsOnExit());
         stateKey.setReleaseLockOnExit(isReleaseLockOnExit());
         stateKey.setTimeToLive(getTimeToLive());
-        StateData stateData = new StateData();
+        StateDataBean stateData = new StateDataBean();
         stateData.setKeyData(stateKey.getKeyData());
         stateData.setLockWrite(true);
-        sendMasterRequest(stateData);
+        sendMasterRequest(stateData.freeze());
     }
 
     /**
@@ -147,10 +150,10 @@
         stateKey.setReleaseLockOnExit(isReleaseLockOnExit());
         stateKey.setTimeToLive(getTimeToLive());
         stateKey.setLockLeaseTime(leaseTime);
-        StateData stateData = new StateData();
+        StateDataBean stateData = new StateDataBean();
         stateData.setKeyData(stateKey.getKeyData());
         stateData.setLockWrite(true);
-        sendMasterRequest(stateData);
+        sendMasterRequest(stateData.freeze());
     }
 
     /**
@@ -269,12 +272,12 @@
             stateKey.setReleaseLockOnExit(releaseLockOnExit);
             stateKey.setTimeToLive(timeToLive);
             stateKey.setLockLeaseTime(leaseTime);
-            StateData stateData = new StateData();
+            StateDataBean stateData = new StateDataBean();
             stateData.setKeyData(stateKey.getKeyData());
             stateData.setStateType(StateType.INSERT);
             stateData.setMapWrite(true);
             stateData.setValue(IOUtils.getBuffer(value));
-            resultValue = sendMasterRequest(stateData);
+            resultValue = sendMasterRequest(stateData.freeze());
         } catch (Exception e) {
             if (e instanceof ClusterUpdateException) {
                 throw (ClusterUpdateException) e;
@@ -318,12 +321,12 @@
     public Object remove(java.lang.Object key) {
         checkStatus();
         StateKey stateKey = new StateKey(this.channel.getLocalMember(), key.toString());
-        StateData stateData = new StateData();
+        StateDataBean stateData = new StateDataBean();
         stateData.setKeyData(stateKey.getKeyData());
         stateData.setMapWrite(true);
         stateData.setStateType(StateType.DELETE);
         try {
-            return this.channel.sendRequest((MemberImpl) this.channel.getMaster(), stateData.type(), stateData,
+            return this.channel.sendRequest((MemberImpl) this.channel.getMaster(), MessageType.STATE_DATA, stateData.freeze(),
                     getRequestTimeout());
         } catch (Exception e) {
             throw new BlazeRuntimeException(e);
@@ -553,11 +556,11 @@
                 // was the master before the new node started - so
                 // we take responsibility for updating the new node
                 for (StateValue value : this.localMap.values()) {
-                    StateData newStateData = value.getData().clone();
+                    StateDataBean newStateData = value.getData().copy();
                     newStateData.setMapWrite(false);
                     newStateData.setMapUpdate(true);
                     newStateData.setStateType(StateType.SYNC);
-                    broadcastStateUpdate(newStateData, "");
+                    broadcastStateUpdate(newStateData.freeze(), "");
                 }
             }
         } catch (Exception e) {
@@ -621,10 +624,8 @@
     }
 
     private void doProcessStateData(PacketData data) throws Exception {
-        MessageType type = MessageType.STATE_DATA;
-        StateData stateData = (StateData) type.createMessage();
         Buffer payload = data.getPayload();
-        stateData.mergeFramed(payload);
+        StateDataBuffer stateData = StateDataBuffer.parseUnframed(payload);
         String correlationId = "";
         if (data.hasMessageId()) {
             correlationId = data.getMessageId().toStringUtf8();
@@ -644,14 +645,15 @@
         }
     }
 
-    protected void processLockWrite(StateData stateData, String correlationId) throws Exception {
+    protected void processLockWrite(StateDataBuffer buffer, String correlationId) throws Exception {
+        // reset values for when we broadcast an update
+        StateDataBean stateData = buffer.copy();
         if (this.channel.waitForElection(0)) {
-            // reset values for when we broadcast an update
             stateData.setLockUpdate(true);
             stateData.setLockWrite(false);
             StateValue stateValue = new StateValue(stateData);
             boolean newLock = stateValue.getKey().isLocked();
-            MemberImpl newOwner = new MemberImpl(stateData.getKeyData().getMember());
+            MemberImpl newOwner = new MemberImpl(stateData.getKeyData().getMember().freeze());
             long newLockExpiration = newLock ? stateValue.getKey().getLockExpiration() : 0l;
             if (this.channel.isMaster()) {
                 StateKey originalKey = getKey(stateValue.getKey().getKey());
@@ -662,19 +664,18 @@
                             Serializable reply = new ClusterUpdateException("Owned by " + originalKey.getOwner());
                             stateReply.getData().setError(true);
                             stateReply.getData().setValue(IOUtils.getBuffer(reply));
-                            this.channel.sendReply((MemberImpl) stateReply.getKey().getOwner(), stateReply.getData()
-                                    .type(), stateReply.getData(), correlationId);
+                            this.channel.sendReply((MemberImpl) stateReply.getKey().getOwner(), MessageType.STATE_DATA, stateReply.getData().freeze(), correlationId);
                         } else {
                             originalKey.setLocked(newLock);
                             originalKey.setOwner(newOwner);
                             originalKey.setLockExpiration(newLockExpiration);
-                            broadcastStateUpdate(stateData, correlationId);
+                            broadcastStateUpdate(stateData.freeze(), correlationId);
                         }
                     } else {
                         originalKey.setLocked(newLock);
                         originalKey.setOwner(newOwner);
                         originalKey.setLockExpiration(newLockExpiration);
-                        broadcastStateUpdate(stateData, correlationId);
+                        broadcastStateUpdate(stateData.freeze(), correlationId);
                     }
                 }
             } else {
@@ -683,17 +684,17 @@
                 Serializable reply = new ClusterNotMasterException(this.channel.getLocalMember() + " Not Master");
                 stateReply.getData().setError(true);
                 stateReply.getData().setValue(IOUtils.getBuffer(reply));
-                this.channel.sendReply((MemberImpl) stateReply.getKey().getOwner(), stateReply.getData().type(),
-                        stateReply.getData(), correlationId);
+                this.channel.sendReply((MemberImpl) stateReply.getKey().getOwner(), MessageType.STATE_DATA,
+                        stateReply.getData().freeze(), correlationId);
             }
         }
     }
 
     protected void processLockUpdate(StateData stateData, String correlationId) throws Exception {
         if (this.channel.waitForElection(0)) {
-            StateValue stateValue = new StateValue(stateData);
+            StateValue stateValue = new StateValue(stateData.copy());
             boolean newLock = stateValue.getKey().isLocked();
-            MemberImpl newOwner = new MemberImpl(stateData.getKeyData().getMember());
+            MemberImpl newOwner = new MemberImpl(stateData.getKeyData().getMember().freeze());
             long newLockExpiration = newLock ? stateValue.getKey().getLockExpiration() : 0l;
             if (!this.channel.isMaster()) {
                 StateKey originalKey = getKey(stateValue.getKey().getKey());
@@ -707,7 +708,7 @@
     }
 
     protected void processMapOperations(StateData data, String correlationId) throws Exception {
-        StateValue stateValue = new StateValue(data);
+        StateValue stateValue = new StateValue(data.copy());
         StateKey key = stateValue.getKey();
         StateType stateType = stateValue.getData().getStateType();
         if (stateType != null) {
@@ -724,30 +725,33 @@
                             } else {
                                 old = this.localMap.remove(key.getKey());
                             }
-                            StateData newStateData = data.clone();
+                            StateDataBean newStateData = data.copy();
                             newStateData.clearOldvalue();
                             if (old != null && old.getData().getValue() != null) {
                                 newStateData.setOldvalue(old.getData().getValue());
                             }
                             newStateData.setMapWrite(false);
                             newStateData.setMapUpdate(true);
-                            broadcastStateUpdate(newStateData, correlationId);
+                            StateDataBuffer buffer = newStateData.freeze();
+                            StateDataBuffer t = StateDataBuffer.parseUnframed(buffer.toUnframedBuffer());
+                            
+                            broadcastStateUpdate(buffer, correlationId);
                             fireMapChanged(key.getOwner(), key.getKey(), old.getValue(), stateValue.getValue(), false);
                         } else {
                             StateValue stateReply = stateValue.copy();
                             Serializable reply = new ClusterUpdateException("Owned by " + originalKey.getOwner());
                             stateReply.getData().setValue(IOUtils.getBuffer(reply));
                             stateReply.getData().setError(true);
-                            this.channel.sendReply((MemberImpl) stateReply.getKey().getOwner(), stateReply.getData()
-                                    .type(), stateReply.getData(), correlationId);
+                            this.channel.sendReply((MemberImpl) stateReply.getKey().getOwner(),
+                                    MessageType.STATE_DATA, stateReply.getData().freeze(), correlationId);
                         }
                     } else {
                         if (insert) {
                             this.localMap.put(key.getKey(), stateValue);
-                            StateData newStateData = data.clone();
+                            StateDataBean newStateData = data.copy();
                             newStateData.setMapWrite(false);
                             newStateData.setMapUpdate(true);
-                            broadcastStateUpdate(newStateData, correlationId);
+                            broadcastStateUpdate(newStateData.freeze(), correlationId);
                             fireMapChanged(key.getOwner(), key.getKey(), null, stateValue.getValue(), false);
                         } else {
                             // this shouldn't happen - as we are trying to remove
@@ -763,17 +767,17 @@
                     Serializable reply = new ClusterNotMasterException(this.channel.getLocalMember() + " Not Master");
                     stateReply.getData().setError(true);
                     stateReply.getData().setValue(IOUtils.getBuffer(reply));
-                    this.channel.sendReply((MemberImpl) stateReply.getKey().getOwner(), stateReply.getData().type(),
-                            stateReply.getData(), correlationId);
+                    this.channel.sendReply((MemberImpl) stateReply.getKey().getOwner(), MessageType.STATE_DATA,
+                            stateReply.getData().freeze(), correlationId);
                 }
             }
         }
     }
 
-    protected void processMapUpdate(StateData data) throws Exception {
+    protected void processMapUpdate(StateDataBuffer data) throws Exception {
         StateKeyData skd = data.getKeyData();
-        StateKey key = new StateKey(skd);
-        StateValue stateValue = new StateValue(data);
+        StateKey key = new StateKey(skd.copy());
+        StateValue stateValue = new StateValue(data.copy());
         boolean containsKey = this.localMap.containsKey(key.getKey());
         if (this.channel.waitForElection(0)) {
             boolean insert = data.getStateType().equals(StateType.SYNC) || data.getStateType().equals(StateType.INSERT);
@@ -790,7 +794,7 @@
                             old = this.localMap.put(key.getKey(), stateValue);
                         } else {
                             old = this.localMap.remove(key.getKey());
-                            StateData copy = data.clone();
+                            StateDataBean copy = data.copy();
                             copy.clearValue();
                             copy.clearOldvalue();
                             stateValue = new StateValue(copy);
@@ -893,8 +897,8 @@
                     value.getData().setExpired(true);
                     value.getData().clearMapWrite();
                     value.getData().setMapUpdate(true);
-                    broadcastStateUpdate(value.getData(), "");
-                    fireMapChanged(new MemberImpl(value.getData().getKeyData().getMember()), k, old.getValue(), null,
+                    broadcastStateUpdate(value.getData().freeze(), "");
+                    fireMapChanged(new MemberImpl(value.getData().getKeyData().getMember().freeze()), k, old.getValue(), null,
                             true);
                 }
             }
@@ -912,7 +916,7 @@
                 StateValue copy = value.copy();
                 copy.getData().setStateType(StateType.DELETE);
                 copy.getData().setLockExpired(true);
-                broadcastStateUpdate(copy.getData(), "");
+                broadcastStateUpdate(copy.getData().freeze(), "");
             }
         }
     }
@@ -942,10 +946,10 @@
         }
     }
 
-    protected void broadcastStateUpdate(StateData value, String correlationId) {
+    protected void broadcastStateUpdate(StateDataBuffer value, String correlationId) {
         if (isStarted()) {
             try {
-                this.channel.broadcastMessage(value.type(), value, correlationId);
+                this.channel.broadcastMessage(MessageType.STATE_DATA, value, correlationId);
             } catch (Exception e) {
                 if (isStarted()) {
                     LOG.error("Failed to send StateData " + value, e);
@@ -954,13 +958,13 @@
         }
     }
 
-    protected Object sendMasterRequest(StateData stateData) throws Exception {
+    protected Object sendMasterRequest(StateDataBuffer stateData) throws Exception {
         int retryCount = 0;
         MemberImpl master = null;
         while (retryCount < 5) {
             this.channel.waitForElection(0);
             master = (MemberImpl) this.channel.getMaster();
-            StateData resultData = (StateData) this.channel.sendRequest(master, stateData.type(), stateData,
+            StateData resultData = (StateData) this.channel.sendRequest(master, MessageType.STATE_DATA, stateData,
                     getRequestTimeout());
             retryCount++;
             if (resultData != null) {

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ElectionService.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ElectionService.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ElectionService.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ElectionService.java Tue Feb 17 15:12:52 2009
@@ -18,12 +18,14 @@
 
 import java.util.ArrayList;
 import java.util.List;
+
 import org.apache.activeblaze.BaseService;
 import org.apache.activeblaze.group.Member;
 import org.apache.activeblaze.group.MemberImpl;
 import org.apache.activeblaze.util.AsyncGroupRequest;
-import org.apache.activeblaze.wire.ElectionMessage;
 import org.apache.activeblaze.wire.ElectionType;
+import org.apache.activeblaze.wire.MessageType;
+import org.apache.activeblaze.wire.ElectionMessage.ElectionMessageBean;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -97,10 +99,10 @@
                 if (this.group.channel.getId().equals(member.getId())) {
                     doCall = true;
                 } else if (doCall) {
-                    ElectionMessage msg = new ElectionMessage();
+                    ElectionMessageBean msg = new ElectionMessageBean();
                     msg.setMember(this.group.getLocalMember().getData());
                     msg.setElectionType(ElectionType.ELECTION);
-                    this.group.channel.sendMessage(request, member, msg.type(), msg);
+                    this.group.channel.sendMessage(request, member, MessageType.ELECTION_MESSAGE, msg.freeze());
                 }
             }
             boolean result = request.isSuccess(this.group.getConfiguration().getAwaitGroupTimeout());

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/StateKey.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/StateKey.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/StateKey.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/StateKey.java Tue Feb 17 15:12:52 2009
@@ -18,8 +18,9 @@
 
 import org.apache.activeblaze.group.Member;
 import org.apache.activeblaze.group.MemberImpl;
-import org.apache.activeblaze.wire.MemberData;
 import org.apache.activeblaze.wire.StateKeyData;
+import org.apache.activeblaze.wire.MemberData.MemberDataBean;
+import org.apache.activeblaze.wire.StateKeyData.StateKeyDataBean;
 
 /**
  * Holds information about a StateKey
@@ -27,7 +28,7 @@
  */
 class StateKey {
     private MemberImpl owner;
-    private StateKeyData keyData;
+    private StateKeyDataBean keyData;
 
     /**
      * Constructor
@@ -37,19 +38,18 @@
      */
      StateKey(MemberImpl owner, String key) {
         this.owner = owner;
-        this.keyData = new StateKeyData();
+        this.keyData = new StateKeyDataBean();
         this.keyData.setKey(key);
         this.keyData.setMember(owner.getData());
     }
      
-     StateKey(StateKeyData keyData) throws Exception{
+     StateKey(StateKeyDataBean keyData) throws Exception{
          this.keyData=keyData;
-         MemberData memberData = keyData.getMember();
-         this.owner=new MemberImpl(memberData);
+         this.owner=new MemberImpl(keyData.getMember().freeze());
      }
      
      StateKey copy() throws Exception {
-         return new StateKey(this.keyData.clone());
+         return new StateKey(this.keyData.copy());
      }
 
     /**

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/StateValue.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/StateValue.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/StateValue.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/StateValue.java Tue Feb 17 15:12:52 2009
@@ -17,7 +17,7 @@
 package org.apache.activeblaze.cluster;
 
 import org.apache.activeblaze.util.IOUtils;
-import org.apache.activeblaze.wire.StateData;
+import org.apache.activeblaze.wire.StateData.StateDataBean;
 
 /**
  * Holds information about the Value in the Map
@@ -25,7 +25,7 @@
  */
 class StateValue {
     private final StateKey key;
-    private final StateData data;
+    private final StateDataBean data;
     private Object value;
 
     /**
@@ -34,19 +34,19 @@
      * @param key
      * @param value
      */
-    StateValue(StateKey key, Object value, StateData data) {
+    StateValue(StateKey key, Object value, StateDataBean data) {
         this.key = key;
         this.value = value;
         this.data = data;
     }
 
-    StateValue(StateData data) throws Exception {
-        this.key = new StateKey(data.getKeyData());
+    StateValue(StateDataBean data) throws Exception {
+        this.key = new StateKey(data.getKeyData().copy());
         this.data = data;
     }
 
     StateValue copy() throws Exception {
-        return new StateValue(this.data.clone());
+        return new StateValue(this.data.copy());
     }
 
     /**
@@ -72,7 +72,7 @@
     /**
      * @return the data
      */
-    StateData getData() {
+    StateDataBean getData() {
         return this.data;
     }
 

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java Tue Feb 17 15:12:52 2009
@@ -16,6 +16,13 @@
  */
 package org.apache.activeblaze.group;
 
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+
 import org.apache.activeblaze.BlazeChannelImpl;
 import org.apache.activeblaze.BlazeMessage;
 import org.apache.activeblaze.BlazeMessageListener;
@@ -37,20 +44,17 @@
 import org.apache.activeblaze.util.LRUCache;
 import org.apache.activeblaze.util.PropertyUtil;
 import org.apache.activeblaze.util.SendRequest;
-import org.apache.activeblaze.wire.BlazeData;
-import org.apache.activeblaze.wire.MemberData;
 import org.apache.activeblaze.wire.MessageType;
 import org.apache.activeblaze.wire.PacketData;
+import org.apache.activeblaze.wire.BlazeData.BlazeDataBuffer;
+import org.apache.activeblaze.wire.MemberData.MemberDataBean;
+import org.apache.activeblaze.wire.MemberData.MemberDataBuffer;
+import org.apache.activeblaze.wire.PacketData.PacketDataBean;
+import org.apache.activeblaze.wire.PacketData.PacketDataBuffer;
 import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.protobuf.Message;
+import org.apache.activemq.protobuf.MessageBuffer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
 /**
  * <P>
  * A <CODE>BlazeGroupChannel</CODE> enables peer-based point to point
@@ -63,7 +67,7 @@
     protected Processor unicast;
     private MemberImpl local;
     private BlazeMessageListener inboxListener;
-    protected Map<Buffer, SendRequest> messageRequests = new LRUCache<Buffer, SendRequest>(10000);
+    protected Map<Buffer, SendRequest<PacketDataBuffer>> messageRequests = new LRUCache<Buffer, SendRequest<PacketDataBuffer>>(10000);
     private final List<SubscriptionHolder> queueMessageListeners = new CopyOnWriteArrayList<SubscriptionHolder>();
     private Group group;
     protected Buffer inboxAddress;
@@ -455,20 +459,19 @@
             throws Exception {
         BlazeMessage result = null;
         if (member != null) {
-            SendRequest request = new SendRequest();
+            SendRequest<PacketDataBuffer> request = new SendRequest<PacketDataBuffer>();
+            message.setDestination(new Destination(destinationName, false));
             message.storeContent();
-            BlazeData blazeData = message.getContent();
-            Destination dest = new Destination(destinationName, false);
-            blazeData.setDestinationData(dest.getData());
-            PacketData packetData = getPacketData(blazeData.type(), blazeData);
+            BlazeDataBuffer blazeData = message.getContent().freeze();
+            PacketDataBean packetData = getPacketData(MessageType.BLAZE_DATA, blazeData);
             synchronized (this.messageRequests) {
                 this.messageRequests.put(packetData.getMessageId(), request);
             }
-            Packet packet = new Packet(packetData);
+            Packet packet = new Packet(packetData.freeze());
             packet.setTo((member).getAddress());
             this.unicast.downStream(packet);
-            packetData = (PacketData) request.get(timeout);
-            result = buildBlazeMessage(packetData);
+            PacketDataBuffer response = request.get(timeout);
+            result = buildBlazeMessage(response);
         }
         return result;
     }
@@ -483,29 +486,24 @@
      */
     public void sendReply(Member to, BlazeMessage response, String correlationId) throws Exception {
         response.storeContent();
-        BlazeData blazeData = response.getContent();
-        PacketData data = getPacketData(blazeData.type(), blazeData);
+        BlazeDataBuffer blazeData = response.getContent().freeze();
+        PacketDataBean data = getPacketData(MessageType.BLAZE_DATA, blazeData);
         data.setCorrelationId(new Buffer(correlationId));
         data.setReliable(true);
-        Packet packet = new Packet(data);
+        Packet packet = new Packet(data.freeze());
         packet.setTo(((MemberImpl) to).getAddress());
         this.unicast.downStream(packet);
     }
 
-    protected void send(MemberImpl member, Buffer destination, BlazeMessage message) throws Exception {
-        message.storeContent();
-        BlazeData blazeData = message.getContent();
-        send(member, destination, blazeData);
-    }
-
-    protected void send(MemberImpl member, Buffer destinationName, BlazeData blazeData) throws Exception {
+    protected void send(MemberImpl member, Buffer destinationName, BlazeMessage message) throws Exception {
         Destination dest = new Destination(destinationName, false);
-        blazeData.clearDestinationData();
-        blazeData.setDestinationData(dest.getData());
-        PacketData data = getPacketData(MessageType.BLAZE_DATA, blazeData);
+        message.setDestination(dest);
+        message.storeContent();
+        
+        PacketDataBean data = getPacketData(MessageType.BLAZE_DATA, message.getContent().freeze());
         data.setReliable(true);
         data.setResponseRequired(true);
-        Packet packet = new Packet(data);
+        Packet packet = new Packet(data.freeze());
         packet.setTo(member.getAddress());
         this.unicast.downStream(packet);
     }
@@ -630,10 +628,10 @@
         return this.local.getGroups();
     }
 
-    protected void processData(String id, Buffer correlationId, PacketData data) throws Exception {
+    protected void processData(String id, Buffer correlationId, PacketDataBuffer data) throws Exception {
         if (isStarted()) {
             if (!processRequest(correlationId, data)) {
-                MessageType type = MessageType.valueOf(data.getType());
+                MessageType type = data.getType();
                 if (type == MessageType.BLAZE_DATA) {
                     doProcessBlazeData(data);
                 } else if (type == MessageType.MEMBER_DATA) {
@@ -643,10 +641,10 @@
         }
     }
 
-    protected boolean processRequest(Buffer correlationId, Message<?> value) {
+    protected boolean processRequest(Buffer correlationId, PacketDataBuffer value) {
         boolean result = false;
         if (correlationId != null) {
-            SendRequest request = null;
+            SendRequest<PacketDataBuffer> request = null;
             synchronized (this.messageRequests) {
                 request = this.messageRequests.remove(correlationId);
             }
@@ -695,10 +693,8 @@
     }
 
     protected final void doProcessMemberData(PacketData data) throws Exception {
-        MessageType type = MessageType.MEMBER_DATA;
-        MemberData memberData = (MemberData) type.createMessage();
         Buffer payload = data.getPayload();
-        memberData.mergeFramed(payload);
+        MemberDataBuffer memberData = MemberDataBuffer.parseUnframed(payload);
         this.group.processMember(memberData);
     }
 
@@ -707,10 +703,10 @@
      * @param message
      * @throws Exception
      */
-    public void broadcastMessage(MessageType messageType, Message<?> message) throws Exception {
-        PacketData data = getPacketData(messageType, message);
+    public void broadcastMessage(MessageType messageType, MessageBuffer message) throws Exception {
+        PacketDataBean data = getPacketData(messageType, message);
         data.setReliable(false);
-        Packet packet = new Packet(data);
+        Packet packet = new Packet(data.freeze());
         this.broadcast.downStreamManagement(packet);
     }
 
@@ -723,16 +719,15 @@
      * @param message
      * @throws Exception
      */
-    public void sendMessage(AsyncGroupRequest asyncRequest, MemberImpl member, MessageType messageType,
-            Message<?> message) throws Exception {
-        SendRequest request = new SendRequest();
-        PacketData data = getPacketData(messageType, message);
+    public void sendMessage(AsyncGroupRequest asyncRequest, MemberImpl member, MessageType messageType,  MessageBuffer message) throws Exception {
+        SendRequest<PacketDataBuffer> request = new SendRequest<PacketDataBuffer>();
+        PacketDataBean data = getPacketData(messageType, message);
         asyncRequest.add(data.getMessageId(), request);
         synchronized (this.messageRequests) {
             this.messageRequests.put(data.getMessageId(), request);
         }
         data.setReliable(false);
-        Packet packet = new Packet(data);
+        Packet packet = new Packet(data.freeze());
         packet.setTo(member.getAddress());
         this.unicast.downStream(packet);
     }
@@ -745,11 +740,11 @@
      * @param correlationId
      * @throws Exception
      */
-    public void broadcastMessage(MessageType messageType, Message<?> message, String correlationId) throws Exception {
-        PacketData data = getPacketData(messageType, message);
+    public void broadcastMessage(MessageType messageType, MessageBuffer message, String correlationId) throws Exception {
+        PacketDataBean data = getPacketData(messageType, message);
         data.setCorrelationId(new Buffer(correlationId));
         data.setReliable(true);
-        Packet packet = new Packet(data);
+        Packet packet = new Packet(data.freeze());
         this.broadcast.downStreamManagement(packet);
     }
 
@@ -759,10 +754,10 @@
      * @param message
      * @throws Exception
      */
-    public void sendMessage(InetSocketAddress to, MessageType messageType, Message<?> message) throws Exception {
-        PacketData data = getPacketData(messageType, message);
+    public void sendMessage(InetSocketAddress to, MessageType messageType, MessageBuffer message) throws Exception {
+        PacketDataBean data = getPacketData(messageType, message);
         data.setReliable(false);
-        Packet packet = new Packet(data);
+        Packet packet = new Packet(data.freeze());
         packet.setTo(to);
         this.unicast.downStream(packet);
     }
@@ -774,12 +769,12 @@
      * @param correlationId
      * @throws Exception
      */
-    public void sendReply(MemberImpl to, MessageType messageType, Message<?> message, String correlationId)
+    public void sendReply(MemberImpl to, MessageType messageType, MessageBuffer message, String correlationId)
             throws Exception {
-        PacketData data = getPacketData(messageType, message);
+        PacketDataBean data = getPacketData(messageType, message);
         data.setCorrelationId(new Buffer(correlationId));
         data.setReliable(false);
-        Packet packet = new Packet(data);
+        Packet packet = new Packet(data.freeze());
         packet.setTo(to.getAddress());
         this.unicast.downStream(packet);
     }
@@ -811,22 +806,32 @@
         if (isInitialized()) {
             try {
                 synchronized (this.localMutex) {
-                    MemberImpl result = new MemberImpl(getLocalMember().getData().clone());
-                    result.getData().clearSubscriptionData();
+                    
+                    MemberDataBean bean = getLocalMember().getData().copy();
+                    bean.clearSubscriptionData();
                     // add topic destinations
                     for (SubscriptionHolder s : this.topicMessageListeners) {
-                        result.getData().addSubscriptionData(s.getSubscription().getData());
+                        bean.addSubscriptionData(s.getSubscription().getData());
                     }
                     // add Queue Destinations
                     
                     for (SubscriptionHolder s : this.queueMessageListeners) {
-                        result.getData().addSubscriptionData(s.getSubscription().getData());
+                        bean.addSubscriptionData(s.getSubscription().getData());
                     }
                    
+                    MemberImpl result = new MemberImpl(bean.freeze());
+                    
                     this.group.processMemberUpdate(this.local, result);
-                    result.getData().setSubscriptionsChanged(true);
+                    
+                    bean = bean.copy();
+                    bean.setSubscriptionsChanged(true);
+                    result = new MemberImpl(bean.freeze());
                     this.group.broadcastHeartBeat(result);
-                    result.getData().clearSubscriptionsChanged();
+                    
+                    bean = bean.copy();
+                    bean.clearSubscriptionsChanged();
+                    result = new MemberImpl(bean.freeze());
+                    
                     this.local = result;
                     this.group.updateLocal(this.local);
                 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java Tue Feb 17 15:12:52 2009
@@ -16,14 +16,6 @@
  */
 package org.apache.activeblaze.group;
 
-import org.apache.activeblaze.BaseService;
-import org.apache.activeblaze.Subscription;
-import org.apache.activeblaze.wire.MemberData;
-import org.apache.activeblaze.wire.MessageType;
-import org.apache.activeblaze.wire.SubscriptionData;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -32,10 +24,19 @@
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 
+import org.apache.activeblaze.BaseService;
+import org.apache.activeblaze.Subscription;
+import org.apache.activeblaze.wire.MessageType;
+import org.apache.activeblaze.wire.SubscriptionData;
+import org.apache.activeblaze.wire.MemberData.MemberDataBuffer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 /**
  * Maintains members of a group
  * 
@@ -255,7 +256,17 @@
      */
     public void doStop() throws Exception {
         if (this.heartBeatTimer != null) {
-            this.heartBeatTimer.cancel();
+            // Make sure we shutdown the timer before shutting down the down stream 
+            // processors to avoid the timer getting errors.
+            final CountDownLatch done = new CountDownLatch(1);
+            this.heartBeatTimer.schedule(new TimerTask(){
+                @Override
+                public void run() {
+                    heartBeatTimer.cancel();
+                    done.countDown();
+                }}, 0);
+            done.await();
+            this.heartBeatTimer = null;
         }
         if (this.checkMemberShipTimer != null) {
             this.checkMemberShipTimer.cancel();
@@ -273,7 +284,7 @@
      * @throws Exception
      * @return Member if a new member else null
      */
-    protected final MemberImpl processMember(MemberData data) throws Exception {
+    protected final MemberImpl processMember(MemberDataBuffer data) throws Exception {
         MemberImpl result = null;
         MemberImpl old = null;
         MemberImpl member = new MemberImpl(data);
@@ -283,7 +294,7 @@
                 processMemberStarted(member);
                 if (!member.getId().equals(this.channel.getId())) {
                     this.channel.sendMessage(member.getAddress(), MessageType.MEMBER_DATA, this.channel
-                            .getLocalMember().getData());
+                            .getLocalMember().getData().freeze());
                 }
                 result = member;
             } else {
@@ -351,6 +362,9 @@
 
     private void processDestinationsForStarted(MemberImpl member) {
         List<SubscriptionData> subscriptionList = member.getData().getSubscriptionDataList();
+        if( subscriptionList == null ) {
+            return;
+        }
         for (SubscriptionData subData : subscriptionList) {
             Map<Subscription, List<MemberImpl>> map = null;
             if (subData.getDestinationData().getTopic()) {
@@ -358,7 +372,7 @@
             } else {
                 map = this.queueMap;
             }
-            Subscription key = new Subscription(subData);
+            Subscription key = new Subscription(subData.copy());
             List<MemberImpl> members = map.get(key);
             if (members == null) {
                 members = new CopyOnWriteArrayList<MemberImpl>();
@@ -370,6 +384,9 @@
 
     private void processDestinationsForStopped(MemberImpl member) {
         List<SubscriptionData> subscriptionList = member.getData().getSubscriptionDataList();
+        if( subscriptionList == null ) {
+            return;
+        }
         for (SubscriptionData subData : subscriptionList) {
             Map<Subscription, List<MemberImpl>> map = null;
             if (subData.getDestinationData().getTopic()) {
@@ -377,7 +394,7 @@
             } else {
                 map = this.queueMap;
             }
-            Subscription key = new Subscription(subData);
+            Subscription key = new Subscription(subData.copy());
             List<MemberImpl> members = map.get(key);
             if (members != null) {
                 members.remove(member);

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java Tue Feb 17 15:12:52 2009
@@ -21,8 +21,11 @@
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
+
 import org.apache.activeblaze.impl.destination.DestinationMatch;
 import org.apache.activeblaze.wire.MemberData;
+import org.apache.activeblaze.wire.MemberData.MemberDataBean;
+import org.apache.activeblaze.wire.MemberData.MemberDataBuffer;
 import org.apache.activemq.protobuf.Buffer;
 
 /**
@@ -30,7 +33,7 @@
  * 
  */
 public class MemberImpl implements Member, Comparable<MemberImpl> {
-    private final MemberData data;
+    private MemberDataBuffer data;
     private final InetSocketAddress socketAddress;
     private final Buffer socketAddressAsBuffer;
 
@@ -48,14 +51,15 @@
         InetAddress addr = InetAddress.getByName(localURI.getHost());
         this.socketAddress = new InetSocketAddress(addr, localURI.getPort());
         this.socketAddressAsBuffer = new Buffer(this.socketAddress.toString());
-        this.data = new MemberData();
-        this.data.setId(id);
-        this.data.setName(name);
-        this.data.setMasterWeight(masterWeight);
-        this.data.setRefinedWeight(refinedWeight);
-        this.data.setStartTime(System.currentTimeMillis());
-        this.data.setInetAddress(new Buffer(addr.getHostAddress()));
-        this.data.setPort(localURI.getPort());
+        MemberDataBean bean = new MemberDataBean();
+        bean.setId(id);
+        bean.setName(name);
+        bean.setMasterWeight(masterWeight);
+        bean.setRefinedWeight(refinedWeight);
+        bean.setStartTime(System.currentTimeMillis());
+        bean.setInetAddress(new Buffer(addr.getHostAddress()));
+        bean.setPort(localURI.getPort());
+        this.data=bean.freeze();
     }
 
     /**
@@ -64,7 +68,7 @@
      * @param data
      * @throws Exception
      */
-    public MemberImpl(MemberData data) throws Exception {
+    public MemberImpl(MemberDataBuffer data) throws Exception {
         this.data = data;
         InetAddress addr = InetAddress.getByName(data.getInetAddress().toStringUtf8());
         this.socketAddress = new InetSocketAddress(addr, data.getPort());
@@ -83,7 +87,11 @@
      * @param name
      */
     public void setName(String name) {
-        this.data.setName(name);
+        this.data = this.data.copy().setName(name).freeze();
+    }
+
+    public void setMasterWeight(long masterWeight) {
+        this.data = this.data.copy().setMasterWeight(masterWeight).freeze();
     }
 
     /**
@@ -94,7 +102,7 @@
     }
 
     void setId(String id) {
-        this.data.setId(id);
+        this.data = this.data.copy().setId(id).freeze();
     }
 
     /**
@@ -138,7 +146,7 @@
      * @param value
      */
     public void setTimeStamp(long value) {
-        this.data.setTimeStamp(value);
+        this.data = this.data.copy().setTimeStamp(value).freeze();
     }
 
     /**
@@ -176,7 +184,7 @@
     /**
      * @return the data
      */
-    public MemberData getData() {
+    public MemberDataBuffer getData() {
         return this.data;
     }
 
@@ -210,7 +218,7 @@
      */
     public void addToGroup(String groupName) {
         {//synchronized (this.data) {
-            this.data.addGroups(new Buffer(groupName));
+            this.data = this.data.copy().addGroups(new Buffer(groupName)).freeze();
         }
     }
 
@@ -242,7 +250,11 @@
     protected boolean isInSameGroup(MemberImpl other) {
         { //synchronized (other.data) {
             {  // synchronized (this.data) {
-                for (Buffer b : this.data.getGroupsList()) {
+                List<Buffer> list = this.data.getGroupsList();
+                if( list == null ) {
+                    return false;
+                }
+                for (Buffer b : list) {
                     for (Buffer o : other.data.getGroupsList()) {
                         if (DestinationMatch.isMatch(b, o)) {
                             return true;
@@ -253,4 +265,6 @@
         }
         return false;
     }
+
+
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/network/StaticNetwork.java Tue Feb 17 15:12:52 2009
@@ -83,8 +83,7 @@
                     URI uri = compositeData.getComponents()[i];
                     if (!this.broadcastURIs.contains(uri)) {
                         this.broadcastURIs.add(uri);
-                        this.broadcastAddresses.add(new InetSocketAddress(uri
-                                .getHost(), uri.getPort()));
+                        this.broadcastAddresses.add(new InetSocketAddress(uri.getHost(), uri.getPort()));
                     }
                 }
             }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/CompressionProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/CompressionProcessor.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/CompressionProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/CompressionProcessor.java Tue Feb 17 15:12:52 2009
@@ -23,6 +23,9 @@
 import java.util.zip.Deflater;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
+
+import org.apache.activeblaze.wire.PacketData.PacketDataBean;
+import org.apache.activeblaze.wire.PacketData.PacketDataBuffer;
 import org.apache.activemq.protobuf.Buffer;
 
 /**
@@ -55,7 +58,8 @@
     }
 
     public void downStream(Packet packet) throws Exception {
-        Buffer data = packet.getPacketData().getPayload();
+        PacketDataBuffer packetData = packet.getPacketData();
+        Buffer data = packetData.getPayload();
         if (data != null && data.length >= this.compressionLimit) {
             ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(data.length);
             try {
@@ -64,10 +68,11 @@
                 gzipOut.close();
                 bytesOut.close();
                 byte[] result = bytesOut.toByteArray();
-                packet.getPacketData().clearPayload();
-                // need to clone to get sizing correct
-                packet = packet.clone();
-                packet.getPacketData().setPayload(new Buffer(result));
+                
+                PacketDataBean bean = packetData.copy();
+                bean.setPayload(new Buffer(result));
+                
+                packet.setPacketData(bean.freeze());
             } catch (IOException e) {
                 fireException("Failed to deflate packet", e);
             }
@@ -76,8 +81,8 @@
     }
 
     public void upStream(Packet packet) throws Exception {
-        Buffer data = packet.getPacketData().getPayload();
-        ;
+        PacketDataBuffer packetData = packet.getPacketData();
+        Buffer data = packetData.getPayload();
         if (CompressionProcessor.isCompressed(data)) {
             InputStream bytesIn = data.newInput();
             try {
@@ -92,10 +97,12 @@
                 bytesIn.close();
                 byte[] result = bytesOut.toByteArray();
                 bytesOut.close();
-                packet.getPacketData().clearPayload();
-                // need to clone to get sizing correct
-                packet = packet.clone();
-                packet.getPacketData().setPayload(new Buffer(result));
+                
+                
+                PacketDataBean bean = packetData.copy();
+                bean.setPayload(new Buffer(result));
+                packet.setPacketData(bean.freeze());
+                
             } catch (IOException e) {
                 fireException("Failed to inflate packet", e);
             }
@@ -106,8 +113,8 @@
     static boolean isCompressed(Buffer data) {
         boolean result = false;
         if (data != null && data.length > 2) {
-            int ch1 = (int) (data.byteAt(data.offset) & 0xff);
-            int ch2 = (int) (data.byteAt(data.offset + 1) & 0xff);
+            int ch1 = (int) (data.byteAt(0) & 0xff);
+            int ch2 = (int) (data.byteAt(1) & 0xff);
             int magic = (ch1 | (ch2 << 8));
             result = (magic == GZIPInputStream.GZIP_MAGIC);
         }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java Tue Feb 17 15:12:52 2009
@@ -20,6 +20,9 @@
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+
+import org.apache.activeblaze.wire.PacketData.PacketDataBean;
+import org.apache.activeblaze.wire.PacketData.PacketDataBuffer;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -40,12 +43,17 @@
   
         
     public void downStream(Packet packet) throws Exception {
-        int size = packet.getPacketData().serializedSizeUnframed();
+        PacketDataBuffer packetData = packet.getPacketData();
+        int size = packetData.serializedSizeUnframed();
         if (size > this.getMaxPacketSize()) {
-            Buffer payload = packet.getPacketData().getPayload();
-            packet.getPacketData().clearPayload();
-            packet=packet.clone();
-            int headerSize = packet.getPacketData().serializedSizeUnframed();
+            
+            Buffer payload = packetData.getPayload();
+            
+            PacketDataBean bean = packetData.copy();
+            bean.clearPayload();
+            PacketDataBuffer headerPacket = bean.freeze();
+            
+            int headerSize = headerPacket.serializedSizeUnframed();
             int fragmentSize = getMaxPacketSize()-headerSize;
             int length = payload.length;
             
@@ -59,14 +67,18 @@
                 Buffer nextPayload = new Buffer(payload.data,offset,fragmentSize);
                 offset += fragmentSize;
                 Packet next = packet.clone();
-                next.getPacketData().setPayload(nextPayload);
-                next.getPacketData().setNumberOfParts(numberOfParts);
-                next.getPacketData().setPartNumber(partNumber);
+                
+                PacketDataBean bean2 = next.getPacketData().copy();
+                bean2.setPayload(nextPayload);
+                bean2.setNumberOfParts(numberOfParts);
+                bean2.setPartNumber(partNumber);
                 partNumber++;
+                next.setPacketData(bean2.freeze());
+                
                 super.downStream(next);
                 
             }
-        }else {
+        } else {
             super.downStream(packet);
         }
     }
@@ -82,11 +94,13 @@
                 value.add(packet.getPacketData().getPartNumber(),packet);
                 if (value.size()==packet.getPacketData().getNumberOfParts()) {
                     Packet result = packet.clone();
-                    result.getPacketData().clearPayload();
-                    result.getPacketData().clearNumberOfParts();
-                    result.getPacketData().clearPartNumber();
-                    result.getPacketData().setNumberOfParts(1);
-                    result.getPacketData().setPartNumber(0);
+                    PacketDataBean bean = result.getPacketData().copy();
+                    bean.clearPayload();
+                    bean.clearNumberOfParts();
+                    bean.clearPartNumber();
+                    bean.setNumberOfParts(1);
+                    bean.setPartNumber(0);
+                    
                     int size=0;
                     for (Packet p:value) {
                         size+= p.getPacketData().getPayload().length;
@@ -98,8 +112,10 @@
                         System.arraycopy(src.data, src.offset, data, offset, src.length);
                         offset+=src.length;
                     }
-                    result.getPacketData().setPayload(new Buffer(data));
+                    bean.setPayload(new Buffer(data));
                     this.cache.remove(packet.getId());
+                    
+                    result.setPacketData(bean.freeze());
                     super.upStream(result);
                 }
             }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java?rev=745113&r1=745112&r2=745113&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java Tue Feb 17 15:12:52 2009
@@ -20,6 +20,7 @@
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import org.apache.activeblaze.wire.PacketData;
+import org.apache.activeblaze.wire.PacketData.PacketDataBuffer;
 
 /**
  * Wrapper for PacketData
@@ -29,7 +30,7 @@
     private SocketAddress from;
     private SocketAddress to;
     private String id;
-    final private PacketData packetData;
+    private PacketDataBuffer packetData;
 
     /**
      * Internal Constructor
@@ -37,7 +38,7 @@
      * @param id
      * @param data
      */
-    private Packet(String id, PacketData data) {
+    private Packet(String id, PacketDataBuffer data) {
         this.id = id;
         this.packetData = data;
         this.from = null;
@@ -49,7 +50,7 @@
      * 
      * @param data
      */
-    public Packet(PacketData data) {
+    public Packet(PacketDataBuffer data) {
         this.packetData = data;
         this.from = null;
         this.to = null;
@@ -62,7 +63,7 @@
      * @param data
      * @throws Exception
      */
-    public Packet(SocketAddress from, PacketData data) throws Exception {
+    public Packet(SocketAddress from, PacketDataBuffer data) throws Exception {
         this.from = from;
         this.packetData = data;
         this.to = null;
@@ -75,7 +76,7 @@
      * @param toPort
      * @param data
      */
-    public Packet(InetAddress toAddress, int toPort, PacketData data) {
+    public Packet(InetAddress toAddress, int toPort, PacketDataBuffer data) {
         this.to = new InetSocketAddress(toAddress, toPort);
         this.packetData = data;
         this.from = null;
@@ -88,7 +89,7 @@
      * @param toPort
      * @param data
      */
-    public Packet(String toAddress, int toPort, PacketData data) {
+    public Packet(String toAddress, int toPort, PacketDataBuffer data) {
         this.to = new InetSocketAddress(toAddress, toPort);
         this.packetData = data;
         this.from = null;
@@ -125,7 +126,7 @@
     /**
      * @return the packetData
      */
-    public PacketData getPacketData() {
+    public PacketDataBuffer getPacketData() {
         return this.packetData;
     }
 
@@ -136,7 +137,7 @@
      * @see java.lang.Object#clone()
      */
     public Packet clone() {
-        Packet result = new Packet(this.id, this.packetData.clone());
+        Packet result = new Packet(this.id, packetData);
         result.to = this.to;
         result.from = this.from;
         return result;
@@ -193,4 +194,8 @@
     public boolean isResponseRequired() {
         return this.packetData.getResponseRequired();
     }
+
+    public void setPacketData(PacketDataBuffer packetData) {
+        this.packetData = packetData;
+    }
 }



Mime
View raw message