Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 81780 invoked from network); 6 Aug 2008 13:26:31 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 6 Aug 2008 13:26:31 -0000 Received: (qmail 7651 invoked by uid 500); 6 Aug 2008 13:26:30 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 7628 invoked by uid 500); 6 Aug 2008 13:26:30 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 7619 invoked by uid 99); 6 Aug 2008 13:26:30 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Aug 2008 06:26:30 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Aug 2008 13:25:33 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id DEBEA2388986; Wed, 6 Aug 2008 06:25:30 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r683259 [1/2] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/group/ test/java/org/apache/activemq/group/ Date: Wed, 06 Aug 2008 13:25:29 -0000 To: commits@activemq.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080806132530.DEBEA2388986@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: rajdavies Date: Wed Aug 6 06:25:27 2008 New Revision: 683259 URL: http://svn.apache.org/viewvc?rev=683259&view=rev Log: Ensure ordered access to the group and add expiration to state in the GroupMap Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/AsyncMapRequest.java (with props) activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/DefaultMapChangedListener.java (with props) activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/ElectionMessage.java (with props) activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMapUpdateException.java - copied, changed from r679849, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/IllegalAccessException.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/RequestCallback.java (with props) activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapMemberTest.java (with props) Removed: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/IllegalAccessException.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapChangedListener.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapRequest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/AsyncMapRequest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/AsyncMapRequest.java?rev=683259&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/AsyncMapRequest.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/AsyncMapRequest.java Wed Aug 6 06:25:27 2008 @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.group; + +import java.util.HashSet; +import java.util.Set; + +/** + * Return information about map update + * + */ +public class AsyncMapRequest implements RequestCallback{ + private final Object mutex = new Object(); + + private Set requests = new HashSet(); + + public void add(String id, MapRequest request) { + request.setCallback(this); + this.requests.add(id); + } + + /** + * Wait for requests + * @param timeout + * @return + */ + public boolean isSuccess(long timeout) { + long deadline = System.currentTimeMillis() + timeout; + while (!this.requests.isEmpty()) { + synchronized (this.mutex) { + try { + this.mutex.wait(timeout); + } catch (InterruptedException e) { + break; + } + } + timeout = Math.max(deadline - System.currentTimeMillis(), 0); + } + return this.requests.isEmpty(); + } + + + public void finished(String id) { + this.requests.remove(id); + + } +} \ No newline at end of file Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/AsyncMapRequest.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/DefaultMapChangedListener.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/DefaultMapChangedListener.java?rev=683259&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/DefaultMapChangedListener.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/DefaultMapChangedListener.java Wed Aug 6 06:25:27 2008 @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.group; + +/** + * Default implementation of a MapChangedListener + * + */ +public class DefaultMapChangedListener implements MapChangedListener{ + + public void mapInsert(Member owner, Object key, Object value) { + } + + public void mapRemove(Member owner, Object key, Object value,boolean expired) { + } + + public void mapUpdate(Member owner, Object Key, Object oldValue,Object newValue) { + } +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/DefaultMapChangedListener.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/ElectionMessage.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/ElectionMessage.java?rev=683259&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/ElectionMessage.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/ElectionMessage.java Wed Aug 6 06:25:27 2008 @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.group; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; + +/** + * Used to pass information around + * + */ +public class ElectionMessage implements Externalizable{ + static enum MessageType{ELECTION,ANSWER,COORDINATOR}; + private Member member; + private MessageType type; + + /** + * @return the member + */ + public Member getMember() { + return this.member; + } + + /** + * @param member the member to set + */ + public void setMember(Member member) { + this.member = member; + } + + /** + * @return the type + */ + public MessageType getType() { + return this.type; + } + + /** + * @param type the type to set + */ + public void setType(MessageType type) { + this.type = type; + } + + /** + * @return true if election message + */ + public boolean isElection() { + return this.type != null && this.type.equals(MessageType.ELECTION); + } + + /** + * @return true if answer message + */ + public boolean isAnswer() { + return this.type != null && this.type.equals(MessageType.ANSWER); + } + + /** + * @return true if coordinator message + */ + public boolean isCoordinator() { + return this.type != null && this.type.equals(MessageType.COORDINATOR); + } + + + public ElectionMessage copy() { + ElectionMessage result = new ElectionMessage(); + result.member=this.member; + result.type=this.type; + return result; + } + + + public void readExternal(ObjectInput in) throws IOException, + ClassNotFoundException { + this.member=(Member) in.readObject(); + this.type=(MessageType) in.readObject(); + } + + public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(this.member); + out.writeObject(this.type); + } + + public String toString() { + return "ElectionMessage: "+ this.member + "{"+this.type+ "}"; + } +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/ElectionMessage.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java?rev=683259&r1=683258&r2=683259&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java Wed Aug 6 06:25:27 2008 @@ -30,6 +30,7 @@ private K key; private boolean share; private boolean removeOnExit; + private long expiration; /** * Default constructor - for serialization @@ -88,6 +89,38 @@ public void setRemoveOnExit(boolean removeOnExit) { this.removeOnExit = removeOnExit; } + + /** + * @return the expiration + */ + public long getExpiration() { + return expiration; + } + + /** + * @param expiration the expiration to set + */ + public void setExpiration(long expiration) { + this.expiration = expiration; + } + + void setTimeToLive(long ttl) { + if (ttl > 0 ) { + this.expiration=ttl+System.currentTimeMillis(); + }else { + this.expiration =0l; + } + } + + boolean isExpired() { + return isExpired(System.currentTimeMillis()); + } + + boolean isExpired(long currentTime) { + return this.expiration > 0 && this.expiration < currentTime; + } + + public boolean equals(Object obj) { boolean result = false; @@ -103,6 +136,7 @@ out.writeObject(this.key); out.writeBoolean(isShare()); out.writeBoolean(isRemoveOnExit()); + out.writeLong(getExpiration()); } public void readExternal(ObjectInput in) throws IOException, @@ -111,5 +145,10 @@ this.key = (K) in.readObject(); this.share = in.readBoolean(); this.removeOnExit=in.readBoolean(); + this.expiration=in.readLong(); + } + + public String toString() { + return "key:"+this.key; } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java?rev=683259&r1=683258&r2=683259&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java Wed Aug 6 06:25:27 2008 @@ -26,19 +26,21 @@ * */ public class EntryMessage implements Externalizable{ - static enum MessageType{INSERT,DELETE}; + static enum MessageType{INSERT,DELETE,SYNC}; private EntryKey key; private Object value; private MessageType type; + private boolean mapUpdate; + private boolean expired; /** * @return the owner */ public EntryKey getKey() { - return key; + return this.key; } /** - * @param owner the owner to set + * @param key */ public void setKey(EntryKey key) { this.key = key; @@ -47,7 +49,7 @@ * @return the value */ public Object getValue() { - return value; + return this.value; } /** * @param value the value to set @@ -60,7 +62,7 @@ * @return the type */ public MessageType getType() { - return type; + return this.type; } /** * @param type the type to set @@ -69,18 +71,81 @@ this.type = type; } + /** + * @return the mapUpdate + */ + public boolean isMapUpdate() { + return this.mapUpdate; + } + /** + * @param mapUpdate the mapUpdate to set + */ + public void setMapUpdate(boolean mapUpdate) { + this.mapUpdate = mapUpdate; + } + + /** + * @return the expired + */ + public boolean isExpired() { + return expired; + } + /** + * @param expired the expired to set + */ + public void setExpired(boolean expired) { + this.expired = expired; + } + + /** + * @return if insert message + */ + public boolean isInsert() { + return this.type != null && this.type.equals(MessageType.INSERT); + } + + /** + * @return true if delete message + */ + public boolean isDelete() { + return this.type != null && this.type.equals(MessageType.DELETE); + } + + public boolean isSync() { + return this.type != null && this.type.equals(MessageType.SYNC); + } + + public EntryMessage copy() { + EntryMessage result = new EntryMessage(); + result.key=this.key; + result.value=this.value; + result.type=this.type; + result.mapUpdate=this.mapUpdate; + result.expired=this.expired; + return result; + } + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { this.key=(EntryKey) in.readObject(); this.value=in.readObject(); - this.type=(MessageType) in.readObject(); + this.type=(MessageType) in.readObject(); + this.mapUpdate=in.readBoolean(); + this.expired=in.readBoolean(); } public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(this.key); out.writeObject(this.value); out.writeObject(this.type); + out.writeBoolean(this.mapUpdate); + out.writeBoolean(this.expired); + } + + public String toString() { + return "EntryMessage: "+this.type + "[" + this.key + "," + this.value + + "]{update=" + this.mapUpdate + "}"; } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java?rev=683259&r1=683258&r2=683259&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java Wed Aug 6 06:25:27 2008 @@ -45,6 +45,14 @@ return this.value; } + /** + * set the value + * @param value + */ + public void setValue(V value) { + this.value=value; + } + public int hashCode() { return this.value != null ? this.value.hashCode() : super.hashCode(); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java?rev=683259&r1=683258&r2=683259&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java Wed Aug 6 06:25:27 2008 @@ -27,6 +27,9 @@ import java.util.Timer; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.Connection; import javax.jms.DeliveryMode; @@ -46,40 +49,60 @@ import org.apache.activemq.advisory.ConsumerListener; import org.apache.activemq.thread.SchedulerTimerTask; import org.apache.activemq.util.IdGenerator; -import org.apache.activemq.util.LRUSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - - /** *

