activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r684047 [1/2] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/group/ test/java/org/apache/activemq/group/
Date Fri, 08 Aug 2008 18:48:07 GMT
Author: rajdavies
Date: Fri Aug  8 11:48:06 2008
New Revision: 684047

URL: http://svn.apache.org/viewvc?rev=684047&view=rev
Log:
more refinements for Group

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/Group.java
      - copied, changed from r683259, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMessageListener.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMemberTest.java
      - copied, changed from r683259, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapMemberTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMessageTest.java   (with props)
Removed:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapMemberTest.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/DefaultMapChangedListener.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/Member.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MemberChangedListener.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/package.html
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/DefaultMapChangedListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/DefaultMapChangedListener.java?rev=684047&r1=684046&r2=684047&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/DefaultMapChangedListener.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/DefaultMapChangedListener.java Fri Aug  8 11:48:06 2008
@@ -22,12 +22,12 @@
  */
 public class DefaultMapChangedListener implements MapChangedListener{
 
-    public void mapInsert(Member owner, Object key, Object value) {        
+    public void mapInsert(Member owner, Object key, Object value) {   
     }
 
-    public void mapRemove(Member owner, Object key, Object value,boolean expired) {        
+    public void mapRemove(Member owner, Object key, Object value,boolean expired) {  
     }
 
-    public void mapUpdate(Member owner, Object Key, Object oldValue,Object newValue) {        
+    public void mapUpdate(Member owner, Object key, Object oldValue,Object newValue) {
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java?rev=684047&r1=684046&r2=684047&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java Fri Aug  8 11:48:06 2008
@@ -28,9 +28,11 @@
 class EntryKey<K> implements Externalizable {
     private Member owner;
     private K key;
-    private boolean share;
+    private boolean locked;
     private boolean removeOnExit;
+    private boolean releaseLockOnExit;
     private long expiration;
+    private long lockExpiration;
 
     /**
      * Default constructor - for serialization
@@ -53,6 +55,10 @@
     public Member getOwner() {
         return this.owner;
     }
+    
+    public void setOwner(Member member) {
+        this.owner=member;
+    }
 
     /**
      * @return the key
@@ -64,15 +70,15 @@
     /**
      * @return the share
      */
-    public boolean isShare() {
-        return this.share;
+    public boolean isLocked() {
+        return this.locked;
     }
 
     /**
      * @param share the share to set
      */
-    public void setShare(boolean share) {
-        this.share = share;
+    public void setLocked(boolean locked) {
+        this.locked = locked;
     }
 
     /**
@@ -104,6 +110,34 @@
         this.expiration = expiration;
     }
     
+    /**
+     * @return the lockExpiration
+     */
+    public long getLockExpiration() {
+        return lockExpiration;
+    }
+
+    /**
+     * @param lockExpiration the lockExpiration to set
+     */
+    public void setLockExpiration(long lockExpiration) {
+        this.lockExpiration = lockExpiration;
+    }
+
+    /**
+     * @return the releaseLockOnExit
+     */
+    public boolean isReleaseLockOnExit() {
+        return releaseLockOnExit;
+    }
+
+    /**
+     * @param releaseLockOnExit the releaseLockOnExit to set
+     */
+    public void setReleaseLockOnExit(boolean releaseLockOnExit) {
+        this.releaseLockOnExit = releaseLockOnExit;
+    }
+    
     void setTimeToLive(long ttl) {
         if (ttl > 0 ) {
             this.expiration=ttl+System.currentTimeMillis();
@@ -112,6 +146,14 @@
         }
     }
     
+    void setLockTimeToLive(long ttl) {
+        if(ttl > 0) {
+            this.lockExpiration=ttl+System.currentTimeMillis();
+        }else {
+            this.lockExpiration=0l;
+        }
+    }
+    
     boolean isExpired() {
         return isExpired(System.currentTimeMillis());
     }
@@ -120,6 +162,14 @@
         return this.expiration > 0 && this.expiration < currentTime;
     }
     
+    boolean isLockExpired() {
+        return isLockExpired(System.currentTimeMillis());
+    }
+    
+    boolean isLockExpired(long currentTime) {
+        return this.lockExpiration > 0 && this.lockExpiration < currentTime;
+    }
+    
    
 
     public boolean equals(Object obj) {
@@ -134,21 +184,27 @@
     public void writeExternal(ObjectOutput out) throws IOException {
         out.writeObject(this.owner);
         out.writeObject(this.key);
-        out.writeBoolean(isShare());
+        out.writeBoolean(isLocked());
         out.writeBoolean(isRemoveOnExit());
+        out.writeBoolean(isReleaseLockOnExit());
         out.writeLong(getExpiration());
+        out.writeLong(getLockExpiration());
     }
 
     public void readExternal(ObjectInput in) throws IOException,
             ClassNotFoundException {
         this.owner = (Member) in.readObject();
         this.key = (K) in.readObject();
-        this.share = in.readBoolean();
+        this.locked = in.readBoolean();
         this.removeOnExit=in.readBoolean();
+        this.releaseLockOnExit=in.readBoolean();
         this.expiration=in.readLong();
+        this.lockExpiration=in.readLong();
     }
     
     public String toString() {
         return "key:"+this.key;
     }
+
+    
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java?rev=684047&r1=684046&r2=684047&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java Fri Aug  8 11:48:06 2008
@@ -29,9 +29,12 @@
     static enum MessageType{INSERT,DELETE,SYNC};
     private EntryKey key;
     private Object value;
+    private Object oldValue;
     private MessageType type;
     private boolean mapUpdate;
     private boolean expired;
+    private boolean lockExpired;
+    private boolean lockUpdate;
     
     /**
      * @return the owner
@@ -59,6 +62,19 @@
     }
     
     /**
+     * @return the oldValue
+     */
+    public Object getOldValue() {
+        return this.oldValue;
+    }
+    /**
+     * @param oldValue the oldValue to set
+     */
+    public void setOldValue(Object oldValue) {
+        this.oldValue = oldValue;
+    }
+    
+    /**
      * @return the type
      */
     public MessageType getType() {
@@ -98,6 +114,32 @@
     }
     
     /**
+     * @return the lockExpired
+     */
+    public boolean isLockExpired() {
+        return lockExpired;
+    }
+    /**
+     * @param lockExpired the lockExpired to set
+     */
+    public void setLockExpired(boolean lockExpired) {
+        this.lockExpired = lockExpired;
+    }
+    
+    /**
+     * @return the lockUpdate
+     */
+    public boolean isLockUpdate() {
+        return lockUpdate;
+    }
+    /**
+     * @param lockUpdate the lockUpdate to set
+     */
+    public void setLockUpdate(boolean lockUpdate) {
+        this.lockUpdate = lockUpdate;
+    }
+    
+    /**
      * @return if insert message
      */
     public boolean isInsert() {
@@ -115,13 +157,17 @@
         return this.type != null && this.type.equals(MessageType.SYNC);
     }
     
+    
     public EntryMessage copy() {
         EntryMessage result = new EntryMessage();
         result.key=this.key;
         result.value=this.value;
+        result.oldValue=this.oldValue;
         result.type=this.type;
         result.mapUpdate=this.mapUpdate;
         result.expired=this.expired;
+        result.lockExpired=this.lockExpired;
+        result.lockUpdate=this.lockUpdate;
         return result;
     }
     
@@ -131,21 +177,28 @@
             ClassNotFoundException {
         this.key=(EntryKey) in.readObject();
         this.value=in.readObject();
+        this.oldValue=in.readObject();
         this.type=(MessageType) in.readObject();  
         this.mapUpdate=in.readBoolean();
         this.expired=in.readBoolean();
+        this.lockExpired=in.readBoolean();
+        this.lockUpdate=in.readBoolean();
     }
     
     public void writeExternal(ObjectOutput out) throws IOException {
         out.writeObject(this.key);
         out.writeObject(this.value);
+        out.writeObject(this.oldValue);
         out.writeObject(this.type);
         out.writeBoolean(this.mapUpdate);
         out.writeBoolean(this.expired);
+        out.writeBoolean(this.lockExpired);
+        out.writeBoolean(this.lockUpdate);
     }
     
     public String toString() {
         return "EntryMessage: "+this.type + "[" + this.key + "," + this.value +
             "]{update=" + this.mapUpdate + "}";
     }
+    
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java?rev=684047&r1=684046&r2=684047&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java Fri Aug  8 11:48:06 2008
@@ -22,20 +22,20 @@
  *
  */
 class EntryValue<V> {
-    private Member owner;
+    private EntryKey key;
     private V value;
     
     
-    EntryValue(Member owner, V value){
-        this.owner=owner;
+    EntryValue(EntryKey key, V value){
+        this.key=key;
         this.value=value;
     }
     
     /**
      * @return the owner
      */
-    public Member getOwner() {
-        return this.owner;
+    public EntryKey getKey() {
+        return this.key;
     }
 
     /**

Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/Group.java (from r683259, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/Group.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/Group.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java&r1=683259&r2=684047&rev=684047&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/Group.java Fri Aug  8 11:48:06 2008
@@ -19,6 +19,8 @@
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -29,7 +31,10 @@
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
@@ -40,6 +45,7 @@
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.ObjectMessage;
+import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.Topic;
 import org.apache.activemq.ActiveMQMessageConsumer;
@@ -54,12 +60,13 @@
 
 /**
  * <P>
- * A <CODE>GroupMap</CODE> is a Map implementation that is used to shared state 
- * amongst a distributed group of other <CODE>GroupMap</CODE> instances. 
+ * A <CODE>Group</CODE> is a distributed collaboration implementation that is 
+ * used to shared state and process messages amongst a distributed group of 
+ * other <CODE>Group</CODE> instances. 
  * Membership of a group is handled automatically using discovery.
  * <P>
  * The underlying transport is JMS and there are some optimizations that occur
- * for membership if used with ActiveMQ - but <CODE>GroupMap</CODE> can be used
+ * for membership if used with ActiveMQ - but <CODE>Group</CODE> can be used
  * with any JMS implementation.
  * 
  * <P>
@@ -71,15 +78,19 @@
  * overridden to implement a custom mechanism for choosing  how the coordinator
  * is elected for the map.
  * <P>
- * New <CODE>GroupMap</CODE> instances have their state updated by the coordinator,
+ * New <CODE>Group</CODE> instances have their state updated by the coordinator,
  * and coordinator failure is handled automatically within the group.
  * <P>
- * All updates are totally ordered through the coordinator, whilst read operations 
+ * All map updates are totally ordered through the coordinator, whilst read operations 
  * happen locally. 
  * <P>
- * A <CODE>GroupMap</CODE>supports the concept of owner only updates(write locks), 
+ * A <CODE>Group</CODE> supports the concept of owner only updates(write locks), 
  * shared updates, entry expiration times and removal on owner exit - 
- * all of which are optional.
+ * all of which are optional. In addition, you can grab and release locks for 
+ * values in the map, independently of who created them.
+ * <P>
+ * In addition, members of a group can broadcast messages and implement request/response
+ * with other <CODE>Group</CODE> instances.
  * 
  * <P>
  * 
@@ -87,31 +98,41 @@
  * @param <V> the value type
  * 
  */
-public class GroupMap<K, V> implements Map<K, V>, Service {
+public class Group<K, V> implements Map<K, V>, Service {
     /**
      * default interval within which to detect a member failure
      */
     public static final long DEFAULT_HEART_BEAT_INTERVAL = 2000;
-    private static final long EXPIRATION_SWEEP_INTERVAL = 1000;
-    private static final Log LOG = LogFactory.getLog(GroupMap.class);
-    private static final String STATE_TOPIC_PREFIX = GroupMap.class.getName()
-            + ".";
+    private static final long EXPIRATION_SWEEP_INTERVAL = 500;
+    private static final Log LOG = LogFactory.getLog(Group.class);
+    private static final String STATE_PREFIX = "STATE."+Group.class.getName()+ ".";
+    private static final String GROUP_MESSAGE_PREFIX = "MESSAGE."+Group.class.getName()+ ".";
+    private static final String STATE_TYPE = "state";
+    private static final String MESSAGE_TYPE = "message";
+    private static final String MEMBER_ID_PROPERTY="memberId";
+    protected Member local;
     private final Object mapMutex = new Object();
-    private Map<EntryKey<K>, EntryValue<V>> localMap;
+    private Map<K, EntryValue<V>> localMap;
     private Map<String, Member> members = new ConcurrentHashMap<String, Member>();
-    private Map<String, MapRequest> requests = new HashMap<String, MapRequest>();
+    private Map<String, MapRequest> stateRequests = new HashMap<String, MapRequest>();
+    private Map<String, MapRequest> messageRequests = new HashMap<String, MapRequest>();
     private List<MemberChangedListener> membershipListeners = new CopyOnWriteArrayList<MemberChangedListener>();
     private List<MapChangedListener> mapChangedListeners = new CopyOnWriteArrayList<MapChangedListener>();
-    Member local;
+    private List<GroupMessageListener> groupMessageListeners = new CopyOnWriteArrayList<GroupMessageListener>();
+    
     private Member coordinator;
     private String groupName;
-    private boolean sharedWrites;
+    private boolean alwaysLock;
     private Connection connection;
-    private Session session;
-    private Topic topic;
+    private Session stateSession;
+    private Session messageSession;
+    private Topic stateTopic;
     private Topic heartBeatTopic;
     private Topic inboxTopic;
-    private MessageProducer producer;
+    private Topic messageTopic;
+    private Queue messageQueue;
+    private MessageProducer stateProducer;
+    private MessageProducer messageProducer;
     private ConsumerEventSource consumerEvents;
     private AtomicBoolean started = new AtomicBoolean();
     private SchedulerTimerTask heartBeatTask;
@@ -121,17 +142,22 @@
     private long heartBeatInterval = DEFAULT_HEART_BEAT_INTERVAL;
     private IdGenerator idGenerator = new IdGenerator();
     private boolean removeOwnedObjectsOnExit;
+    private boolean releaseLockOnExit=true;
     private int timeToLive;
+    private int lockTimeToLive;
     private int minimumGroupSize = 1;
+    private int coordinatorWeight=0;
     private final AtomicBoolean electionFinished = new AtomicBoolean(true);
-    private ExecutorService executor;
+    private ExecutorService stateExecutor;
+    private ExecutorService messageExecutor;
+    private ThreadPoolExecutor electionExecutor;
     private final Object memberMutex = new Object();
 
     /**
      * @param connection
      * @param name
      */
-    public GroupMap(Connection connection, String name) {
+    public Group(Connection connection, String name) {
         this(connection, "default", name);
     }
 
@@ -140,7 +166,7 @@
      * @param groupName
      * @param name
      */
-    public GroupMap(Connection connection, String groupName, String name) {
+    public Group(Connection connection, String groupName, String name) {
         this.connection = connection;
         this.local = new Member(name);
         this.coordinator = this.local;
@@ -169,34 +195,72 @@
         if (this.started.compareAndSet(false, true)) {
             synchronized (this.mapMutex) {
                 if (this.localMap == null) {
-                    this.localMap = new HashMap<EntryKey<K>, EntryValue<V>>();
+                    this.localMap = new HashMap<K, EntryValue<V>>();
                 }
             }
             this.connection.start();
-            this.session = this.connection.createSession(false,
+            this.stateSession = this.connection.createSession(false,
+                    Session.AUTO_ACKNOWLEDGE);
+            this.messageSession = this.connection.createSession(false,
                     Session.AUTO_ACKNOWLEDGE);
-            this.producer = this.session.createProducer(null);
-            this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-            this.inboxTopic = this.session.createTemporaryTopic();
-            String topicName = STATE_TOPIC_PREFIX + this.groupName;
-            this.topic = this.session.createTopic(topicName);
-            this.heartBeatTopic = this.session.createTopic(topicName
+            this.stateProducer = this.stateSession.createProducer(null);
+            this.stateProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+            this.inboxTopic = this.stateSession.createTemporaryTopic();
+            
+            String stateTopicName = STATE_PREFIX + this.groupName;
+            this.stateTopic = this.stateSession.createTopic(stateTopicName);
+            this.heartBeatTopic = this.stateSession.createTopic(stateTopicName
                     + ".heartbeat");
-            MessageConsumer privateInbox = this.session
+            
+            String messageDestinationName = GROUP_MESSAGE_PREFIX+ this.groupName;
+            this.messageTopic = this.messageSession.createTopic(messageDestinationName);
+            this.messageQueue = this.messageSession.createQueue(messageDestinationName);
+            MessageConsumer privateInbox = this.messageSession
                     .createConsumer(this.inboxTopic);
+            
+         
+            
             privateInbox.setMessageListener(new MessageListener() {
                 public void onMessage(Message message) {
-                    processMessage(message);
+                    processJMSMessage(message);
                 }
             });
-            MessageConsumer mapChangeConsumer = this.session
-                    .createConsumer(this.topic);
+            MessageConsumer mapChangeConsumer = this.stateSession
+                    .createConsumer(this.stateTopic);
+            String memberId = null;
+            if (mapChangeConsumer instanceof ActiveMQMessageConsumer) {
+                memberId = ((ActiveMQMessageConsumer) mapChangeConsumer)
+                        .getConsumerId().toString();
+            } else {
+                memberId = this.idGenerator.generateId();
+            }
+            this.local.setId(memberId);
+            this.local.setInBoxDestination(this.inboxTopic);
+            this.local.setCoordinatorWeight(getCoordinatorWeight());
             mapChangeConsumer.setMessageListener(new MessageListener() {
                 public void onMessage(Message message) {
-                    processMessage(message);
+                    processJMSMessage(message);
                 }
             });
-            MessageConsumer heartBeatConsumer = this.session
+            
+            this.messageProducer = this.messageSession.createProducer(null);
+            this.messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+            MessageConsumer topicMessageConsumer = this.messageSession.createConsumer(this.messageTopic);
+            topicMessageConsumer.setMessageListener(new MessageListener() {
+                public void onMessage(Message message) {
+                    processJMSMessage(message);
+                }
+            });
+            
+            MessageConsumer queueMessageConsumer = this.messageSession.createConsumer(this.messageQueue);
+            queueMessageConsumer.setMessageListener(new MessageListener() {
+                public void onMessage(Message message) {
+                    processJMSMessage(message);
+                }
+            });
+            
+            
+            MessageConsumer heartBeatConsumer = this.stateSession
                     .createConsumer(this.heartBeatTopic);
             heartBeatConsumer.setMessageListener(new MessageListener() {
                 public void onMessage(Message message) {
@@ -204,32 +268,45 @@
                 }
             });
             this.consumerEvents = new ConsumerEventSource(this.connection,
-                    this.topic);
+                    this.stateTopic);
             this.consumerEvents.setConsumerListener(new ConsumerListener() {
                 public void onConsumerEvent(ConsumerEvent event) {
                     handleConsumerEvents(event);
                 }
             });
             this.consumerEvents.start();
-            String memberId = null;
-            if (mapChangeConsumer instanceof ActiveMQMessageConsumer) {
-                memberId = ((ActiveMQMessageConsumer) mapChangeConsumer)
-                        .getConsumerId().toString();
-            } else {
-                memberId = this.idGenerator.generateId();
-            }
-            this.local.setId(memberId);
-            this.local.setInBoxDestination(this.inboxTopic);
-            this.executor = Executors
+                        
+            this.electionExecutor = new ThreadPoolExecutor(1, 1,
+                    0L, TimeUnit.MILLISECONDS,
+                    new LinkedBlockingQueue<Runnable>(),
+                    new ThreadFactory() {
+                public Thread newThread(Runnable runnable) {
+                    Thread thread = new Thread(runnable, "Election{"
+                            + Group.this.local + "}");
+                    thread.setDaemon(true);
+                    return thread;
+                }
+            });
+            
+            this.stateExecutor = Executors
                     .newSingleThreadExecutor(new ThreadFactory() {
                         public Thread newThread(Runnable runnable) {
-                            Thread thread = new Thread(runnable, "Election{"
-                                    + GroupMap.this.local + "}");
+                            Thread thread = new Thread(runnable, "Group State{"
+                                    + Group.this.local + "}");
                             thread.setDaemon(true);
-                            thread.setPriority(Thread.NORM_PRIORITY);
                             return thread;
                         }
                     });
+            
+            this.messageExecutor = Executors
+            .newSingleThreadExecutor(new ThreadFactory() {
+                public Thread newThread(Runnable runnable) {
+                    Thread thread = new Thread(runnable, "Group Messages{"
+                            + Group.this.local + "}");
+                    thread.setDaemon(true);
+                    return thread;
+                }
+            });
             sendHeartBeat();
             this.heartBeatTask = new SchedulerTimerTask(new Runnable() {
                 public void run() {
@@ -257,7 +334,9 @@
             // await for members to join
             long timeout = this.heartBeatInterval * this.minimumGroupSize;
             long deadline = System.currentTimeMillis() + timeout;
-            while (this.members.size() < this.minimumGroupSize && timeout > 0) {
+            while ((this.members.size() < this.minimumGroupSize || !this.electionFinished
+                    .get())
+                    && timeout > 0) {
                 synchronized (this.memberMutex) {
                     this.memberMutex.wait(timeout);
                 }
@@ -278,12 +357,19 @@
             this.heartBeatTask.cancel();
             this.expirationTask.cancel();
             this.timer.purge();
-            if (this.executor != null) {
-                this.executor.shutdownNow();
+            if (this.electionExecutor  != null) {
+                this.electionExecutor.shutdownNow();
+            }
+            if (this.stateExecutor != null) {
+                this.stateExecutor.shutdownNow();
+            }
+            if(this.messageExecutor != null) {
+                this.messageExecutor.shutdownNow();
             }
             try {
                 this.consumerEvents.stop();
-                this.session.close();
+                this.stateSession.close();
+                this.messageSession.close();
             } catch (Exception e) {
                 LOG.debug("Caught exception stopping", e);
             }
@@ -312,18 +398,18 @@
     }
 
     /**
-     * @return the sharedWrites
+     * @return true if by default always lock objects (default is false)
      */
-    public boolean isSharedWrites() {
-        return this.sharedWrites;
+    public boolean isAlwaysLock() {
+        return this.alwaysLock;
     }
 
     /**
-     * @param sharedWrites
-     *            the sharedWrites to set
+     * @param alwaysLock - set true if objects inserted will always
+     * be locked (default is false)
      */
-    public void setSharedWrites(boolean sharedWrites) {
-        this.sharedWrites = sharedWrites;
+    public void setAlwaysLock(boolean alwaysLock) {
+        this.alwaysLock = alwaysLock;
     }
 
     /**
@@ -342,6 +428,7 @@
     }
 
     /**
+     * Add a listener for membership changes
      * @param l
      */
     public void addMemberChangedListener(MemberChangedListener l) {
@@ -349,6 +436,7 @@
     }
 
     /**
+     * Remove a listener for membership changes
      * @param l
      */
     public void removeMemberChangedListener(MemberChangedListener l) {
@@ -356,6 +444,7 @@
     }
 
     /**
+     * Add a listener for map changes
      * @param l
      */
     public void addMapChangedListener(MapChangedListener l) {
@@ -363,11 +452,30 @@
     }
 
     /**
+     * Remove a listener for map changes
      * @param l
      */
     public void removeMapChangedListener(MapChangedListener l) {
         this.mapChangedListeners.remove(l);
     }
+    
+    /**
+     * Add a listener for group messages
+     * @param l
+     */
+    public void addGroupMessageListener(GroupMessageListener l) {
+        this.groupMessageListeners.add(l);
+    }
+    
+    
+    /**
+     * remove a listener for group messages
+     * @param l
+     */
+    public void removeGroupMessageListener(GroupMessageListener l) {
+        this.groupMessageListeners.remove(l);
+    }
+
 
     /**
      * @return the timeToLive
@@ -400,6 +508,35 @@
     public void setRemoveOwnedObjectsOnExit(boolean removeOwnedObjectsOnExit) {
         this.removeOwnedObjectsOnExit = removeOwnedObjectsOnExit;
     }
+    
+    /**
+     * @return releaseLockOnExit - true by default
+     */
+    public boolean isReleaseLockOnExit() {
+        return releaseLockOnExit;
+    }
+
+    /**
+     * set release lock on exit - true by default
+     * @param releaseLockOnExit the releaseLockOnExit to set
+     */
+    public void setReleaseLockOnExit(boolean releaseLockOnExit) {
+        this.releaseLockOnExit = releaseLockOnExit;
+    }
+
+    /**
+     * @return the lockTimeToLive
+     */
+    public int getLockTimeToLive() {
+        return lockTimeToLive;
+    }
+
+    /**
+     * @param lockTimeToLive the lockTimeToLive to set
+     */
+    public void setLockTimeToLive(int lockTimeToLive) {
+        this.lockTimeToLive = lockTimeToLive;
+    }
 
     /**
      * @return the minimumGroupSize
@@ -415,6 +552,19 @@
     public void setMinimumGroupSize(int minimumGroupSize) {
         this.minimumGroupSize = minimumGroupSize;
     }
+    
+    /**
+     * @return the coordinatorWeight
+     */
+    public int getCoordinatorWeight() {
+        return this.coordinatorWeight;
+    }
+    /**
+     * @param coordinatorWeight the coordinatorWeight to set
+     */
+    public void setCoordinatorWeight(int coordinatorWeight) {
+        this.coordinatorWeight = coordinatorWeight;
+    }
 
     /**
      * clear entries from the Map
@@ -424,11 +574,11 @@
     public void clear() throws IllegalStateException {
         checkStatus();
         if (this.localMap != null && !this.localMap.isEmpty()) {
-            Set<EntryKey<K>> keys = null;
+            Set<K> keys = null;
             synchronized (this.mapMutex) {
-                keys = new HashSet<EntryKey<K>>(this.localMap.keySet());
+                keys = new HashSet<K>(this.localMap.keySet());
             }
-            for (EntryKey<K> key : keys) {
+            for (K key : keys) {
                 remove(key);
             }
         }
@@ -436,15 +586,14 @@
     }
 
     public boolean containsKey(Object key) {
-        EntryKey stateKey = new EntryKey(this.local, key);
         synchronized (this.mapMutex) {
-            return this.localMap != null ? this.localMap.containsKey(stateKey)
+            return this.localMap != null ? this.localMap.containsKey(key)
                     : false;
         }
     }
 
     public boolean containsValue(Object value) {
-        EntryValue entryValue = new EntryValue(this.local, value);
+        EntryValue entryValue = new EntryValue(null, value);
         synchronized (this.mapMutex) {
             return this.localMap != null ? this.localMap
                     .containsValue(entryValue) : false;
@@ -455,10 +604,8 @@
         Map<K, V> result = new HashMap<K, V>();
         synchronized (this.mapMutex) {
             if (this.localMap != null) {
-                for (java.util.Map.Entry<EntryKey<K>, EntryValue<V>> entry : this.localMap
-                        .entrySet()) {
-                    result.put(entry.getKey().getKey(), entry.getValue()
-                            .getValue());
+                for (EntryValue<V> entry : this.localMap.values()) {
+                    result.put((K) entry.getKey(), entry.getValue());
                 }
             }
         }
@@ -466,10 +613,9 @@
     }
 
     public V get(Object key) {
-        EntryKey<K> stateKey = new EntryKey<K>(this.local, (K) key);
         EntryValue<V> value = null;
         synchronized (this.mapMutex) {
-            value = this.localMap != null ? this.localMap.get(stateKey) : null;
+            value = this.localMap != null ? this.localMap.get(key) : null;
         }
         return value != null ? value.getValue() : null;
     }
@@ -481,13 +627,9 @@
     }
 
     public Set<K> keySet() {
-        Set<K> result = new HashSet<K>();
-        synchronized (this.mapMutex) {
-            if (this.localMap != null) {
-                for (EntryKey<K> key : this.localMap.keySet()) {
-                    result.add(key.getKey());
-                }
-            }
+        Set<K> result = null;
+        synchronized(this.mapMutex) {
+            result = new HashSet<K>(this.localMap.keySet());
         }
         return result;
     }
@@ -504,8 +646,8 @@
      */
     public V put(K key, V value) throws GroupMapUpdateException,
             IllegalStateException {
-        return put(key, value, isSharedWrites(), isRemoveOwnedObjectsOnExit(),
-                getTimeToLive());
+        return put(key, value, isAlwaysLock(), isRemoveOwnedObjectsOnExit(),isReleaseLockOnExit(),
+                getTimeToLive(),getLockTimeToLive());
     }
 
     /**
@@ -513,28 +655,70 @@
      * 
      * @param key
      * @param value
-     * @param sharedWrites
+     * @param lock
      * @param removeOnExit
+     * @param releaseLockOnExit 
      * @param timeToLive
+     * @param lockTimeToLive 
      * @return the old value or null
      * @throws GroupMapUpdateException
      * @throws IllegalStateException
      * 
      */
-    public V put(K key, V value, boolean sharedWrites, boolean removeOnExit,
-            long timeToLive) throws GroupMapUpdateException,
+    public V put(K key, V value, boolean lock, boolean removeOnExit,boolean releaseLockOnExit,
+            long timeToLive,long lockTimeToLive) throws GroupMapUpdateException,
             IllegalStateException {
         checkStatus();
         EntryKey<K> entryKey = new EntryKey<K>(this.local, key);
-        EntryValue<V> stateValue = new EntryValue<V>(this.local, value);
-        entryKey.setShare(sharedWrites);
+        entryKey.setLocked(lock);
         entryKey.setRemoveOnExit(removeOnExit);
+        entryKey.setReleaseLockOnExit(releaseLockOnExit);
         entryKey.setTimeToLive(timeToLive);
+        entryKey.setLockTimeToLive(lockTimeToLive);
         EntryMessage entryMsg = new EntryMessage();
         entryMsg.setKey(entryKey);
         entryMsg.setValue(value);
         entryMsg.setType(EntryMessage.MessageType.INSERT);
-        return (V) sendRequest(getCoordinator(), entryMsg);
+        return (V) sendStateRequest(getCoordinator(), entryMsg);
+    }
+    
+    /**
+     * Remove a lock on a key 
+     * @param key
+     * @throws GroupMapUpdateException
+     */
+    public void unlock(K key) throws GroupMapUpdateException{
+        EntryKey<K> entryKey = new EntryKey<K>(this.local, key);
+        entryKey.setLocked(false);
+        EntryMessage entryMsg = new EntryMessage();
+        entryMsg.setKey(entryKey);
+        entryMsg.setLockUpdate(true);
+        sendStateRequest(getCoordinator(), entryMsg);
+    }
+    
+    /**
+     * Lock a key in the distributed map
+     * @param key
+     * @throws GroupMapUpdateException
+     */
+    public void lock(K key) throws GroupMapUpdateException{
+        lock(key,getLockTimeToLive());
+    }
+    
+    /**
+     * Lock a key in the distributed map
+     * @param key
+     * @param lockTimeToLive 
+     * @throws GroupMapUpdateException
+     */
+    public void lock(K key,long lockTimeToLive) throws GroupMapUpdateException{
+        EntryKey<K> entryKey = new EntryKey<K>(this.local, key);
+        entryKey.setLocked(true);
+        entryKey.setLockTimeToLive(lockTimeToLive);
+        EntryMessage entryMsg = new EntryMessage();
+        entryMsg.setKey(entryKey);
+        entryMsg.setLockUpdate(true);
+        sendStateRequest(getCoordinator(), entryMsg);
     }
 
     /**
@@ -546,26 +730,28 @@
      */
     public void putAll(Map<? extends K, ? extends V> t)
             throws GroupMapUpdateException, IllegalStateException {
-        putAll(t, isSharedWrites(), isRemoveOwnedObjectsOnExit(),
-                getTimeToLive());
+        putAll(t, isAlwaysLock(), isRemoveOwnedObjectsOnExit(),isReleaseLockOnExit(),
+                getTimeToLive(),getLockTimeToLive());
     }
 
     /**
      * Add the Map to the distribution
      * 
      * @param t
-     * @param sharedWrites
-     * @param removeOnExit
-     * @param timeToLive
-     * @throws GroupMapUpdateException
-     * @throws IllegalStateException
+     * @param lock 
+     * @param removeOnExit 
+     * @param releaseLockOnExit 
+     * @param timeToLive 
+     * @param lockTimeToLive 
+     * @throws GroupMapUpdateException 
+     * @throws IllegalStateException 
      */
-    public void putAll(Map<? extends K, ? extends V> t, boolean sharedWrites,
-            boolean removeOnExit, long timeToLive)
+    public void putAll(Map<? extends K, ? extends V> t, boolean lock, boolean removeOnExit,boolean releaseLockOnExit,
+            long timeToLive,long lockTimeToLive)
             throws GroupMapUpdateException, IllegalStateException {
         for (java.util.Map.Entry<? extends K, ? extends V> entry : t.entrySet()) {
-            put(entry.getKey(), entry.getValue(), sharedWrites, removeOnExit,
-                    timeToLive);
+            put(entry.getKey(), entry.getValue(), lock, removeOnExit,
+                    releaseLockOnExit, timeToLive, lockTimeToLive);
         }
     }
 
@@ -581,16 +767,16 @@
     public V remove(Object key) throws GroupMapUpdateException,
             IllegalStateException {
         EntryKey<K> entryKey = new EntryKey<K>(this.local, (K) key);
-        return remove(entryKey);
+        return doRemove(entryKey);
     }
 
-    V remove(EntryKey<K> key) throws GroupMapUpdateException,
+    V doRemove(EntryKey<K> key) throws GroupMapUpdateException,
             IllegalStateException {
         checkStatus();
         EntryMessage entryMsg = new EntryMessage();
         entryMsg.setKey(key);
         entryMsg.setType(EntryMessage.MessageType.DELETE);
-        return (V) sendRequest(getCoordinator(), entryMsg);
+        return (V) sendStateRequest(getCoordinator(), entryMsg);
     }
 
     public int size() {
@@ -619,21 +805,52 @@
         result.addAll(this.members.values());
         return result;
     }
+    
+    /**
+     * Get a member by its unique id
+     * @param id
+     * @return
+     */
+    public Member getMemberById(String id) {
+        return this.members.get(id);
+    }
+    
+    /**
+     * Return a member of the Group with the matching name
+     * @param name
+     * @return
+     */
+    public Member getMemberByName(String name) {
+        if (name != null) {
+            for (Member member :this.members.values()) {
+                if(member.getName().equals(name)) {
+                    return member;
+                }
+            }
+        }
+        return null;
+    }
+    
+    /**
+     * @return the local member that represents this <CODE>Group</CODE> instance
+     */
+    public Member getLocalMember() {
+        return this.local;
+    }
 
     /**
      * @param key
      * @return true if this is the owner of the key
      */
     public boolean isOwner(K key) {
-        EntryKey<K> stateKey = new EntryKey<K>(this.local, key);
         EntryValue<V> entryValue = null;
         synchronized (this.mapMutex) {
-            entryValue = this.localMap != null ? this.localMap.get(stateKey)
+            entryValue = this.localMap != null ? this.localMap.get(key)
                     : null;
         }
         boolean result = false;
         if (entryValue != null) {
-            result = entryValue.getOwner().getId().equals(this.local.getId());
+            result = entryValue.getKey().getOwner().getId().equals(this.local.getId());
         }
         return result;
     }
@@ -644,14 +861,13 @@
      * @param key
      * @return the owner - or null if the key doesn't exist
      */
-    public Member getOwner(K key) {
-        EntryKey<K> stateKey = new EntryKey<K>(this.local, key);
+    EntryKey getKey(Object key) {
         EntryValue<V> entryValue = null;
         synchronized (this.mapMutex) {
-            entryValue = this.localMap != null ? this.localMap.get(stateKey)
+            entryValue = this.localMap != null ? this.localMap.get(key)
                     : null;
         }
-        return entryValue != null ? entryValue.getOwner() : null;
+        return entryValue != null ? entryValue.getKey() : null;
     }
 
     /**
@@ -667,37 +883,157 @@
     public Member getCoordinator() {
         return this.coordinator;
     }
+    
+    /**
+     * Broadcast a message to the group
+     * @param message
+     * @throws JMSException 
+     */
+    public void broadcastMessage(Object message) throws JMSException {
+        checkStatus();
+        ObjectMessage objMsg = this.messageSession.createObjectMessage((Serializable) message);
+        objMsg.setJMSCorrelationID(this.idGenerator.generateId());
+        objMsg.setJMSType(MESSAGE_TYPE);
+        objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId());
+        this.messageProducer.send(this.messageTopic, objMsg);
+    }
+    
+    /**
+     * As the group for a response - one will be selected from the group
+     * @param member
+     * @param message
+     * @param timeout in milliseconds - a value if 0 means wait until complete
+     * @return
+     * @throws JMSException 
+     */
+    public Serializable broadcastMessageRequest(Object message, long timeout)
+            throws JMSException {
+        checkStatus();
+        Object result = null;
+        MapRequest request = new MapRequest();
+        String id = this.idGenerator.generateId();
+        synchronized (this.messageRequests) {
+            this.messageRequests.put(id, request);
+        }
+        ObjectMessage objMsg = this.stateSession
+                .createObjectMessage((Serializable) message);
+        objMsg.setJMSReplyTo(this.inboxTopic);
+        objMsg.setJMSCorrelationID(id);
+        objMsg.setJMSType(MESSAGE_TYPE);
+        objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId());
+        this.messageProducer.send(this.messageQueue, objMsg);
+        result = request.get(timeout);
+        return (Serializable) result;
+    }
+    
+    /**
+     * Send a message to the group - but only the least loaded member will process it
+     * @param message
+     * @throws JMSException
+     */
+    public void sendMessage(Object message) throws JMSException {
+        checkStatus();
+        ObjectMessage objMsg = this.messageSession.createObjectMessage((Serializable) message);
+        objMsg.setJMSCorrelationID(this.idGenerator.generateId());
+        objMsg.setJMSType(MESSAGE_TYPE);
+        objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId());
+        this.messageProducer.send(this.messageQueue, objMsg);
+    }
+    
+    /**
+     * Send a message to an individual member
+     * @param member
+     * @param message
+     * @throws JMSException 
+     */
+    public void sendMessage(Member member, Object message) throws JMSException {
+        checkStatus();
+        ObjectMessage objMsg = this.messageSession.createObjectMessage((Serializable) message);
+        objMsg.setJMSCorrelationID(this.idGenerator.generateId());
+        objMsg.setJMSType(MESSAGE_TYPE);
+        objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId());
+        this.messageProducer.send(member.getInBoxDestination(), objMsg);
+    }
+    
+    /**
+     * Send a request to a member
+     * @param member
+     * @param message
+     * @param timeout in milliseconds - a value if 0 means wait until complete
+     * @return the request or null
+     * @throws JMSException 
+     */
+    public Object sendMessageRequest(Member member, Object message,long timeout) throws JMSException {
+        checkStatus();
+        Object result = null;
+        MapRequest request = new MapRequest();
+        String id = this.idGenerator.generateId();
+        synchronized (this.messageRequests) {
+            this.messageRequests.put(id, request);
+        }
+        ObjectMessage objMsg = this.stateSession
+                .createObjectMessage((Serializable) message);
+        objMsg.setJMSReplyTo(this.inboxTopic);
+        objMsg.setJMSCorrelationID(id);
+        objMsg.setJMSType(MESSAGE_TYPE);
+        objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId());
+        this.messageProducer.send(member.getInBoxDestination(), objMsg);
+        result = request.get(timeout);
+        return result;
+    }
+    
+    /**
+     * send a response to a message
+     * @param member 
+     * @param replyId 
+     * @param message 
+     * @throws JMSException 
+     */
+    public void sendMessageResponse(Member member,String replyId, Object message) throws JMSException {
+        checkStatus();
+        ObjectMessage objMsg = this.messageSession.createObjectMessage((Serializable) message);
+        objMsg.setJMSCorrelationID(replyId);
+        objMsg.setJMSType(MESSAGE_TYPE);
+        objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId());
+        this.messageProducer.send(member.getInBoxDestination(), objMsg);
+    }
 
     /**
-     * Select a coordinator - by default, its the member with the lowest
-     * lexicographical id
+     * Select a coordinator - coordinator weighting is used - or if everything
+     * is equal - a comparison of member ids.
      * 
      * @param members
      * @return
      */
-    protected Member selectCordinator(Collection<Member> members) {
-        Member result = this.local;
-        for (Member member : members) {
-            if (result.getId().compareTo(member.getId()) < 0) {
-                result = member;
+    protected Member selectCordinator(List<Member> list) {
+        Collections.sort(list, new Comparator<Member>() {
+            public int compare(Member m1, Member m2) {
+                int result = m1.getCoordinatorWeight()
+                        - m2.getCoordinatorWeight();
+                if (result == 0) {
+                    result = m1.getId().compareTo(m2.getId());
+                }
+                return result;
             }
-        }
+        });
+        Member result = list.isEmpty() ? this.local : list.get(list.size()-1);
         return result;
     }
 
-    Object sendRequest(Member member, Serializable payload) {
+    Object sendStateRequest(Member member, Serializable payload) {
         Object result = null;
         MapRequest request = new MapRequest();
         String id = this.idGenerator.generateId();
-        synchronized (this.requests) {
-            this.requests.put(id, request);
+        synchronized (this.stateRequests) {
+            this.stateRequests.put(id, request);
         }
         try {
-            ObjectMessage objMsg = this.session.createObjectMessage(payload);
+            ObjectMessage objMsg = this.stateSession.createObjectMessage(payload);
             objMsg.setJMSReplyTo(this.inboxTopic);
             objMsg.setJMSCorrelationID(id);
-            this.producer.send(member.getInBoxDestination(), objMsg);
-            result = request.get(getHeartBeatInterval() * 200000);
+            objMsg.setJMSType(STATE_TYPE);
+            this.stateProducer.send(member.getInBoxDestination(), objMsg);
+            result = request.get(getHeartBeatInterval());
         } catch (JMSException e) {
             if (this.started.get()) {
                 LOG.error("Failed to send request " + payload, e);
@@ -706,22 +1042,27 @@
         if (result instanceof GroupMapUpdateException) {
             throw (GroupMapUpdateException) result;
         }
+        if (result instanceof EntryMessage) {
+            EntryMessage entryMsg = (EntryMessage) result;
+            result = entryMsg.getOldValue();
+        }
         return result;
     }
 
-    void sendAsyncRequest(AsyncMapRequest asyncRequest, Member member,
+    void sendAsyncStateRequest(AsyncMapRequest asyncRequest, Member member,
             Serializable payload) {
         MapRequest request = new MapRequest();
         String id = this.idGenerator.generateId();
         asyncRequest.add(id, request);
-        synchronized (this.requests) {
-            this.requests.put(id, request);
+        synchronized (this.stateRequests) {
+            this.stateRequests.put(id, request);
         }
         try {
-            ObjectMessage objMsg = this.session.createObjectMessage(payload);
+            ObjectMessage objMsg = this.stateSession.createObjectMessage(payload);
             objMsg.setJMSReplyTo(this.inboxTopic);
             objMsg.setJMSCorrelationID(id);
-            this.producer.send(member.getInBoxDestination(), objMsg);
+            objMsg.setJMSType(STATE_TYPE);
+            this.stateProducer.send(member.getInBoxDestination(), objMsg);
         } catch (JMSException e) {
             if (this.started.get()) {
                 LOG.error("Failed to send async request " + payload, e);
@@ -730,63 +1071,145 @@
     }
 
     void sendReply(Object reply, Destination replyTo, String id) {
-        try {
-            ObjectMessage replyMsg = this.session
-                    .createObjectMessage((Serializable) reply);
-            replyMsg.setJMSCorrelationID(id);
-            this.producer.send(replyTo, replyMsg);
-        } catch (JMSException e) {
-            LOG.error("Couldn't send reply from co-ordinator", e);
+        if (this.started.get()) {
+            if (replyTo != null) {
+                if (replyTo.equals(this.local.getInBoxDestination())) {
+                    processRequest(id, reply);
+                } else {
+                    try {
+                        ObjectMessage replyMsg = this.stateSession
+                                .createObjectMessage((Serializable) reply);
+                        replyMsg.setJMSCorrelationID(id);
+                        replyMsg.setJMSType(STATE_TYPE);
+                        this.stateProducer.send(replyTo, replyMsg);
+                    } catch (JMSException e) {
+                        LOG.error("Couldn't send reply from co-ordinator", e);
+                    }
+                }
+            } else {
+                LOG.error("NULL replyTo destination");
+            }
         }
     }
 
     void broadcastMapUpdate(EntryMessage entry, String correlationId) {
-        try {
-            EntryMessage copy = entry.copy();
-            copy.setMapUpdate(true);
-            ObjectMessage objMsg = this.session.createObjectMessage(copy);
-            objMsg.setJMSCorrelationID(correlationId);
-            this.producer.send(this.topic, objMsg);
-        } catch (JMSException e) {
-            if (this.started.get()) {
-                LOG.error("Failed to send EntryMessage " + entry, e);
+        if(this.started.get()) {
+            try {
+                EntryMessage copy = entry.copy();
+                copy.setMapUpdate(true);
+                ObjectMessage objMsg = this.stateSession.createObjectMessage(copy);
+                objMsg.setJMSCorrelationID(correlationId);
+                objMsg.setJMSType(STATE_TYPE);
+                this.stateProducer.send(this.stateTopic, objMsg);
+            } catch (JMSException e) {
+                if (this.started.get()) {
+                    LOG.error("Failed to send EntryMessage " + entry, e);
+                }
             }
         }
     }
+    
+    
 
-    void processMessage(Message message) {
+    void processJMSMessage(Message message) {
         if (message instanceof ObjectMessage) {
             ObjectMessage objMsg = (ObjectMessage) message;
+            
             try {
+                String messageType = objMsg.getJMSType();
                 String id = objMsg.getJMSCorrelationID();
+                String memberId = objMsg.getStringProperty(MEMBER_ID_PROPERTY);
                 Destination replyTo = objMsg.getJMSReplyTo();
                 Object payload = objMsg.getObject();
-                if (payload instanceof Member) {
-                    handleHeartbeats((Member) payload);
-                } else if (payload instanceof EntryMessage) {
-                    EntryMessage entryMsg = (EntryMessage) payload;
-                    entryMsg = entryMsg.copy();
-                    if (entryMsg.isMapUpdate()) {
-                        processMapUpdate(entryMsg);
-                    } else {
-                        processEntryMessage(entryMsg, replyTo, id);
-                    }
-                } else if (payload instanceof ElectionMessage) {
-                    ElectionMessage electionMsg = (ElectionMessage) payload;
-                    electionMsg = electionMsg.copy();
-                    processElectionMessage(electionMsg, replyTo, id);
-                }
-                if (id != null) {
-                    MapRequest result = null;
-                    synchronized (this.requests) {
-                        result = this.requests.remove(id);
-                    }
-                    if (result != null) {
-                        result.put(id, objMsg.getObject());
+                if (messageType != null) {
+                    if (messageType.equals(STATE_TYPE)) {
+                        if (payload instanceof Member) {
+                            handleHeartbeats((Member) payload);
+                        } else if (payload instanceof EntryMessage) {
+                            EntryMessage entryMsg = (EntryMessage) payload;
+                            entryMsg = entryMsg.copy();
+                            if(entryMsg.isLockUpdate()) {
+                                processLockUpdate(entryMsg, replyTo, id);
+                            }
+                            else if (entryMsg.isMapUpdate()) {
+                                processMapUpdate(entryMsg);
+                            } else {
+                                processEntryMessage(entryMsg, replyTo, id);
+                            }
+                        } else if (payload instanceof ElectionMessage) {
+                            ElectionMessage electionMsg = (ElectionMessage) payload;
+                            electionMsg = electionMsg.copy();
+                            processElectionMessage(electionMsg, replyTo, id);
+                        }
+                        
+                    }else if (messageType.equals(MESSAGE_TYPE)) {
+                        processGroupMessage(memberId,id, replyTo, payload);
+                    }else {
+                        LOG.error("Unknown message type: " + messageType);
                     }
+                    processRequest(id, payload);
+                }else {
+                    LOG.error("Can't process a message type of null");
                 }
             } catch (JMSException e) {
-                LOG.warn("Failed to process reply: " + message, e);
+                LOG.warn("Failed to process message: " + message, e);
+            }
+        }
+    }
+    
+    void processRequest(String id, Object value) {
+        if (id != null) {
+            MapRequest result = null;
+            synchronized (this.stateRequests) {
+                result = this.stateRequests.remove(id);
+            }
+            if (result != null) {
+                result.put(id, value);
+            }
+        }
+    }
+    
+    void processLockUpdate(EntryMessage entryMsg, Destination replyTo,
+            String correlationId) {
+        synchronized (this.mapMutex) {
+            boolean newLock = entryMsg.getKey().isLocked();
+            Member newOwner = entryMsg.getKey().getOwner();
+            long newLockExpiration = newLock ? entryMsg.getKey().getLockExpiration():0l;
+            System.err.println(getName()+" PROC LOC = " + newOwner.getName()+ " LOCK = "+newLock);
+            
+            if (isCoordinator() && !entryMsg.isMapUpdate()) {
+               
+                EntryKey originalKey = getKey(entryMsg.getKey().getKey());
+                if (originalKey != null) {
+                    if (originalKey.isLocked()) {
+                        if (!originalKey.getOwner().equals(
+                                entryMsg.getKey().getOwner())) {
+                            Serializable reply = new GroupMapUpdateException(
+                                    "Owned by " + originalKey.getOwner());
+                            sendReply(reply, replyTo, correlationId);
+                            System.err.println(getName()+" PROC LOC SEND EXCEPTION TO " + newOwner.getName());
+                        } else {
+                            originalKey.setLocked(newLock);
+                            originalKey.setOwner(newOwner);
+                            originalKey.setLockExpiration(newLockExpiration);
+                            broadcastMapUpdate(entryMsg, correlationId);
+                            
+                        }
+                    } else {
+                        originalKey.setLocked(newLock);
+                        originalKey.setOwner(newOwner);
+                        originalKey.setLockExpiration(newLockExpiration);
+                        broadcastMapUpdate(entryMsg, correlationId);
+                    }
+                
+                }
+            } else {
+                EntryKey originalKey = getKey(entryMsg.getKey().getKey());
+                if (originalKey != null) {
+                    originalKey.setLocked(newLock);
+                    originalKey.setOwner(newOwner);
+                    originalKey.setLockExpiration(newLockExpiration);
+                }
             }
         }
     }
@@ -795,38 +1218,39 @@
             String correlationId) {
         if (isCoordinator()) {
             EntryKey<K> key = entryMsg.getKey();
-            EntryValue<V> value = new EntryValue<V>(key.getOwner(),
+            EntryValue<V> value = new EntryValue<V>(key,
                     (V) entryMsg.getValue());
             boolean insert = entryMsg.isInsert();
             boolean containsKey = false;
             synchronized (this.mapMutex) {
-                containsKey = this.localMap.containsKey(key);
+                containsKey = this.localMap.containsKey(key.getKey());
             }
             if (containsKey) {
-                Member owner = getOwner((K) key.getKey());
-                if (owner.equals(key.getOwner()) || key.isShare()) {
+                EntryKey originalKey = getKey(key.getKey());
+                if (originalKey.equals(key.getOwner()) || !originalKey.isLocked()) {
                     EntryValue<V> old = null;
                     if (insert) {
                         synchronized (this.mapMutex) {
-                            old = this.localMap.put(key, value);
+                            old = this.localMap.put(key.getKey(), value);
                         }
                     } else {
                         synchronized (this.mapMutex) {
-                            old = this.localMap.remove(key);
+                            old = this.localMap.remove(key.getKey());
                         }
                     }
+                    entryMsg.setOldValue(old.getValue());
                     broadcastMapUpdate(entryMsg, correlationId);
-                    fireMapChanged(owner, key.getKey(), old.getValue(), value
+                    fireMapChanged(key.getOwner(), key.getKey(), old.getValue(), value
                             .getValue(), false);
                 } else {
                     Serializable reply = new GroupMapUpdateException(
-                            "Owned by " + owner);
+                            "Owned by " + originalKey.getOwner());
                     sendReply(reply, replyTo, correlationId);
                 }
             } else {
                 if (insert) {
                     synchronized (this.mapMutex) {
-                        this.localMap.put(key, value);
+                        this.localMap.put(key.getKey(), value);
                     }
                     broadcastMapUpdate(entryMsg, correlationId);
                     fireMapChanged(key.getOwner(), key.getKey(), null, value
@@ -841,33 +1265,40 @@
     void processMapUpdate(EntryMessage entryMsg) {
         boolean containsKey = false;
         EntryKey<K> key = entryMsg.getKey();
-        EntryValue<V> value = new EntryValue<V>(key.getOwner(), (V) entryMsg
+        EntryValue<V> value = new EntryValue<V>(key, (V) entryMsg
                 .getValue());
         boolean insert = entryMsg.isInsert()||entryMsg.isSync();
         synchronized (this.mapMutex) {
-            containsKey = this.localMap.containsKey(key);
+            containsKey = this.localMap.containsKey(key.getKey());
         }
-       
-        if (!isCoordinator()||entryMsg.isSync()) {
+        if (!isCoordinator() || entryMsg.isSync()) {
             if (containsKey) {
-                Member owner = getOwner((K) key.getKey());
-                EntryValue<V> old = null;
-                if (insert) {
-                    synchronized (this.mapMutex) {
-                        old = this.localMap.put(key, value);
+                if (key.isLockExpired()) {
+                    EntryValue old = this.localMap.get(key.getKey());
+                    if (old != null) {
+                        old.getKey().setLocked(false);
                     }
                 } else {
-                    synchronized (this.mapMutex) {
-                        old = this.localMap.remove(key);
-                        value.setValue(null);
+                    EntryValue<V> old = null;
+                    if (insert) {
+                        synchronized (this.mapMutex) {
+                            old = this.localMap.put(key.getKey(), value);
+                        }
+                    } else {
+                        synchronized (this.mapMutex) {
+                            old = this.localMap.remove(key.getKey());
+                            value.setValue(null);
+                        }
                     }
+                    
+                    fireMapChanged(key.getOwner(), key.getKey(),
+                            old.getValue(), value.getValue(), entryMsg
+                                    .isExpired());
                 }
-                fireMapChanged(owner, key.getKey(), old.getValue(), value
-                        .getValue(), entryMsg.isExpired());
             } else {
                 if (insert) {
                     synchronized (this.mapMutex) {
-                        this.localMap.put(key, value);
+                        this.localMap.put(key.getKey(), value);
                     }
                     fireMapChanged(key.getOwner(), key.getKey(), null, value
                             .getValue(), false);
@@ -875,6 +1306,22 @@
             }
         }
     }
+    
+    void processGroupMessage(String memberId,String replyId,Destination replyTo, Object payload) {
+        Member member = this.members.get(memberId);
+        if (member != null) {
+            fireMemberMessage(member, replyId, payload);
+        }
+        if (replyId != null) {
+            MapRequest result = null;
+            synchronized (this.messageRequests) {
+                result = this.messageRequests.remove(replyId);
+            }
+            if (result != null) {
+                result.put(replyId, payload);
+            }
+        }
+    }
 
     void handleHeartbeats(Message message) {
         try {
@@ -893,11 +1340,12 @@
     void handleHeartbeats(Member member) {
         member.setTimeStamp(System.currentTimeMillis());
         if (this.members.put(member.getId(), member) == null) {
-            election(member, true);
+            
             fireMemberStarted(member);
             if (!member.equals(this.local)) {
                 sendHeartBeat(member.getInBoxDestination());
             }
+            election(member, true);
             synchronized (this.memberMutex) {
                 this.memberMutex.notifyAll();
             }
@@ -936,27 +1384,44 @@
     
     void expirationSweep() {
         if (isCoordinator() && this.started.get() && this.electionFinished.get()) {
-            List<EntryKey> list = null;
+            List<EntryKey> expiredMessages = null;
+            List<EntryKey> expiredLocks = null;
             synchronized (this.mapMutex) {
-                Map<EntryKey<K>, EntryValue<V>> map = this.localMap;
+                Map<K, EntryValue<V>> map = this.localMap;
                 if (map != null) {
                     long currentTime = System.currentTimeMillis();
-                    for (EntryKey k : map.keySet()) {
+                    for (EntryValue value : map.values()) {
+                        EntryKey k = value.getKey();
                         if (k.isExpired(currentTime)) {
-                            if (list == null) {
-                                list = new ArrayList<EntryKey>();
-                                list.add(k);
+                            if (expiredMessages == null) {
+                                expiredMessages = new ArrayList<EntryKey>();
+                                expiredMessages.add(k);
+                            }
+                        }else if (k.isLockExpired(currentTime)) {
+                            k.setLocked(false);
+                            if (expiredLocks == null) {
+                                expiredLocks = new ArrayList<EntryKey>();
+                                expiredLocks.add(k);
                             }
+                            expiredLocks.add(k);
                         }
                     }
                 }
             }
             //do the actual removal of entries in a separate thread
-            if (list != null) {
-                final List<EntryKey> expire = list;
-                this.executor.execute(new Runnable() {
+            if (expiredMessages != null) {
+                final List<EntryKey> expire = expiredMessages;
+                this.stateExecutor.execute(new Runnable() {
                     public void run() {
-                        doExpiration(expire);
+                        doMessageExpiration(expire);
+                    }
+                });
+            }
+            if (expiredLocks != null) {
+                final List<EntryKey> expire = expiredLocks;
+                this.stateExecutor.execute(new Runnable() {
+                    public void run() {
+                        doLockExpiration(expire);
                     }
                 });
             }
@@ -964,13 +1429,13 @@
         
     }
     
-    void doExpiration(List<EntryKey> list) {
+    void doMessageExpiration(List<EntryKey> list) {
         if (this.started.get() && this.electionFinished.get()
                 && isCoordinator()) {
             for (EntryKey k : list) {
                 EntryValue<V> old = null;
                 synchronized (this.mapMutex) {
-                    old = this.localMap.remove(k);
+                    old = this.localMap.remove(k.getKey());
                 }
                 if (old != null) {
                     EntryMessage entryMsg = new EntryMessage();
@@ -985,6 +1450,21 @@
             }
         }
     }
+    
+    void doLockExpiration(List<EntryKey> list) {
+        if (this.started.get() && this.electionFinished.get()
+                && isCoordinator()) {
+            for (EntryKey k : list) {
+                
+                    EntryMessage entryMsg = new EntryMessage();
+                    entryMsg.setType(EntryMessage.MessageType.DELETE);
+                    entryMsg.setLockExpired(true);
+                    entryMsg.setKey(k);
+                    broadcastMapUpdate(entryMsg, "");
+                }
+           
+        }
+    }
 
     void sendHeartBeat() {
         sendHeartBeat(this.heartBeatTopic);
@@ -993,9 +1473,10 @@
     void sendHeartBeat(Destination destination) {
         if (this.started.get()) {
             try {
-                ObjectMessage msg = this.session
+                ObjectMessage msg = this.stateSession
                         .createObjectMessage(this.local);
-                this.producer.send(destination, msg);
+                msg.setJMSType(STATE_TYPE);
+                this.stateProducer.send(destination, msg);
             } catch (javax.jms.IllegalStateException e) {
                 // ignore - as we are probably stopping
             } catch (Throwable e) {
@@ -1007,29 +1488,32 @@
     }
 
     void updateNewMemberMap(Member member) {
-        List<Map.Entry<EntryKey<K>, EntryValue<V>>> list = new ArrayList<Map.Entry<EntryKey<K>, EntryValue<V>>>();
+        List<Map.Entry<K, EntryValue<V>>> list = new ArrayList<Map.Entry<K, EntryValue<V>>>();
         synchronized (this.mapMutex) {
             if (this.localMap != null) {
-                for (Map.Entry<EntryKey<K>, EntryValue<V>> entry : this.localMap
+                for (Map.Entry<K, EntryValue<V>> entry : this.localMap
                         .entrySet()) {
                     list.add(entry);
                 }
             }
         }
         try {
-            for (Map.Entry<EntryKey<K>, EntryValue<V>> entry : list) {
+            for (Map.Entry<K, EntryValue<V>> entry : list) {
                 EntryMessage entryMsg = new EntryMessage();
-                entryMsg.setKey(entry.getKey());
+                entryMsg.setKey(entry.getValue().getKey());
                 entryMsg.setValue(entry.getValue().getValue());
                 entryMsg.setType(EntryMessage.MessageType.SYNC);
                 entryMsg.setMapUpdate(true);
-                ObjectMessage objMsg = this.session
+                ObjectMessage objMsg = this.stateSession
                         .createObjectMessage(entryMsg);
-                if (!member.equals(entry.getKey().getOwner())) {
-                    this.producer.send(member.getInBoxDestination(), objMsg);
+                if (!member.equals(entry.getValue().getKey().getOwner())) {
+                    objMsg.setJMSType(STATE_TYPE);
+                    this.stateProducer.send(member.getInBoxDestination(), objMsg);
                 }
             }
-        } catch (JMSException e) {
+        } catch(javax.jms.IllegalStateException e) {
+            //ignore - as closing
+        }catch (JMSException e) {
             if (started.get()) {
                 LOG.warn("Failed to update new member ", e);
             }
@@ -1054,11 +1538,15 @@
         synchronized (this.mapMutex) {
             mapExists = this.localMap != null;
             if (mapExists) {
-                for (EntryKey<K> entryKey : this.localMap.keySet()) {
+                for (EntryValue value : this.localMap.values()) {
+                    EntryKey entryKey = value.getKey();
                     if (entryKey.getOwner().equals(member)) {
                         if (entryKey.isRemoveOnExit()) {
                             tmpList.add(entryKey);
                         }
+                        if (entryKey.isReleaseLockOnExit()) {
+                            entryKey.setLocked(false);
+                        }
                     }
                 }
             }
@@ -1074,12 +1562,31 @@
             }
         }
     }
+    
+    void fireMemberMessage(final Member member,final String replyId,final Object message) {
+        if (this.started.get() && this.stateExecutor != null
+                && !this.messageExecutor.isShutdown()) {
+            this.messageExecutor.execute(new Runnable() {
+                public void run() {
+                    doFireMemberMessage(member,replyId,message);
+                }
+            });
+        }
+    }
+    
+    void doFireMemberMessage(Member sender,String replyId,Object message) {
+        if(this.started.get()) {
+            for(GroupMessageListener l : this.groupMessageListeners) {
+                l.messageDelivered(sender,replyId, message);
+            }
+        }
+    }
 
     void fireMapChanged(final Member owner, final Object key,
             final Object oldValue, final Object newValue, final boolean expired) {
-        if (this.started.get() && this.executor != null
-                && !this.executor.isShutdown()) {
-            this.executor.execute(new Runnable() {
+        if (this.started.get() && this.stateExecutor != null
+                && !this.stateExecutor.isShutdown()) {
+            this.stateExecutor.execute(new Runnable() {
                 public void run() {
                     doFireMapChanged(owner, key, oldValue, newValue, expired);
                 }
@@ -1089,21 +1596,31 @@
 
     void doFireMapChanged(Member owner, Object key, Object oldValue,
             Object newValue, boolean expired) {
-        for (MapChangedListener l : this.mapChangedListeners) {
-            if (oldValue == null) {
-                l.mapInsert(owner, key, newValue);
-            } else if (newValue == null) {
-                l.mapRemove(owner, key, oldValue, expired);
-            } else {
-                l.mapUpdate(owner, key, oldValue, newValue);
+        if (this.started.get()) {
+            for (MapChangedListener l : this.mapChangedListeners) {
+                if (oldValue == null) {
+                    l.mapInsert(owner, key, newValue);
+                } else if (newValue == null) {
+                    l.mapRemove(owner, key, oldValue, expired);
+                } else {
+                    l.mapUpdate(owner, key, oldValue, newValue);
+                }
             }
         }
     }
 
     void election(final Member member, final boolean memberStarted) {
-        if (this.started.get() && this.executor != null
-                && !this.executor.isShutdown()) {
-            this.executor.execute(new Runnable() {
+        if (this.started.get() && this.stateExecutor != null
+                && !this.electionExecutor.isShutdown()) {
+            synchronized (this.electionExecutor) {
+                //remove any queued election tasks 
+                List<Runnable> list = new ArrayList<Runnable>(
+                        this.electionExecutor.getQueue());
+                for (Runnable r : list) {
+                    this.electionExecutor.remove(r);
+                }
+            }
+            this.electionExecutor.execute(new Runnable() {
                 public void run() {
                     doElection(member, memberStarted);
                 }
@@ -1113,34 +1630,36 @@
 
     void doElection(Member member, boolean memberStarted) {
         if ((member == null || !member.equals(this.local))
-                && this.electionFinished.compareAndSet(true, false)) {
+                && (this.electionFinished.compareAndSet(true, false)||true)) {
             boolean wasCoordinator = isCoordinator() && !isEmpty();
             // call an election
-            while (!callElection())
+            while (!callElection()&&this.started.get())
                 ;
-            List<Member> members = new ArrayList<Member>(this.members.values());
-            this.coordinator = selectCordinator(members);
-            if (isCoordinator()) {
-                broadcastElectionType(ElectionMessage.MessageType.COORDINATOR);
-            }
-            if (memberStarted && member != null) {
-                if (wasCoordinator || isCoordinator() && this.started.get()) {
-                    updateNewMemberMap(member);
+            if (this.started.get()) {
+                List<Member> members = new ArrayList<Member>(this.members.values());
+                this.coordinator = selectCordinator(members);
+                if (isCoordinator()) {
+                    broadcastElectionType(ElectionMessage.MessageType.COORDINATOR);
+                }
+                if (memberStarted && member != null) {
+                    if (wasCoordinator || isCoordinator() && this.started.get()) {
+                        updateNewMemberMap(member);
+                    }
                 }
-            }
-            if (!this.electionFinished.get()) {
-                try {
-                    synchronized (this.electionFinished) {
-                        this.electionFinished.wait(this.heartBeatInterval * 2);
+                if (!this.electionFinished.get()) {
+                    try {
+                        synchronized (this.electionFinished) {
+                            this.electionFinished.wait(this.heartBeatInterval * 2);
+                        }
+                    } catch (InterruptedException e) {
                     }
-                } catch (InterruptedException e) {
                 }
-            }
-            if (!this.electionFinished.get()) {
-                // we must be the coordinator
-                this.coordinator = this.local;
-                this.electionFinished.set(true);
-                broadcastElectionType(ElectionMessage.MessageType.COORDINATOR);
+                if (!this.electionFinished.get()) {
+                    // we must be the coordinator
+                    this.coordinator = this.local;
+                    this.electionFinished.set(true);
+                    broadcastElectionType(ElectionMessage.MessageType.COORDINATOR);
+                }
             }
         }
     }
@@ -1153,7 +1672,7 @@
                 ElectionMessage msg = new ElectionMessage();
                 msg.setMember(this.local);
                 msg.setType(ElectionMessage.MessageType.ELECTION);
-                sendAsyncRequest(request, member, msg);
+                sendAsyncStateRequest(request, member, msg);
             }
         }
         return request.isSuccess(getHeartBeatInterval());
@@ -1180,8 +1699,9 @@
                 ElectionMessage msg = new ElectionMessage();
                 msg.setMember(this.local);
                 msg.setType(type);
-                ObjectMessage objMsg = this.session.createObjectMessage(msg);
-                this.producer.send(this.topic, objMsg);
+                ObjectMessage objMsg = this.stateSession.createObjectMessage(msg);
+                objMsg.setJMSType(STATE_TYPE);
+                this.stateProducer.send(this.stateTopic, objMsg);
             } catch (javax.jms.IllegalStateException e) {
                 // ignore - we are stopping
             } catch (JMSException e) {
@@ -1213,4 +1733,10 @@
             }
         }
     }
+    
+    public String toString() {
+        return "Group:" + getName() + "{id=" + this.local.getId()
+                + ",coordinator=" + isCoordinator() + ",inbox="
+                + this.local.getInBoxDestination() + "}";
+    }
 }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMessageListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMessageListener.java?rev=684047&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMessageListener.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMessageListener.java Fri Aug  8 11:48:06 2008
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.group;
+
+
+/**
+ * A listener for message communication
+ *
+ */
+public interface GroupMessageListener {
+    
+    /**
+     * Called when a message is delivered to the <CODE>Group</CODE> from another member
+     * @param sender the member who sent the message
+     * @param replyId the id to use to respond to a message
+     * @param message the message object
+     */
+    void messageDelivered(Member sender, String replyId, Object message);
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMessageListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/Member.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/Member.java?rev=684047&r1=684046&r2=684047&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/Member.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/Member.java Fri Aug  8 11:48:06 2008
@@ -22,6 +22,7 @@
 import java.io.ObjectOutput;
 import javax.jms.Destination;
 import org.apache.activemq.util.IdGenerator;
+import com.sun.jndi.url.corbaname.corbanameURLContextFactory;
 
 /**
  *<P>
@@ -32,9 +33,10 @@
     private String name;
     private String id;
     private String hostname;
-    private long timeStamp;
     private long startTime;
+    private int coordinatorWeight;
     private Destination inBoxDestination;
+    private transient long timeStamp;
     
 
     /**
@@ -94,12 +96,41 @@
         this.inBoxDestination=dest;
     }
     
+    /**
+     * @return the timeStamp
+     */
+    long getTimeStamp() {
+        return this.timeStamp;
+    }
+
+    /**
+     * @param timeStamp the timeStamp to set
+     */
+    void setTimeStamp(long timeStamp) {
+        this.timeStamp = timeStamp;
+    }
+    /**
+     * @return the coordinatorWeight
+     */
+    public int getCoordinatorWeight() {
+        return this.coordinatorWeight;
+    }
+    /**
+     * @param coordinatorWeight the coordinatorWeight to set
+     */
+    public void setCoordinatorWeight(int coordinatorWeight) {
+        this.coordinatorWeight = coordinatorWeight;
+    }
+    
+    
+    
     public String toString() {
         return this.name+"["+this.id+"]@"+this.hostname;
     }
     
      
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        this.coordinatorWeight=in.readInt();;
         this.name = in.readUTF();
         this.id = in.readUTF();
         this.hostname = in.readUTF();
@@ -108,6 +139,7 @@
     }
 
     public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeInt(this.coordinatorWeight);
         out.writeUTF(this.name != null ? this.name : "");
         out.writeUTF(this.id != null ? this.id : "");
         out.writeUTF(this.hostname != null ? this.hostname : "");
@@ -127,20 +159,4 @@
         }
         return result;
     }
-
-    /**
-     * @return the timeStamp
-     */
-    long getTimeStamp() {
-        return this.timeStamp;
-    }
-
-    /**
-     * @param timeStamp the timeStamp to set
-     */
-    void setTimeStamp(long timeStamp) {
-        this.timeStamp = timeStamp;
-    }
-    
-    
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MemberChangedListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MemberChangedListener.java?rev=684047&r1=684046&r2=684047&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MemberChangedListener.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MemberChangedListener.java Fri Aug  8 11:48:06 2008
@@ -18,7 +18,7 @@
 package org.apache.activemq.group;
 
 /**
- * @author rajdavies
+ * A listener for membership changes to a group
  *
  */
 public interface MemberChangedListener {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/package.html
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/package.html?rev=684047&r1=684046&r2=684047&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/package.html (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/package.html Fri Aug  8 11:48:06 2008
@@ -19,7 +19,7 @@
 </head>
 <body>
 
-Shared state and membership information between members of a remote group
+Shared state, messaging and membership information between members of a distributed  group
 
 </body>
 </html>



Mime
View raw message