activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r669545 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/group/ test/java/org/apache/activemq/group/
Date Thu, 19 Jun 2008 16:35:45 GMT
Author: rajdavies
Date: Thu Jun 19 09:35:44 2008
New Revision: 669545

URL: http://svn.apache.org/viewvc?rev=669545&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1812

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/IllegalAccessException.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapChangedListener.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapRequest.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/Member.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MemberChangedListener.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/package.html   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java   (with props)

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java?rev=669545&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java Thu Jun 19 09:35:44 2008
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.group;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * Holds information about an EntryKey
+ * 
+ */
+class EntryKey<K> implements Externalizable {
+    private Member owner;
+    private K key;
+    private boolean share;
+    private boolean removeOnExit;
+
+    /**
+     * Default constructor - for serialization
+     */
+    public EntryKey() {
+    }
+
+    EntryKey(Member owner, K key) {
+        this.owner = owner;
+        this.key = key;
+    }
+
+    public int hashCode() {
+        return this.key != null ? this.key.hashCode() : super.hashCode();
+    }
+
+    /**
+     * @return the owner
+     */
+    public Member getOwner() {
+        return this.owner;
+    }
+
+    /**
+     * @return the key
+     */
+    public K getKey() {
+        return this.key;
+    }
+    
+    /**
+     * @return the share
+     */
+    public boolean isShare() {
+        return this.share;
+    }
+
+    /**
+     * @param share the share to set
+     */
+    public void setShare(boolean share) {
+        this.share = share;
+    }
+
+    /**
+     * @return the removeOnExit
+     */
+    public boolean isRemoveOnExit() {
+        return this.removeOnExit;
+    }
+
+    /**
+     * @param removeOnExit
+     *            the removeOnExit to set
+     */
+    public void setRemoveOnExit(boolean removeOnExit) {
+        this.removeOnExit = removeOnExit;
+    }
+
+    public boolean equals(Object obj) {
+        boolean result = false;
+        if (obj instanceof EntryKey) {
+            EntryKey other = (EntryKey) obj;
+            result = other.key.equals(this.key);
+        }
+        return result;
+    }
+
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(this.owner);
+        out.writeObject(this.key);
+        out.writeBoolean(isShare());
+        out.writeBoolean(isRemoveOnExit());
+    }
+
+    public void readExternal(ObjectInput in) throws IOException,
+            ClassNotFoundException {
+        this.owner = (Member) in.readObject();
+        this.key = (K) in.readObject();
+        this.share = in.readBoolean();
+        this.removeOnExit=in.readBoolean();
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java?rev=669545&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java Thu Jun 19 09:35:44 2008
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.group;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * Used to pass information around
+ *
+ */
+public class EntryMessage implements Externalizable{
+    static enum MessageType{INSERT,DELETE};
+    private EntryKey key;
+    private Object value;
+    private MessageType type;
+    
+    /**
+     * @return the owner
+     */
+    public EntryKey getKey() {
+        return key;
+    }
+    /**
+     * @param owner the owner to set
+     */
+    public void setKey(EntryKey key) {
+        this.key = key;
+    }
+    /**
+     * @return the value
+     */
+    public Object getValue() {
+        return value;
+    }
+    /**
+     * @param value the value to set
+     */
+    public void setValue(Object value) {
+        this.value = value;
+    }
+    
+    /**
+     * @return the type
+     */
+    public MessageType getType() {
+        return type;
+    }
+    /**
+     * @param type the type to set
+     */
+    public void setType(MessageType type) {
+        this.type = type;
+    }
+    
+    
+    
+    public void readExternal(ObjectInput in) throws IOException,
+            ClassNotFoundException {
+        this.key=(EntryKey) in.readObject();
+        this.value=in.readObject();
+        this.type=(MessageType) in.readObject();        
+    }
+    
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(this.key);
+        out.writeObject(this.value);
+        out.writeObject(this.type);
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java?rev=669545&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java Thu Jun 19 09:35:44 2008
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY VIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.group;
+
+
+/**
+ * Holds information about the Value in the Map
+ *
+ */
+class EntryValue<V> {
+    private Member owner;
+    private V value;
+    
+    
+    EntryValue(Member owner, V value){
+        this.owner=owner;
+        this.value=value;
+    }
+    
+    /**
+     * @return the owner
+     */
+    public Member getOwner() {
+        return this.owner;
+    }
+
+    /**
+     * @return the key
+     */
+    public V getValue() {
+        return this.value;
+    }
+    
+    public int hashCode() {
+        return this.value != null ? this.value.hashCode() : super.hashCode();
+    }
+    
+    public boolean equals(Object obj) {
+        boolean result = false;
+        if (obj instanceof EntryValue) {
+            EntryValue other = (EntryValue)obj;
+            result = (this.value==null && other.value==null) ||
+                (this.value != null && other.value != null && this.value.equals(other.value));
+        }
+        return result;
+    }
+}
+

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java?rev=669545&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java Thu Jun 19 09:35:44 2008
@@ -0,0 +1,864 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.group;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.Topic;
+import org.apache.activemq.ActiveMQMessageConsumer;
+import org.apache.activemq.Service;
+import org.apache.activemq.advisory.ConsumerEvent;
+import org.apache.activemq.advisory.ConsumerEventSource;
+import org.apache.activemq.advisory.ConsumerListener;
+import org.apache.activemq.thread.SchedulerTimerTask;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.util.LRUSet;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+
+/**
+ * <P>
+ * A <CODE>GroupMap</CODE> is used to shared state amongst a distributed
+ * group. You can restrict ownership of objects inserted into the map,
+ * by allowing only the map that inserted the objects to update or remove them
+ * <P>
+ * Updates to the group shared map are controlled by a co-ordinator.
+ * The co-ordinator is chosen by the member with the lowest lexicographical id .
+ * <P>The {@link #selectCordinator(Collection<Member> members)} method may be overridden to 
+ * implement a custom mechanism for choosing the co-ordinator
+ * are added to the map.
+ * <P>
+ * @param <K> the key type
+ * @param <V> the value type
+ *
+ */
+public class GroupMap<K, V> implements Map<K, V>,Service{
+    private static final Log LOG = LogFactory.getLog(GroupMap.class);
+    private static final String STATE_TOPIC_PREFIX = GroupMap.class.getName()+".";
+    private static final int HEART_BEAT_INTERVAL = 15000; 
+    private final Object mapMutex = new Object();
+    private Map<EntryKey<K>,EntryValue<V>> localMap;
+    private Map<String,Member> members = new ConcurrentHashMap<String,Member>();
+    private Map<String,MapRequest> requests = new HashMap<String,MapRequest>();
+    private List <MemberChangedListener> membershipListeners = new CopyOnWriteArrayList<MemberChangedListener>();
+    private List <MapChangedListener> mapChangedListeners = new CopyOnWriteArrayList<MapChangedListener>();
+    private LRUSet<Message>mapUpdateReplies = new LRUSet<Message>();
+    private Member local;
+    private Member coordinator;
+    private String groupName;
+    private boolean sharedWrites;
+    private Connection connection;
+    private Session session;
+    private Topic topic;
+    private Topic heartBeatTopic;
+    private Topic inboxTopic;
+    private MessageProducer producer;
+    private ConsumerEventSource consumerEvents;
+    private AtomicBoolean started = new AtomicBoolean();
+    private SchedulerTimerTask heartBeatTask;
+    private SchedulerTimerTask checkMembershipTask;
+    private Timer heartBeatTimer;
+    private int heartBeatInterval = HEART_BEAT_INTERVAL;
+    private IdGenerator requestGenerator = new IdGenerator();
+    private boolean removeOwnedObjectsOnExit;
+    
+    /**
+     * @param connection
+     * @param name
+     */
+    public GroupMap(Connection connection,String name) {
+        this(connection,"default",name);
+    }
+    
+    /**
+     * @param connection
+     * @param groupName
+     * @param name
+     */
+    public GroupMap(Connection connection,String groupName,String name) {
+        this.connection = connection;
+        this.local = new Member(name);
+        this.coordinator=this.local;
+        this.groupName=groupName;
+    }
+    
+    /**
+     * Set the local map implementation to be used
+     * By default its a HashMap - but you could use a Cache for example
+     * @param map
+     */
+    public void setLocalMap(Map map) {
+        synchronized(this.mapMutex) {
+            this.localMap=map;
+        }
+    }
+    
+    /**
+     * Start membership to the group
+     * @throws Exception 
+     * 
+     */
+    public void start() throws Exception {
+        if(this.started.compareAndSet(false, true)) {
+            synchronized(this.mapMutex) {
+                if (this.localMap==null) {
+                    this.localMap= new HashMap<EntryKey<K>, EntryValue<V>>();
+                }
+            }
+            this.connection.start();
+            this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            this.producer = this.session.createProducer(null);
+            this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+            this.inboxTopic = this.session.createTemporaryTopic();
+            String topicName = STATE_TOPIC_PREFIX+this.groupName;
+            this.topic = this.session.createTopic(topicName);
+            this.heartBeatTopic = this.session.createTopic(topicName+".heartbeat");
+            MessageConsumer privateInbox = this.session.createConsumer(this.inboxTopic);
+            privateInbox.setMessageListener(new MessageListener(){
+                public void onMessage(Message message) {
+                    handleResponses(message);
+                }
+            });
+            ActiveMQMessageConsumer mapChangeConsumer = (ActiveMQMessageConsumer) this.session.createConsumer(this.topic);
+            mapChangeConsumer.setMessageListener(new MessageListener(){
+                public void onMessage(Message message) {
+                   handleMapUpdates(message);
+                }
+            });
+            
+            MessageConsumer heartBeatConsumer = this.session.createConsumer(this.heartBeatTopic);
+            heartBeatConsumer.setMessageListener(new MessageListener() {
+                public void onMessage(Message message) {
+                    handleHeartbeats(message);
+                 }
+            });
+           
+            this.consumerEvents = new ConsumerEventSource(this.connection, this.topic);
+            this.consumerEvents.setConsumerListener(new ConsumerListener() {
+                public void onConsumerEvent(ConsumerEvent event) {
+                    handleConsumerEvents(event);
+                }  
+            });
+            this.consumerEvents.start();
+            this.local.setId(mapChangeConsumer.getConsumerId().toString());
+            this.local.setInBoxDestination(this.inboxTopic);
+            sendHeartBeat();
+            this.heartBeatTask = new SchedulerTimerTask (new Runnable() {
+                public void run() {
+                    sendHeartBeat();
+                }
+            });
+            this.checkMembershipTask = new SchedulerTimerTask(new Runnable() {
+                public void run() {
+                    checkMembership();
+                }
+            });
+            this.heartBeatTimer = new Timer("Distributed heart beat",true);
+            this.heartBeatTimer.scheduleAtFixedRate(this.heartBeatTask, getHeartBeatInterval()/3, getHeartBeatInterval()/2);
+            
+        }
+    }
+
+    /**
+     * stop membership to the group
+     * @throws Exception 
+     */
+    public void stop() throws Exception {
+        if (this.started.compareAndSet(true, false)) {
+            this.checkMembershipTask.cancel();
+            this.heartBeatTask.cancel();
+            this.heartBeatTimer.purge();
+            this.consumerEvents.stop();
+            this.session.close();
+        }
+    }
+    
+    /**
+     * @return the partitionName
+     */
+    public String getGroupName() {
+        return this.groupName;
+    }
+    
+    /**
+     * @return the sharedWrites
+     */
+    public boolean isSharedWrites() {
+        return this.sharedWrites;
+    }
+
+    /**
+     * @param sharedWrites the sharedWrites to set
+     */
+    public void setSharedWrites(boolean sharedWrites) {
+        this.sharedWrites = sharedWrites;
+    }
+    
+    /**
+     * @return the heartBeatInterval
+     */
+    public int getHeartBeatInterval() {
+        return this.heartBeatInterval;
+    }
+
+    /**
+     * @param heartBeatInterval the heartBeatInterval to set
+     */
+    public void setHeartBeatInterval(int heartBeatInterval) {
+        this.heartBeatInterval = heartBeatInterval;
+    }
+    
+    
+    /**
+     * @param l
+     */
+    public void addMemberChangedListener(MemberChangedListener l) {
+        this.membershipListeners.add(l);
+    }
+    
+    /**
+     * @param l
+     */
+    public void removeMemberChangedListener(MemberChangedListener l) {
+        this.membershipListeners.remove(l);
+    }
+    
+    /**
+     * @param l
+     */
+    public void addMapChangedListener(MapChangedListener l) {
+        this.mapChangedListeners.add(l);
+    }
+    
+    /**
+     * @param l
+     */
+    public void removeMapChangedListener(MapChangedListener l) {
+        this.mapChangedListeners.remove(l);
+    }
+    
+    /**
+     * @return the removeOwnedObjectsOnExit
+     */
+    public boolean isRemoveOwnedObjectsOnExit() {
+        return removeOwnedObjectsOnExit;
+    }
+
+    /**
+     * Sets the policy for owned objects in the group
+     * If set to true, when this <code>GroupMap<code> stops,
+     * any objects it owns will be removed from the group map
+     * @param removeOwnedObjectsOnExit the removeOwnedObjectsOnExit to set
+     */
+    public void setRemoveOwnedObjectsOnExit(boolean removeOwnedObjectsOnExit) {
+        this.removeOwnedObjectsOnExit = removeOwnedObjectsOnExit;
+    }
+   
+    /**
+     * clear entries from the Map
+     * @throws IllegalStateException 
+     */
+    public void clear() throws IllegalStateException {
+        checkStarted();
+        if(this.localMap != null && !this.localMap.isEmpty()) {
+            Set<EntryKey<K>> keys = null;
+            synchronized(this.mapMutex) {
+                keys = new HashSet<EntryKey<K>>(this.localMap.keySet());
+                this.localMap.clear();
+            }
+            
+            for(EntryKey<K>key:keys) {
+                remove(key);
+            }
+        }        
+    }
+    
+    
+    public boolean containsKey(Object key) {
+        EntryKey stateKey = new EntryKey(this.local,key);
+        synchronized(this.mapMutex) {
+            return this.localMap != null ? this.localMap.containsKey(stateKey):false;
+        }
+    }
+
+    public boolean containsValue(Object value) {
+        EntryValue entryValue = new EntryValue(this.local,value);
+        synchronized(this.mapMutex) {
+            return this.localMap != null ? this.localMap.containsValue(entryValue):false;
+        }
+    }
+
+    public Set<java.util.Map.Entry<K, V>> entrySet() {
+        Map<K,V>result = new HashMap<K,V>();
+        synchronized(this.mapMutex) {
+            if(this.localMap!=null) {
+                for(java.util.Map.Entry<EntryKey<K>,EntryValue<V>>entry:this.localMap.entrySet()) {
+                    result.put(entry.getKey().getKey(),entry.getValue().getValue());
+                }
+            }
+        }
+        return result.entrySet();
+    }
+
+    public V get(Object key) {
+       EntryKey<K> stateKey = new EntryKey<K>(this.local,(K) key);
+       EntryValue<V> value = null;
+       synchronized(this.mapMutex) {
+           value = this.localMap != null ?this.localMap.get(stateKey):null;
+       }
+       return value != null ? value.getValue() : null;
+    }
+
+    public boolean isEmpty() {
+        synchronized(this.mapMutex) {
+            return this.localMap != null ? this.localMap.isEmpty():true;
+        }
+    }
+
+    public Set<K> keySet() {
+        Set <K>result = new HashSet<K>();
+        synchronized(this.mapMutex) {
+            if(this.localMap!=null) {
+                for (EntryKey<K> key:this.localMap.keySet()) {
+                    result.add(key.getKey());
+                }
+            }
+        }
+        return result;
+    }
+    /**
+     * Puts an value into the map associated with the key
+     * @param key 
+     * @param value 
+     * @return the old value or null
+     * @throws IllegalAccessException 
+     * @throws IllegalStateException 
+     * 
+     */
+    public V put(K key, V value) throws IllegalAccessException,IllegalStateException{
+        return put(key,value,isSharedWrites(),isRemoveOwnedObjectsOnExit());
+    }
+    
+    /**
+     * Puts an value into the map associated with the key
+     * @param key 
+     * @param value 
+     * @param sharedWrites 
+     * @param removeOnExit 
+     * @return the old value or null
+     * @throws IllegalAccessException 
+     * @throws IllegalStateException 
+     * 
+     */
+    public V put(K key, V value,boolean sharedWrites,boolean removeOnExit) throws IllegalAccessException, IllegalStateException{
+        checkStarted();
+        EntryKey<K>entryKey = new EntryKey<K>(this.local,key);
+        EntryValue<V>stateValue = new EntryValue<V>(this.local,value);
+        entryKey.setShare(sharedWrites);
+        entryKey.setRemoveOnExit(removeOnExit);
+        EntryMessage entryMsg = new EntryMessage();
+        entryMsg.setKey(entryKey);
+        entryMsg.setValue(value);
+        entryMsg.setType(EntryMessage.MessageType.INSERT);
+        return sendEntryMessage(entryMsg);
+    }
+    
+    /**
+     * Add the Map to the distribution
+     * @param t
+     * @throws IllegalAccessException
+     * @throws IllegalStateException
+     */
+    public void putAll(Map<? extends K, ? extends V> t) throws IllegalAccessException,IllegalStateException {
+        putAll(t,isSharedWrites(),isRemoveOwnedObjectsOnExit());
+    }
+
+    /**
+     * Add the Map to the distribution
+     * @param t
+     * @param sharedWrites
+     * @param removeOnExit
+     * @throws IllegalAccessException
+     * @throws IllegalStateException
+     */
+    public void putAll(Map<? extends K, ? extends V> t,boolean sharedWrites,boolean removeOnExit) throws IllegalAccessException,IllegalStateException {
+        for(java.util.Map.Entry<? extends K, ? extends V>entry:t.entrySet()) {
+            put(entry.getKey(),entry.getValue(),sharedWrites,removeOnExit);
+        }        
+    }
+
+    /**
+     * remove a value from the map associated with the key
+     * @param key 
+     * @return the Value or null
+     * @throws IllegalAccessException 
+     * @throws IllegalStateException 
+     * 
+     */
+    public V remove(Object key) throws IllegalAccessException,IllegalStateException{
+        EntryKey<K> entryKey = new EntryKey<K>(this.local,(K) key);
+        return remove(entryKey);
+    }
+    
+    V remove(EntryKey<K> key) throws IllegalAccessException,IllegalStateException{
+        checkStarted();
+        EntryMessage entryMsg = new EntryMessage();
+        entryMsg.setKey(key);
+        entryMsg.setType(EntryMessage.MessageType.DELETE);
+        return sendEntryMessage(entryMsg);
+    }
+    
+       
+    public int size() {
+        synchronized(this.mapMutex) {
+            return this.localMap != null ? this.localMap.size():0;
+        }
+    }
+
+    public Collection<V> values() {
+        List<V> result = new ArrayList<V>();
+        synchronized(this.mapMutex) {
+            if(this.localMap!=null) {
+                for (EntryValue<V> value:this.localMap.values()) {
+                    result.add(value.getValue());
+                }
+            }
+        }
+        return result;
+    }
+    
+    /**
+     * @return a set of the members 
+     */
+    public Set<Member> members(){
+        Set<Member>result = new HashSet<Member>();
+        result.addAll(this.members.values());
+        return result;
+    }
+    
+    /**
+     * @param key
+     * @return true if this is the owner of the key
+     */
+    public boolean isOwner(K key) {
+        EntryKey<K> stateKey = new EntryKey<K>(this.local,key);
+        EntryValue<V> entryValue = null;
+        synchronized(this.mapMutex) {
+            entryValue = this.localMap != null ? this.localMap.get(stateKey):null;
+        }
+        boolean result = false;
+        if (entryValue != null) {
+            result = entryValue.getOwner().getId().equals(this.local.getId());
+        }
+        return result;
+    }
+    
+    /**
+     * Get the owner of a key
+     * @param key
+     * @return the owner - or null if the key doesn't exist
+     */
+    public Member getOwner(K key) {
+        EntryKey<K> stateKey = new EntryKey<K>(this.local,key);
+        EntryValue<V> entryValue = null;
+        synchronized(this.mapMutex) {
+            entryValue = this.localMap != null ? this.localMap.get(stateKey):null;
+        }
+        return entryValue != null ? entryValue.getOwner():null;
+    }
+    
+    /**
+     * @return true if the coordinator for the map
+     */
+    public boolean isCoordinator() {
+        return this.local.equals(this.coordinator);
+    }
+    
+    /**
+     * Select a coordinator - by default, its the member with 
+     * the lowest lexicographical id 
+     * @param members
+     * @return
+     */
+    protected Member selectCordinator(Collection<Member>members) {
+        Member result = this.local;
+        for (Member member:members) {
+            if (result.getId().compareTo(member.getId()) < 0) {
+                result = member;
+            }
+        }
+        return result;   
+    }
+    
+    V sendEntryMessage(EntryMessage entry) {
+        Object result = null;
+        MapRequest request = new MapRequest();
+        String id = this.requestGenerator.generateId();
+        synchronized(this.requests) {
+            this.requests.put(id,request);
+        }
+        try {
+            ObjectMessage objMsg = this.session.createObjectMessage(entry);
+            objMsg.setJMSReplyTo(this.inboxTopic);
+            objMsg.setJMSCorrelationID(id);
+            this.producer.send(this.topic, objMsg);
+            result = request.get(getHeartBeatInterval()*2);
+        }catch(JMSException e) {
+            if(this.started.get()) {
+                LOG.error("Failed to send EntryMessage " + entry,e);
+            }
+        }
+        if (result instanceof IllegalAccessException) {
+            throw (IllegalAccessException)result;
+        }
+        return (V) result;
+    }
+    
+    void handleResponses(Message message) {
+        if (message instanceof ObjectMessage) {
+            ObjectMessage objMsg = (ObjectMessage) message;
+            try {
+                Object payload = objMsg.getObject();
+                if (payload instanceof Member) {
+                    handleHeartbeats((Member)payload);
+                } else if(payload instanceof EntryMessage) {
+                    EntryMessage entryMsg = (EntryMessage) payload;
+                    EntryKey<K>key=entryMsg.getKey();
+                    EntryValue<V> value = new EntryValue<V>(key.getOwner(),(V) entryMsg.getValue());
+                    
+                    if(this.localMap !=null) {
+                        boolean fireUpdate = false;
+                        synchronized(this.mapMutex) {
+                            if(!this.localMap.containsKey(key)) {
+                                this.localMap.put(key,value);
+                                fireUpdate=true;
+                            }
+                        }
+                        if(fireUpdate) {
+                            fireMapChanged(key.getOwner(), key.getKey(), null, value.getValue());
+                        }
+                    }
+                   
+                    
+                }else {
+                    String id = objMsg.getJMSCorrelationID();
+                    MapRequest result = null;
+                    synchronized (this.requests) {
+                        result = this.requests.remove(id);
+                    }
+                    if (result != null) {
+                        result.put(objMsg.getObject());
+                    }
+                }
+            } catch (JMSException e) {
+                LOG.warn("Failed to process reply: " + message, e);
+            }
+        }
+    }
+    
+    void handleMapUpdates(Message message) {
+        Object reply = null;
+        if (message instanceof ObjectMessage) {
+            try {
+                ObjectMessage objMsg = (ObjectMessage) message;
+                EntryMessage entryMsg = (EntryMessage) objMsg.getObject();
+                EntryKey<K> key = entryMsg.getKey();
+                EntryValue<V> value = new EntryValue<V>(key.getOwner(),(V) entryMsg.getValue());
+                boolean containsKey=false;
+                boolean mapExists = false;
+                synchronized(this.mapMutex) {
+                    mapExists = this.localMap!=null;
+                    if(mapExists) {
+                        containsKey=this.localMap.containsKey(key);
+                    }
+                }
+                if(mapExists) {
+                    if (containsKey) {
+                        Member owner = getOwner((K) key.getKey());
+                        if (owner.equals(key.getOwner()) && !key.isShare()) {
+                            EntryValue<V> old = null;
+                            if (entryMsg.getType().equals(EntryMessage.MessageType.INSERT)) {
+                                synchronized(this.mapMutex) {
+                                    old = this.localMap.put(key, value);     
+                                }
+                            }else {
+                                synchronized(this.mapMutex) {
+                                    old = this.localMap.remove(key);
+                                }
+                            }
+                            fireMapChanged(owner, key.getKey(), old.getValue(), value.getValue());
+                        }else {
+                            reply = new IllegalAccessException("Owned by "+ owner);
+                        }
+                    }else {
+                        if (entryMsg.getType().equals(EntryMessage.MessageType.INSERT)) {
+                            synchronized(this.mapMutex) {
+                                this.localMap.put(key, value);
+                            }
+                            fireMapChanged(key.getOwner(), key.getKey(), null, value.getValue());
+                        }
+                    }
+                }
+            } catch (JMSException e) {
+                LOG.warn("Failed to process map update",e);
+                reply = e;
+            }
+           
+            try {
+                Destination replyTo = message.getJMSReplyTo();
+                String correlationId = message.getJMSCorrelationID();
+                ObjectMessage replyMsg = this.session
+                        .createObjectMessage((Serializable) reply);
+                replyMsg.setJMSCorrelationID(correlationId);
+                // reuse timestamp - this will be cleared by the producer on
+                // send
+                replyMsg.setJMSTimestamp(System.currentTimeMillis());
+                if (isCoordinator()) {
+                    this.producer.send(replyTo, replyMsg);
+                }else {
+                    synchronized(mapUpdateReplies) {
+                        this.mapUpdateReplies.add(replyMsg);
+                    }
+                }
+            } catch (JMSException e) {
+                if(this.started.get()) {
+                    LOG.error("Failed to send response to a map update ", e);
+                }
+            }
+            
+            
+        }else {
+            LOG.warn("Unexpected map update message " + message);
+        }
+    }
+    
+    void handleHeartbeats(Message message) {
+        try {
+            if (message instanceof ObjectMessage) {
+                ObjectMessage objMsg = (ObjectMessage) message;
+                Member member = (Member) objMsg.getObject();
+                handleHeartbeats(member);
+            } else {
+                LOG.warn("Unexpected message: " + message);
+            }
+        } catch (JMSException e) {
+            LOG.warn("Failed to handle heart beat", e);
+        }
+    }
+
+    void handleHeartbeats(Member member) {
+        member.setTimeStamp(System.currentTimeMillis());
+        if (this.members.put(member.getId(), member) == null) {
+            fireMemberStarted(member);
+            if (!member.equals(this.local)) {
+                //send the new member our details
+                sendHeartBeat(member.getInBoxDestination());
+                if(isCoordinator()) {
+                    updateNewMemberMap(member);
+                }
+            }
+        }
+    }
+    
+    void handleConsumerEvents(ConsumerEvent event) {
+        if (!event.isStarted()) {
+            Member member = this.members.remove(event.getConsumerId().toString());
+            if(member!=null) {
+                fireMemberStopped(member);
+                doElection();
+            }
+        }
+    }
+    
+    void checkMembership() {
+        if (this.started.get()) {
+            long checkTime = System.currentTimeMillis()-getHeartBeatInterval();
+            boolean doElection = false;
+            for (Member member : this.members.values()) {
+                if (member.getTimeStamp()<checkTime) {
+                    LOG.info("Member timestamp expired " + member);
+                    this.members.remove(member.getId());
+                    fireMemberStopped(member);
+                    doElection=true;
+                    
+                }
+            }
+            if (doElection) {
+                doElection();
+            }
+        }
+        //clear down cached reply messages
+        long checkTime = System.currentTimeMillis()-(getHeartBeatInterval()*2);
+        List<Message> tmpList = new ArrayList<Message>();
+        synchronized(this.mapUpdateReplies) {
+            try {
+                for(Message msg:this.mapUpdateReplies) {
+                    if (msg.getJMSTimestamp() < checkTime) {
+                        tmpList.add(msg);
+                    }
+                }
+                for(Message msg:tmpList) {
+                    this.mapUpdateReplies.remove(msg);
+                }
+            }catch(JMSException e) {
+                LOG.warn("Failed to clear down mapUpdateReplies",e);
+            }
+        }
+    }
+    
+    void sendHeartBeat() {
+        sendHeartBeat(this.heartBeatTopic);
+    }
+
+    void sendHeartBeat(Destination destination) {
+        if (this.started.get()) {
+            try {
+                ObjectMessage msg = this.session
+                        .createObjectMessage(this.local);
+                this.producer.send(destination, msg);
+            } catch (Throwable e) {
+                if(this.started.get()) {
+                    LOG.warn("Failed to send heart beat", e);
+                }
+            }
+        }
+    }
+    
+    void updateNewMemberMap(Member member) {
+        List<Map.Entry<EntryKey<K>, EntryValue<V>>> list = new ArrayList<Map.Entry<EntryKey<K>, EntryValue<V>>>();
+        synchronized (this.mapMutex) {
+            if (this.localMap != null) {
+                for (Map.Entry<EntryKey<K>, EntryValue<V>> entry : this.localMap
+                        .entrySet()) {
+                    list.add(entry);
+                }
+            }
+        }
+        try {
+            for (Map.Entry<EntryKey<K>, EntryValue<V>> entry : list) {
+                EntryMessage entryMsg = new EntryMessage();
+                entryMsg.setKey(entry.getKey());
+                entryMsg.setValue(entry.getValue().getValue());
+                entryMsg.setType(EntryMessage.MessageType.INSERT);
+                ObjectMessage objMsg = this.session
+                        .createObjectMessage(entryMsg);
+                if(!member.equals(entry.getKey().getOwner())) {
+                    this.producer.send(member.getInBoxDestination(), objMsg);
+                }
+            }
+        } catch (JMSException e) {
+            if (started.get()) {
+                LOG.warn("Failed to update new member ", e);
+            }
+        }
+    }
+    
+    void fireMemberStarted(Member member) {
+        LOG.info(this.local.getName()+" Member started " + member);
+        for (MemberChangedListener l : this.membershipListeners) {
+            l.memberStarted(member);
+        }
+    }
+    
+    void fireMemberStopped(Member member) {
+        LOG.info(this.local.getName()+" Member stopped " + member);
+        for (MemberChangedListener l : this.membershipListeners) {
+            l.memberStopped(member);
+        }
+        //remove all entries owned by the stopped member
+        List<EntryKey<K>>tmpList = new ArrayList<EntryKey<K>>();
+        boolean mapExists = false;
+        synchronized(this.mapMutex) {
+            mapExists=this.localMap!=null;
+            if(mapExists) {
+                for (EntryKey<K> entryKey:this.localMap.keySet()) {
+                    if (entryKey.getOwner().equals(member)) {
+                        if (entryKey.isRemoveOnExit()) {
+                            tmpList.add(entryKey);
+                        }
+                    }
+                }
+            }
+        }
+        if(mapExists) {
+            for (EntryKey<K> entryKey:tmpList) {
+                EntryValue<V> value = null;
+                synchronized(this.mapMutex) {
+                    value = this.localMap.remove(entryKey);
+                }
+                fireMapChanged(member, entryKey.getKey(), value.getValue(), null);
+            }
+        }
+    }
+    
+    void fireMapChanged(Member owner,Object key, Object oldValue, Object newValue) {
+        for (MapChangedListener l:this.mapChangedListeners) {
+            l.mapChanged(owner, key, oldValue, newValue);
+        }
+    }
+    
+    void doElection() {
+        this.coordinator=selectCordinator(this.members.values());
+        if (isCoordinator() && this.started.get()) {
+            //send any inflight requests
+            List<Message>list = new ArrayList<Message>();
+            synchronized(this.mapUpdateReplies) {
+                list.addAll(this.mapUpdateReplies);
+                this.mapUpdateReplies.clear();
+            }
+            try {
+            for(Message msg:list) {
+                if (this.started.get()) {
+                    this.producer.send(msg.getJMSReplyTo(), msg);
+                }
+            }
+            }catch(JMSException e) {
+                if(this.started.get()) {
+                    LOG.error("Failed to resend replies",e);
+                }
+            }
+        }
+    } 
+    
+    void checkStarted() throws IllegalStateException{
+        if (!started.get()) {
+            throw new IllegalStateException("GroupMap " + this.local.getName() + " not started");
+        }
+    }   
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/IllegalAccessException.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/IllegalAccessException.java?rev=669545&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/IllegalAccessException.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/IllegalAccessException.java Thu Jun 19 09:35:44 2008
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.group;
+
+/**
+ * thrown when updating a key to map that the local client doesn't own
+ *
+ */
+public class IllegalAccessException extends java.lang.IllegalStateException {
+    private static final long serialVersionUID = -7584658017201604560L;
+    
+    /**
+     * @param message
+     */
+    public IllegalAccessException(String message) {
+        super(message);
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/IllegalAccessException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapChangedListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapChangedListener.java?rev=669545&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapChangedListener.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapChangedListener.java Thu Jun 19 09:35:44 2008
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.group;
+
+/**
+ *Get notifications about changes to the state of the map
+ *
+ */
+public interface MapChangedListener {
+    
+    /**
+     * Called when the map has changed
+     * @param owner 
+     * @param key
+     * @param oldValue
+     * @param newValue
+     */
+    void mapChanged(Member owner,Object key, Object oldValue, Object newValue);
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapChangedListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapRequest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapRequest.java?rev=669545&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapRequest.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapRequest.java Thu Jun 19 09:35:44 2008
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.group;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Return information about map update
+ * 
+ */
+public class MapRequest {
+    private final AtomicBoolean done = new AtomicBoolean();
+    private Object response;
+
+    Object get(int timeout) {
+        synchronized (done) {
+            if (done.get() == false && response == null) {
+                try {
+                    done.wait(timeout);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+        return this.response;
+    }
+
+    void put(Object response) {
+        this.response = response;
+        cancel();
+    }
+
+    void cancel() {
+        done.set(true);
+        synchronized (done) {
+            done.notifyAll();
+        }
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapRequest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/Member.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/Member.java?rev=669545&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/Member.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/Member.java Thu Jun 19 09:35:44 2008
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.group;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import javax.jms.Destination;
+import org.apache.activemq.util.IdGenerator;
+
+/**
+ *<P>
+ * A <CODE>Member</CODE> holds information about a member of the group
+ * 
+ */
+public class Member implements Externalizable {
+    private String name;
+    private String id;
+    private String hostname;
+    private long timeStamp;
+    private long startTime;
+    private Destination inBoxDestination;
+    
+
+    /**
+     * Default constructor - only used by serialization
+     */
+    public Member() {    
+    }
+    /**
+     * @param name
+     */
+    public Member(String name) {
+        this.name = name;
+        this.hostname = IdGenerator.getHostName();
+        this.startTime=System.currentTimeMillis();
+    }
+
+    /**
+     * @return the name
+     */
+    public String getName() {
+        return this.name;
+    }
+
+    /**
+     * @return the id
+     */
+    public String getId() {
+        return this.id;
+    }
+    
+    void setId(String id) {
+        this.id=id;
+    }
+    
+    /**
+     * @return the hostname
+     */
+    public String getHostname() {
+        return this.hostname;
+    }
+    
+    /**
+     * @return the startTime
+     */
+    public long getStartTime() {
+        return this.startTime;
+    }
+    
+    /**
+     * @return the inbox destination
+     */
+    public Destination getInBoxDestination() {
+        return this.inBoxDestination;
+    }
+    
+    void setInBoxDestination(Destination dest) {
+        this.inBoxDestination=dest;
+    }
+    
+    public String toString() {
+        return this.name+"["+this.id+"]@"+this.hostname;
+    }
+    
+     
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        this.name = in.readUTF();
+        this.id = in.readUTF();
+        this.hostname = in.readUTF();
+        this.startTime=in.readLong();
+        this.inBoxDestination=(Destination) in.readObject();
+    }
+
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeUTF(this.name != null ? this.name : "");
+        out.writeUTF(this.id != null ? this.id : "");
+        out.writeUTF(this.hostname != null ? this.hostname : "");
+        out.writeLong(this.startTime);
+        out.writeObject(this.inBoxDestination);
+    }
+    
+    public int hashCode() {
+        return this.id.hashCode();
+    }
+    
+    public boolean equals(Object obj) {
+        boolean result = false;
+        if (obj instanceof Member) {
+            Member other = (Member)obj;
+            result = this.id.equals(other.id);
+        }
+        return result;
+    }
+
+    /**
+     * @return the timeStamp
+     */
+    long getTimeStamp() {
+        return this.timeStamp;
+    }
+
+    /**
+     * @param timeStamp the timeStamp to set
+     */
+    void setTimeStamp(long timeStamp) {
+        this.timeStamp = timeStamp;
+    }
+    
+    
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/Member.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MemberChangedListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MemberChangedListener.java?rev=669545&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MemberChangedListener.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MemberChangedListener.java Thu Jun 19 09:35:44 2008
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.group;
+
+/**
+ * @author rajdavies
+ *
+ */
+public interface MemberChangedListener {
+    
+    /**
+     * Notification a member has started
+     * @param member
+     */
+    void memberStarted(Member member);
+    
+    /**
+     * Notification a member has stopped
+     * @param member
+     */
+    void memberStopped(Member member);
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MemberChangedListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/package.html
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/package.html?rev=669545&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/package.html (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/package.html Thu Jun 19 09:35:44 2008
@@ -0,0 +1,25 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+Shared state and membership information between members of a remote group
+
+</body>
+</html>

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/package.html
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java?rev=669545&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java Thu Jun 19 09:35:44 2008
@@ -0,0 +1,373 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.group;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+
+
+public class GroupMapTest extends TestCase {
+    protected BrokerService broker;
+    protected Connection connection;
+    protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
+
+    /**
+     * Test method for
+     * {@link org.apache.activemq.group.GroupMap#addMemberChangedListener(org.apache.activemq.group.MemberChangedListener)}.
+     * @throws Exception 
+     */
+    public void testAddMemberChangedListener() throws Exception {
+        final AtomicInteger counter = new AtomicInteger();
+        GroupMap map1 = new GroupMap(connection,"map1");
+        map1.addMemberChangedListener(new MemberChangedListener(){
+
+            public void memberStarted(Member member) {
+                synchronized(counter) {
+                    counter.incrementAndGet();
+                    counter.notifyAll();
+                }
+                
+            }
+
+            public void memberStopped(Member member) {
+                synchronized(counter) {
+                    counter.decrementAndGet();
+                    counter.notifyAll();
+                }
+            }
+            
+        });
+        map1.start();
+        synchronized(counter) {
+            if (counter.get()<1) {
+                counter.wait(5000);
+            }
+        }
+        assertEquals(1, counter.get());
+        ConnectionFactory factory = createConnectionFactory();
+        Connection connection2 = factory.createConnection();
+        connection2.start();
+        GroupMap map2 = new GroupMap(connection,"map2");
+        map2.start();
+        synchronized(counter) {
+            if (counter.get()<2) {
+                counter.wait(5000);
+            }
+        }
+        assertEquals(2, counter.get());
+        map2.stop();
+        synchronized(counter) {
+            if (counter.get()>=2) {
+                counter.wait(5000);
+            }
+        }
+        assertEquals(1, counter.get());
+        map1.stop();
+        connection2.close();
+    }
+
+    /**
+     * Test method for
+     * {@link org.apache.activemq.group.GroupMap#addMapChangedListener(org.apache.activemq.group.MapChangedListener)}.
+     * @throws Exception 
+     */
+    public void testAddMapChangedListener() throws Exception {
+        GroupMap map = new GroupMap(connection,"map");
+        final AtomicBoolean called = new AtomicBoolean();
+        map.addMapChangedListener(new MapChangedListener(){   
+            public void mapChanged(Member owner, Object key, Object oldValue,
+                    Object newValue) {       
+                synchronized(called) {
+                    called.set(true);
+                    called.notifyAll();
+                }
+            }
+            
+        });
+        map.start();
+        map.put("test", "blob");
+        synchronized(called) {
+            if (!called.get()) {
+               called.wait(5000); 
+            }
+        }
+        assertTrue(called.get());
+        map.stop();
+    }
+
+    /**
+     * Test method for {@link org.apache.activemq.group.GroupMap#clear()}.
+     * @throws Exception 
+     */
+    public void testClear() throws Exception {
+        GroupMap map1 = new GroupMap(connection,"map1");
+        final AtomicBoolean called = new AtomicBoolean();
+        map1.addMapChangedListener(new MapChangedListener(){   
+            public void mapChanged(Member owner, Object key, Object oldValue,
+                    Object newValue) {       
+                synchronized(called) {
+                    called.set(true);
+                    called.notifyAll();
+                }
+            }
+            
+        });
+        map1.start();
+        GroupMap map2 = new GroupMap(connection,"map2");
+        map2.start();
+        map2.put("test","foo");
+        synchronized(called) {
+            if (!called.get()) {
+               called.wait(5000); 
+            }
+        }
+        assertTrue(called.get());
+        called.set(false);
+        assertTrue(map1.isEmpty()==false);
+        map2.clear();
+        synchronized(called) {
+            if (!called.get()) {
+               called.wait(5000); 
+            }
+        }
+        assertTrue(map1.isEmpty());
+        map1.stop();
+        map2.stop();
+    }
+
+    /**
+     * Test a new map is populated for existing values
+     */
+    public void testMapUpdatedOnStart() throws Exception {
+        GroupMap map1 = new GroupMap(connection,"map1");
+        final AtomicBoolean called = new AtomicBoolean();
+        
+        map1.start();
+        map1.put("test", "foo");
+        GroupMap map2 = new GroupMap(connection,"map2");
+        map2.addMapChangedListener(new MapChangedListener(){   
+            public void mapChanged(Member owner, Object key, Object oldValue,
+                    Object newValue) {       
+                synchronized(called) {
+                    called.set(true);
+                    called.notifyAll();
+                }
+            }   
+        });
+        map2.start();
+       
+        synchronized(called) {
+            if (!called.get()) {
+               called.wait(5000); 
+            }
+        }
+        assertTrue(called.get());
+        called.set(false);
+        assertTrue(map2.containsKey("test"));
+        assertTrue(map2.containsValue("foo"));
+        map1.stop();
+        map2.stop();
+    }
+    
+    public void testContainsKey() throws Exception {
+        GroupMap map1 = new GroupMap(connection,"map1");
+        final AtomicBoolean called = new AtomicBoolean();
+        map1.addMapChangedListener(new MapChangedListener(){   
+            public void mapChanged(Member owner, Object key, Object oldValue,
+                    Object newValue) {       
+                synchronized(called) {
+                    called.set(true);
+                    called.notifyAll();
+                }
+            }
+            
+        });
+        map1.start();
+        GroupMap map2 = new GroupMap(connection,"map2");
+        map2.start();
+        map2.put("test","foo");
+        synchronized(called) {
+            if (!called.get()) {
+               called.wait(5000); 
+            }
+        }
+        assertTrue(called.get());
+        called.set(false);
+        assertTrue(map1.containsKey("test"));
+        map1.stop();
+        map2.stop();
+    }
+
+
+    /**
+     * Test method for
+     * {@link org.apache.activemq.group.GroupMap#containsValue(java.lang.Object)}.
+     * @throws Exception 
+     */
+    public void testContainsValue() throws Exception {
+        GroupMap map1 = new GroupMap(connection,"map1");
+        final AtomicBoolean called = new AtomicBoolean();
+        map1.addMapChangedListener(new MapChangedListener(){   
+            public void mapChanged(Member owner, Object key, Object oldValue,
+                    Object newValue) {       
+                synchronized(called) {
+                    called.set(true);
+                    called.notifyAll();
+                }
+            }
+            
+        });
+        map1.start();
+        GroupMap map2 = new GroupMap(connection,"map2");
+        map2.start();
+        map2.put("test","foo");
+        synchronized(called) {
+            if (!called.get()) {
+               called.wait(5000); 
+            }
+        }
+        assertTrue(called.get());
+        called.set(false);
+        assertTrue(map1.containsValue("foo"));
+        map1.stop();
+        map2.stop();
+    }
+
+    /**
+     * Test method for {@link org.apache.activemq.group.GroupMap#entrySet()}.
+     * @throws Exception 
+     */
+    
+
+    /**
+     * Test method for
+     * {@link org.apache.activemq.group.GroupMap#get(java.lang.Object)}.
+     * @throws Exception 
+     */
+    public void testGet() throws Exception {
+        GroupMap map1 = new GroupMap(connection,"map1");
+        final AtomicBoolean called = new AtomicBoolean();
+        map1.addMapChangedListener(new MapChangedListener(){   
+            public void mapChanged(Member owner, Object key, Object oldValue,
+                    Object newValue) {       
+                synchronized(called) {
+                    called.set(true);
+                    called.notifyAll();
+                }
+            }
+            
+        });
+        map1.start();
+        GroupMap map2 = new GroupMap(connection,"map2");
+        map2.start();
+        map2.put("test","foo");
+        synchronized(called) {
+            if (!called.get()) {
+               called.wait(5000); 
+            }
+        }
+        assertTrue(called.get());
+        assertTrue(map1.get("test").equals("foo"));
+        map1.stop();
+        map2.stop();
+    }
+
+    
+    
+    /**
+     * Test method for
+     * {@link org.apache.activemq.group.GroupMap#remove(java.lang.Object)}.
+     */
+    public void testRemove() throws Exception{
+        GroupMap map1 = new GroupMap(connection,"map1");
+        final AtomicBoolean called = new AtomicBoolean();
+        map1.addMapChangedListener(new MapChangedListener(){   
+            public void mapChanged(Member owner, Object key, Object oldValue,
+                    Object newValue) {       
+                synchronized(called) {
+                    called.set(true);
+                    called.notifyAll();
+                }
+            }
+            
+        });
+        map1.start();
+        GroupMap map2 = new GroupMap(connection,"map2");
+        map2.start();
+        map2.put("test","foo");
+        synchronized(called) {
+            if (!called.get()) {
+               called.wait(5000); 
+            }
+        }
+        assertTrue(called.get());
+        called.set(false);
+        assertTrue(map1.isEmpty()==false);
+        map2.remove("test");
+        synchronized(called) {
+            if (!called.get()) {
+               called.wait(5000); 
+            }
+        }
+        assertTrue(map1.isEmpty());
+        map1.stop();
+        map2.stop();
+    }
+
+    protected void setUp() throws Exception {
+        if (broker == null) {
+            broker = createBroker();
+        }
+        ConnectionFactory factory = createConnectionFactory();
+        connection = factory.createConnection();
+        connection.start();
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        connection.close();
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory()throws Exception {
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(
+                ActiveMQConnection.DEFAULT_BROKER_URL);
+        return cf;
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        configureBroker(answer);
+        answer.start();
+        return answer;
+    }
+
+    protected void configureBroker(BrokerService answer) throws Exception {
+        answer.setPersistent(false);
+        answer.addConnector(bindAddress);
+        answer.setDeleteAllMessagesOnStartup(true);
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message