- * A GroupMap is used to shared state amongst a distributed - * group. You can restrict ownership of objects inserted into the map, - * by allowing only the map that inserted the objects to update or remove them + * A GroupMap is a Map implementation that is used to shared state + * amongst a distributed group of other GroupMap instances. + * Membership of a group is handled automatically using discovery. + *

+ * The underlying transport is JMS and there are some optimizations that occur + * for membership if used with ActiveMQ - but GroupMap can be used + * with any JMS implementation. + * + *

+ * Updates to the group shared map are controlled by a coordinator. The + * coordinator is elected by the member with the lowest lexicographical id - + * based on the bully algorithm [Silberschatz et al. 1993] + *

+ * The {@link #selectCordinator(Collection members)} method may be + * overridden to implement a custom mechanism for choosing how the coordinator + * is elected for the map. *

- * Updates to the group shared map are controlled by a co-ordinator. - * The co-ordinator is chosen by the member with the lowest lexicographical id . - *

The {@link #selectCordinator(Collection members)} method may be overridden to - * implement a custom mechanism for choosing the co-ordinator - * are added to the map. + * New GroupMap instances have their state updated by the coordinator, + * and coordinator failure is handled automatically within the group. *

+ * All updates are totally ordered through the coordinator, whilst read operations + * happen locally. + *

+ * A GroupMapsupports the concept of owner only updates(write locks), + * shared updates, entry expiration times and removal on owner exit - + * all of which are optional. + * + *

+ * * @param the key type * @param the value type - * + * */ -public class GroupMap implements Map,Service{ +public class GroupMap implements Map, Service { + /** + * default interval within which to detect a member failure + */ + public static final long DEFAULT_HEART_BEAT_INTERVAL = 2000; + private static final long EXPIRATION_SWEEP_INTERVAL = 1000; private static final Log LOG = LogFactory.getLog(GroupMap.class); - private static final String STATE_TOPIC_PREFIX = GroupMap.class.getName()+"."; - private static final int HEART_BEAT_INTERVAL = 15000; + private static final String STATE_TOPIC_PREFIX = GroupMap.class.getName() + + "."; private final Object mapMutex = new Object(); - private Map,EntryValue> localMap; - private Map members = new ConcurrentHashMap(); - private Map requests = new HashMap(); - private List membershipListeners = new CopyOnWriteArrayList(); - private List mapChangedListeners = new CopyOnWriteArrayList(); - private LRUSetmapUpdateReplies = new LRUSet(); - private Member local; + private Map, EntryValue> localMap; + private Map members = new ConcurrentHashMap(); + private Map requests = new HashMap(); + private List membershipListeners = new CopyOnWriteArrayList(); + private List mapChangedListeners = new CopyOnWriteArrayList(); + Member local; private Member coordinator; private String groupName; private boolean sharedWrites; @@ -93,93 +116,122 @@ private AtomicBoolean started = new AtomicBoolean(); private SchedulerTimerTask heartBeatTask; private SchedulerTimerTask checkMembershipTask; - private Timer heartBeatTimer; - private int heartBeatInterval = HEART_BEAT_INTERVAL; - private IdGenerator requestGenerator = new IdGenerator(); + private SchedulerTimerTask expirationTask; + private Timer timer; + private long heartBeatInterval = DEFAULT_HEART_BEAT_INTERVAL; + private IdGenerator idGenerator = new IdGenerator(); private boolean removeOwnedObjectsOnExit; - + private int timeToLive; + private int minimumGroupSize = 1; + private final AtomicBoolean electionFinished = new AtomicBoolean(true); + private ExecutorService executor; + private final Object memberMutex = new Object(); + /** * @param connection * @param name */ - public GroupMap(Connection connection,String name) { - this(connection,"default",name); + public GroupMap(Connection connection, String name) { + this(connection, "default", name); } - + /** * @param connection * @param groupName * @param name */ - public GroupMap(Connection connection,String groupName,String name) { + public GroupMap(Connection connection, String groupName, String name) { this.connection = connection; this.local = new Member(name); - this.coordinator=this.local; - this.groupName=groupName; + this.coordinator = this.local; + this.groupName = groupName; } - + /** - * Set the local map implementation to be used - * By default its a HashMap - but you could use a Cache for example + * Set the local map implementation to be used By default its a HashMap - + * but you could use a Cache for example + * * @param map */ public void setLocalMap(Map map) { - synchronized(this.mapMutex) { - this.localMap=map; + synchronized (this.mapMutex) { + this.localMap = map; } } - + /** * Start membership to the group - * @throws Exception + * + * @throws Exception * */ public void start() throws Exception { - if(this.started.compareAndSet(false, true)) { - synchronized(this.mapMutex) { - if (this.localMap==null) { - this.localMap= new HashMap, EntryValue>(); + if (this.started.compareAndSet(false, true)) { + synchronized (this.mapMutex) { + if (this.localMap == null) { + this.localMap = new HashMap, EntryValue>(); } } this.connection.start(); - this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + this.session = this.connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); this.producer = this.session.createProducer(null); this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); this.inboxTopic = this.session.createTemporaryTopic(); - String topicName = STATE_TOPIC_PREFIX+this.groupName; + String topicName = STATE_TOPIC_PREFIX + this.groupName; this.topic = this.session.createTopic(topicName); - this.heartBeatTopic = this.session.createTopic(topicName+".heartbeat"); - MessageConsumer privateInbox = this.session.createConsumer(this.inboxTopic); - privateInbox.setMessageListener(new MessageListener(){ + this.heartBeatTopic = this.session.createTopic(topicName + + ".heartbeat"); + MessageConsumer privateInbox = this.session + .createConsumer(this.inboxTopic); + privateInbox.setMessageListener(new MessageListener() { public void onMessage(Message message) { - handleResponses(message); + processMessage(message); } }); - ActiveMQMessageConsumer mapChangeConsumer = (ActiveMQMessageConsumer) this.session.createConsumer(this.topic); - mapChangeConsumer.setMessageListener(new MessageListener(){ + MessageConsumer mapChangeConsumer = this.session + .createConsumer(this.topic); + mapChangeConsumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { - handleMapUpdates(message); + processMessage(message); } }); - - MessageConsumer heartBeatConsumer = this.session.createConsumer(this.heartBeatTopic); + MessageConsumer heartBeatConsumer = this.session + .createConsumer(this.heartBeatTopic); heartBeatConsumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { handleHeartbeats(message); - } + } }); - - this.consumerEvents = new ConsumerEventSource(this.connection, this.topic); + this.consumerEvents = new ConsumerEventSource(this.connection, + this.topic); this.consumerEvents.setConsumerListener(new ConsumerListener() { public void onConsumerEvent(ConsumerEvent event) { handleConsumerEvents(event); - } + } }); this.consumerEvents.start(); - this.local.setId(mapChangeConsumer.getConsumerId().toString()); + String memberId = null; + if (mapChangeConsumer instanceof ActiveMQMessageConsumer) { + memberId = ((ActiveMQMessageConsumer) mapChangeConsumer) + .getConsumerId().toString(); + } else { + memberId = this.idGenerator.generateId(); + } + this.local.setId(memberId); this.local.setInBoxDestination(this.inboxTopic); + this.executor = Executors + .newSingleThreadExecutor(new ThreadFactory() { + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, "Election{" + + GroupMap.this.local + "}"); + thread.setDaemon(true); + thread.setPriority(Thread.NORM_PRIORITY); + return thread; + } + }); sendHeartBeat(); - this.heartBeatTask = new SchedulerTimerTask (new Runnable() { + this.heartBeatTask = new SchedulerTimerTask(new Runnable() { public void run() { sendHeartBeat(); } @@ -189,33 +241,76 @@ checkMembership(); } }); - this.heartBeatTimer = new Timer("Distributed heart beat",true); - this.heartBeatTimer.scheduleAtFixedRate(this.heartBeatTask, getHeartBeatInterval()/3, getHeartBeatInterval()/2); + this.expirationTask = new SchedulerTimerTask(new Runnable() { + public void run() { + expirationSweep(); + } + }); + this.timer = new Timer("Distributed heart beat", true); + this.timer.scheduleAtFixedRate(this.heartBeatTask, + getHeartBeatInterval() / 3, getHeartBeatInterval() / 2); + this.timer.scheduleAtFixedRate(this.checkMembershipTask, + getHeartBeatInterval(), getHeartBeatInterval()); + this.timer.scheduleAtFixedRate(this.expirationTask, + EXPIRATION_SWEEP_INTERVAL, EXPIRATION_SWEEP_INTERVAL); + // await for members to join + long timeout = this.heartBeatInterval * this.minimumGroupSize; + long deadline = System.currentTimeMillis() + timeout; + while (this.members.size() < this.minimumGroupSize && timeout > 0) { + synchronized (this.memberMutex) { + this.memberMutex.wait(timeout); + } + timeout = Math.max(deadline - System.currentTimeMillis(), 0); + } } } /** * stop membership to the group - * @throws Exception + * + * @throws Exception */ - public void stop() throws Exception { + public void stop() { if (this.started.compareAndSet(true, false)) { + this.expirationTask.cancel(); this.checkMembershipTask.cancel(); this.heartBeatTask.cancel(); - this.heartBeatTimer.purge(); - this.consumerEvents.stop(); - this.session.close(); + this.expirationTask.cancel(); + this.timer.purge(); + if (this.executor != null) { + this.executor.shutdownNow(); + } + try { + this.consumerEvents.stop(); + this.session.close(); + } catch (Exception e) { + LOG.debug("Caught exception stopping", e); + } } } - + + /** + * @return true if there is elections have finished + */ + public boolean isElectionFinished() { + return this.electionFinished.get(); + } + /** * @return the partitionName */ public String getGroupName() { return this.groupName; } - + + /** + * @return the name ofthis map + */ + public String getName() { + return this.local.getName(); + } + /** * @return the sharedWrites */ @@ -224,112 +319,146 @@ } /** - * @param sharedWrites the sharedWrites to set + * @param sharedWrites + * the sharedWrites to set */ public void setSharedWrites(boolean sharedWrites) { this.sharedWrites = sharedWrites; } - + /** * @return the heartBeatInterval */ - public int getHeartBeatInterval() { + public long getHeartBeatInterval() { return this.heartBeatInterval; } /** - * @param heartBeatInterval the heartBeatInterval to set + * @param heartBeatInterval + * the heartBeatInterval to set */ - public void setHeartBeatInterval(int heartBeatInterval) { + public void setHeartBeatInterval(long heartBeatInterval) { this.heartBeatInterval = heartBeatInterval; } - - + /** * @param l */ public void addMemberChangedListener(MemberChangedListener l) { this.membershipListeners.add(l); } - + /** * @param l */ public void removeMemberChangedListener(MemberChangedListener l) { this.membershipListeners.remove(l); } - + /** * @param l */ public void addMapChangedListener(MapChangedListener l) { this.mapChangedListeners.add(l); } - + /** * @param l */ public void removeMapChangedListener(MapChangedListener l) { this.mapChangedListeners.remove(l); } - + + /** + * @return the timeToLive + */ + public int getTimeToLive() { + return this.timeToLive; + } + + /** + * @param timeToLive + * the timeToLive to set + */ + public void setTimeToLive(int timeToLive) { + this.timeToLive = timeToLive; + } + /** * @return the removeOwnedObjectsOnExit */ public boolean isRemoveOwnedObjectsOnExit() { - return removeOwnedObjectsOnExit; + return this.removeOwnedObjectsOnExit; } /** - * Sets the policy for owned objects in the group - * If set to true, when this GroupMap stops, + * Sets the policy for owned objects in the group If set to true, when this + * GroupMap stops, * any objects it owns will be removed from the group map * @param removeOwnedObjectsOnExit the removeOwnedObjectsOnExit to set */ public void setRemoveOwnedObjectsOnExit(boolean removeOwnedObjectsOnExit) { this.removeOwnedObjectsOnExit = removeOwnedObjectsOnExit; } - + + /** + * @return the minimumGroupSize + */ + public int getMinimumGroupSize() { + return this.minimumGroupSize; + } + + /** + * @param minimumGroupSize + * the minimumGroupSize to set + */ + public void setMinimumGroupSize(int minimumGroupSize) { + this.minimumGroupSize = minimumGroupSize; + } + /** * clear entries from the Map - * @throws IllegalStateException + * + * @throws IllegalStateException */ public void clear() throws IllegalStateException { - checkStarted(); - if(this.localMap != null && !this.localMap.isEmpty()) { + checkStatus(); + if (this.localMap != null && !this.localMap.isEmpty()) { Set> keys = null; - synchronized(this.mapMutex) { + synchronized (this.mapMutex) { keys = new HashSet>(this.localMap.keySet()); - this.localMap.clear(); } - - for(EntryKeykey:keys) { + for (EntryKey key : keys) { remove(key); } - } + } + this.localMap.clear(); } - - + public boolean containsKey(Object key) { - EntryKey stateKey = new EntryKey(this.local,key); - synchronized(this.mapMutex) { - return this.localMap != null ? this.localMap.containsKey(stateKey):false; + EntryKey stateKey = new EntryKey(this.local, key); + synchronized (this.mapMutex) { + return this.localMap != null ? this.localMap.containsKey(stateKey) + : false; } } public boolean containsValue(Object value) { - EntryValue entryValue = new EntryValue(this.local,value); - synchronized(this.mapMutex) { - return this.localMap != null ? this.localMap.containsValue(entryValue):false; + EntryValue entryValue = new EntryValue(this.local, value); + synchronized (this.mapMutex) { + return this.localMap != null ? this.localMap + .containsValue(entryValue) : false; } } public Set> entrySet() { - Mapresult = new HashMap(); - synchronized(this.mapMutex) { - if(this.localMap!=null) { - for(java.util.Map.Entry,EntryValue>entry:this.localMap.entrySet()) { - result.put(entry.getKey().getKey(),entry.getValue().getValue()); + Map result = new HashMap(); + synchronized (this.mapMutex) { + if (this.localMap != null) { + for (java.util.Map.Entry, EntryValue> entry : this.localMap + .entrySet()) { + result.put(entry.getKey().getKey(), entry.getValue() + .getValue()); } } } @@ -337,150 +466,170 @@ } public V get(Object key) { - EntryKey stateKey = new EntryKey(this.local,(K) key); - EntryValue value = null; - synchronized(this.mapMutex) { - value = this.localMap != null ?this.localMap.get(stateKey):null; - } - return value != null ? value.getValue() : null; + EntryKey stateKey = new EntryKey(this.local, (K) key); + EntryValue value = null; + synchronized (this.mapMutex) { + value = this.localMap != null ? this.localMap.get(stateKey) : null; + } + return value != null ? value.getValue() : null; } public boolean isEmpty() { - synchronized(this.mapMutex) { - return this.localMap != null ? this.localMap.isEmpty():true; + synchronized (this.mapMutex) { + return this.localMap != null ? this.localMap.isEmpty() : true; } } public Set keySet() { - Set result = new HashSet(); - synchronized(this.mapMutex) { - if(this.localMap!=null) { - for (EntryKey key:this.localMap.keySet()) { + Set result = new HashSet(); + synchronized (this.mapMutex) { + if (this.localMap != null) { + for (EntryKey key : this.localMap.keySet()) { result.add(key.getKey()); } } } return result; } + /** * Puts an value into the map associated with the key - * @param key - * @param value + * + * @param key + * @param value * @return the old value or null - * @throws IllegalAccessException - * @throws IllegalStateException + * @throws GroupMapUpdateException + * @throws IllegalStateException * */ - public V put(K key, V value) throws IllegalAccessException,IllegalStateException{ - return put(key,value,isSharedWrites(),isRemoveOwnedObjectsOnExit()); + public V put(K key, V value) throws GroupMapUpdateException, + IllegalStateException { + return put(key, value, isSharedWrites(), isRemoveOwnedObjectsOnExit(), + getTimeToLive()); } - + /** * Puts an value into the map associated with the key - * @param key - * @param value - * @param sharedWrites - * @param removeOnExit + * + * @param key + * @param value + * @param sharedWrites + * @param removeOnExit + * @param timeToLive * @return the old value or null - * @throws IllegalAccessException - * @throws IllegalStateException + * @throws GroupMapUpdateException + * @throws IllegalStateException * */ - public V put(K key, V value,boolean sharedWrites,boolean removeOnExit) throws IllegalAccessException, IllegalStateException{ - checkStarted(); - EntryKeyentryKey = new EntryKey(this.local,key); - EntryValuestateValue = new EntryValue(this.local,value); + public V put(K key, V value, boolean sharedWrites, boolean removeOnExit, + long timeToLive) throws GroupMapUpdateException, + IllegalStateException { + checkStatus(); + EntryKey entryKey = new EntryKey(this.local, key); + EntryValue stateValue = new EntryValue(this.local, value); entryKey.setShare(sharedWrites); entryKey.setRemoveOnExit(removeOnExit); + entryKey.setTimeToLive(timeToLive); EntryMessage entryMsg = new EntryMessage(); entryMsg.setKey(entryKey); entryMsg.setValue(value); entryMsg.setType(EntryMessage.MessageType.INSERT); - return sendEntryMessage(entryMsg); + return (V) sendRequest(getCoordinator(), entryMsg); } - + /** * Add the Map to the distribution + * * @param t - * @throws IllegalAccessException + * @throws GroupMapUpdateException * @throws IllegalStateException */ - public void putAll(Map t) throws IllegalAccessException,IllegalStateException { - putAll(t,isSharedWrites(),isRemoveOwnedObjectsOnExit()); + public void putAll(Map t) + throws GroupMapUpdateException, IllegalStateException { + putAll(t, isSharedWrites(), isRemoveOwnedObjectsOnExit(), + getTimeToLive()); } /** * Add the Map to the distribution + * * @param t * @param sharedWrites * @param removeOnExit - * @throws IllegalAccessException + * @param timeToLive + * @throws GroupMapUpdateException * @throws IllegalStateException */ - public void putAll(Map t,boolean sharedWrites,boolean removeOnExit) throws IllegalAccessException,IllegalStateException { - for(java.util.Map.Entryentry:t.entrySet()) { - put(entry.getKey(),entry.getValue(),sharedWrites,removeOnExit); - } + public void putAll(Map t, boolean sharedWrites, + boolean removeOnExit, long timeToLive) + throws GroupMapUpdateException, IllegalStateException { + for (java.util.Map.Entry entry : t.entrySet()) { + put(entry.getKey(), entry.getValue(), sharedWrites, removeOnExit, + timeToLive); + } } /** * remove a value from the map associated with the key - * @param key + * + * @param key * @return the Value or null - * @throws IllegalAccessException - * @throws IllegalStateException + * @throws GroupMapUpdateException + * @throws IllegalStateException * */ - public V remove(Object key) throws IllegalAccessException,IllegalStateException{ - EntryKey entryKey = new EntryKey(this.local,(K) key); + public V remove(Object key) throws GroupMapUpdateException, + IllegalStateException { + EntryKey entryKey = new EntryKey(this.local, (K) key); return remove(entryKey); } - - V remove(EntryKey key) throws IllegalAccessException,IllegalStateException{ - checkStarted(); + + V remove(EntryKey key) throws GroupMapUpdateException, + IllegalStateException { + checkStatus(); EntryMessage entryMsg = new EntryMessage(); entryMsg.setKey(key); entryMsg.setType(EntryMessage.MessageType.DELETE); - return sendEntryMessage(entryMsg); + return (V) sendRequest(getCoordinator(), entryMsg); } - - + public int size() { - synchronized(this.mapMutex) { - return this.localMap != null ? this.localMap.size():0; + synchronized (this.mapMutex) { + return this.localMap != null ? this.localMap.size() : 0; } } public Collection values() { List result = new ArrayList(); - synchronized(this.mapMutex) { - if(this.localMap!=null) { - for (EntryValue value:this.localMap.values()) { + synchronized (this.mapMutex) { + if (this.localMap != null) { + for (EntryValue value : this.localMap.values()) { result.add(value.getValue()); } } } return result; } - + /** - * @return a set of the members + * @return a set of the members */ - public Set members(){ - Setresult = new HashSet(); + public Set members() { + Set result = new HashSet(); result.addAll(this.members.values()); return result; } - + /** * @param key * @return true if this is the owner of the key */ public boolean isOwner(K key) { - EntryKey stateKey = new EntryKey(this.local,key); + EntryKey stateKey = new EntryKey(this.local, key); EntryValue entryValue = null; - synchronized(this.mapMutex) { - entryValue = this.localMap != null ? this.localMap.get(stateKey):null; + synchronized (this.mapMutex) { + entryValue = this.localMap != null ? this.localMap.get(stateKey) + : null; } boolean result = false; if (entryValue != null) { @@ -488,102 +637,152 @@ } return result; } - + /** * Get the owner of a key + * * @param key * @return the owner - or null if the key doesn't exist */ public Member getOwner(K key) { - EntryKey stateKey = new EntryKey(this.local,key); + EntryKey stateKey = new EntryKey(this.local, key); EntryValue entryValue = null; - synchronized(this.mapMutex) { - entryValue = this.localMap != null ? this.localMap.get(stateKey):null; + synchronized (this.mapMutex) { + entryValue = this.localMap != null ? this.localMap.get(stateKey) + : null; } - return entryValue != null ? entryValue.getOwner():null; + return entryValue != null ? entryValue.getOwner() : null; } - + /** * @return true if the coordinator for the map */ public boolean isCoordinator() { return this.local.equals(this.coordinator); } - + + /** + * @return the coordinator + */ + public Member getCoordinator() { + return this.coordinator; + } + /** - * Select a coordinator - by default, its the member with - * the lowest lexicographical id + * Select a coordinator - by default, its the member with the lowest + * lexicographical id + * * @param members * @return */ - protected Member selectCordinator(Collectionmembers) { + protected Member selectCordinator(Collection members) { Member result = this.local; - for (Member member:members) { + for (Member member : members) { if (result.getId().compareTo(member.getId()) < 0) { result = member; } } - return result; + return result; } - - V sendEntryMessage(EntryMessage entry) { + + Object sendRequest(Member member, Serializable payload) { Object result = null; MapRequest request = new MapRequest(); - String id = this.requestGenerator.generateId(); - synchronized(this.requests) { - this.requests.put(id,request); + String id = this.idGenerator.generateId(); + synchronized (this.requests) { + this.requests.put(id, request); } try { - ObjectMessage objMsg = this.session.createObjectMessage(entry); + ObjectMessage objMsg = this.session.createObjectMessage(payload); objMsg.setJMSReplyTo(this.inboxTopic); objMsg.setJMSCorrelationID(id); - this.producer.send(this.topic, objMsg); - result = request.get(getHeartBeatInterval()*2); - }catch(JMSException e) { - if(this.started.get()) { - LOG.error("Failed to send EntryMessage " + entry,e); + this.producer.send(member.getInBoxDestination(), objMsg); + result = request.get(getHeartBeatInterval() * 200000); + } catch (JMSException e) { + if (this.started.get()) { + LOG.error("Failed to send request " + payload, e); } } - if (result instanceof IllegalAccessException) { - throw (IllegalAccessException)result; + if (result instanceof GroupMapUpdateException) { + throw (GroupMapUpdateException) result; } - return (V) result; + return result; } - - void handleResponses(Message message) { + + void sendAsyncRequest(AsyncMapRequest asyncRequest, Member member, + Serializable payload) { + MapRequest request = new MapRequest(); + String id = this.idGenerator.generateId(); + asyncRequest.add(id, request); + synchronized (this.requests) { + this.requests.put(id, request); + } + try { + ObjectMessage objMsg = this.session.createObjectMessage(payload); + objMsg.setJMSReplyTo(this.inboxTopic); + objMsg.setJMSCorrelationID(id); + this.producer.send(member.getInBoxDestination(), objMsg); + } catch (JMSException e) { + if (this.started.get()) { + LOG.error("Failed to send async request " + payload, e); + } + } + } + + void sendReply(Object reply, Destination replyTo, String id) { + try { + ObjectMessage replyMsg = this.session + .createObjectMessage((Serializable) reply); + replyMsg.setJMSCorrelationID(id); + this.producer.send(replyTo, replyMsg); + } catch (JMSException e) { + LOG.error("Couldn't send reply from co-ordinator", e); + } + } + + void broadcastMapUpdate(EntryMessage entry, String correlationId) { + try { + EntryMessage copy = entry.copy(); + copy.setMapUpdate(true); + ObjectMessage objMsg = this.session.createObjectMessage(copy); + objMsg.setJMSCorrelationID(correlationId); + this.producer.send(this.topic, objMsg); + } catch (JMSException e) { + if (this.started.get()) { + LOG.error("Failed to send EntryMessage " + entry, e); + } + } + } + + void processMessage(Message message) { if (message instanceof ObjectMessage) { ObjectMessage objMsg = (ObjectMessage) message; try { + String id = objMsg.getJMSCorrelationID(); + Destination replyTo = objMsg.getJMSReplyTo(); Object payload = objMsg.getObject(); if (payload instanceof Member) { - handleHeartbeats((Member)payload); - } else if(payload instanceof EntryMessage) { + handleHeartbeats((Member) payload); + } else if (payload instanceof EntryMessage) { EntryMessage entryMsg = (EntryMessage) payload; - EntryKeykey=entryMsg.getKey(); - EntryValue value = new EntryValue(key.getOwner(),(V) entryMsg.getValue()); - - if(this.localMap !=null) { - boolean fireUpdate = false; - synchronized(this.mapMutex) { - if(!this.localMap.containsKey(key)) { - this.localMap.put(key,value); - fireUpdate=true; - } - } - if(fireUpdate) { - fireMapChanged(key.getOwner(), key.getKey(), null, value.getValue()); - } + entryMsg = entryMsg.copy(); + if (entryMsg.isMapUpdate()) { + processMapUpdate(entryMsg); + } else { + processEntryMessage(entryMsg, replyTo, id); } - - - }else { - String id = objMsg.getJMSCorrelationID(); + } else if (payload instanceof ElectionMessage) { + ElectionMessage electionMsg = (ElectionMessage) payload; + electionMsg = electionMsg.copy(); + processElectionMessage(electionMsg, replyTo, id); + } + if (id != null) { MapRequest result = null; synchronized (this.requests) { result = this.requests.remove(id); } if (result != null) { - result.put(objMsg.getObject()); + result.put(id, objMsg.getObject()); } } } catch (JMSException e) { @@ -591,83 +790,92 @@ } } } - - void handleMapUpdates(Message message) { - Object reply = null; - if (message instanceof ObjectMessage) { - try { - ObjectMessage objMsg = (ObjectMessage) message; - EntryMessage entryMsg = (EntryMessage) objMsg.getObject(); - EntryKey key = entryMsg.getKey(); - EntryValue value = new EntryValue(key.getOwner(),(V) entryMsg.getValue()); - boolean containsKey=false; - boolean mapExists = false; - synchronized(this.mapMutex) { - mapExists = this.localMap!=null; - if(mapExists) { - containsKey=this.localMap.containsKey(key); - } - } - if(mapExists) { - if (containsKey) { - Member owner = getOwner((K) key.getKey()); - if (owner.equals(key.getOwner()) && !key.isShare()) { - EntryValue old = null; - if (entryMsg.getType().equals(EntryMessage.MessageType.INSERT)) { - synchronized(this.mapMutex) { - old = this.localMap.put(key, value); - } - }else { - synchronized(this.mapMutex) { - old = this.localMap.remove(key); - } - } - fireMapChanged(owner, key.getKey(), old.getValue(), value.getValue()); - }else { - reply = new IllegalAccessException("Owned by "+ owner); + + void processEntryMessage(EntryMessage entryMsg, Destination replyTo, + String correlationId) { + if (isCoordinator()) { + EntryKey key = entryMsg.getKey(); + EntryValue value = new EntryValue(key.getOwner(), + (V) entryMsg.getValue()); + boolean insert = entryMsg.isInsert(); + boolean containsKey = false; + synchronized (this.mapMutex) { + containsKey = this.localMap.containsKey(key); + } + if (containsKey) { + Member owner = getOwner((K) key.getKey()); + if (owner.equals(key.getOwner()) || key.isShare()) { + EntryValue old = null; + if (insert) { + synchronized (this.mapMutex) { + old = this.localMap.put(key, value); } - }else { - if (entryMsg.getType().equals(EntryMessage.MessageType.INSERT)) { - synchronized(this.mapMutex) { - this.localMap.put(key, value); - } - fireMapChanged(key.getOwner(), key.getKey(), null, value.getValue()); + } else { + synchronized (this.mapMutex) { + old = this.localMap.remove(key); } } + broadcastMapUpdate(entryMsg, correlationId); + fireMapChanged(owner, key.getKey(), old.getValue(), value + .getValue(), false); + } else { + Serializable reply = new GroupMapUpdateException( + "Owned by " + owner); + sendReply(reply, replyTo, correlationId); + } + } else { + if (insert) { + synchronized (this.mapMutex) { + this.localMap.put(key, value); + } + broadcastMapUpdate(entryMsg, correlationId); + fireMapChanged(key.getOwner(), key.getKey(), null, value + .getValue(), false); + } else { + sendReply(null, replyTo, correlationId); } - } catch (JMSException e) { - LOG.warn("Failed to process map update",e); - reply = e; } - - try { - Destination replyTo = message.getJMSReplyTo(); - String correlationId = message.getJMSCorrelationID(); - ObjectMessage replyMsg = this.session - .createObjectMessage((Serializable) reply); - replyMsg.setJMSCorrelationID(correlationId); - // reuse timestamp - this will be cleared by the producer on - // send - replyMsg.setJMSTimestamp(System.currentTimeMillis()); - if (isCoordinator()) { - this.producer.send(replyTo, replyMsg); - }else { - synchronized(mapUpdateReplies) { - this.mapUpdateReplies.add(replyMsg); + } + } + + void processMapUpdate(EntryMessage entryMsg) { + boolean containsKey = false; + EntryKey key = entryMsg.getKey(); + EntryValue value = new EntryValue(key.getOwner(), (V) entryMsg + .getValue()); + boolean insert = entryMsg.isInsert()||entryMsg.isSync(); + synchronized (this.mapMutex) { + containsKey = this.localMap.containsKey(key); + } + + if (!isCoordinator()||entryMsg.isSync()) { + if (containsKey) { + Member owner = getOwner((K) key.getKey()); + EntryValue old = null; + if (insert) { + synchronized (this.mapMutex) { + old = this.localMap.put(key, value); + } + } else { + synchronized (this.mapMutex) { + old = this.localMap.remove(key); + value.setValue(null); } } - } catch (JMSException e) { - if(this.started.get()) { - LOG.error("Failed to send response to a map update ", e); + fireMapChanged(owner, key.getKey(), old.getValue(), value + .getValue(), entryMsg.isExpired()); + } else { + if (insert) { + synchronized (this.mapMutex) { + this.localMap.put(key, value); + } + fireMapChanged(key.getOwner(), key.getKey(), null, value + .getValue(), false); } } - - - }else { - LOG.warn("Unexpected map update message " + message); } } - + void handleHeartbeats(Message message) { try { if (message instanceof ObjectMessage) { @@ -685,63 +893,99 @@ void handleHeartbeats(Member member) { member.setTimeStamp(System.currentTimeMillis()); if (this.members.put(member.getId(), member) == null) { + election(member, true); fireMemberStarted(member); if (!member.equals(this.local)) { - //send the new member our details sendHeartBeat(member.getInBoxDestination()); - if(isCoordinator()) { - updateNewMemberMap(member); - } + } + synchronized (this.memberMutex) { + this.memberMutex.notifyAll(); } } } - + void handleConsumerEvents(ConsumerEvent event) { if (!event.isStarted()) { - Member member = this.members.remove(event.getConsumerId().toString()); - if(member!=null) { + Member member = this.members.remove(event.getConsumerId() + .toString()); + if (member != null) { fireMemberStopped(member); - doElection(); + election(member, false); } } } - + void checkMembership() { - if (this.started.get()) { - long checkTime = System.currentTimeMillis()-getHeartBeatInterval(); + if (this.started.get() && this.electionFinished.get()) { + long checkTime = System.currentTimeMillis() + - getHeartBeatInterval(); boolean doElection = false; for (Member member : this.members.values()) { - if (member.getTimeStamp() tmpList = new ArrayList(); - synchronized(this.mapUpdateReplies) { - try { - for(Message msg:this.mapUpdateReplies) { - if (msg.getJMSTimestamp() < checkTime) { - tmpList.add(msg); + } + + void expirationSweep() { + if (isCoordinator() && this.started.get() && this.electionFinished.get()) { + List list = null; + synchronized (this.mapMutex) { + Map, EntryValue> map = this.localMap; + if (map != null) { + long currentTime = System.currentTimeMillis(); + for (EntryKey k : map.keySet()) { + if (k.isExpired(currentTime)) { + if (list == null) { + list = new ArrayList(); + list.add(k); + } + } } } - for(Message msg:tmpList) { - this.mapUpdateReplies.remove(msg); - } - }catch(JMSException e) { - LOG.warn("Failed to clear down mapUpdateReplies",e); + } + //do the actual removal of entries in a separate thread + if (list != null) { + final List expire = list; + this.executor.execute(new Runnable() { + public void run() { + doExpiration(expire); + } + }); } } + } + void doExpiration(List list) { + if (this.started.get() && this.electionFinished.get() + && isCoordinator()) { + for (EntryKey k : list) { + EntryValue old = null; + synchronized (this.mapMutex) { + old = this.localMap.remove(k); + } + if (old != null) { + EntryMessage entryMsg = new EntryMessage(); + entryMsg.setType(EntryMessage.MessageType.DELETE); + entryMsg.setExpired(true); + entryMsg.setKey(k); + entryMsg.setValue(old.getValue()); + broadcastMapUpdate(entryMsg, ""); + fireMapChanged(k.getOwner(), k.getKey(), old.getValue(), + null, true); + } + } + } + } + void sendHeartBeat() { sendHeartBeat(this.heartBeatTopic); } @@ -752,14 +996,16 @@ ObjectMessage msg = this.session .createObjectMessage(this.local); this.producer.send(destination, msg); + } catch (javax.jms.IllegalStateException e) { + // ignore - as we are probably stopping } catch (Throwable e) { - if(this.started.get()) { + if (this.started.get()) { LOG.warn("Failed to send heart beat", e); } } } } - + void updateNewMemberMap(Member member) { List, EntryValue>> list = new ArrayList, EntryValue>>(); synchronized (this.mapMutex) { @@ -775,10 +1021,11 @@ EntryMessage entryMsg = new EntryMessage(); entryMsg.setKey(entry.getKey()); entryMsg.setValue(entry.getValue().getValue()); - entryMsg.setType(EntryMessage.MessageType.INSERT); + entryMsg.setType(EntryMessage.MessageType.SYNC); + entryMsg.setMapUpdate(true); ObjectMessage objMsg = this.session .createObjectMessage(entryMsg); - if(!member.equals(entry.getKey().getOwner())) { + if (!member.equals(entry.getKey().getOwner())) { this.producer.send(member.getInBoxDestination(), objMsg); } } @@ -788,26 +1035,26 @@ } } } - + void fireMemberStarted(Member member) { - LOG.info(this.local.getName()+" Member started " + member); + LOG.info(this.local.getName() + " Member started " + member); for (MemberChangedListener l : this.membershipListeners) { l.memberStarted(member); } } - + void fireMemberStopped(Member member) { - LOG.info(this.local.getName()+" Member stopped " + member); + LOG.info(this.local.getName() + " Member stopped " + member); for (MemberChangedListener l : this.membershipListeners) { l.memberStopped(member); } - //remove all entries owned by the stopped member - List>tmpList = new ArrayList>(); + // remove all entries owned by the stopped member + List> tmpList = new ArrayList>(); boolean mapExists = false; - synchronized(this.mapMutex) { - mapExists=this.localMap!=null; - if(mapExists) { - for (EntryKey entryKey:this.localMap.keySet()) { + synchronized (this.mapMutex) { + mapExists = this.localMap != null; + if (mapExists) { + for (EntryKey entryKey : this.localMap.keySet()) { if (entryKey.getOwner().equals(member)) { if (entryKey.isRemoveOnExit()) { tmpList.add(entryKey); @@ -816,49 +1063,154 @@ } } } - if(mapExists) { - for (EntryKey entryKey:tmpList) { + if (mapExists) { + for (EntryKey entryKey : tmpList) { EntryValue value = null; - synchronized(this.mapMutex) { + synchronized (this.mapMutex) { value = this.localMap.remove(entryKey); } - fireMapChanged(member, entryKey.getKey(), value.getValue(), null); + fireMapChanged(member, entryKey.getKey(), value.getValue(), + null,false); } } } - - void fireMapChanged(Member owner,Object key, Object oldValue, Object newValue) { - for (MapChangedListener l:this.mapChangedListeners) { - l.mapChanged(owner, key, oldValue, newValue); + + void fireMapChanged(final Member owner, final Object key, + final Object oldValue, final Object newValue, final boolean expired) { + if (this.started.get() && this.executor != null + && !this.executor.isShutdown()) { + this.executor.execute(new Runnable() { + public void run() { + doFireMapChanged(owner, key, oldValue, newValue, expired); + } + }); } } - - void doElection() { - this.coordinator=selectCordinator(this.members.values()); - if (isCoordinator() && this.started.get()) { - //send any inflight requests - Listlist = new ArrayList(); - synchronized(this.mapUpdateReplies) { - list.addAll(this.mapUpdateReplies); - this.mapUpdateReplies.clear(); + + void doFireMapChanged(Member owner, Object key, Object oldValue, + Object newValue, boolean expired) { + for (MapChangedListener l : this.mapChangedListeners) { + if (oldValue == null) { + l.mapInsert(owner, key, newValue); + } else if (newValue == null) { + l.mapRemove(owner, key, oldValue, expired); + } else { + l.mapUpdate(owner, key, oldValue, newValue); } - try { - for(Message msg:list) { - if (this.started.get()) { - this.producer.send(msg.getJMSReplyTo(), msg); + } + } + + void election(final Member member, final boolean memberStarted) { + if (this.started.get() && this.executor != null + && !this.executor.isShutdown()) { + this.executor.execute(new Runnable() { + public void run() { + doElection(member, memberStarted); } + }); + } + } + + void doElection(Member member, boolean memberStarted) { + if ((member == null || !member.equals(this.local)) + && this.electionFinished.compareAndSet(true, false)) { + boolean wasCoordinator = isCoordinator() && !isEmpty(); + // call an election + while (!callElection()) + ; + List members = new ArrayList(this.members.values()); + this.coordinator = selectCordinator(members); + if (isCoordinator()) { + broadcastElectionType(ElectionMessage.MessageType.COORDINATOR); } - }catch(JMSException e) { - if(this.started.get()) { - LOG.error("Failed to resend replies",e); + if (memberStarted && member != null) { + if (wasCoordinator || isCoordinator() && this.started.get()) { + updateNewMemberMap(member); } } + if (!this.electionFinished.get()) { + try { + synchronized (this.electionFinished) { + this.electionFinished.wait(this.heartBeatInterval * 2); + } + } catch (InterruptedException e) { + } + } + if (!this.electionFinished.get()) { + // we must be the coordinator + this.coordinator = this.local; + this.electionFinished.set(true); + broadcastElectionType(ElectionMessage.MessageType.COORDINATOR); + } } - } - - void checkStarted() throws IllegalStateException{ + } + + boolean callElection() { + List members = new ArrayList(this.members.values()); + AsyncMapRequest request = new AsyncMapRequest(); + for (Member member : members) { + if (this.local.getId().compareTo(member.getId()) < 0) { + ElectionMessage msg = new ElectionMessage(); + msg.setMember(this.local); + msg.setType(ElectionMessage.MessageType.ELECTION); + sendAsyncRequest(request, member, msg); + } + } + return request.isSuccess(getHeartBeatInterval()); + } + + void processElectionMessage(ElectionMessage msg, Destination replyTo, + String correlationId) { + if (msg.isElection()) { + msg.setType(ElectionMessage.MessageType.ANSWER); + msg.setMember(this.local); + sendReply(msg, replyTo, correlationId); + } else if (msg.isCoordinator()) { + synchronized (this.electionFinished) { + this.coordinator = msg.getMember(); + this.electionFinished.set(true); + this.electionFinished.notifyAll(); + } + } + } + + void broadcastElectionType(ElectionMessage.MessageType type) { + if (started.get()) { + try { + ElectionMessage msg = new ElectionMessage(); + msg.setMember(this.local); + msg.setType(type); + ObjectMessage objMsg = this.session.createObjectMessage(msg); + this.producer.send(this.topic, objMsg); + } catch (javax.jms.IllegalStateException e) { + // ignore - we are stopping + } catch (JMSException e) { + if (this.started.get()) { + LOG.error("Failed to broadcast election message: " + type, + e); + } + } + } + } + + void checkStatus() throws IllegalStateException { if (!started.get()) { - throw new IllegalStateException("GroupMap " + this.local.getName() + " not started"); + throw new IllegalStateException("GroupMap " + this.local.getName() + + " not started"); + } + waitForElection(); + } + + void waitForElection() { + synchronized (this.electionFinished) { + while (started.get() && !this.electionFinished.get()) { + try { + this.electionFinished.wait(1000); + } catch (InterruptedException e) { + stop(); + Thread.currentThread().interrupt(); + } + } } - } + } } Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMapUpdateException.java (from r679849, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/IllegalAccessException.java) URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMapUpdateException.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMapUpdateException.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/IllegalAccessException.java&r1=679849&r2=683259&rev=683259&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/IllegalAccessException.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMapUpdateException.java Wed Aug 6 06:25:27 2008 @@ -20,13 +20,13 @@ * thrown when updating a key to map that the local client doesn't own * */ -public class IllegalAccessException extends java.lang.IllegalStateException { +public class GroupMapUpdateException extends RuntimeException { private static final long serialVersionUID = -7584658017201604560L; /** * @param message */ - public IllegalAccessException(String message) { + public GroupMapUpdateException(String message) { super(message); } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapChangedListener.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapChangedListener.java?rev=683259&r1=683258&r2=683259&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapChangedListener.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapChangedListener.java Wed Aug 6 06:25:27 2008 @@ -23,11 +23,28 @@ public interface MapChangedListener { /** - * Called when the map has changed + * Called when a key/value pair is inserted into the map * @param owner * @param key + * @param value + */ + void mapInsert(Member owner,Object key, Object value); + + /** + * Called when a key value is updated in the map + * @param owner + * @param Key * @param oldValue * @param newValue */ - void mapChanged(Member owner,Object key, Object oldValue, Object newValue); + void mapUpdate(Member owner,Object Key,Object oldValue,Object newValue); + + /** + * Called when a key value is removed from the map + * @param owner + * @param key + * @param value + * @param expired + */ + void mapRemove(Member owner,Object key, Object value,boolean expired); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapRequest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapRequest.java?rev=683259&r1=683258&r2=683259&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapRequest.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapRequest.java Wed Aug 6 06:25:27 2008 @@ -25,12 +25,13 @@ public class MapRequest { private final AtomicBoolean done = new AtomicBoolean(); private Object response; + private RequestCallback callback; - Object get(int timeout) { - synchronized (done) { - if (done.get() == false && response == null) { + Object get(long timeout) { + synchronized (this.done) { + if (this.done.get() == false && this.response == null) { try { - done.wait(timeout); + this.done.wait(timeout); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } @@ -39,15 +40,23 @@ return this.response; } - void put(Object response) { + void put(String id,Object response) { this.response = response; cancel(); + RequestCallback callback = this.callback; + if (callback != null) { + callback.finished(id); + } } void cancel() { - done.set(true); - synchronized (done) { - done.notifyAll(); + this.done.set(true); + synchronized (this.done) { + this.done.notifyAll(); } } + + void setCallback(RequestCallback callback) { + this.callback=callback; + } } Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/RequestCallback.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/RequestCallback.java?rev=683259&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/RequestCallback.java (added) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/RequestCallback.java Wed Aug 6 06:25:27 2008 @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.group; + + +/** + * Return information about map update + * + */ +public interface RequestCallback{ + /** + * Optionally called when a request is finished + * @param id + */ + void finished(String id); +} Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/RequestCallback.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapMemberTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapMemberTest.java?rev=683259&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapMemberTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapMemberTest.java Wed Aug 6 06:25:27 2008 @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.group; + +import java.util.ArrayList; +import java.util.List; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; + + +public class GroupMapMemberTest extends TestCase { + protected BrokerService broker; + protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; + + /** + * Test method for + * {@link org.apache.activemq.group.GroupMap#addMemberChangedListener(org.apache.activemq.group.MemberChangedListener)}. + * @throws Exception + */ + public void testGroup() throws Exception { + + int number = 20; + Listconnections = new ArrayList(); + ListgroupMaps = new ArrayList(); + ConnectionFactory factory = createConnectionFactory(); + for (int i =0; i < number; i++) { + Connection connection = factory.createConnection(); + connection.start(); + connections.add(connection); + GroupMap map = new GroupMap(connection,"map"+i); + map.setHeartBeatInterval(20000); + if(i ==number-1) { + map.setMinimumGroupSize(number); + } + map.start(); + groupMaps.add(map); + } + + int coordinator = 0; + for (GroupMap map:groupMaps) { + if (map.isCoordinator()) { + coordinator++; + } + } + + assertEquals(1,coordinator); + groupMaps.get(0).put("key", "value"); + Thread.sleep(2000); + for (GroupMap map:groupMaps) { + assertTrue(map.get("key").equals("value")); + } + for(GroupMap map:groupMaps) { + map.stop(); + } + for (Connection connection:connections) { + connection.stop(); + } + + } + + + + protected void setUp() throws Exception { + if (broker == null) { + broker = createBroker(); + } + super.setUp(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + if (broker != null) { + broker.stop(); + } + } + + protected ActiveMQConnectionFactory createConnectionFactory()throws Exception { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory( + ActiveMQConnection.DEFAULT_BROKER_URL); + return cf; + } + + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + configureBroker(answer); + answer.start(); + return answer; + } + + protected void configureBroker(BrokerService answer) throws Exception { + answer.setPersistent(false); + answer.addConnector(bindAddress); + answer.setDeleteAllMessagesOnStartup(true); + } +} + Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapMemberTest.java ------------------------------------------------------------------------------ svn:eol-style = native