Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 24177 invoked from network); 19 Jun 2008 16:36:07 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 19 Jun 2008 16:36:07 -0000 Received: (qmail 38498 invoked by uid 500); 19 Jun 2008 16:36:09 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 38473 invoked by uid 500); 19 Jun 2008 16:36:09 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 38464 invoked by uid 99); 19 Jun 2008 16:36:09 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Jun 2008 09:36:09 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Jun 2008 16:35:26 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 9A5DA23889C2; Thu, 19 Jun 2008 09:35:45 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r669545 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/group/ test/java/org/apache/activemq/group/ Date: Thu, 19 Jun 2008 16:35:45 -0000 To: commits@activemq.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080619163545.9A5DA23889C2@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: rajdavies Date: Thu Jun 19 09:35:44 2008 New Revision: 669545 URL: http://svn.apache.org/viewvc?rev=669545&view=rev Log: Fix for https://issues.apache.org/activemq/browse/AMQ-1812 Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java (with props) activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java (with props) activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java (with props) activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java (with props) activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/IllegalAccessException.java (with props) activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapChangedListener.java (with props) activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapRequest.java (with props) activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/Member.java (with props) activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MemberChangedListener.java (with props) activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/package.html (with props) activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java (with props) Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java?rev=669545&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java Thu Jun 19 09:35:44 2008 @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.group; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; + +/** + * Holds information about an EntryKey + * + */ +class EntryKey implements Externalizable { + private Member owner; + private K key; + private boolean share; + private boolean removeOnExit; + + /** + * Default constructor - for serialization + */ + public EntryKey() { + } + + EntryKey(Member owner, K key) { + this.owner = owner; + this.key = key; + } + + public int hashCode() { + return this.key != null ? this.key.hashCode() : super.hashCode(); + } + + /** + * @return the owner + */ + public Member getOwner() { + return this.owner; + } + + /** + * @return the key + */ + public K getKey() { + return this.key; + } + + /** + * @return the share + */ + public boolean isShare() { + return this.share; + } + + /** + * @param share the share to set + */ + public void setShare(boolean share) { + this.share = share; + } + + /** + * @return the removeOnExit + */ + public boolean isRemoveOnExit() { + return this.removeOnExit; + } + + /** + * @param removeOnExit + * the removeOnExit to set + */ + public void setRemoveOnExit(boolean removeOnExit) { + this.removeOnExit = removeOnExit; + } + + public boolean equals(Object obj) { + boolean result = false; + if (obj instanceof EntryKey) { + EntryKey other = (EntryKey) obj; + result = other.key.equals(this.key); + } + return result; + } + + public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(this.owner); + out.writeObject(this.key); + out.writeBoolean(isShare()); + out.writeBoolean(isRemoveOnExit()); + } + + public void readExternal(ObjectInput in) throws IOException, + ClassNotFoundException { + this.owner = (Member) in.readObject(); + this.key = (K) in.readObject(); + this.share = in.readBoolean(); + this.removeOnExit=in.readBoolean(); + } +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java?rev=669545&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java Thu Jun 19 09:35:44 2008 @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.group; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; + +/** + * Used to pass information around + * + */ +public class EntryMessage implements Externalizable{ + static enum MessageType{INSERT,DELETE}; + private EntryKey key; + private Object value; + private MessageType type; + + /** + * @return the owner + */ + public EntryKey getKey() { + return key; + } + /** + * @param owner the owner to set + */ + public void setKey(EntryKey key) { + this.key = key; + } + /** + * @return the value + */ + public Object getValue() { + return value; + } + /** + * @param value the value to set + */ + public void setValue(Object value) { + this.value = value; + } + + /** + * @return the type + */ + public MessageType getType() { + return type; + } + /** + * @param type the type to set + */ + public void setType(MessageType type) { + this.type = type; + } + + + + public void readExternal(ObjectInput in) throws IOException, + ClassNotFoundException { + this.key=(EntryKey) in.readObject(); + this.value=in.readObject(); + this.type=(MessageType) in.readObject(); + } + + public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(this.key); + out.writeObject(this.value); + out.writeObject(this.type); + } +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java?rev=669545&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java Thu Jun 19 09:35:44 2008 @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY VIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.group; + + +/** + * Holds information about the Value in the Map + * + */ +class EntryValue { + private Member owner; + private V value; + + + EntryValue(Member owner, V value){ + this.owner=owner; + this.value=value; + } + + /** + * @return the owner + */ + public Member getOwner() { + return this.owner; + } + + /** + * @return the key + */ + public V getValue() { + return this.value; + } + + public int hashCode() { + return this.value != null ? this.value.hashCode() : super.hashCode(); + } + + public boolean equals(Object obj) { + boolean result = false; + if (obj instanceof EntryValue) { + EntryValue other = (EntryValue)obj; + result = (this.value==null && other.value==null) || + (this.value != null && other.value != null && this.value.equals(other.value)); + } + return result; + } +} + Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java?rev=669545&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java Thu Jun 19 09:35:44 2008 @@ -0,0 +1,864 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.group; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Timer; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Session; +import javax.jms.Topic; +import org.apache.activemq.ActiveMQMessageConsumer; +import org.apache.activemq.Service; +import org.apache.activemq.advisory.ConsumerEvent; +import org.apache.activemq.advisory.ConsumerEventSource; +import org.apache.activemq.advisory.ConsumerListener; +import org.apache.activemq.thread.SchedulerTimerTask; +import org.apache.activemq.util.IdGenerator; +import org.apache.activemq.util.LRUSet; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + + + +/** + *

