activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r722095 [2/2] - in /activemq/activemq-blaze/trunk/src: main/java/org/apache/activeblaze/ main/java/org/apache/activeblaze/cluster/ main/java/org/apache/activeblaze/group/ main/java/org/apache/activeblaze/impl/processor/ main/java/org/apache...
Date Mon, 01 Dec 2008 15:32:05 GMT
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ElectionService.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ElectionService.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ElectionService.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ElectionService.java Mon Dec  1 07:32:04 2008
@@ -21,52 +21,52 @@
 import org.apache.activeblaze.BaseService;
 import org.apache.activeblaze.group.Member;
 import org.apache.activeblaze.group.MemberImpl;
+import org.apache.activeblaze.util.AsyncGroupRequest;
+import org.apache.activeblaze.wire.ElectionMessage;
 import org.apache.activeblaze.wire.ElectionType;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 /**
- * Responsible for calling an election amongst the members and deciding a 
- * coordinator
- *
+ * Responsible for calling an election amongst the members and deciding a coordinator
+ * 
  */
 class ElectionService extends BaseService implements Runnable {
     private static final Log LOG = LogFactory.getLog(ElectionService.class);
     private final ClusterGroup group;
     private Member member;
-    ElectionService(ClusterGroup group,Member member, boolean memberStarted) {
-        this.group=group;
+
+    ElectionService(ClusterGroup group, Member member, boolean memberStarted) {
+        this.group = group;
         this.member = member;
     }
 
-    
     public void run() {
         try {
             doElection();
         } catch (Exception e) {
-           LOG.error("Failed to run election",e);
+            LOG.error("Failed to run election", e);
         }
     }
 
     void doElection() throws Exception {
         List<MemberImpl> members = new ArrayList<MemberImpl>(this.group.getMembersImpl());
-        if ((this.member == null || (!this.member.getId().equals(this.group.getId()) || members.size() == this.group.getConfiguration().getMinimumGroupSize()))) {
-            
+        if ((this.member == null || (!this.member.getId().equals(this.group.getId()) || members.size() == this.group
+                .getConfiguration().getMinimumGroupSize()))) {
             // call an election
-            while (!this.group.callElection() && this.group.isStarted() && isStarted())
+            while (!callElection() && this.group.isStarted() && isStarted())
                 ;
             if (this.group.isStarted() && isStarted()) {
-                
                 this.group.setMaster(selectCordinator(members));
                 if (this.group.isMasterMatch()) {
                     this.group.broadcastElectionType(ElectionType.MASTER);
                 }
                 if (!this.group.isElectionFinished() && isStarted()) {
-                    //ok - lets just wait for more members to show
-                    //we could be the coordinator now - but best to check
+                    // lets just wait for more members to show
+                    // we could be the coordinator now - but best to check
                     try {
                         synchronized (this.group.electionFinished) {
-                            this.group.electionFinished.wait(this.group.getConfiguration().getHeartBeatInterval() * 2);
+                            this.group.electionFinished.wait(this.group.getConfiguration().getAwaitGroupTimeout());
                         }
                     } catch (InterruptedException e) {
                     }
@@ -75,18 +75,38 @@
                     // we must be the coordinator
                     this.group.setMaster(this.group.getLocalMember());
                     this.group.setElectionFinished(true);
-                    LOG.debug(this.group.getLocalMember()+" We are the Coordinator ");
+                    LOG.debug(this.group.getLocalMember() + " I am the Master ...");
                     this.group.broadcastElectionType(ElectionType.MASTER);
                 }
             }
         }
     }
-    
+
+    boolean callElection() throws Exception {
+        if (isStarted()) {
+            List<MemberImpl> members = new ArrayList<MemberImpl>(this.group.getMembersImpl());
+            List<MemberImpl> sorted = ClusterGroup.sortMemberList(members);
+            AsyncGroupRequest request = new AsyncGroupRequest();
+            boolean doCall = false;
+            for (MemberImpl member : sorted) {
+                if (this.group.channel.getId().equals(member.getId())) {
+                    doCall = true;
+                } else if (doCall) {
+                    ElectionMessage msg = new ElectionMessage();
+                    msg.setMember(this.group.getLocalMember().getData());
+                    msg.setElectionType(ElectionType.ELECTION);
+                    this.group.channel.sendMessage(request, member, msg.type(), msg);
+                }
+            }
+            boolean result = request.isSuccess(this.group.getConfiguration().getAwaitGroupTimeout());
+            return result;
+        }
+        return true;
+    }
+
     protected MemberImpl selectCordinator(List<MemberImpl> list) throws Exception {
         List<MemberImpl> sorted = ClusterGroup.sortMemberList(list);
-        MemberImpl result = sorted.isEmpty() ? this.group.getLocalMember() : sorted
-                .get(list.size() - 1);
+        MemberImpl result = sorted.isEmpty() ? this.group.getLocalMember() : sorted.get(list.size() - 1);
         return result;
     }
-
 }

Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/MasterChangedListener.java (from r720544, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterChangedListener.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/MasterChangedListener.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/MasterChangedListener.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterChangedListener.java&r1=720544&r2=722095&rev=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterChangedListener.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/MasterChangedListener.java Mon Dec  1 07:32:04 2008
@@ -19,13 +19,13 @@
 import org.apache.activeblaze.group.Member;
 
 /**
- * A listener for coordinator changes to a group
+ * A listener for Master changes to a group
  *
  */
-public interface ClusterChangedListener  {
+public interface MasterChangedListener  {
     /**
      * Fired when a master changes in the group
      * @param master the new master of the cluster
      */
-    void ClusterChanged(Member master);
+    void masterChanged(Member master);
 }

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/StateKey.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/StateKey.java?rev=722095&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/StateKey.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/StateKey.java Mon Dec  1 07:32:04 2008
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.cluster;
+
+import org.apache.activeblaze.group.Member;
+import org.apache.activeblaze.group.MemberImpl;
+import org.apache.activeblaze.wire.MemberData;
+import org.apache.activeblaze.wire.StateKeyData;
+
+/**
+ * Holds information about a StateKey
+ * 
+ */
+class StateKey {
+    private MemberImpl owner;
+    private StateKeyData keyData;
+
+    /**
+     * Constructor
+     * 
+     * @param owner
+     * @param key
+     */
+     StateKey(MemberImpl owner, String key) {
+        this.owner = owner;
+        this.keyData = new StateKeyData();
+        this.keyData.setKey(key);
+        this.keyData.setMember(owner.getData());
+    }
+     
+     StateKey(StateKeyData keyData) throws Exception{
+         this.keyData=keyData;
+         MemberData memberData = keyData.getMember();
+         this.owner=new MemberImpl(memberData);
+     }
+     
+     StateKey copy() throws Exception {
+         return new StateKey(this.keyData.clone());
+     }
+
+    /**
+     * @return the owner
+     */
+     Member getOwner() {
+        return this.owner;
+    }
+
+    /**
+     * @param member
+     */
+     void setOwner(MemberImpl member) {
+        this.owner = member;
+        this.keyData.setMember(member.getData());
+    }
+
+    /**
+     * @return the key
+     */
+     String getKey() {
+        return this.keyData.getKey();
+    }
+
+    /**
+     * @return the share
+     */
+     boolean isLocked() {
+        return this.keyData.getLocked();
+    }
+
+    /**
+     * @param locked
+     *            the share to set
+     */
+     void setLocked(boolean locked) {
+        this.keyData.setLocked(locked);
+    }
+
+    /**
+     * @return the removeOnExit
+     */
+     boolean isRemoveOnExit() {
+        return this.keyData.getRemoveOnExit();
+    }
+
+    /**
+     * @param removeOnExit
+     *            the removeOnExit to set
+     */
+     void setRemoveOnExit(boolean removeOnExit) {
+        this.keyData.setRemoveOnExit(removeOnExit);
+    }
+
+    /**
+     * @return the expiration
+     */
+     long getExpiration() {
+        return this.keyData.getExpiration();
+    }
+
+    /**
+     * @param expiration
+     *            the expiration to set
+     */
+     void setExpiration(long expiration) {
+        this.keyData.setExpiration(expiration);
+    }
+
+    /**
+     * @return the lockExpiration
+     */
+     long getLockExpiration() {
+        return this.keyData.getLockExpiration();
+    }
+
+    /**
+     * @param lockExpiration
+     *            the lockExpiration to set
+     */
+     void setLockExpiration(long lockExpiration) {
+        this.keyData.setLockExpiration(lockExpiration);
+    }
+
+    /**
+     * @return the releaseLockOnExit
+     */
+     boolean isReleaseLockOnExit() {
+        return this.keyData.getReleaseLockOnExit();
+    }
+
+    /**
+     * @param releaseLockOnExit
+     *            the releaseLockOnExit to set
+     */
+     void setReleaseLockOnExit(boolean releaseLockOnExit) {
+        this.keyData.setReleaseLockOnExit(releaseLockOnExit);
+    }
+
+    /**
+     * set time to live
+     * 
+     * @param ttl
+     */
+     void setTimeToLive(long ttl) {
+        if (ttl > 0) {
+            setExpiration(ttl + System.currentTimeMillis());
+        } else {
+            setExpiration(0);
+        }
+    }
+
+    /**
+     * set lease time to live
+     * 
+     * @param ttl
+     */
+     void setLockLeaseTime(long ttl) {
+        if (ttl > 0) {
+            setLockExpiration(ttl + System.currentTimeMillis());
+        } else {
+            setLockExpiration(0);
+        }
+    }
+
+    /**
+     * @return true if expired
+     */
+     boolean isExpired() {
+        return isExpired(System.currentTimeMillis());
+    }
+
+    /**
+     * @param currentTime
+     * @return true if expired
+     */
+     boolean isExpired(long currentTime) {
+        return this.keyData.getExpiration() > 0 && this.keyData.getExpiration() < currentTime;
+    }
+
+    /**
+     * @return true if lock expired
+     */
+     boolean isLockExpired() {
+        return isLockExpired(System.currentTimeMillis());
+    }
+
+    /**
+     * @param currentTime
+     * @return true if lock expired
+     */
+     boolean isLockExpired(long currentTime) {
+        return this.keyData.getLockExpiration() > 0 && this.keyData.getLockExpiration() < currentTime;
+    }
+     
+     StateKeyData getKeyData() {
+         return this.keyData;
+     }
+
+    public int hashCode() {
+        return this.keyData.getKey() != null ? this.keyData.getKey().hashCode() : super.hashCode();
+    }
+
+     public boolean equals(Object obj) {
+        boolean result = false;
+        if (obj instanceof StateKey) {
+            StateKey other = (StateKey) obj;
+            result = other.keyData.getKey().equals(this.keyData.getKey());
+        }
+        return result;
+    }
+
+    public  String toString() {
+        return "key:" + this.keyData.getKey();
+    }
+    
+    
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/StateKey.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/StateValue.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/StateValue.java?rev=722095&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/StateValue.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/StateValue.java Mon Dec  1 07:32:04 2008
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.cluster;
+
+import org.apache.activeblaze.util.IOUtils;
+import org.apache.activeblaze.wire.StateData;
+
+/**
+ * Holds information about the Value in the Map
+ * 
+ */
+class StateValue {
+    private final StateKey key;
+    private final StateData data;
+    private Object value;
+
+    /**
+     * Constructor
+     * 
+     * @param key
+     * @param value
+     */
+    StateValue(StateKey key, Object value, StateData data) {
+        this.key = key;
+        this.value = value;
+        this.data = data;
+    }
+
+    StateValue(StateData data) throws Exception {
+        this.key = new StateKey(data.getKeyData());
+        this.data = data;
+    }
+
+    StateValue copy() throws Exception {
+        return new StateValue(this.data.clone());
+    }
+
+    /**
+     * @return the owner
+     */
+    StateKey getKey() {
+        return this.key;
+    }
+
+    /**
+     * @return the key
+     * @throws Exception
+     */
+    Object getValue() throws Exception {
+        if (this.value == null && this.data != null) {
+            if (this.data.getValue() != null) {
+                this.value = IOUtils.getObject(this.data.getValue());
+            }
+        }
+        return this.value;
+    }
+
+    /**
+     * @return the data
+     */
+    StateData getData() {
+        return this.data;
+    }
+
+    public int hashCode() {
+        return this.value != null ? this.value.hashCode() : super.hashCode();
+    }
+
+    public boolean equals(Object obj) {
+        boolean result = false;
+        if (obj instanceof StateValue) {
+            StateValue other = (StateValue) obj;
+            result = (this.value == null && other.value == null)
+                    || (this.value != null && other.value != null && this.value.equals(other.value));
+        }
+        return result;
+    }
+}
\ No newline at end of file

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/StateValue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java Mon Dec  1 07:32:04 2008
@@ -126,24 +126,27 @@
 
     /**
      * @return a set of the members
+     * @throws Exception 
      */
-    public Set<Member> getMembers();
+    public Set<Member> getMembers() throws Exception;
 
     /**
      * Get a member by its unique id
      * 
      * @param id
      * @return
+     * @throws Exception 
      */
-    public Member getMemberById(String id);
+    public Member getMemberById(String id) throws Exception;
 
     /**
      * Return a member of the Group with the matching name
      * 
      * @param name
      * @return
+     * @throws Exception 
      */
-    public Member getMemberByName(String name);
+    public Member getMemberByName(String name) throws Exception;
     
     /**
      * Will wait for a member to advertise itself if not available
@@ -151,8 +154,9 @@
      * @param timeout
      * @return the member or null
      * @throws InterruptedException 
+     * @throws Exception 
      */
-    public Member getAndWaitForMemberByName(String name, int timeout) throws InterruptedException;
+    public Member getAndWaitForMemberByName(String name, int timeout) throws InterruptedException, Exception;
 
     /**
      * @return the local member that represents this <CODE>Group</CODE> instance
@@ -163,15 +167,17 @@
      * Add a listener for membership changes
      * 
      * @param l
+     * @throws Exception 
      */
-    public void addMemberChangedListener(MemberChangedListener l);
+    public void addMemberChangedListener(MemberChangedListener l) throws Exception;
 
     /**
      * Remove a listener for membership changes
      * 
      * @param l
+     * @throws Exception 
      */
-    public void removeMemberChangedListener(MemberChangedListener l);
+    public void removeMemberChangedListener(MemberChangedListener l) throws Exception;
 
     /**
      * Add a listener for messages

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java Mon Dec  1 07:32:04 2008
@@ -18,7 +18,6 @@
 
 import java.net.InetSocketAddress;
 import java.net.URI;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -55,16 +54,16 @@
 public class BlazeGroupChannelImpl extends BlazeChannelImpl implements BlazeGroupChannel {
     private static final Log LOG = LogFactory.getLog(BlazeGroupChannelImpl.class);
     private final String name;
-    private Processor unicast;
+    protected Processor unicast;
     private BaseTransport groupManagementTransport;
     private InetSocketAddress toManagementAddress;
     private MemberImpl local;
     private BlazeQueueListener inboxListener;
-    private Map<Buffer, SendRequest> messageRequests = new LRUCache<Buffer, SendRequest>(1000);
+    protected Map<Buffer, SendRequest> messageRequests = new LRUCache<Buffer, SendRequest>(10000);
     private Map<Buffer, BlazeQueueListener> queueMessageListenerMap = new ConcurrentHashMap<Buffer, BlazeQueueListener>();
     private Group group;
-    private Buffer inboxURI;
-    private final Object localMutex = new Object();
+    protected Buffer inboxURI;
+    protected final Object localMutex = new Object();
 
     /**
      * Constructor
@@ -198,43 +197,57 @@
      * @return the member for this channel
      * @see org.apache.activeblaze.group.BlazeGroupChannel#getLocalMember()
      */
-    public final MemberImpl getLocalMember() {
+    public MemberImpl getLocalMember() {
         synchronized (this.localMutex) {
             return this.local;
         }
     }
 
+    protected void setLocalMember(MemberImpl local) {
+        synchronized (this.localMutex) {
+            this.local = local;
+        }
+    }
+
     /**
      * @param l
+     * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#addMemberChangedListener(org.apache.activeblaze.group.MemberChangedListener)
      */
-    public void addMemberChangedListener(MemberChangedListener l) {
+    public void addMemberChangedListener(MemberChangedListener l) throws Exception {
+        init();
         this.group.addMemberChangedListener(l);
     }
 
     /**
      * @param l
+     * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#removeMemberChangedListener(org.apache.activeblaze.group.MemberChangedListener)
      */
-    public void removeMemberChangedListener(MemberChangedListener l) {
+    public void removeMemberChangedListener(MemberChangedListener l) throws Exception {
+        init();
         this.group.removeMemberChangedListener(l);
     }
 
     /**
      * @param id
      * @return
+     * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#getMemberById(java.lang.String)
      */
-    public Member getMemberById(String id) {
+    public Member getMemberById(String id) throws Exception {
+        init();
         return this.group.getMemberById(id);
     }
 
     /**
      * @param name
      * @return
+     * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#getMemberByName(java.lang.String)
      */
-    public Member getMemberByName(String name) {
+    public Member getMemberByName(String name) throws Exception {
+        init();
         return this.group.getMemberByName(name);
     }
 
@@ -244,17 +257,20 @@
      * @param name
      * @param timeout
      * @return the member or null
-     * @throws InterruptedException
+     * @throws Exception
      */
-    public Member getAndWaitForMemberByName(String name, int timeout) throws InterruptedException {
+    public Member getAndWaitForMemberByName(String name, int timeout) throws Exception {
+        init();
         return this.group.getAndWaitForMemberByName(name, timeout);
     }
 
     /**
      * @return
+     * @throws Exception
      * @see org.apache.activeblaze.group.BlazeGroupChannel#getMembers()
      */
-    public Set<Member> getMembers() {
+    public Set<Member> getMembers() throws Exception {
+        init();
         return this.group.getMembers();
     }
 
@@ -361,15 +377,25 @@
                         waitTime = (int) Math.max(deadline - System.currentTimeMillis(), 0);
                     }
                 }
-            }else {
+            } else {
                 this.group.waitForNewMember((int) waitTime);
             }
         }
         return null;
     }
 
-    protected synchronized BlazeMessage sendRequest(MemberImpl member, Buffer destination, BlazeMessage message,
-            int timeout) throws Exception {
+    /**
+     * send Request
+     * 
+     * @param member
+     * @param destination
+     * @param message
+     * @param timeout
+     * @return
+     * @throws Exception
+     */
+    public BlazeMessage sendRequest(MemberImpl member, Buffer destination, BlazeMessage message, int timeout)
+            throws Exception {
         BlazeMessage result = null;
         if (member != null) {
             SendRequest request = new SendRequest();
@@ -399,7 +425,7 @@
      * @see org.apache.activeblaze.group.BlazeGroupChannel#sendReply(org.apache.activeblaze.group.Member,
      *      org.apache.activeblaze.BlazeMessage, java.lang.String)
      */
-    public synchronized void sendReply(Member to, BlazeMessage response, String correlationId) throws Exception {
+    public void sendReply(Member to, BlazeMessage response, String correlationId) throws Exception {
         response.storeContent();
         BlazeData blazeData = response.getContent();
         blazeData.setTopic(false);
@@ -418,7 +444,7 @@
         send(member, destination, blazeData);
     }
 
-    protected synchronized void send(MemberImpl member, Buffer destination, BlazeData blazeData) throws Exception {
+    protected void send(MemberImpl member, Buffer destination, BlazeData blazeData) throws Exception {
         blazeData.setTopic(false);
         blazeData.setDestination(destination);
         PacketData data = getPacketData(MessageType.BLAZE_DATA, blazeData);
@@ -499,7 +525,7 @@
         }
     }
 
-    boolean processRequest(Buffer correlationId, Message<?> value) {
+    protected boolean processRequest(Buffer correlationId, Message<?> value) {
         boolean result = false;
         if (correlationId != null) {
             SendRequest request = null;
@@ -556,7 +582,7 @@
      * @param message
      * @throws Exception
      */
-    public synchronized void broadcastMessage(MessageType messageType, Message<?> message) throws Exception {
+    public void broadcastMessage(MessageType messageType, Message<?> message) throws Exception {
         PacketData data = getPacketData(messageType, message);
         data.setReliable(false);
         data.setFromAddress(this.inboxURI);
@@ -566,13 +592,15 @@
     }
 
     /**
+     * broadcast a general message
+     * 
      * @param asyncRequest
      * @param messageType
      * @param message
      * @throws Exception
      */
-    public synchronized void broadcastMessage(AsyncGroupRequest asyncRequest, MessageType messageType,
-            Message<?> message) throws Exception {
+    public void broadcastMessage(AsyncGroupRequest asyncRequest, MessageType messageType, Message<?> message)
+            throws Exception {
         SendRequest request = new SendRequest();
         PacketData data = getPacketData(messageType, message);
         asyncRequest.add(data.getMessageId(), request);
@@ -587,13 +615,53 @@
     }
 
     /**
-     * @param to
+     * send a message
+     * @param asyncRequest
+     * @param member
      * @param messageType
      * @param message
      * @throws Exception
      */
-    public synchronized void sendMessage(InetSocketAddress to, MessageType messageType, Message<?> message)
+    public void sendMessage(AsyncGroupRequest asyncRequest,MemberImpl member, MessageType messageType, Message<?> message)
             throws Exception {
+        SendRequest request = new SendRequest();
+        PacketData data = getPacketData(messageType, message);
+        asyncRequest.add(data.getMessageId(), request);
+        synchronized (this.messageRequests) {
+            this.messageRequests.put(data.getMessageId(), request);
+        }
+        data.setReliable(false);
+        data.setFromAddress(this.inboxURI);
+        Packet packet = new Packet(data);
+        packet.setTo(member.getAddress());
+        this.unicast.downStream(packet);
+    }
+
+    /**
+     * broadcast a general message
+     * 
+     * @param messageType
+     * @param message
+     * @param correlationId
+     * @throws Exception
+     */
+    public void broadcastMessage(MessageType messageType, Message<?> message, String correlationId) throws Exception {
+        PacketData data = getPacketData(messageType, message);
+        data.setCorrelationId(new Buffer(correlationId));
+        data.setReliable(true);
+        data.setFromAddress(this.inboxURI);
+        Packet packet = new Packet(data);
+        packet.setTo(this.toManagementAddress);
+        this.groupManagementTransport.downStream(packet);
+    }
+
+    /**
+     * @param to
+     * @param messageType
+     * @param message
+     * @throws Exception
+     */
+    public void sendMessage(InetSocketAddress to, MessageType messageType, Message<?> message) throws Exception {
         PacketData data = getPacketData(messageType, message);
         data.setReliable(false);
         data.setFromAddress(this.inboxURI);
@@ -609,7 +677,7 @@
      * @param correlationId
      * @throws Exception
      */
-    public synchronized void sendReply(MemberImpl to, MessageType messageType, Message<?> message, String correlationId)
+    public void sendReply(MemberImpl to, MessageType messageType, Message<?> message, String correlationId)
             throws Exception {
         PacketData data = getPacketData(messageType, message);
         data.setCorrelationId(new Buffer(correlationId));

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java Mon Dec  1 07:32:04 2008
@@ -25,7 +25,7 @@
 public class BlazeGroupConfiguration extends BlazeConfiguration {
     private String groupManagementURI = "mcast://224.2.2.2:8888";
     
-    private int heartBeatInterval = 250;
+    private int heartBeatInterval = 800;
    
     /**
      * @return the groupManagementUTI

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/DefaultMemberChangedListener.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/DefaultMemberChangedListener.java?rev=722095&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/DefaultMemberChangedListener.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/DefaultMemberChangedListener.java Mon Dec  1 07:32:04 2008
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.group;
+
+/**
+ * A Default listener for membership changes to a group
+ * 
+ */
+public class DefaultMemberChangedListener implements MemberChangedListener {
+    /**
+     * @param member
+     * @see org.apache.activeblaze.group.MemberChangedListener#memberStarted(org.apache.activeblaze.group.Member)
+     */
+    public void memberStarted(Member member) {
+    }
+
+    /**
+     * @param member
+     * @see org.apache.activeblaze.group.MemberChangedListener#memberStopped(org.apache.activeblaze.group.Member)
+     */
+    public void memberStopped(Member member) {
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/DefaultMemberChangedListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java Mon Dec  1 07:32:04 2008
@@ -22,6 +22,7 @@
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
@@ -49,6 +50,14 @@
     private final Map<Buffer, List<MemberImpl>> queueMap = new ConcurrentHashMap<Buffer, List<MemberImpl>>();
     private final Map<Buffer, List<MemberImpl>> topicMap = new ConcurrentHashMap<Buffer, List<MemberImpl>>();
     private final Object memberMutex = new Object();
+    protected ExecutorService listenerService;
+    protected final ThreadFactory threadFactory = new ThreadFactory() {
+        public Thread newThread(Runnable r) {
+            Thread thread = new Thread(r);
+            thread.setDaemon(true);
+            return thread;
+        }
+    };
 
     /**
      * Constructor
@@ -67,7 +76,7 @@
      * @return the Member of the Channel
      * @throws Exception
      */
-    public MemberImpl getLocalMember(){
+    public MemberImpl getLocalMember() {
         return this.channel.getLocalMember();
     }
 
@@ -183,6 +192,7 @@
     public boolean init() throws Exception {
         boolean result = super.init();
         if (result) {
+            this.listenerService = Executors.newCachedThreadPool(this.threadFactory);
             this.members.put(this.channel.getId(), this.channel.getLocalMember());
         }
         return result;
@@ -195,7 +205,9 @@
      */
     public boolean shutDown() throws Exception {
         boolean result = super.shutDown();
-        if (result) {
+        this.members.clear();
+        if (this.listenerService != null) {
+            this.listenerService.shutdownNow();
         }
         return result;
     }
@@ -208,13 +220,7 @@
     public boolean start() throws Exception {
         boolean result = super.start();
         if (result) {
-            this.heartBeatService = Executors.newScheduledThreadPool(1, new ThreadFactory() {
-                public Thread newThread(Runnable r) {
-                    Thread thread = new Thread(r);
-                    thread.setDaemon(true);
-                    return thread;
-                }
-            });
+            this.heartBeatService = Executors.newScheduledThreadPool(1, this.threadFactory);
             Runnable heartbeat = new Runnable() {
                 public void run() {
                     try {
@@ -225,15 +231,9 @@
                 }
             };
             heartbeat.run();
-            int interval = this.configuration.getHeartBeatInterval();
+            int interval = this.configuration.getHeartBeatInterval() / 4;
             this.heartBeatService.scheduleAtFixedRate(heartbeat, interval, interval, TimeUnit.MILLISECONDS);
-            this.checkMembershipService = Executors.newScheduledThreadPool(1, new ThreadFactory() {
-                public Thread newThread(Runnable r) {
-                    Thread thread = new Thread(r);
-                    thread.setDaemon(true);
-                    return thread;
-                }
-            });
+            this.checkMembershipService = Executors.newScheduledThreadPool(1, this.threadFactory);
             Runnable checkMembership = new Runnable() {
                 public void run() {
                     if (isStarted()) {
@@ -260,17 +260,17 @@
         boolean result = super.stop();
         if (result) {
             if (this.heartBeatService != null) {
-                this.heartBeatService.shutdown();
+                this.heartBeatService.shutdownNow();
             }
             if (this.checkMembershipService != null) {
-                this.checkMembershipService.shutdown();
+                this.checkMembershipService.shutdownNow();
             }
         }
         return result;
     }
-    
+
     public String toString() {
-        return "Group "+getLocalMember().getName();
+        return "Group " + getLocalMember().getName();
     }
 
     /**
@@ -302,23 +302,31 @@
         return result;
     }
 
-    private void fireMemberStarted(Member member) {
+    private void fireMemberStarted(final Member member) {
         synchronized (this.memberMutex) {
             this.memberMutex.notifyAll();
         }
         LOG.debug(this.channel.getName() + " Member started " + member);
-        for (MemberChangedListener l : this.membershipListeners) {
-            l.memberStarted(member);
+        for (final MemberChangedListener l : this.membershipListeners) {
+            this.listenerService.execute(new Runnable() {
+                public void run() {
+                    l.memberStarted(member);
+                }
+            });
         }
     }
 
-    private void fireMemberStopped(Member member) {
+    private void fireMemberStopped(final Member member) {
         synchronized (this.memberMutex) {
             this.memberMutex.notifyAll();
         }
         LOG.debug(this.channel.getName() + " Member stopped " + member);
-        for (MemberChangedListener l : this.membershipListeners) {
-            l.memberStopped(member);
+        for (final MemberChangedListener l : this.membershipListeners) {
+            this.listenerService.execute(new Runnable() {
+                public void run() {
+                    l.memberStopped(member);
+                }
+            });
         }
     }
 

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Member.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Member.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Member.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Member.java Mon Dec  1 07:32:04 2008
@@ -56,8 +56,10 @@
     
     
     /**
-     * @return the coordinatorWeight
+     * This weight can be used to help select a master
+     * in the cluster - the highest weight becomes the master
+     * @return the masterWeight
      */
-    public long getCoordinatorWeight();
+    public int getMasterWeight();
     
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java Mon Dec  1 07:32:04 2008
@@ -36,18 +36,18 @@
      * Default constructor
      * @param id 
      * @param name 
-     * @param coordinatorWeight 
+     * @param masterWeight 
      * @param localURI 
      * @throws Exception 
      */
-    public MemberImpl(String id,String name,long coordinatorWeight,URI localURI) throws Exception {
+    public MemberImpl(String id,String name,int masterWeight,URI localURI) throws Exception {
         InetAddress addr = InetAddress.getByName(localURI.getHost());
         this.socketAddress = new InetSocketAddress(addr,localURI.getPort());
         this.socketAddressAsBuffer=new Buffer(this.socketAddress.toString());
         this.data = new MemberData();
         this.data.setId(id);
         this.data.setName(name);
-        this.data.setCoordinatorWeight(coordinatorWeight);
+        this.data.setMasterWeight(masterWeight);
         this.data.setStartTime(System.currentTimeMillis());
         this.data.setInetAddress(new Buffer(addr.getHostAddress()));
         this.data.setPort(localURI.getPort());
@@ -134,13 +134,13 @@
     /**
      * @return the coordinatorWeight
      */
-    public long getCoordinatorWeight() {
-        return this.data.getCoordinatorWeight();
+    public int getMasterWeight() {
+        return this.data.getMasterWeight();
     }
        
     
     public String toString() {
-        return this.data.getName()+"["+this.data.getId()+"]";
+        return getName()+"["+getId()+"]w="+getMasterWeight();
     }
     
         

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java Mon Dec  1 07:32:04 2008
@@ -45,8 +45,9 @@
 
     /**
      * Set Next at the end of the chain
+     * 
      * @param next
-     *            
+     * 
      */
     public void setEnd(Processor next) {
         ChainedProcessor target = this;
@@ -58,19 +59,20 @@
                 n = cn.getNext();
             }
         }
-        if(next instanceof ChainedProcessor) {
+        if (next instanceof ChainedProcessor) {
             target.setNextChain((ChainedProcessor) next);
-        }else {
-        target.next=next;
+        } else {
+            target.next = next;
         }
     }
-    
+
     /**
      * Set the next
+     * 
      * @param next
      */
     public void setNext(Processor next) {
-        this.next=next;
+        this.next = next;
     }
 
     /**
@@ -95,7 +97,7 @@
                 n = cn.getNext();
             }
         }
-        target.next=p;
+        target.next = p;
         p.setPrev(target);
         if (this.exceptionListener != null && p.exceptionListener == null) {
             p.exceptionListener = this.exceptionListener;
@@ -113,7 +115,7 @@
     public boolean init() throws Exception {
         boolean result = super.init();
         if (result && this.next != null) {
-            result = this.next.init();
+            this.next.init();
         }
         return result;
     }
@@ -121,7 +123,7 @@
     public boolean shutDown() throws Exception {
         boolean result = super.shutDown();
         if (result && this.next != null) {
-            result = this.next.shutDown();
+            this.next.shutDown();
         }
         return result;
     }
@@ -129,7 +131,7 @@
     public boolean start() throws Exception {
         boolean result = super.start();
         if (result && this.next != null) {
-            result = this.next.start();
+            this.next.start();
         }
         return result;
     }
@@ -137,7 +139,7 @@
     public boolean stop() throws Exception {
         boolean result = super.stop();
         if (result && this.next != null) {
-            result = this.next.stop();
+            this.next.stop();
         }
         return result;
     }
@@ -178,7 +180,7 @@
             LOG.error("No exception listener - caught exception ", e);
         }
     }
-    
+
     /**
      * calls stop - but catches exceptions
      */
@@ -186,7 +188,7 @@
         try {
             stop();
         } catch (Throwable e) {
-           LOG.error("Caught an exception stopping",e);
+            LOG.error("Caught an exception stopping", e);
         }
     }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java Mon Dec  1 07:32:04 2008
@@ -29,6 +29,7 @@
  * Fragments a packet
  */
 
+@SuppressWarnings("serial")
 public class FragmentationProcessor extends ChainedProcessor {
     private static final Log LOG = LogFactory.getLog(FragmentationProcessor.class);
     private int maxPacketSize = BlazeConfiguration.DEFAULT_MAX_PACKET_SIZE;

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java Mon Dec  1 07:32:04 2008
@@ -50,10 +50,7 @@
      * @param data
      */
     public Packet(PacketData data) {
-        this.packetData = data;
-        if (data.hasMessageId()) {
-            this.id = this.packetData.getMessageId().toString();
-        }
+        this.packetData = data;  
         this.from = null;
         this.to = null;
     }
@@ -67,9 +64,6 @@
     public Packet(SocketAddress from, PacketData data) {
         this.from = from;
         this.packetData = data;
-        if (data.hasMessageId()) {
-            this.id = this.packetData.getMessageId().toString();
-        }
         this.to = null;
     }
 
@@ -83,7 +77,6 @@
     public Packet(InetAddress toAddress, int toPort, PacketData data) {
         this.to = new InetSocketAddress(toAddress, toPort);
         this.packetData = data;
-        this.id = this.packetData.getMessageId().toString();
         this.from = null;
     }
 
@@ -97,7 +90,6 @@
     public Packet(String toAddress, int toPort, PacketData data) {
         this.to = new InetSocketAddress(toAddress, toPort);
         this.packetData = data;
-        this.id = this.packetData.getMessageId().toString();
         this.from = null;
     }
     
@@ -114,6 +106,11 @@
      * @return the id
      */
     public String getId() {
+        if (this.id==null && this.packetData!=null) {
+            if (this.packetData.hasMessageId()) {
+                this.id = this.packetData.getMessageId().toStringUtf8();
+            }
+        }
         return this.id;
     }
 

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketAudit.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketAudit.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketAudit.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketAudit.java Mon Dec  1 07:32:04 2008
@@ -41,6 +41,7 @@
         return result;
     }
 
+    @SuppressWarnings("serial")
     public boolean start() throws Exception {
         boolean result = super.start();
         if (result) {

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketMessageType.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketMessageType.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketMessageType.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketMessageType.java Mon Dec  1 07:32:04 2008
@@ -19,7 +19,7 @@
 import org.apache.activeblaze.wire.MessageType;
 
 /**
- * @author rajdavies
+ * utility interface
  *
  */
 public interface PacketMessageType {

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java Mon Dec  1 07:32:04 2008
@@ -19,8 +19,6 @@
 import java.net.URI;
 import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.activeblaze.BlazeConfiguration;
-import org.apache.activeblaze.BlazeMessage;
-import org.apache.activeblaze.impl.processor.ChainedProcessor;
 import org.apache.activeblaze.impl.processor.Packet;
 import org.apache.activeblaze.impl.processor.PacketAudit;
 import org.apache.activemq.protobuf.Buffer;
@@ -43,7 +41,7 @@
     private int timeToLive = 1;
     private boolean loopBack = false;
     protected final PacketAudit audit = new PacketAudit();
-    private boolean broadcast = false;
+    private boolean broadcast = true;
     private boolean enableAudit = false;
     private int maxDispatchQueueSize = 10000;
     private LinkedBlockingQueue<Packet> dispatchQueue;

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java Mon Dec  1 07:32:04 2008
@@ -88,7 +88,7 @@
         }
     }
 
-    public void downStream(Packet packet) throws Exception {
+    public synchronized void downStream(Packet packet) throws Exception {
         if (isInitialized()) {
             if (isEnableAudit()) {
                 // add to audit

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java Mon Dec  1 07:32:04 2008
@@ -94,7 +94,7 @@
         ByteBuffer buffer = this.inBuffer;
         if (isInitialized()) {
             buffer.flip();
-            if (buffer.remaining() > 0) {
+            while (buffer.remaining() > 0) {
                 InputStream stream = IOUtils.getByteBufferInputStream(buffer);
                 PacketData data = PacketData.parseFramed(stream);
                 stream.close();
@@ -143,18 +143,20 @@
                     this.messageRequests.put(packet.getPacketData().getMessageId(), request);
                 }
             }
-            buffer.clear();
-            OutputStream stream = IOUtils.getByteBufferOutputStream(buffer);
-            if (isEnableAudit()) {
-                // add to audit
-                this.audit.isDuplicate(packet);
+            synchronized (buffer) {
+                buffer.clear();
+                OutputStream stream = IOUtils.getByteBufferOutputStream(buffer);
+                if (isEnableAudit()) {
+                    // add to audit
+                    this.audit.isDuplicate(packet);
+                }
+                packet.getPacketData().writeFramed(stream);
+                stream.close();
+                buffer.flip();
+                this.channel.send(buffer, packet.getTo());
             }
-            packet.getPacketData().writeFramed(stream);
-            stream.close();
-            buffer.flip();
-            this.channel.send(buffer, packet.getTo());
             if (request != null) {
-                if (request.get(0) == null) {
+                if (request.get(getSoTimeout()) == null) {
                     throw new BlazeNoRouteException("No response in " + getSoTimeout() + " ms from " + packet.getTo());
                 }
             }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/AsyncGroupRequest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/AsyncGroupRequest.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/AsyncGroupRequest.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/AsyncGroupRequest.java Mon Dec  1 07:32:04 2008
@@ -29,6 +29,11 @@
     
     private Set<Buffer> requests = new HashSet<Buffer>();
 
+    /**
+     * Add a request
+     * @param id
+     * @param request
+     */
     public void add(Buffer id, SendRequest request) {
         request.setCallback(this);
         this.requests.add(id);

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/BufferOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/BufferOutputStream.java?rev=722095&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/BufferOutputStream.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/BufferOutputStream.java Mon Dec  1 07:32:04 2008
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.util;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.OutputStream;
+import org.apache.activemq.protobuf.Buffer;
+
+
+/**
+ * Very similar to the java.io.ByteArrayOutputStream but this version 
+ * is not thread safe and the resulting data is returned in a Buffer
+ * to avoid an extra byte[] allocation.  
+ */
+final public class BufferOutputStream extends OutputStream {
+
+    byte buffer[];
+    int offset;
+    int limit;
+    int pos;
+
+    public BufferOutputStream(int size) {
+        this(new byte[size]);
+    }   
+    
+    public BufferOutputStream(byte[] buffer) {
+        this.buffer = buffer;
+        this.limit = buffer.length;
+    }   
+    
+    public BufferOutputStream(Buffer data) {
+        this.buffer = data.data;
+        this.pos = this.offset = data.offset;
+        this.limit = data.offset+data.length;
+    }
+    
+    
+    public void write(int b) throws IOException {
+        int newPos = pos + 1;
+        checkCapacity(newPos);
+        buffer[pos] = (byte) b;
+        pos = newPos;
+    }
+
+    public void write(byte b[], int off, int len) throws IOException {
+        int newPos = pos + len;
+        checkCapacity(newPos);
+        System.arraycopy(b, off, buffer, pos, len);
+        pos = newPos;
+    }
+    
+    public Buffer getNextBuffer(int len) throws IOException {
+        int newPos = pos + len;
+        checkCapacity(newPos);
+        return new Buffer(buffer, pos, len);
+    }
+    
+    /**
+     * Ensures the the buffer has at least the minimumCapacity specified. 
+     * @param i
+     * @throws EOFException 
+     */
+    private void checkCapacity(int minimumCapacity) throws IOException {
+        if (minimumCapacity > buffer.length) {
+            byte b[] = new byte[Math.max(buffer.length << 1, minimumCapacity)];
+            System.arraycopy(buffer, 0, b, 0, buffer.length);
+            buffer = b;
+        }
+    }
+
+    public void reset() {
+        pos = offset;
+    }
+
+    public Buffer toBuffer() {
+        return new Buffer(buffer, offset, pos);
+    }
+    
+    public byte[] toByteArray() {
+        return toBuffer().toByteArray();
+    }
+    
+    public int size() {
+        return offset-pos;
+    }
+    
+
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/BufferOutputStream.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/IOUtils.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/IOUtils.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/IOUtils.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/IOUtils.java Mon Dec  1 07:32:04 2008
@@ -16,10 +16,16 @@
  */
 package org.apache.activeblaze.util;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.protobuf.BufferInputStream;
+
 
 /**
  * Utilities for ByteBuffers
@@ -66,4 +72,42 @@
             }
         };
     }
+
+    /**
+     * Create a Buffer from an Object
+     * 
+     * @param object
+     * @return
+     * @throws Exception
+     */
+    public static Buffer getBuffer(Object object) throws Exception {
+        if (object != null) {
+            BufferOutputStream bufferOut = new BufferOutputStream(512);
+            DataOutputStream dataOut = new DataOutputStream(bufferOut);
+            ObjectOutputStream objOut = new ObjectOutputStream(dataOut);
+            objOut.writeObject(object);
+            objOut.flush();
+            objOut.reset();
+            objOut.close();
+            return bufferOut.toBuffer();
+        }
+        return null;
+    }
+
+    /**
+     * Create an Object from a Buffer
+     * 
+     * @param buffer
+     * @return
+     * @throws Exception
+     */
+    public static Object getObject(Buffer buffer) throws Exception {
+        if (buffer != null) {
+            InputStream is = new BufferInputStream(buffer);
+            DataInputStream dataIn = new DataInputStream(is);
+            ClassLoadingAwareObjectInputStream objIn = new ClassLoadingAwareObjectInputStream(dataIn);
+            return objIn.readObject();
+        }
+        return null;
+    }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/IdGenerator.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/IdGenerator.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/IdGenerator.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/IdGenerator.java Mon Dec  1 07:32:04 2008
@@ -65,6 +65,7 @@
 
     /**
      * Construct an IdGenerator
+     * @param prefix 
      */
     public IdGenerator(String prefix) {
         synchronized (UNIQUE_STUB) {
@@ -72,6 +73,9 @@
         }
     }
 
+    /**
+     * Constructor
+     */
     public IdGenerator() {
         this("ID:" + hostName);
     }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/LRUCache.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/LRUCache.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/LRUCache.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/LRUCache.java Mon Dec  1 07:32:04 2008
@@ -69,7 +69,7 @@
      * @return Returns the maxCacheSize.
      */
     public int getMaxCacheSize() {
-        return maxCacheSize;
+        return this.maxCacheSize;
     }
 
     /**
@@ -80,7 +80,7 @@
     }
 
     protected boolean removeEldestEntry(Map.Entry<K,V> eldest) {
-        return size() > maxCacheSize;
+        return size() > this.maxCacheSize;
     }
 }
 

Modified: activemq/activemq-blaze/trunk/src/main/proto/blaze.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/proto/blaze.proto?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/proto/blaze.proto (original)
+++ activemq/activemq-blaze/trunk/src/main/proto/blaze.proto Mon Dec  1 07:32:04 2008
@@ -31,6 +31,7 @@
   MEMBER_DATA = 1;
   ELECTION_MESSAGE = 2;
   ACK_DATA = 3;
+  STATE_DATA = 4;
 }
     message PacketData {   
       optional bool responseRequired = 1;
@@ -57,6 +58,8 @@
        optional int64 messageSequence = 5;
     }
     
+     
+    
     message DestinationData {
       required bool topic = 1;
       required bytes destination = 2;
@@ -70,11 +73,42 @@
        optional int64 timeStamp = 4;
        optional bytes inetAddress = 5;
        optional int32 port = 6;
-       optional int64 coordinatorWeight = 7;
+       optional int32 masterWeight = 7;
        optional bool  destinationsChanged = 8;
        repeated DestinationData  destination = 9; 
     }
     
+    message StateKeyData {
+      optional MemberData member =1;
+      optional string key = 2;
+      optional bool locked = 3;
+      optional bool removeOnExit = 4;
+      optional bool releaseLockOnExit = 5;
+      optional int64 expiration = 6;
+      optional int64 lockExpiration = 7;
+    }
+    enum StateType {
+      INSERT = 1;
+      DELETE = 2;
+      SYNC = 3;
+    }
+    message StateData {
+     //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType";
+       //| option java_type_method = "MessageType";
+       optional StateKeyData keyData = 1;
+       optional bytes value =2;
+       optional bytes oldvalue =3;
+       optional bool mapUpdate = 4;
+       optional bool mapWrite = 5;
+       optional bool expired = 6;
+       optional bool lockExpired = 7;
+       optional bool lockUpdate = 8;
+       optional bool lockWrite = 9;
+       optional bool error = 10;
+       optional StateType stateType = 11;
+       
+    }
+    
     enum ElectionType {
     ELECTION = 0;
     ANSWER = 1;

Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelTest.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelTest.java Mon Dec  1 07:32:04 2008
@@ -30,6 +30,17 @@
  */
 public class BlazeClusterGroupChannelTest extends TestCase {
     
+    public void testOneChannel() throws Exception {
+        BlazeClusterGroupChannelFactory factory = new BlazeClusterGroupChannelFactory();
+        BlazeClusterGroupChannel channel = factory.createChannel("test");
+        assertEquals(1, channel.getMembers().size());
+        channel.start();
+        boolean electionFinished = channel.waitForElection((int) (channel.getConfiguration().getAwaitGroupTimeout()+500));
+        assertTrue(electionFinished);
+        assertEquals(1, channel.getMembers().size());
+        channel.shutDown();
+    }
+
     public void testGroup() throws Exception {
         final int number = 3;
         List<BlazeClusterGroupChannel> channels = new ArrayList<BlazeClusterGroupChannel>();
@@ -103,33 +114,76 @@
         }
     }
 
+    public void testChangedWeightedGroup() throws Exception {
+        final int number = 4;
+        List<BlazeClusterGroupChannel> channels = new ArrayList<BlazeClusterGroupChannel>();
+        BlazeClusterGroupChannelFactory factory = new BlazeClusterGroupChannelFactory();
+        BlazeClusterGroupChannel weightedMaster = null;
+        for (int i = 0; i < number; i++) {
+            BlazeClusterGroupChannel channel = factory.createChannel("test" + i);
+            channel.getConfiguration().setMinimumGroupSize(number);
+            if (i == number / 2) {
+                channel.getConfiguration().setMasterWeight(10);
+                weightedMaster = channel;
+            } else {
+                channel.getConfiguration().setMasterWeight(0);
+            }
+            channel.start();
+            channels.add(channel);
+        }
+        channels.get(number - 1).waitForElection(5000);
+        int masterNumber = 0;
+        BlazeClusterGroupChannel master = null;
+        for (BlazeClusterGroupChannel channel : channels) {
+            if (channel.isMaster()) {
+                masterNumber++;
+                master = channel;
+            }
+        }
+        assertNotNull(master);
+        assertTrue(master == weightedMaster);
+        assertEquals(1, masterNumber);
+        channels.get(0).getConfiguration().setMasterWeight(2000);
+        Thread.sleep(2000);
+        masterNumber = 0;
+        master = null;
+        for (BlazeClusterGroupChannel channel : channels) {
+            if (channel.isMaster()) {
+                masterNumber++;
+                master = channel;
+            }
+        }
+        assertNotNull(master);
+        assertTrue(master != weightedMaster);
+        assertEquals(1, masterNumber);
+        for (BlazeClusterGroupChannel channel : channels) {
+            channel.shutDown();
+        }
+    }
+
     public void testClusterChangedListener() throws Exception {
         final AtomicBoolean result = new AtomicBoolean();
         BlazeClusterGroupChannelFactory factory = new BlazeClusterGroupChannelFactory();
         BlazeClusterGroupChannel master = factory.createChannel("master");
         master.getConfiguration().setMasterWeight(10);
         master.start();
-        
         BlazeClusterGroupChannel channel = factory.createChannel("test1");
-        channel.addClusterChangedListener(new ClusterChangedListener() {
-            public void ClusterChanged(Member master) {
-               synchronized(result) {
-                   result.set(true);
-                   result.notifyAll();
-               }
-                
+        channel.addMasterChangedListener(new MasterChangedListener() {
+            public void masterChanged(Member master) {
+                synchronized (result) {
+                    result.set(true);
+                    result.notifyAll();
+                }
             }
         });
         channel.start();
-        
-        synchronized(result) {
+        synchronized (result) {
             if (!result.get()) {
-               result.wait(3000); 
+                result.wait(3000);
             }
         }
         assertTrue(result.get());
         channel.shutDown();
         master.shutDown();
-        
     }
 }

Added: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/ClusterStateTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/ClusterStateTest.java?rev=722095&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/ClusterStateTest.java (added)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/ClusterStateTest.java Mon Dec  1 07:32:04 2008
@@ -0,0 +1,444 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.cluster;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import junit.framework.TestCase;
+import org.apache.activeblaze.group.Member;
+
+/**
+ * Tests for ClusterState
+ * 
+ */
+public class ClusterStateTest extends TestCase {
+    protected BlazeClusterGroupChannel channel1;
+    protected BlazeClusterGroupChannel channel2;
+
+    public void testAddAndRemove() throws Exception {
+        ClusterState state1 = this.channel1.getState();
+        final int number = 1000;
+        final CountDownLatch addLatch = new CountDownLatch(number);
+        final CountDownLatch removeLatch = new CountDownLatch(number);
+        state1.addClusterStateChangedListener(new DefaultClusterStateListener() {
+            public void mapInsert(Member owner, String Key, Object Value) {
+                addLatch.countDown();
+            }
+
+            public void mapRemove(Member owner, String key, Object value, boolean expired) {
+                removeLatch.countDown();
+            }
+        });
+        this.channel1.start();
+        this.channel2.getConfiguration().setMinimumGroupSize(2);
+        ClusterState state2 = this.channel2.getState();
+        this.channel2.start();
+        this.channel2.waitForElection(5000);
+        for (int i = 0; i < number; i++) {
+            state2.put("" + i, "test" + i);
+        }
+        addLatch.await(5, TimeUnit.SECONDS);
+        assertEquals(0, addLatch.getCount());
+        for (int i = 0; i < number; i++) {
+            state2.remove("" + i);
+        }
+        removeLatch.await(5, TimeUnit.SECONDS);
+        assertEquals(0, removeLatch.getCount());
+        assertTrue(state1.isEmpty());
+    }
+
+    public void testAddClusterStateChangedListener() throws Exception {
+        final AtomicBoolean called1 = new AtomicBoolean();
+        final AtomicBoolean called2 = new AtomicBoolean();
+        ClusterState state1 = this.channel1.getState();
+        state1.addClusterStateChangedListener(new DefaultClusterStateListener() {
+            public void mapInsert(Member owner, String Key, Object Value) {
+                synchronized (called1) {
+                    called1.set(true);
+                    called1.notifyAll();
+                }
+            }
+        });
+        this.channel1.start();
+        ClusterState state2 = this.channel2.getState();
+        state2.addClusterStateChangedListener(new DefaultClusterStateListener() {
+            public void mapInsert(Member owner, String Key, Object Value) {
+                synchronized (called2) {
+                    called2.set(true);
+                    called2.notifyAll();
+                }
+            }
+        });
+        this.channel2.getConfiguration().setMinimumGroupSize(2);
+        this.channel2.start();
+        this.channel2.waitForElection(5000);
+        state1.put("test", "blob");
+        synchronized (called1) {
+            if (!called1.get()) {
+                called1.wait(5000);
+            }
+        }
+        synchronized (called2) {
+            if (!called2.get()) {
+                called2.wait(5000);
+            }
+        }
+        assertTrue(called1.get());
+        assertTrue(called2.get());
+    }
+
+    public void testGetImplicitWriteLock() throws Exception {
+        ClusterState state1 = this.channel1.getState();
+        final AtomicBoolean called = new AtomicBoolean();
+        this.channel1.start();
+        ClusterState state2 = this.channel2.getState();
+        state2.setAlwaysLock(true);
+        this.channel2.getConfiguration().setMinimumGroupSize(2);
+        this.channel2.waitForElection(5000);
+        this.channel2.start();
+        state2.put("test", "foo");
+        try {
+            state1.put("test", "bah");
+            fail("Should have thrown an exception!");
+        } catch (ClusterUpdateException e) {
+        }
+    }
+
+    public void testExpireImplicitWriteLock() throws Exception {
+        ClusterState state1 = this.channel1.getState();
+        final AtomicBoolean called = new AtomicBoolean();
+        this.channel1.start();
+        ClusterState state2 = this.channel2.getState();
+        state2.setAlwaysLock(true);
+        state2.setLockTimeToLive(1000);
+        this.channel2.getConfiguration().setMinimumGroupSize(2);
+        this.channel2.start();
+        this.channel2.waitForElection(5000);
+        state2.put("test", "foo");
+        try {
+            state1.put("test", "bah");
+            fail("Should have thrown an exception!");
+        } catch (ClusterUpdateException e) {
+        }
+        Thread.sleep(2000);
+        state1.put("test", "bah");
+    }
+
+    public void testExpireImplicitLockOnExit() throws Exception {
+        ClusterState state1 = this.channel1.getState();
+        final AtomicBoolean called = new AtomicBoolean();
+        this.channel1.start();
+        ClusterState state2 = this.channel2.getState();
+        state2.setAlwaysLock(true);
+        state2.setLockTimeToLive(1000);
+        this.channel2.getConfiguration().setMinimumGroupSize(2);
+        this.channel2.start();
+        this.channel2.waitForElection(5000);
+        state2.put("test", "foo");
+        try {
+            state1.put("test", "bah");
+            fail("Should have thrown an exception!");
+        } catch (ClusterUpdateException e) {
+        }
+        channel2.shutDown();
+        Thread.sleep(1000);
+        state1.put("test", "bah");
+    }
+
+    public void testGetExplicitWriteLock() throws Exception {
+        ClusterState state1 = this.channel1.getState();
+        state1.setAlwaysLock(true);
+        final AtomicBoolean called = new AtomicBoolean();
+        this.channel1.start();
+        ClusterState state2 = this.channel2.getState();
+        state2.setAlwaysLock(true);
+        this.channel2.getConfiguration().setMinimumGroupSize(2);
+        this.channel2.start();
+        this.channel2.waitForElection(5000);
+        state2.put("test", "foo");
+        state2.lock("test");
+        try {
+            state1.put("test", "bah");
+            fail("Should have thrown an exception!");
+        } catch (ClusterUpdateException e) {
+        }
+        state2.unlock("test");
+        state1.lock("test");
+        try {
+            state2.lock("test");
+            fail("Should have thrown an exception!");
+        } catch (ClusterUpdateException e) {
+        }
+    }
+
+    public void testClear() throws Exception {
+        ClusterState state1 = this.channel1.getState();
+        final AtomicBoolean called = new AtomicBoolean();
+        state1.addClusterStateChangedListener(new DefaultClusterStateListener() {
+            public void mapInsert(Member owner, String Key, Object Value) {
+                synchronized (called) {
+                    called.set(true);
+                    called.notifyAll();
+                }
+            }
+
+            public void mapRemove(Member owner, String key, Object value, boolean expired) {
+                synchronized (called) {
+                    called.set(true);
+                    called.notifyAll();
+                }
+            }
+        });
+        this.channel1.start();
+        ClusterState state2 = this.channel2.getState();
+        this.channel2.getConfiguration().setMinimumGroupSize(2);
+        this.channel2.start();
+        this.channel2.waitForElection(5000);
+        state2.put("test", "foo");
+        synchronized (called) {
+            if (!called.get()) {
+                called.wait(5000);
+            }
+        }
+        assertTrue(called.get());
+        called.set(false);
+        assertTrue(state1.isEmpty() == false);
+        state2.clear();
+        synchronized (called) {
+            if (!called.get()) {
+                called.wait(5000);
+            }
+        }
+        assertTrue(state1.isEmpty());
+    }
+
+    public void testMapUpdatedOnStart() throws Exception {
+        ClusterState state1 = this.channel1.getState();
+        final AtomicBoolean called = new AtomicBoolean();
+        this.channel1.start();
+        state1.put("test", "foo");
+        ClusterState state2 = this.channel2.getState();
+        state2.addClusterStateChangedListener(new DefaultClusterStateListener() {
+            public void mapInsert(Member owner, String Key, Object Value) {
+                synchronized (called) {
+                    called.set(true);
+                    called.notifyAll();
+                }
+            }
+        });
+        this.channel2.getConfiguration().setMinimumGroupSize(2);
+        this.channel2.start();
+        this.channel2.waitForElection(5000);
+        synchronized (called) {
+            if (!called.get()) {
+                called.wait(5000);
+            }
+        }
+        assertTrue(called.get());
+        called.set(false);
+        assertTrue(state2.containsKey("test"));
+        assertTrue(state2.containsValue("foo"));
+    }
+
+    public void testContainsKey() throws Exception {
+        ClusterState state1 = this.channel1.getState();
+        final AtomicBoolean called = new AtomicBoolean();
+        state1.addClusterStateChangedListener(new DefaultClusterStateListener() {
+            public void mapInsert(Member owner, String Key, Object Value) {
+                synchronized (called) {
+                    called.set(true);
+                    called.notifyAll();
+                }
+            }
+        });
+        this.channel1.start();
+        ClusterState state2 = this.channel2.getState();
+        this.channel2.start();
+        state2.put("test", "foo");
+        synchronized (called) {
+            if (!called.get()) {
+                called.wait(5000);
+            }
+        }
+        assertTrue(called.get());
+        called.set(false);
+        assertTrue(state1.containsKey("test"));
+    }
+
+    public void testContainsValue() throws Exception {
+        ClusterState state1 = this.channel1.getState();
+        final AtomicBoolean called = new AtomicBoolean();
+        state1.addClusterStateChangedListener(new DefaultClusterStateListener() {
+            public void mapInsert(Member owner, String Key, Object Value) {
+                synchronized (called) {
+                    called.set(true);
+                    called.notifyAll();
+                }
+            }
+        });
+        this.channel1.start();
+        ClusterState state2 = this.channel2.getState();
+        this.channel2.getConfiguration().setMinimumGroupSize(2);
+        this.channel2.start();
+        this.channel2.waitForElection(5000);
+        state2.put("test", "foo");
+        synchronized (called) {
+            if (!called.get()) {
+                called.wait(5000);
+            }
+        }
+        assertTrue(called.get());
+        called.set(false);
+        assertTrue(state1.containsValue("foo"));
+    }
+
+    public void testGet() throws Exception {
+        ClusterState state1 = this.channel1.getState();
+        final AtomicBoolean called = new AtomicBoolean();
+        state1.addClusterStateChangedListener(new DefaultClusterStateListener() {
+            public void mapInsert(Member owner, String Key, Object Value) {
+                synchronized (called) {
+                    called.set(true);
+                    called.notifyAll();
+                }
+            }
+        });
+        this.channel1.start();
+        ClusterState state2 = this.channel2.getState();
+        this.channel2.getConfiguration().setMinimumGroupSize(2);
+        this.channel2.start();
+        this.channel2.waitForElection(5000);
+        state2.put("test", "foo");
+        synchronized (called) {
+            if (!called.get()) {
+                called.wait(5000);
+            }
+        }
+        assertTrue(called.get());
+        assertTrue(state1.get("test").equals("foo"));
+    }
+
+    public void testPut() throws Exception {
+        ClusterState state1 = this.channel1.getState();
+        this.channel1.start();
+        ClusterState state2 = this.channel2.getState();
+        this.channel2.getConfiguration().setMinimumGroupSize(2);
+        this.channel2.start();
+        this.channel2.waitForElection(5000);
+        Object value = state1.put("foo", "blob");
+        assertNull(value);
+        value = state1.put("foo", "blah");
+        assertEquals(value, "blob");
+    }
+
+    public void testRemove() throws Exception {
+        ClusterState state1 = this.channel1.getState();
+        final AtomicBoolean called = new AtomicBoolean();
+        state1.addClusterStateChangedListener(new DefaultClusterStateListener() {
+            public void mapInsert(Member owner, String Key, Object Value) {
+                synchronized (called) {
+                    called.set(true);
+                    called.notifyAll();
+                }
+            }
+
+            public void mapRemove(Member owner, String key, Object value, boolean expired) {
+                synchronized (called) {
+                    called.set(true);
+                    called.notifyAll();
+                }
+            }
+        });
+        this.channel1.start();
+        ClusterState state2 = this.channel2.getState();
+        this.channel2.getConfiguration().setMinimumGroupSize(2);
+        this.channel2.start();
+        this.channel2.waitForElection(5000);
+        state2.put("test", "foo");
+        synchronized (called) {
+            if (!called.get()) {
+                called.wait(5000);
+            }
+        }
+        assertTrue(called.get());
+        called.set(false);
+        assertTrue(state1.isEmpty() == false);
+        state2.remove("test");
+        synchronized (called) {
+            if (!called.get()) {
+                called.wait(5000);
+            }
+        }
+        assertTrue(state1.isEmpty());
+    }
+
+    public void testExpire() throws Exception {
+        final AtomicBoolean called1 = new AtomicBoolean();
+        final AtomicBoolean called2 = new AtomicBoolean();
+        ClusterState state1 = this.channel1.getState();
+        state1.setTimeToLive(1000);
+        state1.addClusterStateChangedListener(new DefaultClusterStateListener() {
+            public void mapRemove(Member owner, String key, Object value, boolean expired) {
+                synchronized (called1) {
+                    called1.set(expired);
+                    called1.notifyAll();
+                }
+            }
+        });
+        this.channel1.start();
+        ClusterState state2 = this.channel2.getState();
+        state2.addClusterStateChangedListener(new DefaultClusterStateListener() {
+            public void mapRemove(Member owner, String key, Object value, boolean expired) {
+                synchronized (called2) {
+                    called2.set(expired);
+                    called2.notifyAll();
+                }
+            }
+        });
+        this.channel2.getConfiguration().setMinimumGroupSize(2);
+        this.channel2.start();
+        this.channel2.waitForElection(5000);
+        state1.put("test", "blob");
+        synchronized (called1) {
+            if (!called1.get()) {
+                called1.wait(5000);
+            }
+        }
+        synchronized (called2) {
+            if (!called2.get()) {
+                called2.wait(5000);
+            }
+        }
+        assertTrue(called1.get());
+        assertTrue(called2.get());
+    }
+
+    protected void setUp() throws Exception {
+        BlazeClusterGroupChannelFactory factory = new BlazeClusterGroupChannelFactory();
+        this.channel1 = factory.createChannel("channel1");
+        this.channel2 = factory.createChannel("channel2");
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        this.channel1.shutDown();
+        this.channel2.shutDown();
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/ClusterStateTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java Mon Dec  1 07:32:04 2008
@@ -185,8 +185,7 @@
             });
         }
         Thread.sleep(2000);
-        BlazeMessage msg = new BlazeMessage();
-        msg.setString("test", "hello");
+        BlazeMessage msg = new BlazeMessage("hello");
         channels.get(0).send(destination, msg);
         synchronized (count) {
             if (count.get() == 0) {



Mime
View raw message