activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r1504961 [3/11] - in /activemq/activemq-blaze/trunk: ./ src/main/java/org/apache/activeblaze/ src/main/java/org/apache/activeblaze/cluster/ src/main/java/org/apache/activeblaze/group/ src/main/java/org/apache/activeblaze/impl/destination/ s...
Date Fri, 19 Jul 2013 18:44:24 GMT
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java Fri Jul 19 18:44:21 2013
@@ -18,27 +18,22 @@ package org.apache.activeblaze.cluster;
 
 import java.net.URI;
 
+import org.apache.activeblaze.BlazeMessage;
 import org.apache.activeblaze.group.BlazeGroupChannelImpl;
 import org.apache.activeblaze.group.Group;
 import org.apache.activeblaze.group.Member;
-import org.apache.activeblaze.group.MemberImpl;
-import org.apache.activeblaze.impl.processor.Packet;
 import org.apache.activeblaze.util.SendRequest;
-import org.apache.activeblaze.wire.MessageType;
-import org.apache.activeblaze.wire.PacketData;
-import org.apache.activeblaze.wire.AckData.AckDataBuffer;
-import org.apache.activeblaze.wire.ElectionMessage.ElectionMessageBuffer;
-import org.apache.activeblaze.wire.PacketData.PacketDataBean;
-import org.apache.activeblaze.wire.PacketData.PacketDataBuffer;
-import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.protobuf.MessageBuffer;
+import org.apache.activeblaze.wire.Election;
+import org.apache.activeblaze.wire.MemberImpl;
+import org.apache.activeblaze.wire.Packet;
+import org.apache.activeblaze.wire.PacketType;
+import org.apache.activeblaze.wire.StateValue;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 /**
- * A <CODE>BlazeGroupChannel</CODE> enables peer-based point to point communication and maintains a coordinator
- * (elected leader) for the group
- * 
+ * A <CODE>BlazeGroupChannel</CODE> enables peer-based point to point
+ * communication and maintains a coordinator (elected leader) for the group
  */
 public class BlazeClusterGroupChannelImpl extends BlazeGroupChannelImpl implements BlazeClusterGroupChannel {
     private static final Log LOG = LogFactory.getLog(BlazeClusterGroupChannelImpl.class);
@@ -47,8 +42,6 @@ public class BlazeClusterGroupChannelImp
 
     /**
      * Constructor
-     * 
-     * @param name
      */
     public BlazeClusterGroupChannelImpl(String name) {
         super(name);
@@ -88,8 +81,6 @@ public class BlazeClusterGroupChannelImp
     }
 
     /**
-     * @param l
-     * @throws Exception
      * @see org.apache.activeblaze.cluster.BlazeClusterGroupChannel#addMasterChangedListener(org.apache.activeblaze.cluster.MasterChangedListener)
      */
     public void addMasterChangedListener(MasterChangedListener l) throws Exception {
@@ -99,7 +90,6 @@ public class BlazeClusterGroupChannelImp
 
     /**
      * @return Member
-     * @throws Exception
      * @see org.apache.activeblaze.cluster.BlazeClusterGroupChannel#getMaster()
      */
     public Member getMaster() throws Exception {
@@ -111,7 +101,6 @@ public class BlazeClusterGroupChannelImp
 
     /**
      * @return true if the Master
-     * @throws Exception
      * @see org.apache.activeblaze.cluster.BlazeClusterGroupChannel#isMaster()
      */
     public boolean isMaster() throws Exception {
@@ -122,9 +111,9 @@ public class BlazeClusterGroupChannelImp
     }
 
     /**
+     *
      * @param l
      * @throws Exception
-     * @see org.apache.activeblaze.cluster.BlazeClusterGroupChannel#removeMemberChangedListener(org.apache.activeblaze.cluster.MasterChangedListener)
      */
     public void removeMasterChangedListener(MasterChangedListener l) throws Exception {
         if (this.clusterGroup != null) {
@@ -133,18 +122,15 @@ public class BlazeClusterGroupChannelImp
     }
 
     /**
-     * @return the configuration
-     * @see org.apache.activeblaze.cluster.BlazeClusterGroupChannel#getCoordinatedGroupConfiguration()
+     *
+     * @return
      */
     public BlazeClusterGroupConfiguration getConfiguration() {
         return (BlazeClusterGroupConfiguration) this.configuration;
     }
 
     /**
-     * @param timeout
      * @return true if election is finished
-     * @throws Exception
-     * @see {@link org.apache.activeblaze.group.BlazeGroupChannel#waitForElection(int)
      */
     public boolean waitForElection(int timeout) throws Exception {
         init();
@@ -153,33 +139,24 @@ public class BlazeClusterGroupChannelImp
 
     /**
      * @return true if there is elections have finished
-     * @throws Exception
      */
     public boolean isElectionFinished() throws Exception {
         init();
         return this.clusterGroup.isElectionFinished();
     }
 
-    protected void processData(String id, Buffer correlationId, PacketDataBuffer data) throws Exception {
+    protected void processPacket(Packet packet) throws Exception {
         if (isStarted()) {
-            processRequest(correlationId, data);
-            MessageType type = data.getMessageType();
-            switch(type) {
-            case BLAZE_DATA:
-                doProcessBlazeData(data);
-                break;
-            case MEMBER_DATA:
-                doProcessMemberData(data);
-                break;
-            case ELECTION_MESSAGE:
-                doProcessElectionData(id, data);
-                break;
-            case STATE_DATA:
-                doProcessStateData(data);
-                break;
-            default:
-                LOG.error("Unexpected message type: " + type);
-                LOG.error("was: " + AckDataBuffer.parseUnframed(data.getPayload()));
+            processRequest(packet);
+            if (packet instanceof BlazeMessage) {
+            } else if (packet.getPacketType() == PacketType.MEMBER.getNumber()) {
+                doProcessMember((MemberImpl) packet);
+            } else if (packet.getPacketType() == PacketType.ELECTION.getNumber()) {
+                doProcessElection((Election) packet);
+            } else if (packet.getPacketType() == PacketType.STATE.getNumber()) {
+                doProcessState((StateValue) packet);
+            } else {
+                LOG.error("Unexpected message type: " + packet);
             }
         }
     }
@@ -207,45 +184,29 @@ public class BlazeClusterGroupChannelImp
         return this.clusterGroup;
     }
 
-    protected void doProcessElectionData(String id, PacketData data) throws Exception {
-        Buffer payload = data.getPayload();
-        ElectionMessageBuffer electionMessage = ElectionMessageBuffer.parseUnframed(payload);
+    protected void doProcessElection(Election election) throws Exception {
         ClusterGroup group = (ClusterGroup) getGroup();
-        group.processElectionMessage(electionMessage, id);
+        group.processElectionMessage(election);
     }
 
-    protected void doProcessStateData(PacketData data) throws Exception {
-        this.state.processStateData(data);
+    protected void doProcessState(StateValue state) throws Exception {
+        this.state.processState(state);
     }
 
     /**
      * send Request
-     * 
-     * @param member
-     * @param destination
-     * @param message
-     * @param timeout
-     * @return
-     * @throws Exception
      */
-    protected MessageBuffer sendRequest(MemberImpl to, MessageType type, MessageBuffer message, int timeout) throws Exception {
-        MessageBuffer result = null;
+    protected Packet sendRequest(MemberImpl to, Packet packet, int timeout) throws Exception {
+        Packet result = null;
         if (to != null) {
-            SendRequest<PacketDataBuffer> request = new SendRequest<PacketDataBuffer>();
-            PacketDataBean data = getPacketData(type, message);
-            data.setReliable(true);
-            data.setResponseRequired(false);
-            Packet packet = new Packet(data.freeze());
+            SendRequest request = new SendRequest();
             packet.setTo(to.getAddress());
+            packet.setId(this.idGenerator.generateId());
             synchronized (this.messageRequests) {
-                this.messageRequests.put(data.getMessageId(), request);
+                this.messageRequests.put(packet.getId(), request);
             }
             this.unicast.downStream(packet);
-            PacketDataBuffer response = request.get(timeout);
-            if (response != null) {
-                type = response.getMessageType();
-                result = type.parseUnframed(response.getPayload());
-            }
+            result = request.get(timeout);
         }
         return result;
     }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupConfiguration.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupConfiguration.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupConfiguration.java Fri Jul 19 18:44:21 2013
@@ -21,15 +21,14 @@ import org.apache.activeblaze.group.Blaz
 
 /**
  * Configuration for a BlazeCoordinatedGroupChannel
- *
  */
-public class BlazeClusterGroupConfiguration extends BlazeGroupConfiguration{
+public class BlazeClusterGroupConfiguration extends BlazeGroupConfiguration {
     private long masterWeight = 0;
     private long refinedMasterWeight = 0;
     private int minimumGroupSize = 1;
-    private int  awaitGroupTimeout = Math.max(getHeartBeatInterval()*2,5000);
-    
-    
+    private int awaitGroupTimeout = Math.max(getHeartBeatInterval() * 2, 5000);
+
+
     /**
      * @return the masterWeight
      */
@@ -43,7 +42,7 @@ public class BlazeClusterGroupConfigurat
     public void setMasterWeight(long masterWeight) {
         this.masterWeight = masterWeight;
     }
-    
+
     /**
      * @return the refinedMasterWeight
      */
@@ -85,7 +84,7 @@ public class BlazeClusterGroupConfigurat
     public void setAwaitGroupTimeout(int awaitGroupTimeout) {
         this.awaitGroupTimeout = awaitGroupTimeout;
     }
-    
+
     protected BlazeConfiguration newInstance() {
         return new BlazeClusterGroupConfiguration();
     }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterGroup.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterGroup.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterGroup.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterGroup.java Fri Jul 19 18:44:21 2013
@@ -28,17 +28,13 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.activeblaze.group.Group;
 import org.apache.activeblaze.group.Member;
-import org.apache.activeblaze.group.MemberImpl;
-import org.apache.activeblaze.wire.ElectionType;
-import org.apache.activeblaze.wire.MessageType;
-import org.apache.activeblaze.wire.ElectionMessage.ElectionMessageBean;
-import org.apache.activeblaze.wire.ElectionMessage.ElectionMessageBuffer;
+import org.apache.activeblaze.wire.Election;
+import org.apache.activeblaze.wire.MemberImpl;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 /**
  * Implementation of Group State
- * 
  */
 public class ClusterGroup extends Group {
     static final Log LOG = LogFactory.getLog(ClusterGroup.class);
@@ -51,11 +47,6 @@ public class ClusterGroup extends Group 
 
     /**
      * Constructor
-     * 
-     * @param local
-     * @param channel
-     * @param transport
-     * @param config
      */
     protected ClusterGroup(BlazeClusterGroupChannelImpl channel) {
         super(channel);
@@ -65,29 +56,35 @@ public class ClusterGroup extends Group 
     }
 
     /**
-     * @throws Exception
      * @see org.apache.activeblaze.Service#start()
      */
     public void doStart() throws Exception {
         super.doStart();
         this.electionExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
                 new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
-                    public Thread newThread(Runnable runnable) {
-                        Thread thread = new Thread(runnable, "Election{" + ClusterGroup.this.channel.getId() + "}");
-                        thread.setDaemon(true);
-                        return thread;
-                    }
-                });
+            public Thread newThread(Runnable runnable) {
+                Thread thread = new Thread(runnable, "Election{" + ClusterGroup.this.channel.getId() + "}");
+                thread.setDaemon(true);
+                return thread;
+            }
+        });
         election(null, true);
     }
 
     /**
-     * @throws Exception
      * @see org.apache.activeblaze.Service#stop()
      */
     public void doStop() throws Exception {
         super.doStop();
         if (this.electionExecutor != null) {
+            List<Runnable> list = new ArrayList<Runnable>(this.electionExecutor.getQueue());
+            for (Runnable r : list) {
+                ElectionService es = (ElectionService) r;
+                if (es != null) {
+                    es.stop();
+                    this.electionExecutor.remove(es);
+                }
+            }
             this.electionExecutor.shutdownNow();
             synchronized (this.electionFinished) {
                 this.electionFinished.notifyAll();
@@ -95,6 +92,11 @@ public class ClusterGroup extends Group 
         }
     }
 
+    /**
+     * shutDown
+     *
+     * @see org.apache.activeblaze.group.Group#doShutDown()
+     */
     public void doShutDown() throws Exception {
         super.doShutDown();
         setMaster(null);
@@ -116,9 +118,6 @@ public class ClusterGroup extends Group 
 
     /**
      * Process a new member
-     * 
-     * @param data
-     * @throws Exception
      */
     protected void processMemberStarted(MemberImpl member) throws Exception {
         if (!member.equals(getLocalMember())) {
@@ -157,7 +156,7 @@ public class ClusterGroup extends Group 
             synchronized (this.electionFinished) {
                 this.electionFinished.set(false);
             }
-            if (this.members.size() >= getConfiguration().getMinimumGroupSize())
+            if (this.members.size() >= getConfiguration().getMinimumGroupSize()) {
                 synchronized (this.electionExecutor) {
                     // remove any queued election tasks
                     List<Runnable> list = new ArrayList<Runnable>(this.electionExecutor.getQueue());
@@ -169,9 +168,10 @@ public class ClusterGroup extends Group 
                         }
                     }
                 }
-            ElectionService es = new ElectionService(this, member, memberStarted);
-            es.start();
-            this.electionExecutor.execute(es);
+                ElectionService es = new ElectionService(this, member, memberStarted);
+                es.start();
+                this.electionExecutor.execute(es);
+            }
         }
     }
 
@@ -202,9 +202,6 @@ public class ClusterGroup extends Group 
 
     /**
      * Remove a listener for membership changes
-     * 
-     * @param l
-     * @throws Exception
      */
     protected void removeMasterChangedListener(MasterChangedListener l) {
         this.listeners.remove(l);
@@ -222,17 +219,17 @@ public class ClusterGroup extends Group 
         }
     }
 
-    void processElectionMessage(ElectionMessageBuffer msg, String correlationId) throws Exception {
-        MemberImpl from = new MemberImpl(msg.getMember().freeze());
-        if (!from.getId().equals(getLocalMember().getId())) {
+    void processElectionMessage(Election msg) throws Exception {
+        MemberImpl from = this.members.get(msg.getMemberId());
+        if (from != null && !from.getId().equals(getLocalMember().getId())) {
             LOG.debug(getLocalMember() + " Election message " + msg.getElectionType() + " from " + from);
-            if (msg.getElectionType().equals(ElectionType.ELECTION)) {
-                ElectionMessageBean reply = new ElectionMessageBean();
-                reply.setElectionType(ElectionType.ANSWER);
-                reply.setMember(this.channel.getLocalMember().getData());
-                this.channel.sendReply(from, MessageType.ELECTION_MESSAGE, reply.freeze(), correlationId);
+            if (msg.getElectionType().equals(Election.ElectionType.ELECTION)) {
+                Election reply = new Election();
+                reply.setElectionType(Election.ElectionType.ANSWER);
+                reply.setMemberId(getLocalMember().getId());
+                this.channel.sendReply(from, reply, msg.getCorrelationId());
                 // election(null, false);
-            } else if (msg.getElectionType().equals(ElectionType.MASTER)) {
+            } else if (msg.getElectionType().equals(Election.ElectionType.MASTER)) {
                 if (isValidMaster(from)) {
                     setMaster(from);
                     setElectionFinished(true);
@@ -244,12 +241,12 @@ public class ClusterGroup extends Group 
         }
     }
 
-    void broadcastElectionType(ElectionType type) throws Exception {
+    void broadcastElectionType(Election.ElectionType type) throws Exception {
         if (isStarted()) {
-            ElectionMessageBean msg = new ElectionMessageBean();
-            msg.setMember(this.channel.getLocalMember().getData());
+            Election msg = new Election();
+            msg.setMemberId(this.channel.getLocalMember().getId());
             msg.setElectionType(type);
-            this.channel.broadcastMessage(MessageType.ELECTION_MESSAGE, msg.freeze());
+            this.channel.broadcastManagementMessage(msg);
         }
     }
 

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterNotMasterException.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterNotMasterException.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterNotMasterException.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterNotMasterException.java Fri Jul 19 18:44:21 2013
@@ -20,7 +20,6 @@ import org.apache.activeblaze.BlazeRunti
 
 /**
  * Exception raised when updating Cluster State
- * 
  */
 public class ClusterNotMasterException extends BlazeRuntimeException {
     private static final long serialVersionUID = -5543167215616832680L;
@@ -36,10 +35,9 @@ public class ClusterNotMasterException e
     /**
      * Constructs a new exception with the specified detail message. The cause is not initialized, and may subsequently
      * be initialized by a call to {@link #initCause}.
-     * 
-     * @param message
-     *            the detail message. The detail message is saved for later retrieval by the {@link #getMessage()}
-     *            method.
+     *
+     * @param message the detail message. The detail message is saved for later retrieval by the {@link #getMessage()}
+     *                method.
      */
     public ClusterNotMasterException(String message) {
         super(message);
@@ -47,15 +45,13 @@ public class ClusterNotMasterException e
 
     /**
      * Constructs a new exception with the specified detail message and cause.
-     * <p>
+     * <p/>
      * Note that the detail message associated with <code>cause</code> is <i>not</i> automatically incorporated in
      * this exception's detail message.
-     * 
-     * @param message
-     *            the detail message (which is saved for later retrieval by the {@link #getMessage()} method).
-     * @param cause
-     *            the cause (which is saved for later retrieval by the {@link #getCause()} method). (A <tt>null</tt>
-     *            value is permitted, and indicates that the cause is nonexistent or unknown.)
+     *
+     * @param message the detail message (which is saved for later retrieval by the {@link #getMessage()} method).
+     * @param cause   the cause (which is saved for later retrieval by the {@link #getCause()} method). (A <tt>null</tt>
+     *                value is permitted, and indicates that the cause is nonexistent or unknown.)
      */
     public ClusterNotMasterException(String message, Throwable cause) {
         super(message, cause);
@@ -66,10 +62,9 @@ public class ClusterNotMasterException e
      * <tt>(cause==null ? null : cause.toString())</tt> (which typically contains the class and detail message of
      * <tt>cause</tt>). This constructor is useful for exceptions that are little more than wrappers for other
      * throwables (for example, {@link java.security.PrivilegedActionException}).
-     * 
-     * @param cause
-     *            the cause (which is saved for later retrieval by the {@link #getCause()} method). (A <tt>null</tt>
-     *            value is permitted, and indicates that the cause is nonexistent or unknown.)
+     *
+     * @param cause the cause (which is saved for later retrieval by the {@link #getCause()} method). (A <tt>null</tt>
+     *              value is permitted, and indicates that the cause is nonexistent or unknown.)
      */
     public ClusterNotMasterException(Throwable cause) {
         super(cause);

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterState.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterState.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterState.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterState.java Fri Jul 19 18:44:21 2013
@@ -37,25 +37,18 @@ import org.apache.activeblaze.BaseServic
 import org.apache.activeblaze.BlazeRuntimeException;
 import org.apache.activeblaze.group.Member;
 import org.apache.activeblaze.group.MemberChangedListener;
-import org.apache.activeblaze.group.MemberImpl;
-import org.apache.activeblaze.util.IOUtils;
-import org.apache.activeblaze.wire.MessageType;
-import org.apache.activeblaze.wire.PacketData;
-import org.apache.activeblaze.wire.StateData;
-import org.apache.activeblaze.wire.StateKeyData;
-import org.apache.activeblaze.wire.StateType;
-import org.apache.activeblaze.wire.StateData.StateDataBean;
-import org.apache.activeblaze.wire.StateData.StateDataBuffer;
-import org.apache.activemq.protobuf.Buffer;
+import org.apache.activeblaze.wire.MemberImpl;
+import org.apache.activeblaze.wire.StateKey;
+import org.apache.activeblaze.wire.StateValue;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 /**
- * <P>
- * A <CODE>ClusterState</CODE> is a distributed collaboration implementation that is used to shared state and process
- * messages amongst a distributed group of other <CODE>Group</CODE> instances. Membership of a group is handled
+ * <p/>
+ * A <CODE>ClusterState</CODE> is a distributed collaboration implementation
+ * that is used to shared state and process messages amongst a distributed group
+ * of other <CODE>Group</CODE> instances. Membership of a group is handled
  * automatically using discovery.
- * 
  */
 public class ClusterState extends BaseService implements Map<String, Object>, MemberChangedListener {
     final static Log LOG = LogFactory.getLog(ClusterState.class);
@@ -71,7 +64,7 @@ public class ClusterState extends BaseSe
     private ExecutorService stateChangedExecutor;
     private Timer expirationTimer;
     private int maxDispatchQueueSize = 10000;
-    private LinkedBlockingQueue<PacketData> dispatchQueue;
+    private LinkedBlockingQueue<StateValue> dispatchQueue;
     private Thread dispatchQueueThread;
 
     protected ClusterState(BlazeClusterGroupChannelImpl channel) {
@@ -80,8 +73,7 @@ public class ClusterState extends BaseSe
 
     /**
      * Test to see if we own the key
-     * 
-     * @param key
+     *
      * @return true if the owner of the key
      */
     public boolean isOwner(String key) {
@@ -95,8 +87,7 @@ public class ClusterState extends BaseSe
 
     /**
      * Get the owner of a Key
-     * 
-     * @param key
+     *
      * @return the owner or null if the key doesnn't exist
      */
     public Member getOwner(String key) {
@@ -109,9 +100,6 @@ public class ClusterState extends BaseSe
 
     /**
      * Unlock a key
-     * 
-     * @param key
-     * @throws Exception
      */
     public void unlock(String key) throws Exception {
         StateKey stateKey = new StateKey(this.channel.getLocalMember(), key);
@@ -119,17 +107,14 @@ public class ClusterState extends BaseSe
         stateKey.setRemoveOnExit(isRemoveOwnedObjectsOnExit());
         stateKey.setReleaseLockOnExit(isReleaseLockOnExit());
         stateKey.setTimeToLive(getTimeToLive());
-        StateDataBean stateData = new StateDataBean();
-        stateData.setKeyData(stateKey.getKeyData());
-        stateData.setLockWrite(true);
-        sendMasterRequest(stateData.freeze());
+        StateValue state = new StateValue();
+        state.setKey(stateKey);
+        state.setLockWrite(true);
+        sendMasterRequest(state);
     }
 
     /**
      * Lock a Key
-     * 
-     * @param key
-     * @throws Exception
      */
     public void lock(String key) throws Exception {
         lock(key, getLockTimeToLive());
@@ -137,10 +122,6 @@ public class ClusterState extends BaseSe
 
     /**
      * Lock a Key
-     * 
-     * @param key
-     * @param leaseTime
-     * @throws Exception
      */
     public void lock(String key, long leaseTime) throws Exception {
         checkStatus();
@@ -150,14 +131,13 @@ public class ClusterState extends BaseSe
         stateKey.setReleaseLockOnExit(isReleaseLockOnExit());
         stateKey.setTimeToLive(getTimeToLive());
         stateKey.setLockLeaseTime(leaseTime);
-        StateDataBean stateData = new StateDataBean();
-        stateData.setKeyData(stateKey.getKeyData());
-        stateData.setLockWrite(true);
-        sendMasterRequest(stateData.freeze());
+        StateValue state = new StateValue();
+        state.setKey(stateKey);
+        state.setLockWrite(true);
+        sendMasterRequest(state);
     }
 
     /**
-     * 
      * @see java.util.Map#clear()
      */
     public void clear() {
@@ -173,8 +153,7 @@ public class ClusterState extends BaseSe
     }
 
     /**
-     * @param key
-     * @return
+     * @return true if it contains the key
      * @see java.util.Map#containsKey(java.lang.Object)
      */
     public boolean containsKey(java.lang.Object key) {
@@ -182,17 +161,16 @@ public class ClusterState extends BaseSe
     }
 
     /**
-     * @param value
-     * @return
+     * @return true if it contains the value
      * @see java.util.Map#containsValue(java.lang.Object)
      */
     public boolean containsValue(java.lang.Object value) {
-        StateValue sv = new StateValue(null, value, null);
+        StateValue sv = new StateValue(null, value);
         return this.localMap.containsValue(sv);
     }
 
     /**
-     * @return
+     * @return a Set
      * @see java.util.Map#entrySet()
      */
     public Set<java.util.Map.Entry<String, Object>> entrySet() {
@@ -210,8 +188,6 @@ public class ClusterState extends BaseSe
     }
 
     /**
-     * @param key
-     * @return
      * @see java.util.Map#get(java.lang.Object)
      */
     public Object get(java.lang.Object key) {
@@ -224,7 +200,6 @@ public class ClusterState extends BaseSe
     }
 
     /**
-     * @return
      * @see java.util.Map#isEmpty()
      */
     public boolean isEmpty() {
@@ -232,7 +207,6 @@ public class ClusterState extends BaseSe
     }
 
     /**
-     * @return
      * @see java.util.Map#keySet()
      */
     public Set<String> keySet() {
@@ -240,9 +214,6 @@ public class ClusterState extends BaseSe
     }
 
     /**
-     * @param key
-     * @param value
-     * @return
      * @see java.util.Map#put(java.lang.Object, java.lang.Object)
      */
     public Object put(String key, Object value) {
@@ -251,20 +222,12 @@ public class ClusterState extends BaseSe
     }
 
     /**
-     * @param key
-     * @param value
-     * @param lock
-     * @param removeOnExit
-     * @param releaseLockOnExit
-     * @param timeToLive
-     * @param leaseTime
-     * @return
-     * @throws BlazeRuntimeException
+     * @return the old value
      */
     public Object put(String key, Object value, boolean lock, boolean removeOnExit, boolean releaseLockOnExit,
-            long timeToLive, long leaseTime) throws BlazeRuntimeException {
+                      long timeToLive, long leaseTime) throws BlazeRuntimeException {
         checkStatus();
-        Object resultValue = null;
+        StateValue resultValue = null;
         try {
             StateKey stateKey = new StateKey(this.channel.getLocalMember(), key);
             stateKey.setLocked(lock);
@@ -272,23 +235,22 @@ public class ClusterState extends BaseSe
             stateKey.setReleaseLockOnExit(releaseLockOnExit);
             stateKey.setTimeToLive(timeToLive);
             stateKey.setLockLeaseTime(leaseTime);
-            StateDataBean stateData = new StateDataBean();
-            stateData.setKeyData(stateKey.getKeyData());
-            stateData.setStateType(StateType.INSERT);
-            stateData.setMapWrite(true);
-            stateData.setValue(IOUtils.getBuffer(value));
-            resultValue = sendMasterRequest(stateData.freeze());
+            StateValue state = new StateValue();
+            state.setKey(stateKey);
+            state.setType(StateValue.StateType.INSERT);
+            state.setMapWrite(true);
+            state.setValue(value);
+            resultValue = sendMasterRequest(state);
         } catch (Exception e) {
             if (e instanceof ClusterUpdateException) {
                 throw (ClusterUpdateException) e;
             }
             throw new ClusterUpdateException(e);
         }
-        return resultValue;
+        return resultValue != null ? resultValue.getOldValue() : null;
     }
 
     /**
-     * @param t
      * @see java.util.Map#putAll(java.util.Map)
      */
     public void putAll(Map<? extends String, ? extends Object> t) {
@@ -298,43 +260,33 @@ public class ClusterState extends BaseSe
 
     /**
      * put all
-     * 
-     * @param t
-     * @param lock
-     * @param removeOnExit
-     * @param releaseLockOnExit
-     * @param timeToLive
-     * @param lockTimeToLive
      */
     public void putAll(Map<? extends String, ? extends Object> t, boolean lock, boolean removeOnExit,
-            boolean releaseLockOnExit, long timeToLive, long lockTimeToLive) {
+                       boolean releaseLockOnExit, long timeToLive, long lockTimeToLive) {
         for (java.util.Map.Entry<? extends String, ? extends Object> entry : t.entrySet()) {
             put(entry.getKey(), entry.getValue(), lock, removeOnExit, releaseLockOnExit, timeToLive, lockTimeToLive);
         }
     }
 
     /**
-     * @param key
-     * @return
+     * @return the old object
      * @see java.util.Map#remove(java.lang.Object)
      */
     public Object remove(java.lang.Object key) {
         checkStatus();
         StateKey stateKey = new StateKey(this.channel.getLocalMember(), key.toString());
-        StateDataBean stateData = new StateDataBean();
-        stateData.setKeyData(stateKey.getKeyData());
-        stateData.setMapWrite(true);
-        stateData.setStateType(StateType.DELETE);
+        StateValue state = new StateValue();
+        state.setKey(stateKey);
+        state.setMapWrite(true);
+        state.setType(StateValue.StateType.DELETE);
         try {
-            return this.channel.sendRequest((MemberImpl) this.channel.getMaster(), MessageType.STATE_DATA, stateData.freeze(),
-                    getRequestTimeout());
+            return this.channel.sendRequest((MemberImpl) this.channel.getMaster(), state, getRequestTimeout());
         } catch (Exception e) {
             throw new BlazeRuntimeException(e);
         }
     }
 
     /**
-     * @return
      * @see java.util.Map#size()
      */
     public int size() {
@@ -342,7 +294,6 @@ public class ClusterState extends BaseSe
     }
 
     /**
-     * @return
      * @see java.util.Map#values()
      */
     public Collection<Object> values() {
@@ -365,8 +316,7 @@ public class ClusterState extends BaseSe
     }
 
     /**
-     * @param alwaysLock
-     *            the alwaysLock to set
+     * @param alwaysLock the alwaysLock to set
      */
     public void setAlwaysLock(boolean alwaysLock) {
         this.alwaysLock = alwaysLock;
@@ -380,8 +330,7 @@ public class ClusterState extends BaseSe
     }
 
     /**
-     * @param removeOwnedObjectsOnExit
-     *            the removeOwnedObjectsOnExit to set
+     * @param removeOwnedObjectsOnExit the removeOwnedObjectsOnExit to set
      */
     public void setRemoveOwnedObjectsOnExit(boolean removeOwnedObjectsOnExit) {
         this.removeOwnedObjectsOnExit = removeOwnedObjectsOnExit;
@@ -395,8 +344,7 @@ public class ClusterState extends BaseSe
     }
 
     /**
-     * @param releaseLockOnExit
-     *            the releaseLockOnExit to set
+     * @param releaseLockOnExit the releaseLockOnExit to set
      */
     public void setReleaseLockOnExit(boolean releaseLockOnExit) {
         this.releaseLockOnExit = releaseLockOnExit;
@@ -410,8 +358,7 @@ public class ClusterState extends BaseSe
     }
 
     /**
-     * @param timeToLive
-     *            the timeToLive to set
+     * @param timeToLive the timeToLive to set
      */
     public void setTimeToLive(int timeToLive) {
         this.timeToLive = timeToLive;
@@ -425,8 +372,7 @@ public class ClusterState extends BaseSe
     }
 
     /**
-     * @param lockTimeToLive
-     *            the lockTimeToLive to set
+     * @param lockTimeToLive the lockTimeToLive to set
      */
     public void setLockTimeToLive(int lockTimeToLive) {
         this.lockTimeToLive = lockTimeToLive;
@@ -440,22 +386,24 @@ public class ClusterState extends BaseSe
     }
 
     /**
-     * @param requestTimeout
-     *            the requestTimeout to set
+     * @param requestTimeout the requestTimeout to set
      */
     public void setRequestTimeout(int requestTimeout) {
         this.requestTimeout = requestTimeout;
     }
 
+    @Override
     public void doInit() throws Exception {
         this.channel.addMemberChangedListener(this);
-        this.dispatchQueue = new LinkedBlockingQueue<PacketData>(getMaxDispatchQueueSize());
+        this.dispatchQueue = new LinkedBlockingQueue<StateValue>(getMaxDispatchQueueSize());
     }
 
+    @Override
     public void doShutDown() throws Exception {
         this.channel.removeMemberChangedListener(this);
     }
 
+    @Override
     public void doStart() throws Exception {
         this.stateChangedExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
             public Thread newThread(Runnable runnable) {
@@ -489,6 +437,7 @@ public class ClusterState extends BaseSe
         this.dispatchQueueThread.start();
     }
 
+    @Override
     public void doStop() throws Exception {
         if (this.dispatchQueueThread != null) {
             this.dispatchQueueThread.interrupt();
@@ -511,8 +460,6 @@ public class ClusterState extends BaseSe
 
     /**
      * Add a <Code>ClusterStateChangedListener</Code>
-     * 
-     * @param l
      */
     public void addClusterStateChangedListener(ClusterStateChangedListener l) {
         this.clusterStateChangedListeners.add(l);
@@ -520,8 +467,6 @@ public class ClusterState extends BaseSe
 
     /**
      * Add a <Code>ClusterStateChangedListener</Code>
-     * 
-     * @param l
      */
     public void removeClusterStateChangedListener(ClusterStateChangedListener l) {
         this.clusterStateChangedListeners.remove(l);
@@ -535,17 +480,16 @@ public class ClusterState extends BaseSe
     }
 
     /**
-     * @param maxDispatchQueueSize
-     *            the maxDispatchQueueSize to set
+     * @param maxDispatchQueueSize the maxDispatchQueueSize to set
      */
     public void setMaxDispatchQueueSize(int maxDispatchQueueSize) {
         this.maxDispatchQueueSize = maxDispatchQueueSize;
     }
 
     /**
-     * Implementation of org.apache.activeblaze.group.MemberChangedListener for listening to membership changes
-     * 
-     * @param member
+     * Implementation of org.apache.activeblaze.group.MemberChangedListener for
+     * listening to membership changes
+     *
      * @see org.apache.activeblaze.group.MemberChangedListener#memberStarted(org.apache.activeblaze.group.Member)
      */
     public void memberStarted(Member member) {
@@ -555,12 +499,12 @@ public class ClusterState extends BaseSe
                 // even though we may no longer be the master - we
                 // was the master before the new node started - so
                 // we take responsibility for updating the new node
-                for (StateValue value : this.localMap.values()) {
-                    StateDataBean newStateData = value.getData().copy();
-                    newStateData.setMapWrite(false);
-                    newStateData.setMapUpdate(true);
-                    newStateData.setStateType(StateType.SYNC);
-                    broadcastStateUpdate(newStateData.freeze(), "");
+                for (StateValue state : this.localMap.values()) {
+                    state = state.clone();
+                    state.setMapWrite(false);
+                    state.setMapUpdate(true);
+                    state.setType(StateValue.StateType.SYNC);
+                    broadcastStateUpdate(state, "");
                 }
             }
         } catch (Exception e) {
@@ -569,9 +513,9 @@ public class ClusterState extends BaseSe
     }
 
     /**
-     * Implementation of org.apache.activeblaze.group.MemberChangedListener for listening to membership changes
-     * 
-     * @param member
+     * Implementation of org.apache.activeblaze.group.MemberChangedListener for
+     * listening to membership changes
+     *
      * @see org.apache.activeblaze.group.MemberChangedListener#memberStopped(org.apache.activeblaze.group.Member)
      */
     public void memberStopped(Member member) {
@@ -602,99 +546,93 @@ public class ClusterState extends BaseSe
         }
     }
 
-    protected void processStateData(PacketData data) throws Exception {
+    protected void processState(StateValue state) throws Exception {
         if (!isStopped()) {
-            this.dispatchQueue.put(data);
+            this.dispatchQueue.put(state);
         }
     }
 
     protected void dequeuePackets() {
-        PacketData packet = null;
+        StateValue state = null;
         try {
-            packet = this.dispatchQueue.take();
-            if (packet != null) {
-                doProcessStateData(packet);
+            state = this.dispatchQueue.take();
+            if (state != null) {
+                doProcessState(state);
             }
         } catch (InterruptedException e1) {
             // we've stopped
         } catch (Exception e) {
-            LOG.error("Caught an exception processing a packet: " + packet, e);
+            LOG.error("Caught an exception processing a packet: " + state, e);
             stopInternal();
         }
     }
 
-    private void doProcessStateData(PacketData data) throws Exception {
-        Buffer payload = data.getPayload();
-        StateDataBuffer stateData = StateDataBuffer.parseUnframed(payload);
-        String correlationId = "";
-        if (data.hasMessageId()) {
-            correlationId = data.getMessageId().toStringUtf8();
-        }
-        if (!stateData.getError()) {
-            if (stateData.getMapUpdate()) {
-                processMapUpdate(stateData);
-            } else if (stateData.getLockUpdate()) {
-                processLockUpdate(stateData, correlationId);
-            } else if (stateData.getLockWrite()) {
-                processLockWrite(stateData, correlationId);
-            } else if (stateData.getMapWrite()) {
-                processMapOperations(stateData, correlationId);
+    private void doProcessState(StateValue state) throws Exception {
+        String correlationId = state.getId();
+        if (!state.isError()) {
+            if (state.isMapUpdate()) {
+                processMapUpdate(state);
+            } else if (state.isLockUpdate()) {
+                processLockUpdate(state, correlationId);
+            } else if (state.isLockWrite()) {
+                processLockWrite(state, correlationId);
+            } else if (state.isMapWrite()) {
+                processMapOperations(state, correlationId);
             } else {
-                LOG.error("Don't know how to process " + stateData);
+                LOG.error("Don't know how to process " + state);
             }
         }
     }
 
-    protected void processLockWrite(StateDataBuffer buffer, String correlationId) throws Exception {
+    protected void processLockWrite(StateValue stateValue, String correlationId) throws Exception {
         // reset values for when we broadcast an update
-        StateDataBean stateData = buffer.copy();
+        boolean newLock = stateValue.getKey().isLocked();
+        stateValue = stateValue.clone();
         if (this.channel.waitForElection(0)) {
-            stateData.setLockUpdate(true);
-            stateData.setLockWrite(false);
-            StateValue stateValue = new StateValue(stateData);
-            boolean newLock = stateValue.getKey().isLocked();
-            MemberImpl newOwner = new MemberImpl(stateData.getKeyData().getMember().freeze());
+            stateValue.setLockUpdate(true);
+            stateValue.setLockWrite(false);
+            MemberImpl newOwner = stateValue.getKey().getOwner().clone();
             long newLockExpiration = newLock ? stateValue.getKey().getLockExpiration() : 0l;
             if (this.channel.isMaster()) {
                 StateKey originalKey = getKey(stateValue.getKey().getKey());
                 if (originalKey != null) {
                     if (originalKey.isLocked()) {
                         if (!originalKey.getOwner().equals(stateValue.getKey().getOwner())) {
-                            StateValue stateReply = stateValue.copy();
+                            StateValue stateReply = stateValue.clone();
                             Serializable reply = new ClusterUpdateException("Owned by " + originalKey.getOwner());
-                            stateReply.getData().setError(true);
-                            stateReply.getData().setValue(IOUtils.getBuffer(reply));
-                            this.channel.sendReply((MemberImpl) stateReply.getKey().getOwner(), MessageType.STATE_DATA, stateReply.getData().freeze(), correlationId);
+                            stateReply.setError(true);
+                            stateReply.setValue(reply);
+                            this.channel.sendReply(stateReply.getKey().getOwner(), stateReply,
+                                    correlationId);
                         } else {
                             originalKey.setLocked(newLock);
                             originalKey.setOwner(newOwner);
                             originalKey.setLockExpiration(newLockExpiration);
-                            broadcastStateUpdate(stateData.freeze(), correlationId);
+                            broadcastStateUpdate(stateValue, correlationId);
                         }
                     } else {
                         originalKey.setLocked(newLock);
                         originalKey.setOwner(newOwner);
                         originalKey.setLockExpiration(newLockExpiration);
-                        broadcastStateUpdate(stateData.freeze(), correlationId);
+                        broadcastStateUpdate(stateValue, correlationId);
                     }
                 }
             } else {
-                StateValue stateReply = stateValue.copy();
-                stateReply.getData().clearStateType();
+                StateValue stateReply = stateValue.clone();
+                stateReply.setType(StateValue.StateType.NOT_SET);
                 Serializable reply = new ClusterNotMasterException(this.channel.getLocalMember() + " Not Master");
-                stateReply.getData().setError(true);
-                stateReply.getData().setValue(IOUtils.getBuffer(reply));
-                this.channel.sendReply((MemberImpl) stateReply.getKey().getOwner(), MessageType.STATE_DATA,
-                        stateReply.getData().freeze(), correlationId);
+                stateReply.setError(true);
+                stateReply.setValue(reply);
+                this.channel.sendReply(stateReply.getKey().getOwner(), stateReply, correlationId);
             }
         }
     }
 
-    protected void processLockUpdate(StateData stateData, String correlationId) throws Exception {
+    protected void processLockUpdate(StateValue state, String correlationId) throws Exception {
         if (this.channel.waitForElection(0)) {
-            StateValue stateValue = new StateValue(stateData.copy());
+            StateValue stateValue = state.clone();
             boolean newLock = stateValue.getKey().isLocked();
-            MemberImpl newOwner = new MemberImpl(stateData.getKeyData().getMember().freeze());
+            MemberImpl newOwner = stateValue.getKey().getOwner().clone();
             long newLockExpiration = newLock ? stateValue.getKey().getLockExpiration() : 0l;
             if (!this.channel.isMaster()) {
                 StateKey originalKey = getKey(stateValue.getKey().getKey());
@@ -707,12 +645,13 @@ public class ClusterState extends BaseSe
         }
     }
 
-    protected void processMapOperations(StateData data, String correlationId) throws Exception {
-        StateValue stateValue = new StateValue(data.copy());
+    protected void processMapOperations(StateValue state, String correlationId) throws Exception {
+        StateValue stateValue = state.clone();
         StateKey key = stateValue.getKey();
-        StateType stateType = stateValue.getData().getStateType();
+        StateValue.StateType stateType = stateValue.getType();
         if (stateType != null) {
-            boolean insert = stateType.equals(StateType.INSERT) || stateType.equals(StateType.SYNC);
+            boolean insert = stateType.equals(StateValue.StateType.INSERT)
+                    || stateType.equals(StateValue.StateType.SYNC);
             boolean containsKey = this.localMap.containsKey(key.getKey());
             if (this.channel.waitForElection(0)) {
                 if (this.channel.isMaster()) {
@@ -725,36 +664,35 @@ public class ClusterState extends BaseSe
                             } else {
                                 old = this.localMap.remove(key.getKey());
                             }
-                            StateDataBean newStateData = data.copy();
-                            newStateData.clearOldvalue();
-                            if (old != null && old.getData().getValue() != null) {
-                                newStateData.setOldvalue(old.getData().getValue());
+                            StateValue newStateData = state.clone();
+                            newStateData.setOldValue(null);
+                            if (old != null && old.getValue() != null) {
+                                newStateData.setOldValue(old.getValue());
                             }
                             newStateData.setMapWrite(false);
                             newStateData.setMapUpdate(true);
-                            StateDataBuffer buffer = newStateData.freeze();
-                            StateDataBuffer t = StateDataBuffer.parseUnframed(buffer.toUnframedBuffer());
-                            
-                            broadcastStateUpdate(buffer, correlationId);
-                            fireMapChanged(key.getOwner(), key.getKey(), old.getValue(), stateValue.getValue(), false);
+                            broadcastStateUpdate(newStateData, correlationId);
+                            fireMapChanged(key.getOwner(), key.getKey(), old == null ? null : old.getValue(),
+                                    stateValue.getValue(), false);
                         } else {
-                            StateValue stateReply = stateValue.copy();
+                            StateValue stateReply = stateValue.clone();
                             Serializable reply = new ClusterUpdateException("Owned by " + originalKey.getOwner());
-                            stateReply.getData().setValue(IOUtils.getBuffer(reply));
-                            stateReply.getData().setError(true);
-                            this.channel.sendReply((MemberImpl) stateReply.getKey().getOwner(),
-                                    MessageType.STATE_DATA, stateReply.getData().freeze(), correlationId);
+                            stateReply.setValue(reply);
+                            stateReply.setError(true);
+                            this.channel.sendReply(stateReply.getKey().getOwner(), stateReply,
+                                    correlationId);
                         }
                     } else {
                         if (insert) {
                             this.localMap.put(key.getKey(), stateValue);
-                            StateDataBean newStateData = data.copy();
+                            StateValue newStateData = state.clone();
                             newStateData.setMapWrite(false);
                             newStateData.setMapUpdate(true);
-                            broadcastStateUpdate(newStateData.freeze(), correlationId);
+                            broadcastStateUpdate(newStateData, correlationId);
                             fireMapChanged(key.getOwner(), key.getKey(), null, stateValue.getValue(), false);
                         } else {
-                            // this shouldn't happen - as we are trying to remove
+                            // this shouldn't happen - as we are trying to
+                            // remove
                             // a non-existent key
                             LOG
                                     .warn("Cluster State in inconsistent state - master trying to remove a non-existent key: "
@@ -762,26 +700,25 @@ public class ClusterState extends BaseSe
                         }
                     }
                 } else {
-                    StateValue stateReply = stateValue.copy();
-                    stateReply.getData().clearStateType();
+                    StateValue stateReply = stateValue.clone();
+                    stateReply.setType(StateValue.StateType.NOT_SET);
                     Serializable reply = new ClusterNotMasterException(this.channel.getLocalMember() + " Not Master");
-                    stateReply.getData().setError(true);
-                    stateReply.getData().setValue(IOUtils.getBuffer(reply));
-                    this.channel.sendReply((MemberImpl) stateReply.getKey().getOwner(), MessageType.STATE_DATA,
-                            stateReply.getData().freeze(), correlationId);
+                    stateReply.setError(true);
+                    stateReply.setValue(reply);
+                    this.channel.sendReply(stateReply.getKey().getOwner(), stateReply, correlationId);
                 }
             }
         }
     }
 
-    protected void processMapUpdate(StateDataBuffer data) throws Exception {
-        StateKeyData skd = data.getKeyData();
-        StateKey key = new StateKey(skd.copy());
-        StateValue stateValue = new StateValue(data.copy());
+    protected void processMapUpdate(StateValue state) throws Exception {
+        StateKey key = state.getKey().clone();
+        StateValue stateValue = state.clone();
         boolean containsKey = this.localMap.containsKey(key.getKey());
         if (this.channel.waitForElection(0)) {
-            boolean insert = data.getStateType().equals(StateType.SYNC) || data.getStateType().equals(StateType.INSERT);
-            if (!this.channel.isMaster() || data.getStateType().equals(StateType.SYNC)) {
+            boolean insert = state.getType().equals(StateValue.StateType.SYNC)
+                    || state.getType().equals(StateValue.StateType.INSERT);
+            if (!this.channel.isMaster() || state.getType().equals(StateValue.StateType.SYNC)) {
                 if (containsKey) {
                     if (key.isLockExpired()) {
                         StateValue old = this.localMap.get(key.getKey());
@@ -794,13 +731,13 @@ public class ClusterState extends BaseSe
                             old = this.localMap.put(key.getKey(), stateValue);
                         } else {
                             old = this.localMap.remove(key.getKey());
-                            StateDataBean copy = data.copy();
-                            copy.clearValue();
-                            copy.clearOldvalue();
-                            stateValue = new StateValue(copy);
+                            StateValue copy = state.clone();
+                            copy.setValue(null);
+                            copy.setOldValue(null);
+                            stateValue = copy;
                         }
-                        fireMapChanged(key.getOwner(), key.getKey(), old.getValue(), stateValue.getValue(), data
-                                .getExpired());
+                        fireMapChanged(key.getOwner(), key.getKey(), old.getValue(), stateValue.getValue(), stateValue
+                                .isExpired());
                     }
                 } else {
                     if (insert) {
@@ -819,7 +756,10 @@ public class ClusterState extends BaseSe
         boolean result = false;
         BlazeClusterGroupConfiguration config = this.channel.getConfiguration();
         try {
-            result = this.channel.waitForElection(config.getAwaitGroupTimeout());
+            // We need to wait longer than the ElectionService (which calls an
+            // election)
+            // to determine if the election has finished
+            result = this.channel.waitForElection(0);
         } catch (Exception e) {
             if (e instanceof RuntimeException) {
                 throw (RuntimeException) e;
@@ -830,6 +770,7 @@ public class ClusterState extends BaseSe
             int memberCount = 0;
             try {
                 memberCount = this.channel.getMembers().size();
+                System.err.println("Member count = " + memberCount + " are we master ? " + this.channel.isMaster());
             } catch (Exception e) {
             }
             throw new BlazeRuntimeException("Cluster not established - need " + config.getMinimumGroupSize()
@@ -892,14 +833,13 @@ public class ClusterState extends BaseSe
             for (String k : list) {
                 StateValue old = this.localMap.remove(k);
                 if (old != null) {
-                    StateValue value = old.copy();
-                    value.getData().setStateType(StateType.DELETE);
-                    value.getData().setExpired(true);
-                    value.getData().clearMapWrite();
-                    value.getData().setMapUpdate(true);
-                    broadcastStateUpdate(value.getData().freeze(), "");
-                    fireMapChanged(new MemberImpl(value.getData().getKeyData().getMember().freeze()), k, old.getValue(), null,
-                            true);
+                    StateValue value = old.clone();
+                    value.setType(StateValue.StateType.DELETE);
+                    value.setExpired(true);
+                    value.setMapWrite(false);
+                    value.setMapUpdate(true);
+                    broadcastStateUpdate(value, "");
+                    fireMapChanged(value.getKey().getOwner().clone(), k, old.getValue(), null, true);
                 }
             }
         }
@@ -913,16 +853,16 @@ public class ClusterState extends BaseSe
     protected void doLockExpiration(List<StateValue> list) throws Exception {
         if (isStarted() && this.channel.isElectionFinished() && this.channel.isMaster()) {
             for (StateValue value : list) {
-                StateValue copy = value.copy();
-                copy.getData().setStateType(StateType.DELETE);
-                copy.getData().setLockExpired(true);
-                broadcastStateUpdate(copy.getData().freeze(), "");
+                StateValue copy = value.clone();
+                copy.setType(StateValue.StateType.DELETE);
+                copy.setLockExpired(true);
+                broadcastStateUpdate(copy, "");
             }
         }
     }
 
     protected void fireMapChanged(final Member owner, final String key, final Object oldValue, final Object newValue,
-            final boolean expired) {
+                                  final boolean expired) {
         if (isStarted() && this.stateChangedExecutor != null && !this.stateChangedExecutor.isShutdown()) {
             this.stateChangedExecutor.execute(new Runnable() {
                 public void run() {
@@ -946,10 +886,11 @@ public class ClusterState extends BaseSe
         }
     }
 
-    protected void broadcastStateUpdate(StateDataBuffer value, String correlationId) {
+    protected void broadcastStateUpdate(StateValue value, String correlationId) {
         if (isStarted()) {
             try {
-                this.channel.broadcastMessage(MessageType.STATE_DATA, value, correlationId);
+                value.setMapUpdate(true);
+                this.channel.broadcastManagementMessage(value, correlationId);
             } catch (Exception e) {
                 if (isStarted()) {
                     LOG.error("Failed to send StateData " + value, e);
@@ -958,27 +899,25 @@ public class ClusterState extends BaseSe
         }
     }
 
-    protected Object sendMasterRequest(StateDataBuffer stateData) throws Exception {
+    protected StateValue sendMasterRequest(StateValue state) throws Exception {
         int retryCount = 0;
+        StateValue result = null;
         MemberImpl master = null;
         while (retryCount < 5) {
             this.channel.waitForElection(0);
             master = (MemberImpl) this.channel.getMaster();
-            StateData resultData = (StateData) this.channel.sendRequest(master, MessageType.STATE_DATA, stateData,
-                    getRequestTimeout());
+            result = (StateValue) this.channel.sendRequest(master, state, getRequestTimeout());
             retryCount++;
-            if (resultData != null) {
-                Object resultValue = IOUtils.getObject(resultData.getValue());
-                if (resultValue instanceof ClusterNotMasterException) {
+            if (result != null) {
+                if (result.getValue() instanceof ClusterNotMasterException) {
                     LOG.warn(this.getLocal().getName() + " Request sent to an old master " + master
                             + "  - resending to new master: " + this.channel.getMaster());
                     Thread.sleep(1000);
                     continue;
                 }
-                if (resultValue instanceof ClusterUpdateException) {
-                    throw (ClusterUpdateException) resultValue;
+                if (result.getValue() instanceof ClusterUpdateException) {
+                    throw (ClusterUpdateException) result.getValue();
                 }
-                Object result = IOUtils.getObject(resultData.getOldvalue());
                 return result;
             }
         }
@@ -988,4 +927,8 @@ public class ClusterState extends BaseSe
     protected MemberImpl getLocal() {
         return this.channel.getLocalMember();
     }
+
+    protected String getName() {
+        return getLocal().getName();
+    }
 }
\ No newline at end of file

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterStateChangedListener.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterStateChangedListener.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterStateChangedListener.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterStateChangedListener.java Fri Jul 19 18:44:21 2013
@@ -19,36 +19,23 @@ package org.apache.activeblaze.cluster;
 import org.apache.activeblaze.group.Member;
 
 
-
 /**
- *Get notifications about changes to the state of the map
- *
+ * Get notifications about changes to the state of the map
  */
 public interface ClusterStateChangedListener {
-    
+
     /**
      * Called when a key/value pair is inserted into the map
-     * @param owner 
-     * @param key
-     * @param value 
      */
-    void mapInsert(Member owner,String key, Object value);
-    
+    void mapInsert(Member owner, String key, Object value);
+
     /**
      * Called when a key value is updated in the map
-     * @param owner
-     * @param Key
-     * @param oldValue
-     * @param newValue
      */
-    void mapUpdate(Member owner,String Key,Object oldValue,Object newValue);
-    
+    void mapUpdate(Member owner, String Key, Object oldValue, Object newValue);
+
     /**
      * Called when a key value is removed from the map
-     * @param owner
-     * @param key
-     * @param value
-     * @param expired
      */
-    void mapRemove(Member owner,String key, Object value,boolean expired);
+    void mapRemove(Member owner, String key, Object value, boolean expired);
 }
\ No newline at end of file

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterUpdateException.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterUpdateException.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterUpdateException.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterUpdateException.java Fri Jul 19 18:44:21 2013
@@ -20,10 +20,9 @@ import org.apache.activeblaze.BlazeRunti
 
 /**
  * Exception raised when updating Cluster State
- *
  */
-public class ClusterUpdateException extends BlazeRuntimeException{
-   
+public class ClusterUpdateException extends BlazeRuntimeException {
+
     private static final long serialVersionUID = 8778617962968095062L;
 
     /**
@@ -32,7 +31,7 @@ public class ClusterUpdateException exte
      * call to {@link #initCause}.
      */
     public ClusterUpdateException() {
-    super();
+        super();
     }
 
     /**
@@ -40,11 +39,11 @@ public class ClusterUpdateException exte
      * cause is not initialized, and may subsequently be initialized by
      * a call to {@link #initCause}.
      *
-     * @param   message   the detail message. The detail message is saved for 
-     *          later retrieval by the {@link #getMessage()} method.
+     * @param message the detail message. The detail message is saved for
+     *                later retrieval by the {@link #getMessage()} method.
      */
     public ClusterUpdateException(String message) {
-    super(message);
+        super(message);
     }
 
     /**
@@ -53,12 +52,12 @@ public class ClusterUpdateException exte
      * <code>cause</code> is <i>not</i> automatically incorporated in
      * this exception's detail message.
      *
-     * @param  message the detail message (which is saved for later retrieval
-     *         by the {@link #getMessage()} method).
-     * @param  cause the cause (which is saved for later retrieval by the
-     *         {@link #getCause()} method).  (A <tt>null</tt> value is
-     *         permitted, and indicates that the cause is nonexistent or
-     *         unknown.)
+     * @param message the detail message (which is saved for later retrieval
+     *                by the {@link #getMessage()} method).
+     * @param cause   the cause (which is saved for later retrieval by the
+     *                {@link #getCause()} method).  (A <tt>null</tt> value is
+     *                permitted, and indicates that the cause is nonexistent or
+     *                unknown.)
      */
     public ClusterUpdateException(String message, Throwable cause) {
         super(message, cause);
@@ -72,10 +71,10 @@ public class ClusterUpdateException exte
      * wrappers for other throwables (for example, {@link
      * java.security.PrivilegedActionException}).
      *
-     * @param  cause the cause (which is saved for later retrieval by the
-     *         {@link #getCause()} method).  (A <tt>null</tt> value is
-     *         permitted, and indicates that the cause is nonexistent or
-     *         unknown.)
+     * @param cause the cause (which is saved for later retrieval by the
+     *              {@link #getCause()} method).  (A <tt>null</tt> value is
+     *              permitted, and indicates that the cause is nonexistent or
+     *              unknown.)
      */
     public ClusterUpdateException(Throwable cause) {
         super(cause);

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/DefaultClusterStateListener.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/DefaultClusterStateListener.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/DefaultClusterStateListener.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/DefaultClusterStateListener.java Fri Jul 19 18:44:21 2013
@@ -20,13 +20,9 @@ import org.apache.activeblaze.group.Memb
 
 /**
  * Default implementation of ClusterStateListener
- * 
  */
 public class DefaultClusterStateListener implements ClusterStateChangedListener {
     /**
-     * @param owner
-     * @param key
-     * @param value
      * @see org.apache.activeblaze.cluster.ClusterStateChangedListener#mapInsert(org.apache.activeblaze.group.Member,
      *      java.lang.String, java.lang.Object)
      */
@@ -34,10 +30,6 @@ public class DefaultClusterStateListener
     }
 
     /**
-     * @param owner
-     * @param key
-     * @param value
-     * @param expired
      * @see org.apache.activeblaze.cluster.ClusterStateChangedListener#mapRemove(org.apache.activeblaze.group.Member,
      *      java.lang.String, java.lang.Object, boolean)
      */
@@ -45,10 +37,6 @@ public class DefaultClusterStateListener
     }
 
     /**
-     * @param owner
-     * @param key
-     * @param oldValue
-     * @param newValue
      * @see org.apache.activeblaze.cluster.ClusterStateChangedListener#mapUpdate(org.apache.activeblaze.group.Member,
      *      java.lang.String, java.lang.Object, java.lang.Object)
      */

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ElectionService.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ElectionService.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ElectionService.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ElectionService.java Fri Jul 19 18:44:21 2013
@@ -21,17 +21,15 @@ import java.util.List;
 
 import org.apache.activeblaze.BaseService;
 import org.apache.activeblaze.group.Member;
-import org.apache.activeblaze.group.MemberImpl;
 import org.apache.activeblaze.util.AsyncGroupRequest;
-import org.apache.activeblaze.wire.ElectionType;
-import org.apache.activeblaze.wire.MessageType;
-import org.apache.activeblaze.wire.ElectionMessage.ElectionMessageBean;
+import org.apache.activeblaze.wire.Election;
+import org.apache.activeblaze.wire.MemberImpl;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 /**
- * Responsible for calling an election amongst the members and deciding a coordinator
- * 
+ * Responsible for calling an election amongst the members and deciding a
+ * coordinator
  */
 class ElectionService extends BaseService implements Runnable {
     private static final Log LOG = LogFactory.getLog(ElectionService.class);
@@ -61,7 +59,7 @@ class ElectionService extends BaseServic
             if (this.group.isStarted() && isStarted()) {
                 this.group.setMaster(selectCordinator(members));
                 if (this.group.isMasterMatch()) {
-                    this.group.broadcastElectionType(ElectionType.MASTER);
+                    this.group.broadcastElectionType(Election.ElectionType.MASTER);
                 }
                 if (!this.group.isElectionFinished() && isStarted()) {
                     // lets just wait for more members to show
@@ -80,9 +78,10 @@ class ElectionService extends BaseServic
                         this.group.setMaster(this.group.getLocalMember());
                         this.group.setElectionFinished(true);
                         LOG.debug(this.group.getLocalMember() + " I am the Master ...");
-                        this.group.broadcastElectionType(ElectionType.MASTER);
+                        this.group.broadcastElectionType(Election.ElectionType.MASTER);
                     } else {
-                        LOG.warn(this.group.getLocalMember() +" Do not have a minimum group (" + minimumGroupSize+ ")  only " + this.group.getMembersCount() + " members available");
+                        LOG.warn(this.group.getLocalMember() + " Do not have a minimum group (" + minimumGroupSize
+                                + ")  only " + this.group.getMembersCount() + " members available");
                     }
                 }
             }
@@ -95,14 +94,14 @@ class ElectionService extends BaseServic
             List<MemberImpl> sorted = ClusterGroup.sortMemberList(members);
             AsyncGroupRequest request = new AsyncGroupRequest();
             boolean doCall = false;
-            for (MemberImpl member : sorted) {
-                if (this.group.channel.getId().equals(member.getId())) {
+            for (MemberImpl m : sorted) {
+                if (this.group.channel.getId().equals(m.getId())) {
                     doCall = true;
                 } else if (doCall) {
-                    ElectionMessageBean msg = new ElectionMessageBean();
-                    msg.setMember(this.group.getLocalMember().getData());
-                    msg.setElectionType(ElectionType.ELECTION);
-                    this.group.channel.sendMessage(request, member, MessageType.ELECTION_MESSAGE, msg.freeze());
+                    Election msg = new Election();
+                    msg.setMemberId(this.group.getLocalMember().getId());
+                    msg.setElectionType(Election.ElectionType.ELECTION);
+                    this.group.channel.sendGroupRequest(request, m, msg);
                 }
             }
             boolean result = request.isSuccess(this.group.getConfiguration().getAwaitGroupTimeout());
@@ -118,42 +117,34 @@ class ElectionService extends BaseServic
     }
 
     /**
-     * @throws Exception
      * @see org.apache.activeblaze.BaseService#doInit()
      */
     @Override
     protected void doInit() throws Exception {
         // TODO Auto-generated method stub
-        
     }
 
     /**
-     * @throws Exception
      * @see org.apache.activeblaze.BaseService#doShutDown()
      */
     @Override
     protected void doShutDown() throws Exception {
         // TODO Auto-generated method stub
-        
     }
 
     /**
-     * @throws Exception
      * @see org.apache.activeblaze.BaseService#doStart()
      */
     @Override
     protected void doStart() throws Exception {
         // TODO Auto-generated method stub
-        
     }
 
     /**
-     * @throws Exception
      * @see org.apache.activeblaze.BaseService#doStop()
      */
     @Override
     protected void doStop() throws Exception {
         // TODO Auto-generated method stub
-        
     }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/MasterChangedListener.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/MasterChangedListener.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/MasterChangedListener.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/MasterChangedListener.java Fri Jul 19 18:44:21 2013
@@ -20,11 +20,11 @@ import org.apache.activeblaze.group.Memb
 
 /**
  * A listener for Master changes to a group
- *
  */
-public interface MasterChangedListener  {
+public interface MasterChangedListener {
     /**
      * Fired when a master changes in the group
+     *
      * @param master the new master of the cluster
      */
     void masterChanged(Member master);

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/package.html?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/package.html (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/package.html Fri Jul 19 18:44:21 2013
@@ -1,18 +1,18 @@
 !--
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-   
-    http://www.apache.org/licenses/LICENSE-2.0
-   
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
 -->
 <html>
 <head>

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java Fri Jul 19 18:44:21 2013
@@ -22,14 +22,12 @@ import java.util.Set;
 import org.apache.activeblaze.BlazeChannel;
 import org.apache.activeblaze.BlazeMessage;
 import org.apache.activeblaze.BlazeMessageListener;
-import org.apache.activeblaze.BlazeMessageProcessor;
 import org.apache.activeblaze.Destination;
 import org.apache.activeblaze.Subscription;
 
 /**
- * <P>
+ * <p/>
  * A <CODE>BlazeGroupChannel</CODE> enables peer-based point to point communication
- * 
  */
 public interface BlazeGroupChannel extends BlazeChannel {
     /**
@@ -44,112 +42,77 @@ public interface BlazeGroupChannel exten
 
     /**
      * Send a message to an individual member
-     * 
-     * @param member
-     * @param message
-     * @throws Exception
      */
     public void send(Member member, BlazeMessage message) throws Exception;
 
     /**
      * Send a message to an individual member and wait for a response
-     * 
-     * @param member
-     * @param message
+     *
      * @return the response
-     * @throws Exception
      */
     public BlazeMessage sendRequest(Member member, BlazeMessage message) throws Exception;
 
     /**
      * Send a message to an individual member and wait for a response
-     * 
-     * @param member
-     * @param message
-     * @param timeout
-     *            time in milliseconds to wait for a response
+     *
+     * @param timeout time in milliseconds to wait for a response
      * @return a response of null if timed out
-     * @throws Exception
      */
     public BlazeMessage sendRequest(Member member, BlazeMessage message, int timeout) throws Exception;
 
     /**
      * Send a message to the group - one Member will be picked to receive the message in a round-robin fashion
-     * 
-     * @param destination
-     * @param message
-     * @throws Exception
      */
     public void send(String destination, BlazeMessage message) throws Exception;
-    
+
     /**
      * Send a message to the group - one Member will be picked to receive the message in a round-robin fashion
-     * 
-     * @param destination
-     * @param message
-     * @throws Exception
      */
     public void send(Destination destination, BlazeMessage message) throws Exception;
 
     /**
      * Send a message to the group - one Member will be picked to receive the message in a round-robin fashion and wait
      * for a response
-     * 
-     * @param destination
-     * @param message
+     *
      * @return a response
-     * @throws Exception
      */
     public BlazeMessage sendRequest(String destination, BlazeMessage message) throws Exception;
-    
+
     /**
      * Send a message to the group - one Member will be picked to receive the message in a round-robin fashion and wait
      * for a response
-     * 
-     * @param destination
-     * @param message
+     *
      * @return a response
-     * @throws Exception
      */
     public BlazeMessage sendRequest(Destination destination, BlazeMessage message) throws Exception;
 
     /**
      * Send a message to the group - one Member will be picked to receive the message in a round-robin fashion and wait
      * for a response
-     * 
-     * @param destination
-     * @param message
+     *
      * @param timeout -
-     *            time in milliseconds to wait for a response
+     *                time in milliseconds to wait for a response
      * @return a response of null if timed out
-     * @throws Exception
      */
     public BlazeMessage sendRequest(String destination, BlazeMessage message, int timeout) throws Exception;
 
-    
+
     /**
      * Send a message to the group - one Member will be picked to receive the message in a round-robin fashion and wait
      * for a response
-     * 
-     * @param destination
-     * @param message
+     *
      * @param timeout -
-     *            time in milliseconds to wait for a response
+     *                time in milliseconds to wait for a response
      * @return a response of null if timed out
-     * @throws Exception
      */
     public BlazeMessage sendRequest(Destination destination, BlazeMessage message, int timeout) throws Exception;
 
     /**
      * Send a response message to an original message - for request/response
-     * 
-     * @param to
-     *            the Member to send a response to
-     * @param response
-     *            the message to send in a response
-     * @param correlationId
-     *            the associated id from the original message
-     * @throws Exception
+     *
+     * @param to            the Member to send a response to
+     * @param response      the message to send in a response
+     * @param correlationId the associated id from the original message
      */
     public void sendReply(Member to, BlazeMessage response, String correlationId) throws Exception;
 
@@ -159,8 +122,7 @@ public interface BlazeGroupChannel exten
     public BlazeMessageListener getInboxListener();
 
     /**
-     * @param inboxListener
-     *            the inboxListener to set
+     * @param inboxListener the inboxListener to set
      */
     public void setInboxListener(BlazeMessageListener inboxListener);
 
@@ -171,36 +133,27 @@ public interface BlazeGroupChannel exten
 
     /**
      * @return a set of the members
-     * @throws Exception
      */
     public Set<Member> getMembers() throws Exception;
 
     /**
      * Get a member by its unique id
-     * 
-     * @param id
-     * @return
-     * @throws Exception
+     *
+     * @return the Member
      */
     public Member getMemberById(String id) throws Exception;
 
     /**
      * Return a member of the Group with the matching name
-     * 
-     * @param name
-     * @return
-     * @throws Exception
+     *
+     * @return the Member
      */
     public Member getMemberByName(String name) throws Exception;
 
     /**
      * Will wait for a member to advertise itself if not available
-     * 
-     * @param name
-     * @param timeout
+     *
      * @return the member or null
-     * @throws InterruptedException
-     * @throws Exception
      */
     public Member getAndWaitForMemberByName(String name, int timeout) throws InterruptedException, Exception;
 
@@ -211,88 +164,50 @@ public interface BlazeGroupChannel exten
 
     /**
      * Add a listener for membership changes
-     * 
-     * @param l
-     * @throws Exception
      */
     public void addMemberChangedListener(MemberChangedListener l) throws Exception;
 
     /**
      * Remove a listener for membership changes
-     * 
-     * @param l
-     * @throws Exception
      */
     public void removeMemberChangedListener(MemberChangedListener l) throws Exception;
 
     /**
      * Add a listener for messages
-     * 
-     * @param destination
-     * @param l
-     * @throws Exception
      */
     public void addBlazeQueueMessageListener(String destination, BlazeMessageListener l) throws Exception;
 
     /**
      * Add a listener for messages
-     * 
-     * @param subscription
-     * @param l
-     * @throws Exception
      */
     public void addBlazeQueueMessageListener(Subscription subscription, BlazeMessageListener l) throws Exception;
 
     /**
      * Remove a listener for messages
-     * 
-     * @param destination
-     * @param l 
-     * @throws Exception
      */
     public void removeBlazeQueueMessageListener(String destination, BlazeMessageListener l) throws Exception;
 
     /**
      * Remove a listener for messages
-     * 
-     * @param subscription
-     * @param l 
-     * 
-     * @throws Exception
      */
     public void removeBlazeQueueMessageListener(Subscription subscription, BlazeMessageListener l) throws Exception;
 
     /**
      * Add member to a group
-     * 
-     * @param groupName
-     * @throws Exception
      */
     public void addToGroup(String groupName) throws Exception;
 
     /**
      * remove member from a group
-     * 
-     * @param groupName
-     * @throws Exception
      */
     public void removeFromGroup(String groupName) throws Exception;
 
     /**
      * Get an array of groups
-     * 
+     *
      * @return an array of groups
-     * @throws Exception
      */
     public List<String> getGroups() throws Exception;
 
-    /**
-     * @param processor
-     */
-    public void setBlazeMessageProcessor(BlazeMessageProcessor processor);
-    
-    /**
-     * @return BlazeMessageProcessor
-     */
-    public BlazeMessageProcessor getBlazeMessageProcessor();
+
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelFactory.java?rev=1504961&r1=1504960&r2=1504961&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelFactory.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelFactory.java Fri Jul 19 18:44:21 2013
@@ -31,8 +31,6 @@ public class BlazeGroupChannelFactory ex
 
     /**
      * Construct a factory to use the passed Configuration
-     * 
-     * @param config
      */
     public BlazeGroupChannelFactory(BlazeGroupConfiguration config) {
         super(config);
@@ -40,10 +38,8 @@ public class BlazeGroupChannelFactory ex
 
     /**
      * Create a GroupChannel
-     * 
-     * @param name
+     *
      * @return the Channel
-     * @throws Exception 
      */
     public BlazeGroupChannel createGroupChannel(String name) throws Exception {
         BlazeGroupChannelImpl result = new BlazeGroupChannelImpl(name);



Mime
View raw message