activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r685612 [2/3] - in /activemq/sandbox/groupmq: ./ eclipse-classes/ eclipse-classes/META-INF/ eclipse-classes/org/ eclipse-classes/org/apache/ eclipse-classes/org/apache/group/ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/or...
Date Wed, 13 Aug 2008 17:05:11 GMT
Added: activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/Group.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/Group.java?rev=685612&view=auto
==============================================================================
--- activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/Group.java (added)
+++ activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/Group.java Wed Aug 13 10:05:07 2008
@@ -0,0 +1,1830 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.groupmq;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+import org.apache.activemq.ActiveMQMessageConsumer;
+import org.apache.activemq.Service;
+import org.apache.activemq.advisory.ConsumerEvent;
+import org.apache.activemq.advisory.ConsumerEventSource;
+import org.apache.activemq.advisory.ConsumerListener;
+import org.apache.activemq.thread.SchedulerTimerTask;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * <P>
+ * A <CODE>Group</CODE> is a distributed collaboration implementation that is
+ * used to shared state and process messages amongst a distributed group of
+ * other <CODE>Group</CODE> instances. Membership of a group is handled
+ * automatically using discovery.
+ * <P>
+ * The underlying transport is JMS and there are some optimizations that occur
+ * for membership if used with ActiveMQ - but <CODE>Group</CODE> can be used
+ * with any JMS implementation.
+ * 
+ * <P>
+ * Updates to the group shared map are controlled by a coordinator. The
+ * coordinator is elected by the member with the lowest lexicographical id -
+ * based on the bully algorithm [Silberschatz et al. 1993]
+ * <P>
+ * The {@link #selectCordinator(Collection<Member> members)} method may be
+ * overridden to implement a custom mechanism for choosing how the coordinator
+ * is elected for the map.
+ * <P>
+ * New <CODE>Group</CODE> instances have their state updated by the
+ * coordinator, and coordinator failure is handled automatically within the
+ * group.
+ * <P>
+ * All map updates are totally ordered through the coordinator, whilst read
+ * operations happen locally.
+ * <P>
+ * A <CODE>Group</CODE> supports the concept of owner only updates(write
+ * locks), shared updates, entry expiration times and removal on owner exit -
+ * all of which are optional. In addition, you can grab and release locks for
+ * values in the map, independently of who created them.
+ * <P>
+ * In addition, members of a group can broadcast messages and implement
+ * request/response with other <CODE>Group</CODE> instances.
+ * 
+ * <P>
+ * 
+ * @param <K>
+ *            the key type
+ * @param <V>
+ *            the value type
+ * 
+ */
+public class Group<K, V> implements Map<K, V>, Service {
+    /**
+     * default interval within which to detect a member failure
+     */
+    public static final long DEFAULT_HEART_BEAT_INTERVAL = 1000;
+    private static final long EXPIRATION_SWEEP_INTERVAL = 500;
+    private static final Log LOG = LogFactory.getLog(Group.class);
+    private static final String STATE_PREFIX = "STATE." + Group.class.getName()
+            + ".";
+    private static final String GROUP_MESSAGE_PREFIX = "MESSAGE."
+            + Group.class.getName() + ".";
+    private static final String STATE_TYPE = "state";
+    private static final String MESSAGE_TYPE = "message";
+    private static final String MEMBER_ID_PROPERTY = "memberId";
+    protected Member local;
+    private final Object mapMutex = new Object();
+    private Map<K, EntryValue<V>> localMap;
+    Map<String, Member> members = new ConcurrentHashMap<String, Member>();
+    private Map<String, MapRequest> stateRequests = new HashMap<String, MapRequest>();
+    private Map<String, MapRequest> messageRequests = new HashMap<String, MapRequest>();
+    private List<MemberChangedListener> membershipListeners = new CopyOnWriteArrayList<MemberChangedListener>();
+    private List<GroupStateChangedListener> mapChangedListeners = new CopyOnWriteArrayList<GroupStateChangedListener>();
+    private List<GroupMessageListener> groupMessageListeners = new CopyOnWriteArrayList<GroupMessageListener>();
+    private Member coordinator;
+    private String groupName;
+    private boolean alwaysLock;
+    private Connection connection;
+    private Session stateSession;
+    private Session messageSession;
+    private Topic stateTopic;
+    private Topic heartBeatTopic;
+    private Topic inboxTopic;
+    private Topic messageTopic;
+    private Queue messageQueue;
+    private MessageProducer stateProducer;
+    private MessageProducer messageProducer;
+    private ConsumerEventSource consumerEvents;
+    private AtomicBoolean started = new AtomicBoolean();
+    private SchedulerTimerTask heartBeatTask;
+    private SchedulerTimerTask checkMembershipTask;
+    private SchedulerTimerTask expirationTask;
+    private Timer timer;
+    private long heartBeatInterval = DEFAULT_HEART_BEAT_INTERVAL;
+    private IdGenerator idGenerator = new IdGenerator();
+    private boolean removeOwnedObjectsOnExit;
+    private boolean releaseLockOnExit = true;
+    private int timeToLive;
+    private int lockTimeToLive;
+    private int minimumGroupSize = 1;
+    private int coordinatorWeight = 0;
+    private final AtomicBoolean electionFinished = new AtomicBoolean(true);
+    private ExecutorService stateExecutor;
+    private ExecutorService messageExecutor;
+    private ThreadPoolExecutor electionExecutor;
+    private final Object memberMutex = new Object();
+
+    /**
+     * @param connection
+     * @param name
+     */
+    public Group(Connection connection, String name) {
+        this(connection, "default", name);
+    }
+
+    /**
+     * @param connection
+     * @param groupName
+     * @param name
+     */
+    public Group(Connection connection, String groupName, String name) {
+        this.connection = connection;
+        this.local = new Member(name);
+        this.coordinator = this.local;
+        this.groupName = groupName;
+    }
+
+    /**
+     * Set the local map implementation to be used By default its a HashMap -
+     * but you could use a Cache for example
+     * 
+     * @param map
+     */
+    public void setLocalMap(Map map) {
+        synchronized (this.mapMutex) {
+            this.localMap = map;
+        }
+    }
+
+    /**
+     * Start membership to the group
+     * 
+     * @throws Exception
+     * 
+     */
+    public void start() throws Exception {
+        if (this.started.compareAndSet(false, true)) {
+            synchronized (this.mapMutex) {
+                if (this.localMap == null) {
+                    this.localMap = new HashMap<K, EntryValue<V>>();
+                }
+            }
+            this.connection.start();
+            this.stateSession = this.connection.createSession(false,
+                    Session.AUTO_ACKNOWLEDGE);
+            this.messageSession = this.connection.createSession(false,
+                    Session.AUTO_ACKNOWLEDGE);
+            this.stateProducer = this.stateSession.createProducer(null);
+            this.stateProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+            this.inboxTopic = this.stateSession.createTemporaryTopic();
+            String stateTopicName = STATE_PREFIX + this.groupName;
+            this.stateTopic = this.stateSession.createTopic(stateTopicName);
+            this.heartBeatTopic = this.stateSession.createTopic(stateTopicName
+                    + ".heartbeat");
+            String messageDestinationName = GROUP_MESSAGE_PREFIX
+                    + this.groupName;
+            this.messageTopic = this.messageSession
+                    .createTopic(messageDestinationName);
+            this.messageQueue = this.messageSession
+                    .createQueue(messageDestinationName);
+            MessageConsumer privateInbox = this.messageSession
+                    .createConsumer(this.inboxTopic);
+            MessageConsumer memberChangeConsumer = this.stateSession
+                    .createConsumer(this.stateTopic);
+            String memberId = null;
+            if (memberChangeConsumer instanceof ActiveMQMessageConsumer) {
+                memberId = ((ActiveMQMessageConsumer) memberChangeConsumer)
+                        .getConsumerId().toString();
+            } else {
+                memberId = this.idGenerator.generateId();
+            }
+            this.local.setId(memberId);
+            this.local.setInBoxDestination(this.inboxTopic);
+            this.local.setCoordinatorWeight(getCoordinatorWeight());
+            privateInbox.setMessageListener(new MessageListener() {
+                public void onMessage(Message message) {
+                    processJMSMessage(message);
+                }
+            });
+            memberChangeConsumer.setMessageListener(new MessageListener() {
+                public void onMessage(Message message) {
+                    processJMSMessage(message);
+                }
+            });
+            this.messageProducer = this.messageSession.createProducer(null);
+            this.messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+            MessageConsumer topicMessageConsumer = this.messageSession
+                    .createConsumer(this.messageTopic);
+            topicMessageConsumer.setMessageListener(new MessageListener() {
+                public void onMessage(Message message) {
+                    processJMSMessage(message);
+                }
+            });
+            MessageConsumer queueMessageConsumer = this.messageSession
+                    .createConsumer(this.messageQueue);
+            queueMessageConsumer.setMessageListener(new MessageListener() {
+                public void onMessage(Message message) {
+                    processJMSMessage(message);
+                }
+            });
+            MessageConsumer heartBeatConsumer = this.stateSession
+                    .createConsumer(this.heartBeatTopic);
+            heartBeatConsumer.setMessageListener(new MessageListener() {
+                public void onMessage(Message message) {
+                    handleHeartbeats(message);
+                }
+            });
+            this.consumerEvents = new ConsumerEventSource(this.connection,
+                    this.stateTopic);
+            this.consumerEvents.setConsumerListener(new ConsumerListener() {
+                public void onConsumerEvent(ConsumerEvent event) {
+                    handleConsumerEvents(event);
+                }
+            });
+            this.consumerEvents.start();
+            this.electionExecutor = new ThreadPoolExecutor(1, 1, 0L,
+                    TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
+                    new ThreadFactory() {
+                        public Thread newThread(Runnable runnable) {
+                            Thread thread = new Thread(runnable, "Election{"
+                                    + Group.this.local + "}");
+                            thread.setDaemon(true);
+                            return thread;
+                        }
+                    });
+            this.stateExecutor = Executors
+                    .newSingleThreadExecutor(new ThreadFactory() {
+                        public Thread newThread(Runnable runnable) {
+                            Thread thread = new Thread(runnable, "Group State{"
+                                    + Group.this.local + "}");
+                            thread.setDaemon(true);
+                            return thread;
+                        }
+                    });
+            this.messageExecutor = Executors
+                    .newSingleThreadExecutor(new ThreadFactory() {
+                        public Thread newThread(Runnable runnable) {
+                            Thread thread = new Thread(runnable,
+                                    "Group Messages{" + Group.this.local + "}");
+                            thread.setDaemon(true);
+                            return thread;
+                        }
+                    });
+            sendHeartBeat();
+            this.heartBeatTask = new SchedulerTimerTask(new Runnable() {
+                public void run() {
+                    sendHeartBeat();
+                }
+            });
+            this.checkMembershipTask = new SchedulerTimerTask(new Runnable() {
+                public void run() {
+                    checkMembership();
+                }
+            });
+            this.expirationTask = new SchedulerTimerTask(new Runnable() {
+                public void run() {
+                    expirationSweep();
+                }
+            });
+            this.timer = new Timer("Distributed heart beat", true);
+            this.timer.scheduleAtFixedRate(this.heartBeatTask,
+                    getHeartBeatInterval() / 3, getHeartBeatInterval() / 2);
+            this.timer.scheduleAtFixedRate(this.checkMembershipTask,
+                    getHeartBeatInterval(), getHeartBeatInterval());
+            this.timer.scheduleAtFixedRate(this.expirationTask,
+                    EXPIRATION_SWEEP_INTERVAL, EXPIRATION_SWEEP_INTERVAL);
+            // await for members to join
+            long timeout = (long) (this.heartBeatInterval
+                    * this.minimumGroupSize );
+            long deadline = System.currentTimeMillis() + timeout;
+            while ((this.members.size() < this.minimumGroupSize || !this.electionFinished
+                    .get())
+                    && timeout > 0) {
+                synchronized (this.electionFinished) {
+                    this.electionFinished.wait(timeout);
+                }
+                timeout = Math.max(deadline - System.currentTimeMillis(), 0);
+            }
+        }
+    }
+
+    /**
+     * stop membership to the group
+     * 
+     * @throws Exception
+     */
+    public void stop() {
+        if (this.started.compareAndSet(true, false)) {
+            this.expirationTask.cancel();
+            this.checkMembershipTask.cancel();
+            this.heartBeatTask.cancel();
+            this.expirationTask.cancel();
+            this.timer.purge();
+            if (this.electionExecutor != null) {
+                this.electionExecutor.shutdownNow();
+            }
+            if (this.stateExecutor != null) {
+                this.stateExecutor.shutdownNow();
+            }
+            if (this.messageExecutor != null) {
+                this.messageExecutor.shutdownNow();
+            }
+            try {
+                this.consumerEvents.stop();
+                this.stateSession.close();
+                this.messageSession.close();
+                this.connection.close();
+            } catch (Exception e) {
+                LOG.debug("Caught exception stopping", e);
+            }
+            
+        }
+    }
+
+    /**
+     * @return true if started
+     */
+    public boolean isStarted() {
+        return this.started.get();
+    }
+
+    /**
+     * @return true if there is elections have finished
+     */
+    public boolean isElectionFinished() {
+        return this.electionFinished.get();
+    }
+
+    void setElectionFinished(boolean flag) {
+        this.electionFinished.set(flag);
+    }
+
+    /**
+     * @return the partitionName
+     */
+    public String getGroupName() {
+        return this.groupName;
+    }
+
+    /**
+     * @return the name ofthis map
+     */
+    public String getName() {
+        return this.local.getName();
+    }
+
+    /**
+     * @return true if by default always lock objects (default is false)
+     */
+    public boolean isAlwaysLock() {
+        return this.alwaysLock;
+    }
+
+    /**
+     * @param alwaysLock -
+     *            set true if objects inserted will always be locked (default is
+     *            false)
+     */
+    public void setAlwaysLock(boolean alwaysLock) {
+        this.alwaysLock = alwaysLock;
+    }
+
+    /**
+     * @return the heartBeatInterval
+     */
+    public long getHeartBeatInterval() {
+        return this.heartBeatInterval;
+    }
+
+    /**
+     * @param heartBeatInterval
+     *            the heartBeatInterval to set
+     */
+    public void setHeartBeatInterval(long heartBeatInterval) {
+        this.heartBeatInterval = heartBeatInterval;
+    }
+
+    /**
+     * Add a listener for membership changes
+     * 
+     * @param l
+     */
+    public void addMemberChangedListener(MemberChangedListener l) {
+        this.membershipListeners.add(l);
+    }
+
+    /**
+     * Remove a listener for membership changes
+     * 
+     * @param l
+     */
+    public void removeMemberChangedListener(MemberChangedListener l) {
+        this.membershipListeners.remove(l);
+    }
+
+    /**
+     * Add a listener for map changes
+     * 
+     * @param l
+     */
+    public void addMapChangedListener(GroupStateChangedListener l) {
+        this.mapChangedListeners.add(l);
+    }
+
+    /**
+     * Remove a listener for map changes
+     * 
+     * @param l
+     */
+    public void removeMapChangedListener(GroupStateChangedListener l) {
+        this.mapChangedListeners.remove(l);
+    }
+
+    /**
+     * Add a listener for group messages
+     * 
+     * @param l
+     */
+    public void addGroupMessageListener(GroupMessageListener l) {
+        this.groupMessageListeners.add(l);
+    }
+
+    /**
+     * remove a listener for group messages
+     * 
+     * @param l
+     */
+    public void removeGroupMessageListener(GroupMessageListener l) {
+        this.groupMessageListeners.remove(l);
+    }
+
+    /**
+     * @return the timeToLive
+     */
+    public int getTimeToLive() {
+        return this.timeToLive;
+    }
+
+    /**
+     * @param timeToLive
+     *            the timeToLive to set
+     */
+    public void setTimeToLive(int timeToLive) {
+        this.timeToLive = timeToLive;
+    }
+
+    /**
+     * @return the removeOwnedObjectsOnExit
+     */
+    public boolean isRemoveOwnedObjectsOnExit() {
+        return this.removeOwnedObjectsOnExit;
+    }
+
+    /**
+     * Sets the policy for owned objects in the group If set to true, when this
+     * <code>GroupMap<code> stops,
+     * any objects it owns will be removed from the group map
+     * @param removeOwnedObjectsOnExit the removeOwnedObjectsOnExit to set
+     */
+    public void setRemoveOwnedObjectsOnExit(boolean removeOwnedObjectsOnExit) {
+        this.removeOwnedObjectsOnExit = removeOwnedObjectsOnExit;
+    }
+
+    /**
+     * @return releaseLockOnExit - true by default
+     */
+    public boolean isReleaseLockOnExit() {
+        return releaseLockOnExit;
+    }
+
+    /**
+     * set release lock on exit - true by default
+     * 
+     * @param releaseLockOnExit
+     *            the releaseLockOnExit to set
+     */
+    public void setReleaseLockOnExit(boolean releaseLockOnExit) {
+        this.releaseLockOnExit = releaseLockOnExit;
+    }
+
+    /**
+     * @return the lockTimeToLive
+     */
+    public int getLockTimeToLive() {
+        return lockTimeToLive;
+    }
+
+    /**
+     * @param lockTimeToLive
+     *            the lockTimeToLive to set
+     */
+    public void setLockTimeToLive(int lockTimeToLive) {
+        this.lockTimeToLive = lockTimeToLive;
+    }
+
+    /**
+     * @return the minimumGroupSize
+     */
+    public int getMinimumGroupSize() {
+        return this.minimumGroupSize;
+    }
+
+    /**
+     * @param minimumGroupSize
+     *            the minimumGroupSize to set
+     */
+    public void setMinimumGroupSize(int minimumGroupSize) {
+        this.minimumGroupSize = minimumGroupSize;
+    }
+
+    /**
+     * @return the coordinatorWeight
+     */
+    public int getCoordinatorWeight() {
+        return this.coordinatorWeight;
+    }
+
+    /**
+     * @param coordinatorWeight
+     *            the coordinatorWeight to set
+     */
+    public void setCoordinatorWeight(int coordinatorWeight) {
+        this.coordinatorWeight = coordinatorWeight;
+    }
+
+    /**
+     * clear entries from the Map
+     * 
+     * @throws IllegalStateException
+     */
+    public void clear() throws IllegalStateException {
+        checkStatus();
+        if (this.localMap != null && !this.localMap.isEmpty()) {
+            Set<K> keys = null;
+            synchronized (this.mapMutex) {
+                keys = new HashSet<K>(this.localMap.keySet());
+            }
+            for (K key : keys) {
+                remove(key);
+            }
+        }
+        this.localMap.clear();
+    }
+
+    public boolean containsKey(Object key) {
+        synchronized (this.mapMutex) {
+            return this.localMap != null ? this.localMap.containsKey(key)
+                    : false;
+        }
+    }
+
+    public boolean containsValue(Object value) {
+        EntryValue entryValue = new EntryValue(null, value);
+        synchronized (this.mapMutex) {
+            return this.localMap != null ? this.localMap
+                    .containsValue(entryValue) : false;
+        }
+    }
+
+    public Set<java.util.Map.Entry<K, V>> entrySet() {
+        Map<K, V> result = new HashMap<K, V>();
+        synchronized (this.mapMutex) {
+            if (this.localMap != null) {
+                for (EntryValue<V> entry : this.localMap.values()) {
+                    result.put((K) entry.getKey(), entry.getValue());
+                }
+            }
+        }
+        return result.entrySet();
+    }
+
+    public V get(Object key) {
+        EntryValue<V> value = null;
+        synchronized (this.mapMutex) {
+            value = this.localMap != null ? this.localMap.get(key) : null;
+        }
+        return value != null ? value.getValue() : null;
+    }
+
+    public boolean isEmpty() {
+        synchronized (this.mapMutex) {
+            return this.localMap != null ? this.localMap.isEmpty() : true;
+        }
+    }
+
+    public Set<K> keySet() {
+        Set<K> result = null;
+        synchronized (this.mapMutex) {
+            result = new HashSet<K>(this.localMap.keySet());
+        }
+        return result;
+    }
+
+    /**
+     * Puts an value into the map associated with the key
+     * 
+     * @param key
+     * @param value
+     * @return the old value or null
+     * @throws GroupUpdateException
+     * @throws IllegalStateException
+     * 
+     */
+    public V put(K key, V value) throws GroupUpdateException,
+            IllegalStateException {
+        return put(key, value, isAlwaysLock(), isRemoveOwnedObjectsOnExit(),
+                isReleaseLockOnExit(), getTimeToLive(), getLockTimeToLive());
+    }
+
+    /**
+     * Puts an value into the map associated with the key
+     * 
+     * @param key
+     * @param value
+     * @param lock
+     * @param removeOnExit
+     * @param releaseLockOnExit
+     * @param timeToLive
+     * @param leaseTime
+     * @return the old value or null
+     * @throws GroupUpdateException
+     * @throws IllegalStateException
+     * 
+     */
+    public V put(K key, V value, boolean lock, boolean removeOnExit,
+            boolean releaseLockOnExit, long timeToLive, long leaseTime)
+            throws GroupUpdateException, IllegalStateException {
+        checkStatus();
+        EntryKey<K> entryKey = new EntryKey<K>(this.local, key);
+        entryKey.setLocked(lock);
+        entryKey.setRemoveOnExit(removeOnExit);
+        entryKey.setReleaseLockOnExit(releaseLockOnExit);
+        entryKey.setTimeToLive(timeToLive);
+        entryKey.setLockLeaseTime(leaseTime);
+        EntryMessage entryMsg = new EntryMessage();
+        entryMsg.setKey(entryKey);
+        entryMsg.setValue(value);
+        entryMsg.setType(EntryMessage.MessageType.INSERT);
+        return (V) sendStateRequest(getCoordinator(), entryMsg);
+    }
+
+    /**
+     * Remove a lock on a key
+     * 
+     * @param key
+     * @throws GroupUpdateException
+     */
+    public void unlock(K key) throws GroupUpdateException {
+        EntryKey<K> entryKey = new EntryKey<K>(this.local, key);
+        entryKey.setLocked(false);
+        EntryMessage entryMsg = new EntryMessage();
+        entryMsg.setKey(entryKey);
+        entryMsg.setLockUpdate(true);
+        sendStateRequest(getCoordinator(), entryMsg);
+    }
+
+    /**
+     * Lock a key in the distributed map
+     * 
+     * @param key
+     * @throws GroupUpdateException
+     */
+    public void lock(K key) throws GroupUpdateException {
+        lock(key, getLockTimeToLive());
+    }
+
+    /**
+     * Lock a key in the distributed map
+     * 
+     * @param key
+     * @param leaseTime
+     * @throws GroupUpdateException
+     */
+    public void lock(K key, long leaseTime) throws GroupUpdateException {
+        EntryKey<K> entryKey = new EntryKey<K>(this.local, key);
+        entryKey.setLocked(true);
+        entryKey.setLockLeaseTime(leaseTime);
+        EntryMessage entryMsg = new EntryMessage();
+        entryMsg.setKey(entryKey);
+        entryMsg.setLockUpdate(true);
+        sendStateRequest(getCoordinator(), entryMsg);
+    }
+
+    /**
+     * Add the Map to the distribution
+     * 
+     * @param t
+     * @throws GroupUpdateException
+     * @throws IllegalStateException
+     */
+    public void putAll(Map<? extends K, ? extends V> t)
+            throws GroupUpdateException, IllegalStateException {
+        putAll(t, isAlwaysLock(), isRemoveOwnedObjectsOnExit(),
+                isReleaseLockOnExit(), getTimeToLive(), getLockTimeToLive());
+    }
+
+    /**
+     * Add the Map to the distribution
+     * 
+     * @param t
+     * @param lock
+     * @param removeOnExit
+     * @param releaseLockOnExit
+     * @param timeToLive
+     * @param lockTimeToLive
+     * @throws GroupUpdateException
+     * @throws IllegalStateException
+     */
+    public void putAll(Map<? extends K, ? extends V> t, boolean lock,
+            boolean removeOnExit, boolean releaseLockOnExit, long timeToLive,
+            long lockTimeToLive) throws GroupUpdateException,
+            IllegalStateException {
+        for (java.util.Map.Entry<? extends K, ? extends V> entry : t.entrySet()) {
+            put(entry.getKey(), entry.getValue(), lock, removeOnExit,
+                    releaseLockOnExit, timeToLive, lockTimeToLive);
+        }
+    }
+
+    /**
+     * remove a value from the map associated with the key
+     * 
+     * @param key
+     * @return the Value or null
+     * @throws GroupUpdateException
+     * @throws IllegalStateException
+     * 
+     */
+    public V remove(Object key) throws GroupUpdateException,
+            IllegalStateException {
+        EntryKey<K> entryKey = new EntryKey<K>(this.local, (K) key);
+        return doRemove(entryKey);
+    }
+
+    V doRemove(EntryKey<K> key) throws GroupUpdateException,
+            IllegalStateException {
+        checkStatus();
+        EntryMessage entryMsg = new EntryMessage();
+        entryMsg.setKey(key);
+        entryMsg.setType(EntryMessage.MessageType.DELETE);
+        return (V) sendStateRequest(getCoordinator(), entryMsg);
+    }
+
+    public int size() {
+        synchronized (this.mapMutex) {
+            return this.localMap != null ? this.localMap.size() : 0;
+        }
+    }
+
+    public Collection<V> values() {
+        List<V> result = new ArrayList<V>();
+        synchronized (this.mapMutex) {
+            if (this.localMap != null) {
+                for (EntryValue<V> value : this.localMap.values()) {
+                    result.add(value.getValue());
+                }
+            }
+        }
+        return result;
+    }
+
+    /**
+     * @return a set of the members
+     */
+    public Set<Member> getMembers() {
+        Set<Member> result = new HashSet<Member>();
+        result.addAll(this.members.values());
+        return result;
+    }
+
+    /**
+     * Get a member by its unique id
+     * 
+     * @param id
+     * @return
+     */
+    public Member getMemberById(String id) {
+        return this.members.get(id);
+    }
+
+    /**
+     * Return a member of the Group with the matching name
+     * 
+     * @param name
+     * @return
+     */
+    public Member getMemberByName(String name) {
+        if (name != null) {
+            for (Member member : this.members.values()) {
+                if (member.getName().equals(name)) {
+                    return member;
+                }
+            }
+        }
+        return null;
+    }
+
+    /**
+     * @return the local member that represents this <CODE>Group</CODE>
+     *         instance
+     */
+    public Member getLocalMember() {
+        return this.local;
+    }
+
+    /**
+     * @param key
+     * @return true if this is the owner of the key
+     */
+    public boolean isOwner(K key) {
+        EntryValue<V> entryValue = null;
+        synchronized (this.mapMutex) {
+            entryValue = this.localMap != null ? this.localMap.get(key) : null;
+        }
+        boolean result = false;
+        if (entryValue != null) {
+            result = entryValue.getKey().getOwner().getId().equals(
+                    this.local.getId());
+        }
+        return result;
+    }
+
+    /**
+     * Get the owner of a key
+     * 
+     * @param key
+     * @return the owner - or null if the key doesn't exist
+     */
+    EntryKey getKey(Object key) {
+        EntryValue<V> entryValue = null;
+        synchronized (this.mapMutex) {
+            entryValue = this.localMap != null ? this.localMap.get(key) : null;
+        }
+        return entryValue != null ? entryValue.getKey() : null;
+    }
+
+    /**
+     * @return true if the coordinator for the map
+     */
+    protected boolean isCoordinator() {
+        return isCoordinatorMatch() && this.electionFinished.get();
+    }
+
+    /**
+     * @return true if the coordinator for the map
+     */
+    protected boolean isCoordinatorMatch() {
+        return this.local.equals(this.coordinator);
+    }
+
+    /**
+     * @return the coordinator
+     */
+    public Member getCoordinator() {
+        return this.coordinator;
+    }
+
+    void setCoordinator(Member member) {
+        this.coordinator = member;
+    }
+
+    /**
+     * Broadcast a message to the group
+     * 
+     * @param message
+     * @throws JMSException
+     */
+    public void broadcastMessage(Object message) throws JMSException {
+        checkStatus();
+        ObjectMessage objMsg = this.messageSession
+                .createObjectMessage((Serializable) message);
+        objMsg.setJMSCorrelationID(this.idGenerator.generateId());
+        objMsg.setJMSType(MESSAGE_TYPE);
+        objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId());
+        this.messageProducer.send(this.messageTopic, objMsg);
+    }
+
+    /**
+     * As the group for a response - one will be selected from the group
+     * 
+     * @param member
+     * @param message
+     * @param timeout
+     *            in milliseconds - a value if 0 means wait until complete
+     * @return
+     * @throws JMSException
+     */
+    public Serializable broadcastMessageRequest(Object message, long timeout)
+            throws JMSException {
+        checkStatus();
+        Object result = null;
+        MapRequest request = new MapRequest();
+        String id = this.idGenerator.generateId();
+        synchronized (this.messageRequests) {
+            this.messageRequests.put(id, request);
+        }
+        ObjectMessage objMsg = this.stateSession
+                .createObjectMessage((Serializable) message);
+        objMsg.setJMSReplyTo(this.inboxTopic);
+        objMsg.setJMSCorrelationID(id);
+        objMsg.setJMSType(MESSAGE_TYPE);
+        objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId());
+        this.messageProducer.send(this.messageQueue, objMsg);
+        result = request.get(timeout);
+        return (Serializable) result;
+    }
+
+    /**
+     * Send a message to the group - but only the least loaded member will
+     * process it
+     * 
+     * @param message
+     * @throws JMSException
+     */
+    public void sendMessage(Object message) throws JMSException {
+        checkStatus();
+        ObjectMessage objMsg = this.messageSession
+                .createObjectMessage((Serializable) message);
+        objMsg.setJMSCorrelationID(this.idGenerator.generateId());
+        objMsg.setJMSType(MESSAGE_TYPE);
+        objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId());
+        this.messageProducer.send(this.messageQueue, objMsg);
+    }
+
+    /**
+     * Send a message to an individual member
+     * 
+     * @param member
+     * @param message
+     * @throws JMSException
+     */
+    public void sendMessage(Member member, Object message) throws JMSException {
+        checkStatus();
+        ObjectMessage objMsg = this.messageSession
+                .createObjectMessage((Serializable) message);
+        objMsg.setJMSCorrelationID(this.idGenerator.generateId());
+        objMsg.setJMSType(MESSAGE_TYPE);
+        objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId());
+        this.messageProducer.send(member.getInBoxDestination(), objMsg);
+    }
+
+    /**
+     * Send a request to a member
+     * 
+     * @param member
+     * @param message
+     * @param timeout
+     *            in milliseconds - a value if 0 means wait until complete
+     * @return the request or null
+     * @throws JMSException
+     */
+    public Object sendMessageRequest(Member member, Object message, long timeout)
+            throws JMSException {
+        checkStatus();
+        Object result = null;
+        MapRequest request = new MapRequest();
+        String id = this.idGenerator.generateId();
+        synchronized (this.messageRequests) {
+            this.messageRequests.put(id, request);
+        }
+        ObjectMessage objMsg = this.stateSession
+                .createObjectMessage((Serializable) message);
+        objMsg.setJMSReplyTo(this.inboxTopic);
+        objMsg.setJMSCorrelationID(id);
+        objMsg.setJMSType(MESSAGE_TYPE);
+        objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId());
+        this.messageProducer.send(member.getInBoxDestination(), objMsg);
+        result = request.get(timeout);
+        return result;
+    }
+
+    /**
+     * send a response to a message
+     * 
+     * @param member
+     * @param replyId
+     * @param message
+     * @throws JMSException
+     */
+    public void sendMessageResponse(Member member, String replyId,
+            Object message) throws JMSException {
+        checkStatus();
+        ObjectMessage objMsg = this.messageSession
+                .createObjectMessage((Serializable) message);
+        objMsg.setJMSCorrelationID(replyId);
+        objMsg.setJMSType(MESSAGE_TYPE);
+        objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId());
+        this.messageProducer.send(member.getInBoxDestination(), objMsg);
+    }
+
+    /**
+     * Select a coordinator - coordinator weighting is used - or if everything
+     * is equal - a comparison of member ids.
+     * 
+     * @param members
+     * @return
+     */
+    protected Member selectCordinator(List<Member> list) {
+        List<Member> sorted = sortMemberList(list);
+        Member result = sorted.isEmpty() ? this.local : sorted
+                .get(list.size() - 1);
+        return result;
+    }
+
+    protected List<Member> sortMemberList(List<Member> list) {
+        Collections.sort(list, new Comparator<Member>() {
+            public int compare(Member m1, Member m2) {
+                int result = m1.getCoordinatorWeight()
+                        - m2.getCoordinatorWeight();
+                if (result == 0) {
+                    result = m1.getId().compareTo(m2.getId());
+                }
+                return result;
+            }
+        });
+        return list;
+    }
+
+    Object sendStateRequest(Member member, Serializable payload) {
+        Object result = null;
+        MapRequest request = new MapRequest();
+        String id = this.idGenerator.generateId();
+        synchronized (this.stateRequests) {
+            this.stateRequests.put(id, request);
+        }
+        try {
+            ObjectMessage objMsg = this.stateSession
+                    .createObjectMessage(payload);
+            objMsg.setJMSReplyTo(this.inboxTopic);
+            objMsg.setJMSCorrelationID(id);
+            objMsg.setJMSType(STATE_TYPE);
+            this.stateProducer.send(member.getInBoxDestination(), objMsg);
+            result = request.get(getHeartBeatInterval());
+        } catch (JMSException e) {
+            if (this.started.get()) {
+                LOG.error("Failed to send request " + payload, e);
+            }
+        }
+        if (result instanceof GroupUpdateException) {
+            throw (GroupUpdateException) result;
+        }
+        if (result instanceof EntryMessage) {
+            EntryMessage entryMsg = (EntryMessage) result;
+            result = entryMsg.getOldValue();
+        }
+        return result;
+    }
+
+    void sendAsyncStateRequest(AsyncMapRequest asyncRequest, Member member,
+            Serializable payload) {
+        MapRequest request = new MapRequest();
+        String id = this.idGenerator.generateId();
+        asyncRequest.add(id, request);
+        synchronized (this.stateRequests) {
+            this.stateRequests.put(id, request);
+        }
+        try {
+            ObjectMessage objMsg = this.stateSession
+                    .createObjectMessage(payload);
+            objMsg.setJMSReplyTo(this.inboxTopic);
+            objMsg.setJMSCorrelationID(id);
+            objMsg.setJMSType(STATE_TYPE);
+            this.stateProducer.send(member.getInBoxDestination(), objMsg);
+        } catch (JMSException e) {
+            if (this.started.get()) {
+                LOG.error("Failed to send async request " + payload, e);
+            }
+        }
+    }
+
+    void sendReply(Object reply, Destination replyTo, String id) {
+        if (this.started.get()) {
+            if (replyTo != null) {
+                if (replyTo.equals(this.local.getInBoxDestination())) {
+                    processRequest(id, reply);
+                } else {
+                    try {
+                        ObjectMessage replyMsg = this.stateSession
+                                .createObjectMessage((Serializable) reply);
+                        replyMsg.setJMSCorrelationID(id);
+                        replyMsg.setJMSType(STATE_TYPE);
+                        this.stateProducer.send(replyTo, replyMsg);
+                    } catch (JMSException e) {
+                        LOG.error("Couldn't send reply from co-ordinator", e);
+                    }
+                }
+            } else {
+                LOG.error("NULL replyTo destination");
+            }
+        }
+    }
+
+    void broadcastMapUpdate(EntryMessage entry, String correlationId) {
+        if (this.started.get()) {
+            try {
+                EntryMessage copy = entry.copy();
+                copy.setMapUpdate(true);
+                ObjectMessage objMsg = this.stateSession
+                        .createObjectMessage(copy);
+                objMsg.setJMSCorrelationID(correlationId);
+                objMsg.setJMSType(STATE_TYPE);
+                this.stateProducer.send(this.stateTopic, objMsg);
+            } catch (JMSException e) {
+                if (this.started.get()) {
+                    LOG.error("Failed to send EntryMessage " + entry, e);
+                }
+            }
+        }
+    }
+
+    void processJMSMessage(Message message) {
+        if (message instanceof ObjectMessage) {
+            ObjectMessage objMsg = (ObjectMessage) message;
+            try {
+                String messageType = objMsg.getJMSType();
+                String id = objMsg.getJMSCorrelationID();
+                String memberId = objMsg.getStringProperty(MEMBER_ID_PROPERTY);
+                Destination replyTo = objMsg.getJMSReplyTo();
+                Object payload = objMsg.getObject();
+                if (messageType != null) {
+                    if (messageType.equals(STATE_TYPE)) {
+                        if (payload instanceof Member) {
+                            handleHeartbeats((Member) payload);
+                        } else if (payload instanceof EntryMessage) {
+                            EntryMessage entryMsg = (EntryMessage) payload;
+                            entryMsg = entryMsg.copy();
+                            if (entryMsg.isLockUpdate()) {
+                                processLockUpdate(entryMsg, replyTo, id);
+                            } else if (entryMsg.isMapUpdate()) {
+                                processMapUpdate(entryMsg);
+                            } else {
+                                processEntryMessage(entryMsg, replyTo, id);
+                            }
+                        } else if (payload instanceof ElectionMessage) {
+                            ElectionMessage electionMsg = (ElectionMessage) payload;
+                            electionMsg = electionMsg.copy();
+                            processElectionMessage(electionMsg, replyTo, id);
+                        }
+                    } else if (messageType.equals(MESSAGE_TYPE)) {
+                        processGroupMessage(memberId, id, replyTo, payload);
+                    } else {
+                        LOG.error("Unknown message type: " + messageType);
+                    }
+                    processRequest(id, payload);
+                } else {
+                    LOG.error("Can't process a message type of null");
+                }
+            } catch (JMSException e) {
+                LOG.warn("Failed to process message: " + message, e);
+            }
+        }
+    }
+
+    void processRequest(String id, Object value) {
+        if (id != null) {
+            MapRequest result = null;
+            synchronized (this.stateRequests) {
+                result = this.stateRequests.remove(id);
+            }
+            if (result != null) {
+                result.put(id, value);
+            }
+        }
+    }
+
+    void processLockUpdate(EntryMessage entryMsg, Destination replyTo,
+            String correlationId) {
+        waitForElection();
+        synchronized (this.mapMutex) {
+            boolean newLock = entryMsg.getKey().isLocked();
+            Member newOwner = entryMsg.getKey().getOwner();
+            long newLockExpiration = newLock ? entryMsg.getKey()
+                    .getLockExpiration() : 0l;
+            if (isCoordinator() && !entryMsg.isMapUpdate()) {
+                EntryKey originalKey = getKey(entryMsg.getKey().getKey());
+                if (originalKey != null) {
+                    if (originalKey.isLocked()) {
+                        if (!originalKey.getOwner().equals(
+                                entryMsg.getKey().getOwner())) {
+                            Serializable reply = new GroupUpdateException(
+                                    "Owned by " + originalKey.getOwner());
+                            sendReply(reply, replyTo, correlationId);
+                        } else {
+                            originalKey.setLocked(newLock);
+                            originalKey.setOwner(newOwner);
+                            originalKey.setLockExpiration(newLockExpiration);
+                            broadcastMapUpdate(entryMsg, correlationId);
+                        }
+                    } else {
+                        originalKey.setLocked(newLock);
+                        originalKey.setOwner(newOwner);
+                        originalKey.setLockExpiration(newLockExpiration);
+                        broadcastMapUpdate(entryMsg, correlationId);
+                    }
+                }
+            } else {
+                EntryKey originalKey = getKey(entryMsg.getKey().getKey());
+                if (originalKey != null) {
+                    originalKey.setLocked(newLock);
+                    originalKey.setOwner(newOwner);
+                    originalKey.setLockExpiration(newLockExpiration);
+                }
+            }
+        }
+    }
+
+    void processEntryMessage(EntryMessage entryMsg, Destination replyTo,
+            String correlationId) {
+        waitForElection();
+        if (isCoordinator()) {
+            EntryKey<K> key = entryMsg.getKey();
+            EntryValue<V> value = new EntryValue<V>(key, (V) entryMsg
+                    .getValue());
+            boolean insert = entryMsg.isInsert();
+            boolean containsKey = false;
+            synchronized (this.mapMutex) {
+                containsKey = this.localMap.containsKey(key.getKey());
+            }
+            if (containsKey) {
+                EntryKey originalKey = getKey(key.getKey());
+                if (originalKey.equals(key.getOwner())
+                        || !originalKey.isLocked()) {
+                    EntryValue<V> old = null;
+                    if (insert) {
+                        synchronized (this.mapMutex) {
+                            old = this.localMap.put(key.getKey(), value);
+                        }
+                    } else {
+                        synchronized (this.mapMutex) {
+                            old = this.localMap.remove(key.getKey());
+                        }
+                    }
+                    entryMsg.setOldValue(old.getValue());
+                    broadcastMapUpdate(entryMsg, correlationId);
+                    fireMapChanged(key.getOwner(), key.getKey(),
+                            old.getValue(), value.getValue(), false);
+                } else {
+                    Serializable reply = new GroupUpdateException(
+                            "Owned by " + originalKey.getOwner());
+                    sendReply(reply, replyTo, correlationId);
+                }
+            } else {
+                if (insert) {
+                    synchronized (this.mapMutex) {
+                        this.localMap.put(key.getKey(), value);
+                    }
+                    broadcastMapUpdate(entryMsg, correlationId);
+                    fireMapChanged(key.getOwner(), key.getKey(), null, value
+                            .getValue(), false);
+                } else {
+                    sendReply(null, replyTo, correlationId);
+                }
+            }
+        }
+    }
+
+    void processMapUpdate(EntryMessage entryMsg) {
+        boolean containsKey = false;
+        EntryKey<K> key = entryMsg.getKey();
+        EntryValue<V> value = new EntryValue<V>(key, (V) entryMsg.getValue());
+        boolean insert = entryMsg.isInsert() || entryMsg.isSync();
+        synchronized (this.mapMutex) {
+            containsKey = this.localMap.containsKey(key.getKey());
+        }
+        waitForElection();
+        if (!isCoordinator() || entryMsg.isSync()) {
+            if (containsKey) {
+                if (key.isLockExpired()) {
+                    EntryValue old = this.localMap.get(key.getKey());
+                    if (old != null) {
+                        old.getKey().setLocked(false);
+                    }
+                } else {
+                    EntryValue<V> old = null;
+                    if (insert) {
+                        synchronized (this.mapMutex) {
+                            old = this.localMap.put(key.getKey(), value);
+                        }
+                    } else {
+                        synchronized (this.mapMutex) {
+                            old = this.localMap.remove(key.getKey());
+                            value.setValue(null);
+                        }
+                    }
+                    fireMapChanged(key.getOwner(), key.getKey(),
+                            old.getValue(), value.getValue(), entryMsg
+                                    .isExpired());
+                }
+            } else {
+                if (insert) {
+                    synchronized (this.mapMutex) {
+                        this.localMap.put(key.getKey(), value);
+                    }
+                    fireMapChanged(key.getOwner(), key.getKey(), null, value
+                            .getValue(), false);
+                }
+            }
+        }
+    }
+
+    void processGroupMessage(String memberId, String replyId,
+            Destination replyTo, Object payload) {
+        Member member = this.members.get(memberId);
+        if (member != null) {
+            fireMemberMessage(member, replyId, payload);
+        }
+        if (replyId != null) {
+            MapRequest result = null;
+            synchronized (this.messageRequests) {
+                result = this.messageRequests.remove(replyId);
+            }
+            if (result != null) {
+                result.put(replyId, payload);
+            }
+        }
+    }
+
+    void handleHeartbeats(Message message) {
+        try {
+            if (message instanceof ObjectMessage) {
+                ObjectMessage objMsg = (ObjectMessage) message;
+                Member member = (Member) objMsg.getObject();
+                handleHeartbeats(member);
+            } else {
+                LOG.warn("Unexpected message: " + message);
+            }
+        } catch (JMSException e) {
+            LOG.warn("Failed to handle heart beat", e);
+        }
+    }
+
+    void handleHeartbeats(Member member) {
+        member.setTimeStamp(System.currentTimeMillis());
+        if (this.members.put(member.getId(), member) == null) {
+            fireMemberStarted(member);
+            if (!member.equals(this.local)) {
+                sendHeartBeat(member.getInBoxDestination());
+            }
+            election(member, true);
+            synchronized (this.memberMutex) {
+                this.memberMutex.notifyAll();
+            }
+        }
+    }
+
+    void handleConsumerEvents(ConsumerEvent event) {
+        if (!event.isStarted()) {
+            Member member = this.members.remove(event.getConsumerId()
+                    .toString());
+            if (member != null) {
+                fireMemberStopped(member);
+                election(member, false);
+            }
+        }
+    }
+
+    void checkMembership() {
+        if (this.started.get() && this.electionFinished.get()) {
+            long checkTime = System.currentTimeMillis()
+                    - getHeartBeatInterval();
+            boolean doElection = false;
+            for (Member member : this.members.values()) {
+                if (member.getTimeStamp() < checkTime) {
+                    LOG.info("Member timestamp expired " + member);
+                    this.members.remove(member.getId());
+                    fireMemberStopped(member);
+                    doElection = true;
+                }
+            }
+            if (doElection) {
+                election(null, false);
+            }
+        }
+    }
+
+    void expirationSweep() {
+        waitForElection();
+        if (isCoordinator() && this.started.get()
+                && this.electionFinished.get()) {
+            List<EntryKey> expiredMessages = null;
+            List<EntryKey> expiredLocks = null;
+            synchronized (this.mapMutex) {
+                Map<K, EntryValue<V>> map = this.localMap;
+                if (map != null) {
+                    long currentTime = System.currentTimeMillis();
+                    for (EntryValue value : map.values()) {
+                        EntryKey k = value.getKey();
+                        if (k.isExpired(currentTime)) {
+                            if (expiredMessages == null) {
+                                expiredMessages = new ArrayList<EntryKey>();
+                                expiredMessages.add(k);
+                            }
+                        } else if (k.isLockExpired(currentTime)) {
+                            k.setLocked(false);
+                            if (expiredLocks == null) {
+                                expiredLocks = new ArrayList<EntryKey>();
+                                expiredLocks.add(k);
+                            }
+                            expiredLocks.add(k);
+                        }
+                    }
+                }
+            }
+            // do the actual removal of entries in a separate thread
+            if (expiredMessages != null) {
+                final List<EntryKey> expire = expiredMessages;
+                this.stateExecutor.execute(new Runnable() {
+                    public void run() {
+                        doMessageExpiration(expire);
+                    }
+                });
+            }
+            if (expiredLocks != null) {
+                final List<EntryKey> expire = expiredLocks;
+                this.stateExecutor.execute(new Runnable() {
+                    public void run() {
+                        doLockExpiration(expire);
+                    }
+                });
+            }
+        }
+    }
+
+    void doMessageExpiration(List<EntryKey> list) {
+        if (this.started.get() && this.electionFinished.get()
+                && isCoordinator()) {
+            for (EntryKey k : list) {
+                EntryValue<V> old = null;
+                synchronized (this.mapMutex) {
+                    old = this.localMap.remove(k.getKey());
+                }
+                if (old != null) {
+                    EntryMessage entryMsg = new EntryMessage();
+                    entryMsg.setType(EntryMessage.MessageType.DELETE);
+                    entryMsg.setExpired(true);
+                    entryMsg.setKey(k);
+                    entryMsg.setValue(old.getValue());
+                    broadcastMapUpdate(entryMsg, "");
+                    fireMapChanged(k.getOwner(), k.getKey(), old.getValue(),
+                            null, true);
+                }
+            }
+        }
+    }
+
+    void doLockExpiration(List<EntryKey> list) {
+        if (this.started.get() && this.electionFinished.get()
+                && isCoordinator()) {
+            for (EntryKey k : list) {
+                EntryMessage entryMsg = new EntryMessage();
+                entryMsg.setType(EntryMessage.MessageType.DELETE);
+                entryMsg.setLockExpired(true);
+                entryMsg.setKey(k);
+                broadcastMapUpdate(entryMsg, "");
+            }
+        }
+    }
+
+    void sendHeartBeat() {
+        sendHeartBeat(this.heartBeatTopic);
+    }
+
+    void sendHeartBeat(Destination destination) {
+        if (this.started.get()) {
+            try {
+                ObjectMessage msg = this.stateSession
+                        .createObjectMessage(this.local);
+                msg.setJMSType(STATE_TYPE);
+                this.stateProducer.send(destination, msg);
+            } catch (javax.jms.IllegalStateException e) {
+                // ignore - as we are probably stopping
+            } catch (Throwable e) {
+                if (this.started.get()) {
+                    LOG.warn("Failed to send heart beat", e);
+                }
+            }
+        }
+    }
+
+    void updateNewMemberMap(Member member) {
+        List<Map.Entry<K, EntryValue<V>>> list = new ArrayList<Map.Entry<K, EntryValue<V>>>();
+        synchronized (this.mapMutex) {
+            if (this.localMap != null) {
+                for (Map.Entry<K, EntryValue<V>> entry : this.localMap
+                        .entrySet()) {
+                    list.add(entry);
+                }
+            }
+        }
+        try {
+            for (Map.Entry<K, EntryValue<V>> entry : list) {
+                EntryMessage entryMsg = new EntryMessage();
+                entryMsg.setKey(entry.getValue().getKey());
+                entryMsg.setValue(entry.getValue().getValue());
+                entryMsg.setType(EntryMessage.MessageType.SYNC);
+                entryMsg.setMapUpdate(true);
+                ObjectMessage objMsg = this.stateSession
+                        .createObjectMessage(entryMsg);
+                if (!member.equals(entry.getValue().getKey().getOwner())) {
+                    objMsg.setJMSType(STATE_TYPE);
+                    this.stateProducer.send(member.getInBoxDestination(),
+                            objMsg);
+                }
+            }
+        } catch (javax.jms.IllegalStateException e) {
+            // ignore - as closing
+        } catch (JMSException e) {
+            if (started.get()) {
+                LOG.warn("Failed to update new member ", e);
+            }
+        }
+    }
+
+    void fireMemberStarted(Member member) {
+        LOG.info(this.local.getName() + " Member started " + member);
+        for (MemberChangedListener l : this.membershipListeners) {
+            l.memberStarted(member);
+        }
+    }
+
+    void fireMemberStopped(Member member) {
+        LOG.info(this.local.getName() + " Member stopped " + member);
+        for (MemberChangedListener l : this.membershipListeners) {
+            l.memberStopped(member);
+        }
+        // remove all entries owned by the stopped member
+        List<EntryKey<K>> tmpList = new ArrayList<EntryKey<K>>();
+        boolean mapExists = false;
+        synchronized (this.mapMutex) {
+            mapExists = this.localMap != null;
+            if (mapExists) {
+                for (EntryValue value : this.localMap.values()) {
+                    EntryKey entryKey = value.getKey();
+                    if (entryKey.getOwner().equals(member)) {
+                        if (entryKey.isRemoveOnExit()) {
+                            tmpList.add(entryKey);
+                        }
+                        if (entryKey.isReleaseLockOnExit()) {
+                            entryKey.setLocked(false);
+                        }
+                    }
+                }
+            }
+        }
+        if (mapExists) {
+            for (EntryKey<K> entryKey : tmpList) {
+                EntryValue<V> value = null;
+                synchronized (this.mapMutex) {
+                    value = this.localMap.remove(entryKey);
+                }
+                fireMapChanged(member, entryKey.getKey(), value.getValue(),
+                        null, false);
+            }
+        }
+    }
+
+    void fireMemberMessage(final Member member, final String replyId,
+            final Object message) {
+        if (this.started.get() && this.stateExecutor != null
+                && !this.messageExecutor.isShutdown()) {
+            this.messageExecutor.execute(new Runnable() {
+                public void run() {
+                    doFireMemberMessage(member, replyId, message);
+                }
+            });
+        }
+    }
+
+    void doFireMemberMessage(Member sender, String replyId, Object message) {
+        if (this.started.get()) {
+            for (GroupMessageListener l : this.groupMessageListeners) {
+                l.messageDelivered(sender, replyId, message);
+            }
+        }
+    }
+
+    void fireMapChanged(final Member owner, final Object key,
+            final Object oldValue, final Object newValue, final boolean expired) {
+        if (this.started.get() && this.stateExecutor != null
+                && !this.stateExecutor.isShutdown()) {
+            this.stateExecutor.execute(new Runnable() {
+                public void run() {
+                    doFireMapChanged(owner, key, oldValue, newValue, expired);
+                }
+            });
+        }
+    }
+
+    void doFireMapChanged(Member owner, Object key, Object oldValue,
+            Object newValue, boolean expired) {
+        if (this.started.get()) {
+            for (GroupStateChangedListener l : this.mapChangedListeners) {
+                if (oldValue == null) {
+                    l.mapInsert(owner, key, newValue);
+                } else if (newValue == null) {
+                    l.mapRemove(owner, key, oldValue, expired);
+                } else {
+                    l.mapUpdate(owner, key, oldValue, newValue);
+                }
+            }
+        }
+    }
+
+    void checkStatus() throws IllegalStateException {
+        if (!started.get()) {
+            throw new IllegalStateException("GroupMap " + this.local.getName()
+                    + " not started");
+        }
+        waitForElection();
+    }
+
+    public String toString() {
+        return "Group:" + getName() + "{id=" + this.local.getId()
+                + ",coordinator=" + isCoordinator() + ",inbox="
+                + this.local.getInBoxDestination() + "}";
+    }
+
+    void election(final Member member, final boolean memberStarted) {
+        if (this.started.get() && this.stateExecutor != null
+                && !this.electionExecutor.isShutdown()) {
+            synchronized (this.electionFinished) {
+                this.electionFinished.set(false);
+            }
+            synchronized (this.electionExecutor) {
+                // remove any queued election tasks
+                List<Runnable> list = new ArrayList<Runnable>(
+                        this.electionExecutor.getQueue());
+                for (Runnable r : list) {
+                    ElectionService es = (ElectionService) r;
+                    es.stop();
+                    this.electionExecutor.remove(es);
+                }
+            }
+            ElectionService es = new ElectionService(member, memberStarted);
+            es.start();
+            this.electionExecutor.execute(es);
+        }
+    }
+
+    boolean callElection() {
+        List<Member> members = new ArrayList<Member>(this.members.values());
+        List<Member> sorted = sortMemberList(members);
+        AsyncMapRequest request = new AsyncMapRequest();
+        boolean doCall = false;
+        for (Member member : sorted) {
+            if (this.local.equals(member)) {
+                doCall = true;
+            } else if (doCall) {
+                ElectionMessage msg = new ElectionMessage();
+                msg.setMember(this.local);
+                msg.setType(ElectionMessage.MessageType.ELECTION);
+                sendAsyncStateRequest(request, member, msg);
+            }
+        }
+        boolean result = request.isSuccess(getHeartBeatInterval());
+        return result;
+    }
+
+    void processElectionMessage(ElectionMessage msg, Destination replyTo,
+            String correlationId) {
+        if (msg.isElection()) {
+            msg.setType(ElectionMessage.MessageType.ANSWER);
+            msg.setMember(this.local);
+            sendReply(msg, replyTo, correlationId);
+            election(null, false);
+        } else if (msg.isCoordinator()) {
+            synchronized (this.electionFinished) {
+                this.coordinator = msg.getMember();
+                this.electionFinished.set(true);
+                this.electionFinished.notifyAll();
+            }
+        }
+    }
+
+    void broadcastElectionType(ElectionMessage.MessageType type) {
+        if (started.get()) {
+            try {
+                ElectionMessage msg = new ElectionMessage();
+                msg.setMember(this.local);
+                msg.setType(type);
+                ObjectMessage objMsg = this.stateSession
+                        .createObjectMessage(msg);
+                objMsg.setJMSType(STATE_TYPE);
+                this.stateProducer.send(this.stateTopic, objMsg);
+            } catch (javax.jms.IllegalStateException e) {
+                // ignore - we are stopping
+            } catch (JMSException e) {
+                if (this.started.get()) {
+                    LOG.error("Failed to broadcast election message: " + type,
+                            e);
+                }
+            }
+        }
+    }
+
+    void waitForElection() {
+        synchronized (this.electionFinished) {
+            while (started.get() && !this.electionFinished.get()) {
+                try {
+                    this.electionFinished.wait(200);
+                } catch (InterruptedException e) {
+                    stop();
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+    }
+    class ElectionService implements Runnable {
+        private AtomicBoolean started = new AtomicBoolean();
+        private Member member;
+        private boolean memberStarted;
+
+        ElectionService(Member member, boolean memberStarted) {
+            this.member = member;
+            this.memberStarted = memberStarted;
+        }
+
+        void start() {
+            this.started.set(true);
+        }
+
+        void stop() {
+            this.started.set(false);
+        }
+
+        public void run() {
+            doElection();
+        }
+
+        void doElection() {
+            if ((this.member == null || (!this.member.equals(Group.this.local) || Group.this.members
+                    .size() == getMinimumGroupSize()))) {
+                boolean wasCoordinator = isCoordinatorMatch() && !isEmpty();
+                // call an election
+                while (!callElection() && isStarted() && this.started.get())
+                    ;
+                if (isStarted() && this.started.get()) {
+                    List<Member> members = new ArrayList<Member>(
+                            Group.this.members.values());
+                    Group.this.coordinator = selectCordinator(members);
+                    if (isCoordinatorMatch()) {
+                        broadcastElectionType(ElectionMessage.MessageType.COORDINATOR);
+                    }
+                    if (this.memberStarted && this.member != null) {
+                        if (wasCoordinator || isCoordinator()
+                                && this.started.get()) {
+                            updateNewMemberMap(this.member);
+                        }
+                    }
+                    if (!isElectionFinished() && this.started.get()) {
+                        try {
+                            synchronized (Group.this.electionFinished) {
+                                Group.this.electionFinished
+                                        .wait(Group.this.heartBeatInterval * 2);
+                            }
+                        } catch (InterruptedException e) {
+                        }
+                    }
+                    if (!isElectionFinished() && this.started.get()) {
+                        // we must be the coordinator
+                        setCoordinator(getLocalMember());
+                        setElectionFinished(true);
+                        broadcastElectionType(ElectionMessage.MessageType.COORDINATOR);
+                    }
+                }
+            }
+        }
+    }
+}

Propchange: activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/Group.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/GroupMessageListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/GroupMessageListener.java?rev=685612&view=auto
==============================================================================
--- activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/GroupMessageListener.java (added)
+++ activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/GroupMessageListener.java Wed Aug 13 10:05:07 2008
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.groupmq;
+
+
+/**
+ * A listener for message communication
+ *
+ */
+public interface GroupMessageListener {
+    
+    /**
+     * Called when a message is delivered to the <CODE>Group</CODE> from another member
+     * @param sender the member who sent the message
+     * @param replyId the id to use to respond to a message
+     * @param message the message object
+     */
+    void messageDelivered(Member sender, String replyId, Object message);
+}
\ No newline at end of file

Propchange: activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/GroupMessageListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/GroupStateChangedListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/GroupStateChangedListener.java?rev=685612&view=auto
==============================================================================
--- activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/GroupStateChangedListener.java (added)
+++ activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/GroupStateChangedListener.java Wed Aug 13 10:05:07 2008
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.groupmq;
+
+/**
+ *Get notifications about changes to the state of the map
+ *
+ */
+public interface GroupStateChangedListener {
+    
+    /**
+     * Called when a key/value pair is inserted into the map
+     * @param owner 
+     * @param key
+     * @param value 
+     */
+    void mapInsert(Member owner,Object key, Object value);
+    
+    /**
+     * Called when a key value is updated in the map
+     * @param owner
+     * @param Key
+     * @param oldValue
+     * @param newValue
+     */
+    void mapUpdate(Member owner,Object Key,Object oldValue,Object newValue);
+    
+    /**
+     * Called when a key value is removed from the map
+     * @param owner
+     * @param key
+     * @param value
+     * @param expired
+     */
+    void mapRemove(Member owner,Object key, Object value,boolean expired);
+}

Propchange: activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/GroupStateChangedListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/GroupUpdateException.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/GroupUpdateException.java?rev=685612&view=auto
==============================================================================
--- activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/GroupUpdateException.java (added)
+++ activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/GroupUpdateException.java Wed Aug 13 10:05:07 2008
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.groupmq;
+
+/**
+ * thrown when updating a key to map that the local client doesn't own
+ *
+ */
+public class GroupUpdateException extends RuntimeException {
+    private static final long serialVersionUID = -7584658017201604560L;
+    
+    /**
+     * @param message
+     */
+    public GroupUpdateException(String message) {
+        super(message);
+    }
+}

Propchange: activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/GroupUpdateException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/MapRequest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/MapRequest.java?rev=685612&view=auto
==============================================================================
--- activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/MapRequest.java (added)
+++ activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/MapRequest.java Wed Aug 13 10:05:07 2008
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.groupmq;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Return information about map update
+ * 
+ */
+public class MapRequest {
+    private final AtomicBoolean done = new AtomicBoolean();
+    private Object response;
+    private RequestCallback callback;
+
+    Object get(long timeout) {
+        synchronized (this.done) {
+            if (this.done.get() == false && this.response == null) {
+                try {
+                    this.done.wait(timeout);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+        return this.response;
+    }
+
+    void put(String id,Object response) {
+        this.response = response;
+        cancel();
+        RequestCallback callback = this.callback;
+        if (callback != null) {
+            callback.finished(id);
+        }
+    }
+
+    void cancel() {
+        this.done.set(true);
+        synchronized (this.done) {
+            this.done.notifyAll();
+        }
+    }
+    
+    void setCallback(RequestCallback callback) {
+        this.callback=callback;
+    }
+}

Propchange: activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/MapRequest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/Member.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/Member.java?rev=685612&view=auto
==============================================================================
--- activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/Member.java (added)
+++ activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/Member.java Wed Aug 13 10:05:07 2008
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.groupmq;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import javax.jms.Destination;
+import org.apache.activemq.util.IdGenerator;
+
+/**
+ *<P>
+ * A <CODE>Member</CODE> holds information about a member of the group
+ * 
+ */
+public class Member implements Externalizable {
+    private String name;
+    private String id;
+    private String hostname;
+    private long startTime;
+    private int coordinatorWeight;
+    private Destination inBoxDestination;
+    private transient long timeStamp;
+    
+
+    /**
+     * Default constructor - only used by serialization
+     */
+    public Member() {    
+    }
+    /**
+     * @param name
+     */
+    public Member(String name) {
+        this.name = name;
+        this.hostname = IdGenerator.getHostName();
+        this.startTime=System.currentTimeMillis();
+    }
+
+    /**
+     * @return the name
+     */
+    public String getName() {
+        return this.name;
+    }
+
+    /**
+     * @return the id
+     */
+    public String getId() {
+        return this.id;
+    }
+    
+    void setId(String id) {
+        this.id=id;
+    }
+    
+    /**
+     * @return the hostname
+     */
+    public String getHostname() {
+        return this.hostname;
+    }
+    
+    /**
+     * @return the startTime
+     */
+    public long getStartTime() {
+        return this.startTime;
+    }
+    
+    /**
+     * @return the inbox destination
+     */
+    public Destination getInBoxDestination() {
+        return this.inBoxDestination;
+    }
+    
+    void setInBoxDestination(Destination dest) {
+        this.inBoxDestination=dest;
+    }
+    
+    /**
+     * @return the timeStamp
+     */
+    long getTimeStamp() {
+        return this.timeStamp;
+    }
+
+    /**
+     * @param timeStamp the timeStamp to set
+     */
+    void setTimeStamp(long timeStamp) {
+        this.timeStamp = timeStamp;
+    }
+    /**
+     * @return the coordinatorWeight
+     */
+    public int getCoordinatorWeight() {
+        return this.coordinatorWeight;
+    }
+    /**
+     * @param coordinatorWeight the coordinatorWeight to set
+     */
+    public void setCoordinatorWeight(int coordinatorWeight) {
+        this.coordinatorWeight = coordinatorWeight;
+    }
+    
+    
+    
+    public String toString() {
+        return this.name+"["+this.id+"]@"+this.hostname;
+    }
+    
+     
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        this.coordinatorWeight=in.readInt();;
+        this.name = in.readUTF();
+        this.id = in.readUTF();
+        this.hostname = in.readUTF();
+        this.startTime=in.readLong();
+        this.inBoxDestination=(Destination) in.readObject();
+        this.coordinatorWeight=in.readInt();
+    }
+
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeInt(this.coordinatorWeight);
+        out.writeUTF(this.name != null ? this.name : "");
+        out.writeUTF(this.id != null ? this.id : "");
+        out.writeUTF(this.hostname != null ? this.hostname : "");
+        out.writeLong(this.startTime);
+        out.writeObject(this.inBoxDestination);
+        out.writeInt(this.coordinatorWeight);
+    }
+    
+    public int hashCode() {
+        return this.id.hashCode();
+    }
+    
+    public boolean equals(Object obj) {
+        boolean result = false;
+        if (obj instanceof Member) {
+            Member other = (Member)obj;
+            result = this.id.equals(other.id);
+        }
+        return result;
+    }
+}

Propchange: activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/Member.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/MemberChangedListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/MemberChangedListener.java?rev=685612&view=auto
==============================================================================
--- activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/MemberChangedListener.java (added)
+++ activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/MemberChangedListener.java Wed Aug 13 10:05:07 2008
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.groupmq;
+
+/**
+ * A listener for membership changes to a group
+ *
+ */
+public interface MemberChangedListener {
+    
+    /**
+     * Notification a member has started
+     * @param member
+     */
+    void memberStarted(Member member);
+    
+    /**
+     * Notification a member has stopped
+     * @param member
+     */
+    void memberStopped(Member member);
+}
\ No newline at end of file

Propchange: activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/MemberChangedListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/RequestCallback.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/RequestCallback.java?rev=685612&view=auto
==============================================================================
--- activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/RequestCallback.java (added)
+++ activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/RequestCallback.java Wed Aug 13 10:05:07 2008
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.groupmq;
+
+
+/**
+ * Return information about map update
+ * 
+ */
+public interface RequestCallback{
+    /**
+     * Optionally called when a request is finished
+     * @param id
+     */
+    void finished(String id);
+}

Propchange: activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/RequestCallback.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/package.html?rev=685612&view=auto
==============================================================================
--- activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/package.html (added)
+++ activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/package.html Wed Aug 13 10:05:07 2008
@@ -0,0 +1,25 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+Shared state, messaging and membership information between members of a distributed  group
+
+</body>
+</html>

Propchange: activemq/sandbox/groupmq/src/main/java/org/apache/groupmq/package.html
------------------------------------------------------------------------------
    svn:executable = 



Mime
View raw message