+ * A GroupMap is used to shared state amongst a distributed + * group. You can restrict ownership of objects inserted into the map, + * by allowing only the map that inserted the objects to update or remove them + *

+ * Updates to the group shared map are controlled by a co-ordinator. + * The co-ordinator is chosen by the member with the lowest lexicographical id . + *

The {@link #selectCordinator(Collection members)} method may be overridden to + * implement a custom mechanism for choosing the co-ordinator + * are added to the map. + *

+ * @param the key type + * @param the value type + * + */ +public class GroupMap implements Map,Service{ + private static final Log LOG = LogFactory.getLog(GroupMap.class); + private static final String STATE_TOPIC_PREFIX = GroupMap.class.getName()+"."; + private static final int HEART_BEAT_INTERVAL = 15000; + private final Object mapMutex = new Object(); + private Map,EntryValue> localMap; + private Map members = new ConcurrentHashMap(); + private Map requests = new HashMap(); + private List membershipListeners = new CopyOnWriteArrayList(); + private List mapChangedListeners = new CopyOnWriteArrayList(); + private LRUSetmapUpdateReplies = new LRUSet(); + private Member local; + private Member coordinator; + private String groupName; + private boolean sharedWrites; + private Connection connection; + private Session session; + private Topic topic; + private Topic heartBeatTopic; + private Topic inboxTopic; + private MessageProducer producer; + private ConsumerEventSource consumerEvents; + private AtomicBoolean started = new AtomicBoolean(); + private SchedulerTimerTask heartBeatTask; + private SchedulerTimerTask checkMembershipTask; + private Timer heartBeatTimer; + private int heartBeatInterval = HEART_BEAT_INTERVAL; + private IdGenerator requestGenerator = new IdGenerator(); + private boolean removeOwnedObjectsOnExit; + + /** + * @param connection + * @param name + */ + public GroupMap(Connection connection,String name) { + this(connection,"default",name); + } + + /** + * @param connection + * @param groupName + * @param name + */ + public GroupMap(Connection connection,String groupName,String name) { + this.connection = connection; + this.local = new Member(name); + this.coordinator=this.local; + this.groupName=groupName; + } + + /** + * Set the local map implementation to be used + * By default its a HashMap - but you could use a Cache for example + * @param map + */ + public void setLocalMap(Map map) { + synchronized(this.mapMutex) { + this.localMap=map; + } + } + + /** + * Start membership to the group + * @throws Exception + * + */ + public void start() throws Exception { + if(this.started.compareAndSet(false, true)) { + synchronized(this.mapMutex) { + if (this.localMap==null) { + this.localMap= new HashMap, EntryValue>(); + } + } + this.connection.start(); + this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + this.producer = this.session.createProducer(null); + this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + this.inboxTopic = this.session.createTemporaryTopic(); + String topicName = STATE_TOPIC_PREFIX+this.groupName; + this.topic = this.session.createTopic(topicName); + this.heartBeatTopic = this.session.createTopic(topicName+".heartbeat"); + MessageConsumer privateInbox = this.session.createConsumer(this.inboxTopic); + privateInbox.setMessageListener(new MessageListener(){ + public void onMessage(Message message) { + handleResponses(message); + } + }); + ActiveMQMessageConsumer mapChangeConsumer = (ActiveMQMessageConsumer) this.session.createConsumer(this.topic); + mapChangeConsumer.setMessageListener(new MessageListener(){ + public void onMessage(Message message) { + handleMapUpdates(message); + } + }); + + MessageConsumer heartBeatConsumer = this.session.createConsumer(this.heartBeatTopic); + heartBeatConsumer.setMessageListener(new MessageListener() { + public void onMessage(Message message) { + handleHeartbeats(message); + } + }); + + this.consumerEvents = new ConsumerEventSource(this.connection, this.topic); + this.consumerEvents.setConsumerListener(new ConsumerListener() { + public void onConsumerEvent(ConsumerEvent event) { + handleConsumerEvents(event); + } + }); + this.consumerEvents.start(); + this.local.setId(mapChangeConsumer.getConsumerId().toString()); + this.local.setInBoxDestination(this.inboxTopic); + sendHeartBeat(); + this.heartBeatTask = new SchedulerTimerTask (new Runnable() { + public void run() { + sendHeartBeat(); + } + }); + this.checkMembershipTask = new SchedulerTimerTask(new Runnable() { + public void run() { + checkMembership(); + } + }); + this.heartBeatTimer = new Timer("Distributed heart beat",true); + this.heartBeatTimer.scheduleAtFixedRate(this.heartBeatTask, getHeartBeatInterval()/3, getHeartBeatInterval()/2); + + } + } + + /** + * stop membership to the group + * @throws Exception + */ + public void stop() throws Exception { + if (this.started.compareAndSet(true, false)) { + this.checkMembershipTask.cancel(); + this.heartBeatTask.cancel(); + this.heartBeatTimer.purge(); + this.consumerEvents.stop(); + this.session.close(); + } + } + + /** + * @return the partitionName + */ + public String getGroupName() { + return this.groupName; + } + + /** + * @return the sharedWrites + */ + public boolean isSharedWrites() { + return this.sharedWrites; + } + + /** + * @param sharedWrites the sharedWrites to set + */ + public void setSharedWrites(boolean sharedWrites) { + this.sharedWrites = sharedWrites; + } + + /** + * @return the heartBeatInterval + */ + public int getHeartBeatInterval() { + return this.heartBeatInterval; + } + + /** + * @param heartBeatInterval the heartBeatInterval to set + */ + public void setHeartBeatInterval(int heartBeatInterval) { + this.heartBeatInterval = heartBeatInterval; + } + + + /** + * @param l + */ + public void addMemberChangedListener(MemberChangedListener l) { + this.membershipListeners.add(l); + } + + /** + * @param l + */ + public void removeMemberChangedListener(MemberChangedListener l) { + this.membershipListeners.remove(l); + } + + /** + * @param l + */ + public void addMapChangedListener(MapChangedListener l) { + this.mapChangedListeners.add(l); + } + + /** + * @param l + */ + public void removeMapChangedListener(MapChangedListener l) { + this.mapChangedListeners.remove(l); + } + + /** + * @return the removeOwnedObjectsOnExit + */ + public boolean isRemoveOwnedObjectsOnExit() { + return removeOwnedObjectsOnExit; + } + + /** + * Sets the policy for owned objects in the group + * If set to true, when this GroupMap stops, + * any objects it owns will be removed from the group map + * @param removeOwnedObjectsOnExit the removeOwnedObjectsOnExit to set + */ + public void setRemoveOwnedObjectsOnExit(boolean removeOwnedObjectsOnExit) { + this.removeOwnedObjectsOnExit = removeOwnedObjectsOnExit; + } + + /** + * clear entries from the Map + * @throws IllegalStateException + */ + public void clear() throws IllegalStateException { + checkStarted(); + if(this.localMap != null && !this.localMap.isEmpty()) { + Set> keys = null; + synchronized(this.mapMutex) { + keys = new HashSet>(this.localMap.keySet()); + this.localMap.clear(); + } + + for(EntryKeykey:keys) { + remove(key); + } + } + } + + + public boolean containsKey(Object key) { + EntryKey stateKey = new EntryKey(this.local,key); + synchronized(this.mapMutex) { + return this.localMap != null ? this.localMap.containsKey(stateKey):false; + } + } + + public boolean containsValue(Object value) { + EntryValue entryValue = new EntryValue(this.local,value); + synchronized(this.mapMutex) { + return this.localMap != null ? this.localMap.containsValue(entryValue):false; + } + } + + public Set> entrySet() { + Mapresult = new HashMap(); + synchronized(this.mapMutex) { + if(this.localMap!=null) { + for(java.util.Map.Entry,EntryValue>entry:this.localMap.entrySet()) { + result.put(entry.getKey().getKey(),entry.getValue().getValue()); + } + } + } + return result.entrySet(); + } + + public V get(Object key) { + EntryKey stateKey = new EntryKey(this.local,(K) key); + EntryValue value = null; + synchronized(this.mapMutex) { + value = this.localMap != null ?this.localMap.get(stateKey):null; + } + return value != null ? value.getValue() : null; + } + + public boolean isEmpty() { + synchronized(this.mapMutex) { + return this.localMap != null ? this.localMap.isEmpty():true; + } + } + + public Set keySet() { + Set result = new HashSet(); + synchronized(this.mapMutex) { + if(this.localMap!=null) { + for (EntryKey key:this.localMap.keySet()) { + result.add(key.getKey()); + } + } + } + return result; + } + /** + * Puts an value into the map associated with the key + * @param key + * @param value + * @return the old value or null + * @throws IllegalAccessException + * @throws IllegalStateException + * + */ + public V put(K key, V value) throws IllegalAccessException,IllegalStateException{ + return put(key,value,isSharedWrites(),isRemoveOwnedObjectsOnExit()); + } + + /** + * Puts an value into the map associated with the key + * @param key + * @param value + * @param sharedWrites + * @param removeOnExit + * @return the old value or null + * @throws IllegalAccessException + * @throws IllegalStateException + * + */ + public V put(K key, V value,boolean sharedWrites,boolean removeOnExit) throws IllegalAccessException, IllegalStateException{ + checkStarted(); + EntryKeyentryKey = new EntryKey(this.local,key); + EntryValuestateValue = new EntryValue(this.local,value); + entryKey.setShare(sharedWrites); + entryKey.setRemoveOnExit(removeOnExit); + EntryMessage entryMsg = new EntryMessage(); + entryMsg.setKey(entryKey); + entryMsg.setValue(value); + entryMsg.setType(EntryMessage.MessageType.INSERT); + return sendEntryMessage(entryMsg); + } + + /** + * Add the Map to the distribution + * @param t + * @throws IllegalAccessException + * @throws IllegalStateException + */ + public void putAll(Map t) throws IllegalAccessException,IllegalStateException { + putAll(t,isSharedWrites(),isRemoveOwnedObjectsOnExit()); + } + + /** + * Add the Map to the distribution + * @param t + * @param sharedWrites + * @param removeOnExit + * @throws IllegalAccessException + * @throws IllegalStateException + */ + public void putAll(Map t,boolean sharedWrites,boolean removeOnExit) throws IllegalAccessException,IllegalStateException { + for(java.util.Map.Entryentry:t.entrySet()) { + put(entry.getKey(),entry.getValue(),sharedWrites,removeOnExit); + } + } + + /** + * remove a value from the map associated with the key + * @param key + * @return the Value or null + * @throws IllegalAccessException + * @throws IllegalStateException + * + */ + public V remove(Object key) throws IllegalAccessException,IllegalStateException{ + EntryKey entryKey = new EntryKey(this.local,(K) key); + return remove(entryKey); + } + + V remove(EntryKey key) throws IllegalAccessException,IllegalStateException{ + checkStarted(); + EntryMessage entryMsg = new EntryMessage(); + entryMsg.setKey(key); + entryMsg.setType(EntryMessage.MessageType.DELETE); + return sendEntryMessage(entryMsg); + } + + + public int size() { + synchronized(this.mapMutex) { + return this.localMap != null ? this.localMap.size():0; + } + } + + public Collection values() { + List result = new ArrayList(); + synchronized(this.mapMutex) { + if(this.localMap!=null) { + for (EntryValue value:this.localMap.values()) { + result.add(value.getValue()); + } + } + } + return result; + } + + /** + * @return a set of the members + */ + public Set members(){ + Setresult = new HashSet(); + result.addAll(this.members.values()); + return result; + } + + /** + * @param key + * @return true if this is the owner of the key + */ + public boolean isOwner(K key) { + EntryKey stateKey = new EntryKey(this.local,key); + EntryValue entryValue = null; + synchronized(this.mapMutex) { + entryValue = this.localMap != null ? this.localMap.get(stateKey):null; + } + boolean result = false; + if (entryValue != null) { + result = entryValue.getOwner().getId().equals(this.local.getId()); + } + return result; + } + + /** + * Get the owner of a key + * @param key + * @return the owner - or null if the key doesn't exist + */ + public Member getOwner(K key) { + EntryKey stateKey = new EntryKey(this.local,key); + EntryValue entryValue = null; + synchronized(this.mapMutex) { + entryValue = this.localMap != null ? this.localMap.get(stateKey):null; + } + return entryValue != null ? entryValue.getOwner():null; + } + + /** + * @return true if the coordinator for the map + */ + public boolean isCoordinator() { + return this.local.equals(this.coordinator); + } + + /** + * Select a coordinator - by default, its the member with + * the lowest lexicographical id + * @param members + * @return + */ + protected Member selectCordinator(Collectionmembers) { + Member result = this.local; + for (Member member:members) { + if (result.getId().compareTo(member.getId()) < 0) { + result = member; + } + } + return result; + } + + V sendEntryMessage(EntryMessage entry) { + Object result = null; + MapRequest request = new MapRequest(); + String id = this.requestGenerator.generateId(); + synchronized(this.requests) { + this.requests.put(id,request); + } + try { + ObjectMessage objMsg = this.session.createObjectMessage(entry); + objMsg.setJMSReplyTo(this.inboxTopic); + objMsg.setJMSCorrelationID(id); + this.producer.send(this.topic, objMsg); + result = request.get(getHeartBeatInterval()*2); + }catch(JMSException e) { + if(this.started.get()) { + LOG.error("Failed to send EntryMessage " + entry,e); + } + } + if (result instanceof IllegalAccessException) { + throw (IllegalAccessException)result; + } + return (V) result; + } + + void handleResponses(Message message) { + if (message instanceof ObjectMessage) { + ObjectMessage objMsg = (ObjectMessage) message; + try { + Object payload = objMsg.getObject(); + if (payload instanceof Member) { + handleHeartbeats((Member)payload); + } else if(payload instanceof EntryMessage) { + EntryMessage entryMsg = (EntryMessage) payload; + EntryKeykey=entryMsg.getKey(); + EntryValue value = new EntryValue(key.getOwner(),(V) entryMsg.getValue()); + + if(this.localMap !=null) { + boolean fireUpdate = false; + synchronized(this.mapMutex) { + if(!this.localMap.containsKey(key)) { + this.localMap.put(key,value); + fireUpdate=true; + } + } + if(fireUpdate) { + fireMapChanged(key.getOwner(), key.getKey(), null, value.getValue()); + } + } + + + }else { + String id = objMsg.getJMSCorrelationID(); + MapRequest result = null; + synchronized (this.requests) { + result = this.requests.remove(id); + } + if (result != null) { + result.put(objMsg.getObject()); + } + } + } catch (JMSException e) { + LOG.warn("Failed to process reply: " + message, e); + } + } + } + + void handleMapUpdates(Message message) { + Object reply = null; + if (message instanceof ObjectMessage) { + try { + ObjectMessage objMsg = (ObjectMessage) message; + EntryMessage entryMsg = (EntryMessage) objMsg.getObject(); + EntryKey key = entryMsg.getKey(); + EntryValue value = new EntryValue(key.getOwner(),(V) entryMsg.getValue()); + boolean containsKey=false; + boolean mapExists = false; + synchronized(this.mapMutex) { + mapExists = this.localMap!=null; + if(mapExists) { + containsKey=this.localMap.containsKey(key); + } + } + if(mapExists) { + if (containsKey) { + Member owner = getOwner((K) key.getKey()); + if (owner.equals(key.getOwner()) && !key.isShare()) { + EntryValue old = null; + if (entryMsg.getType().equals(EntryMessage.MessageType.INSERT)) { + synchronized(this.mapMutex) { + old = this.localMap.put(key, value); + } + }else { + synchronized(this.mapMutex) { + old = this.localMap.remove(key); + } + } + fireMapChanged(owner, key.getKey(), old.getValue(), value.getValue()); + }else { + reply = new IllegalAccessException("Owned by "+ owner); + } + }else { + if (entryMsg.getType().equals(EntryMessage.MessageType.INSERT)) { + synchronized(this.mapMutex) { + this.localMap.put(key, value); + } + fireMapChanged(key.getOwner(), key.getKey(), null, value.getValue()); + } + } + } + } catch (JMSException e) { + LOG.warn("Failed to process map update",e); + reply = e; + } + + try { + Destination replyTo = message.getJMSReplyTo(); + String correlationId = message.getJMSCorrelationID(); + ObjectMessage replyMsg = this.session + .createObjectMessage((Serializable) reply); + replyMsg.setJMSCorrelationID(correlationId); + // reuse timestamp - this will be cleared by the producer on + // send + replyMsg.setJMSTimestamp(System.currentTimeMillis()); + if (isCoordinator()) { + this.producer.send(replyTo, replyMsg); + }else { + synchronized(mapUpdateReplies) { + this.mapUpdateReplies.add(replyMsg); + } + } + } catch (JMSException e) { + if(this.started.get()) { + LOG.error("Failed to send response to a map update ", e); + } + } + + + }else { + LOG.warn("Unexpected map update message " + message); + } + } + + void handleHeartbeats(Message message) { + try { + if (message instanceof ObjectMessage) { + ObjectMessage objMsg = (ObjectMessage) message; + Member member = (Member) objMsg.getObject(); + handleHeartbeats(member); + } else { + LOG.warn("Unexpected message: " + message); + } + } catch (JMSException e) { + LOG.warn("Failed to handle heart beat", e); + } + } + + void handleHeartbeats(Member member) { + member.setTimeStamp(System.currentTimeMillis()); + if (this.members.put(member.getId(), member) == null) { + fireMemberStarted(member); + if (!member.equals(this.local)) { + //send the new member our details + sendHeartBeat(member.getInBoxDestination()); + if(isCoordinator()) { + updateNewMemberMap(member); + } + } + } + } + + void handleConsumerEvents(ConsumerEvent event) { + if (!event.isStarted()) { + Member member = this.members.remove(event.getConsumerId().toString()); + if(member!=null) { + fireMemberStopped(member); + doElection(); + } + } + } + + void checkMembership() { + if (this.started.get()) { + long checkTime = System.currentTimeMillis()-getHeartBeatInterval(); + boolean doElection = false; + for (Member member : this.members.values()) { + if (member.getTimeStamp() tmpList = new ArrayList(); + synchronized(this.mapUpdateReplies) { + try { + for(Message msg:this.mapUpdateReplies) { + if (msg.getJMSTimestamp() < checkTime) { + tmpList.add(msg); + } + } + for(Message msg:tmpList) { + this.mapUpdateReplies.remove(msg); + } + }catch(JMSException e) { + LOG.warn("Failed to clear down mapUpdateReplies",e); + } + } + } + + void sendHeartBeat() { + sendHeartBeat(this.heartBeatTopic); + } + + void sendHeartBeat(Destination destination) { + if (this.started.get()) { + try { + ObjectMessage msg = this.session + .createObjectMessage(this.local); + this.producer.send(destination, msg); + } catch (Throwable e) { + if(this.started.get()) { + LOG.warn("Failed to send heart beat", e); + } + } + } + } + + void updateNewMemberMap(Member member) { + List, EntryValue>> list = new ArrayList, EntryValue>>(); + synchronized (this.mapMutex) { + if (this.localMap != null) { + for (Map.Entry, EntryValue> entry : this.localMap + .entrySet()) { + list.add(entry); + } + } + } + try { + for (Map.Entry, EntryValue> entry : list) { + EntryMessage entryMsg = new EntryMessage(); + entryMsg.setKey(entry.getKey()); + entryMsg.setValue(entry.getValue().getValue()); + entryMsg.setType(EntryMessage.MessageType.INSERT); + ObjectMessage objMsg = this.session + .createObjectMessage(entryMsg); + if(!member.equals(entry.getKey().getOwner())) { + this.producer.send(member.getInBoxDestination(), objMsg); + } + } + } catch (JMSException e) { + if (started.get()) { + LOG.warn("Failed to update new member ", e); + } + } + } + + void fireMemberStarted(Member member) { + LOG.info(this.local.getName()+" Member started " + member); + for (MemberChangedListener l : this.membershipListeners) { + l.memberStarted(member); + } + } + + void fireMemberStopped(Member member) { + LOG.info(this.local.getName()+" Member stopped " + member); + for (MemberChangedListener l : this.membershipListeners) { + l.memberStopped(member); + } + //remove all entries owned by the stopped member + List>tmpList = new ArrayList>(); + boolean mapExists = false; + synchronized(this.mapMutex) { + mapExists=this.localMap!=null; + if(mapExists) { + for (EntryKey entryKey:this.localMap.keySet()) { + if (entryKey.getOwner().equals(member)) { + if (entryKey.isRemoveOnExit()) { + tmpList.add(entryKey); + } + } + } + } + } + if(mapExists) { + for (EntryKey entryKey:tmpList) { + EntryValue value = null; + synchronized(this.mapMutex) { + value = this.localMap.remove(entryKey); + } + fireMapChanged(member, entryKey.getKey(), value.getValue(), null); + } + } + } + + void fireMapChanged(Member owner,Object key, Object oldValue, Object newValue) { + for (MapChangedListener l:this.mapChangedListeners) { + l.mapChanged(owner, key, oldValue, newValue); + } + } + + void doElection() { + this.coordinator=selectCordinator(this.members.values()); + if (isCoordinator() && this.started.get()) { + //send any inflight requests + Listlist = new ArrayList(); + synchronized(this.mapUpdateReplies) { + list.addAll(this.mapUpdateReplies); + this.mapUpdateReplies.clear(); + } + try { + for(Message msg:list) { + if (this.started.get()) { + this.producer.send(msg.getJMSReplyTo(), msg); + } + } + }catch(JMSException e) { + if(this.started.get()) { + LOG.error("Failed to resend replies",e); + } + } + } + } + + void checkStarted() throws IllegalStateException{ + if (!started.get()) { + throw new IllegalStateException("GroupMap " + this.local.getName() + " not started"); + } + } +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/IllegalAccessException.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/IllegalAccessException.java?rev=669545&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/IllegalAccessException.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/IllegalAccessException.java Thu Jun 19 09:35:44 2008 @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.group; + +/** + * thrown when updating a key to map that the local client doesn't own + * + */ +public class IllegalAccessException extends java.lang.IllegalStateException { + private static final long serialVersionUID = -7584658017201604560L; + + /** + * @param message + */ + public IllegalAccessException(String message) { + super(message); + } +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/IllegalAccessException.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapChangedListener.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapChangedListener.java?rev=669545&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapChangedListener.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapChangedListener.java Thu Jun 19 09:35:44 2008 @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.group; + +/** + *Get notifications about changes to the state of the map + * + */ +public interface MapChangedListener { + + /** + * Called when the map has changed + * @param owner + * @param key + * @param oldValue + * @param newValue + */ + void mapChanged(Member owner,Object key, Object oldValue, Object newValue); +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapChangedListener.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapRequest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapRequest.java?rev=669545&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapRequest.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapRequest.java Thu Jun 19 09:35:44 2008 @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.group; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Return information about map update + * + */ +public class MapRequest { + private final AtomicBoolean done = new AtomicBoolean(); + private Object response; + + Object get(int timeout) { + synchronized (done) { + if (done.get() == false && response == null) { + try { + done.wait(timeout); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + return this.response; + } + + void put(Object response) { + this.response = response; + cancel(); + } + + void cancel() { + done.set(true); + synchronized (done) { + done.notifyAll(); + } + } +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapRequest.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/Member.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/Member.java?rev=669545&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/Member.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/Member.java Thu Jun 19 09:35:44 2008 @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.group; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import javax.jms.Destination; +import org.apache.activemq.util.IdGenerator; + +/** + *

+ * A Member holds information about a member of the group + * + */ +public class Member implements Externalizable { + private String name; + private String id; + private String hostname; + private long timeStamp; + private long startTime; + private Destination inBoxDestination; + + + /** + * Default constructor - only used by serialization + */ + public Member() { + } + /** + * @param name + */ + public Member(String name) { + this.name = name; + this.hostname = IdGenerator.getHostName(); + this.startTime=System.currentTimeMillis(); + } + + /** + * @return the name + */ + public String getName() { + return this.name; + } + + /** + * @return the id + */ + public String getId() { + return this.id; + } + + void setId(String id) { + this.id=id; + } + + /** + * @return the hostname + */ + public String getHostname() { + return this.hostname; + } + + /** + * @return the startTime + */ + public long getStartTime() { + return this.startTime; + } + + /** + * @return the inbox destination + */ + public Destination getInBoxDestination() { + return this.inBoxDestination; + } + + void setInBoxDestination(Destination dest) { + this.inBoxDestination=dest; + } + + public String toString() { + return this.name+"["+this.id+"]@"+this.hostname; + } + + + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + this.name = in.readUTF(); + this.id = in.readUTF(); + this.hostname = in.readUTF(); + this.startTime=in.readLong(); + this.inBoxDestination=(Destination) in.readObject(); + } + + public void writeExternal(ObjectOutput out) throws IOException { + out.writeUTF(this.name != null ? this.name : ""); + out.writeUTF(this.id != null ? this.id : ""); + out.writeUTF(this.hostname != null ? this.hostname : ""); + out.writeLong(this.startTime); + out.writeObject(this.inBoxDestination); + } + + public int hashCode() { + return this.id.hashCode(); + } + + public boolean equals(Object obj) { + boolean result = false; + if (obj instanceof Member) { + Member other = (Member)obj; + result = this.id.equals(other.id); + } + return result; + } + + /** + * @return the timeStamp + */ + long getTimeStamp() { + return this.timeStamp; + } + + /** + * @param timeStamp the timeStamp to set + */ + void setTimeStamp(long timeStamp) { + this.timeStamp = timeStamp; + } + + +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/Member.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MemberChangedListener.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MemberChangedListener.java?rev=669545&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MemberChangedListener.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MemberChangedListener.java Thu Jun 19 09:35:44 2008 @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.group; + +/** + * @author rajdavies + * + */ +public interface MemberChangedListener { + + /** + * Notification a member has started + * @param member + */ + void memberStarted(Member member); + + /** + * Notification a member has stopped + * @param member + */ + void memberStopped(Member member); +} \ No newline at end of file Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MemberChangedListener.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/package.html URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/package.html?rev=669545&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/package.html (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/package.html Thu Jun 19 09:35:44 2008 @@ -0,0 +1,25 @@ + + + + + + +Shared state and membership information between members of a remote group + + + Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/package.html ------------------------------------------------------------------------------ svn:executable = * Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java?rev=669545&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java Thu Jun 19 09:35:44 2008 @@ -0,0 +1,373 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.group; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; + + +public class GroupMapTest extends TestCase { + protected BrokerService broker; + protected Connection connection; + protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; + + /** + * Test method for + * {@link org.apache.activemq.group.GroupMap#addMemberChangedListener(org.apache.activemq.group.MemberChangedListener)}. + * @throws Exception + */ + public void testAddMemberChangedListener() throws Exception { + final AtomicInteger counter = new AtomicInteger(); + GroupMap map1 = new GroupMap(connection,"map1"); + map1.addMemberChangedListener(new MemberChangedListener(){ + + public void memberStarted(Member member) { + synchronized(counter) { + counter.incrementAndGet(); + counter.notifyAll(); + } + + } + + public void memberStopped(Member member) { + synchronized(counter) { + counter.decrementAndGet(); + counter.notifyAll(); + } + } + + }); + map1.start(); + synchronized(counter) { + if (counter.get()<1) { + counter.wait(5000); + } + } + assertEquals(1, counter.get()); + ConnectionFactory factory = createConnectionFactory(); + Connection connection2 = factory.createConnection(); + connection2.start(); + GroupMap map2 = new GroupMap(connection,"map2"); + map2.start(); + synchronized(counter) { + if (counter.get()<2) { + counter.wait(5000); + } + } + assertEquals(2, counter.get()); + map2.stop(); + synchronized(counter) { + if (counter.get()>=2) { + counter.wait(5000); + } + } + assertEquals(1, counter.get()); + map1.stop(); + connection2.close(); + } + + /** + * Test method for + * {@link org.apache.activemq.group.GroupMap#addMapChangedListener(org.apache.activemq.group.MapChangedListener)}. + * @throws Exception + */ + public void testAddMapChangedListener() throws Exception { + GroupMap map = new GroupMap(connection,"map"); + final AtomicBoolean called = new AtomicBoolean(); + map.addMapChangedListener(new MapChangedListener(){ + public void mapChanged(Member owner, Object key, Object oldValue, + Object newValue) { + synchronized(called) { + called.set(true); + called.notifyAll(); + } + } + + }); + map.start(); + map.put("test", "blob"); + synchronized(called) { + if (!called.get()) { + called.wait(5000); + } + } + assertTrue(called.get()); + map.stop(); + } + + /** + * Test method for {@link org.apache.activemq.group.GroupMap#clear()}. + * @throws Exception + */ + public void testClear() throws Exception { + GroupMap map1 = new GroupMap(connection,"map1"); + final AtomicBoolean called = new AtomicBoolean(); + map1.addMapChangedListener(new MapChangedListener(){ + public void mapChanged(Member owner, Object key, Object oldValue, + Object newValue) { + synchronized(called) { + called.set(true); + called.notifyAll(); + } + } + + }); + map1.start(); + GroupMap map2 = new GroupMap(connection,"map2"); + map2.start(); + map2.put("test","foo"); + synchronized(called) { + if (!called.get()) { + called.wait(5000); + } + } + assertTrue(called.get()); + called.set(false); + assertTrue(map1.isEmpty()==false); + map2.clear(); + synchronized(called) { + if (!called.get()) { + called.wait(5000); + } + } + assertTrue(map1.isEmpty()); + map1.stop(); + map2.stop(); + } + + /** + * Test a new map is populated for existing values + */ + public void testMapUpdatedOnStart() throws Exception { + GroupMap map1 = new GroupMap(connection,"map1"); + final AtomicBoolean called = new AtomicBoolean(); + + map1.start(); + map1.put("test", "foo"); + GroupMap map2 = new GroupMap(connection,"map2"); + map2.addMapChangedListener(new MapChangedListener(){ + public void mapChanged(Member owner, Object key, Object oldValue, + Object newValue) { + synchronized(called) { + called.set(true); + called.notifyAll(); + } + } + }); + map2.start(); + + synchronized(called) { + if (!called.get()) { + called.wait(5000); + } + } + assertTrue(called.get()); + called.set(false); + assertTrue(map2.containsKey("test")); + assertTrue(map2.containsValue("foo")); + map1.stop(); + map2.stop(); + } + + public void testContainsKey() throws Exception { + GroupMap map1 = new GroupMap(connection,"map1"); + final AtomicBoolean called = new AtomicBoolean(); + map1.addMapChangedListener(new MapChangedListener(){ + public void mapChanged(Member owner, Object key, Object oldValue, + Object newValue) { + synchronized(called) { + called.set(true); + called.notifyAll(); + } + } + + }); + map1.start(); + GroupMap map2 = new GroupMap(connection,"map2"); + map2.start(); + map2.put("test","foo"); + synchronized(called) { + if (!called.get()) { + called.wait(5000); + } + } + assertTrue(called.get()); + called.set(false); + assertTrue(map1.containsKey("test")); + map1.stop(); + map2.stop(); + } + + + /** + * Test method for + * {@link org.apache.activemq.group.GroupMap#containsValue(java.lang.Object)}. + * @throws Exception + */ + public void testContainsValue() throws Exception { + GroupMap map1 = new GroupMap(connection,"map1"); + final AtomicBoolean called = new AtomicBoolean(); + map1.addMapChangedListener(new MapChangedListener(){ + public void mapChanged(Member owner, Object key, Object oldValue, + Object newValue) { + synchronized(called) { + called.set(true); + called.notifyAll(); + } + } + + }); + map1.start(); + GroupMap map2 = new GroupMap(connection,"map2"); + map2.start(); + map2.put("test","foo"); + synchronized(called) { + if (!called.get()) { + called.wait(5000); + } + } + assertTrue(called.get()); + called.set(false); + assertTrue(map1.containsValue("foo")); + map1.stop(); + map2.stop(); + } + + /** + * Test method for {@link org.apache.activemq.group.GroupMap#entrySet()}. + * @throws Exception + */ + + + /** + * Test method for + * {@link org.apache.activemq.group.GroupMap#get(java.lang.Object)}. + * @throws Exception + */ + public void testGet() throws Exception { + GroupMap map1 = new GroupMap(connection,"map1"); + final AtomicBoolean called = new AtomicBoolean(); + map1.addMapChangedListener(new MapChangedListener(){ + public void mapChanged(Member owner, Object key, Object oldValue, + Object newValue) { + synchronized(called) { + called.set(true); + called.notifyAll(); + } + } + + }); + map1.start(); + GroupMap map2 = new GroupMap(connection,"map2"); + map2.start(); + map2.put("test","foo"); + synchronized(called) { + if (!called.get()) { + called.wait(5000); + } + } + assertTrue(called.get()); + assertTrue(map1.get("test").equals("foo")); + map1.stop(); + map2.stop(); + } + + + + /** + * Test method for + * {@link org.apache.activemq.group.GroupMap#remove(java.lang.Object)}. + */ + public void testRemove() throws Exception{ + GroupMap map1 = new GroupMap(connection,"map1"); + final AtomicBoolean called = new AtomicBoolean(); + map1.addMapChangedListener(new MapChangedListener(){ + public void mapChanged(Member owner, Object key, Object oldValue, + Object newValue) { + synchronized(called) { + called.set(true); + called.notifyAll(); + } + } + + }); + map1.start(); + GroupMap map2 = new GroupMap(connection,"map2"); + map2.start(); + map2.put("test","foo"); + synchronized(called) { + if (!called.get()) { + called.wait(5000); + } + } + assertTrue(called.get()); + called.set(false); + assertTrue(map1.isEmpty()==false); + map2.remove("test"); + synchronized(called) { + if (!called.get()) { + called.wait(5000); + } + } + assertTrue(map1.isEmpty()); + map1.stop(); + map2.stop(); + } + + protected void setUp() throws Exception { + if (broker == null) { + broker = createBroker(); + } + ConnectionFactory factory = createConnectionFactory(); + connection = factory.createConnection(); + connection.start(); + super.setUp(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + connection.close(); + if (broker != null) { + broker.stop(); + } + } + + protected ActiveMQConnectionFactory createConnectionFactory()throws Exception { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory( + ActiveMQConnection.DEFAULT_BROKER_URL); + return cf; + } + + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + configureBroker(answer); + answer.start(); + return answer; + } + + protected void configureBroker(BrokerService answer) throws Exception { + answer.setPersistent(false); + answer.addConnector(bindAddress); + answer.setDeleteAllMessagesOnStartup(true); + } +} Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java ------------------------------------------------------------------------------ svn:eol-style = native