activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r801515 - /activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/Group.java
Date Thu, 06 Aug 2009 06:15:52 GMT
Author: rajdavies
Date: Thu Aug  6 06:15:52 2009
New Revision: 801515

URL: http://svn.apache.org/viewvc?rev=801515&view=rev
Log:
tidied up a little

Modified:
    activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/Group.java

Modified: activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/Group.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/Group.java?rev=801515&r1=801514&r2=801515&view=diff
==============================================================================
--- activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/Group.java (original)
+++ activemq/trunk/activemq-groups/src/main/java/org/apache/activegroups/Group.java Thu Aug
 6 06:15:52 2009
@@ -66,38 +66,31 @@
 
 /**
  * <P>
- * A <CODE>Group</CODE> is a distributed collaboration implementation that is
- * used to shared state and process messages amongst a distributed group of
- * other <CODE>Group</CODE> instances. Membership of a group is handled
+ * A <CODE>Group</CODE> is a distributed collaboration implementation that is
used to shared state and process
+ * messages amongst a distributed group of other <CODE>Group</CODE> instances.
Membership of a group is handled
  * automatically using discovery.
  * <P>
- * The underlying transport is JMS and there are some optimizations that occur
- * for membership if used with ActiveMQ - but <CODE>Group</CODE> can be used
- * with any JMS implementation.
+ * The underlying transport is JMS and there are some optimizations that occur for membership
if used with ActiveMQ -
+ * but <CODE>Group</CODE> can be used with any JMS implementation.
  * 
  * <P>
- * Updates to the group shared map are controlled by a coordinator. The
- * coordinator is elected by the member with the lowest lexicographical id -
- * based on the bully algorithm [Silberschatz et al. 1993]
+ * Updates to the group shared map are controlled by a coordinator. The coordinator is elected
by the member with the
+ * lowest lexicographical id - based on the bully algorithm [Silberschatz et al. 1993]
  * <P>
- * The {@link #selectCordinator(Collection<Member> members)} method may be
- * overridden to implement a custom mechanism for choosing how the coordinator
- * is elected for the map.
+ * The {@link #selectCordinator(Collection<Member> members)} method may be overridden
to implement a custom mechanism
+ * for choosing how the coordinator is elected for the map.
  * <P>
- * New <CODE>Group</CODE> instances have their state updated by the
- * coordinator, and coordinator failure is handled automatically within the
- * group.
+ * New <CODE>Group</CODE> instances have their state updated by the coordinator,
and coordinator failure is handled
+ * automatically within the group.
  * <P>
- * All map updates are totally ordered through the coordinator, whilst read
- * operations happen locally.
+ * All map updates are totally ordered through the coordinator, whilst read operations happen
locally.
  * <P>
- * A <CODE>Group</CODE> supports the concept of owner only updates(write
- * locks), shared updates, entry expiration times and removal on owner exit -
- * all of which are optional. In addition, you can grab and release locks for
- * values in the map, independently of who created them.
+ * A <CODE>Group</CODE> supports the concept of owner only updates(write locks),
shared updates, entry expiration
+ * times and removal on owner exit - all of which are optional. In addition, you can grab
and release locks for values
+ * in the map, independently of who created them.
  * <P>
- * In addition, members of a group can broadcast messages and implement
- * request/response with other <CODE>Group</CODE> instances.
+ * In addition, members of a group can broadcast messages and implement request/response
with other <CODE>Group</CODE>
+ * instances.
  * 
  * <P>
  * 
@@ -114,10 +107,8 @@
     public static final long DEFAULT_HEART_BEAT_INTERVAL = 1000;
     private static final long EXPIRATION_SWEEP_INTERVAL = 500;
     private static final Log LOG = LogFactory.getLog(Group.class);
-    private static final String STATE_PREFIX = "STATE." + Group.class.getName()
-            + ".";
-    private static final String GROUP_MESSAGE_PREFIX = "MESSAGE."
-            + Group.class.getName() + ".";
+    private static final String STATE_PREFIX = "STATE." + Group.class.getName() + ".";
+    private static final String GROUP_MESSAGE_PREFIX = "MESSAGE." + Group.class.getName()
+ ".";
     private static final String STATE_TYPE = "state";
     private static final String MESSAGE_TYPE = "message";
     private static final String MEMBER_ID_PROPERTY = "memberId";
@@ -184,8 +175,7 @@
     }
 
     /**
-     * Set the local map implementation to be used By default its a HashMap -
-     * but you could use a Cache for example
+     * Set the local map implementation to be used By default its a HashMap - but you could
use a Cache for example
      * 
      * @param map
      */
@@ -209,31 +199,22 @@
                 }
             }
             this.connection.start();
-            this.stateSession = this.connection.createSession(false,
-                    Session.AUTO_ACKNOWLEDGE);
-            this.messageSession = this.connection.createSession(false,
-                    Session.AUTO_ACKNOWLEDGE);
+            this.stateSession = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            this.messageSession = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             this.stateProducer = this.stateSession.createProducer(null);
             this.stateProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
             this.inboxTopic = this.stateSession.createTemporaryTopic();
             String stateTopicName = STATE_PREFIX + this.groupName;
             this.stateTopic = this.stateSession.createTopic(stateTopicName);
-            this.heartBeatTopic = this.stateSession.createTopic(stateTopicName
-                    + ".heartbeat");
-            String messageDestinationName = GROUP_MESSAGE_PREFIX
-                    + this.groupName;
-            this.messageTopic = this.messageSession
-                    .createTopic(messageDestinationName);
-            this.messageQueue = this.messageSession
-                    .createQueue(messageDestinationName);
-            MessageConsumer privateInbox = this.messageSession
-                    .createConsumer(this.inboxTopic);
-            MessageConsumer memberChangeConsumer = this.stateSession
-                    .createConsumer(this.stateTopic);
+            this.heartBeatTopic = this.stateSession.createTopic(stateTopicName + ".heartbeat");
+            String messageDestinationName = GROUP_MESSAGE_PREFIX + this.groupName;
+            this.messageTopic = this.messageSession.createTopic(messageDestinationName);
+            this.messageQueue = this.messageSession.createQueue(messageDestinationName);
+            MessageConsumer privateInbox = this.messageSession.createConsumer(this.inboxTopic);
+            MessageConsumer memberChangeConsumer = this.stateSession.createConsumer(this.stateTopic);
             String memberId = null;
             if (memberChangeConsumer instanceof ActiveMQMessageConsumer) {
-                memberId = ((ActiveMQMessageConsumer) memberChangeConsumer)
-                        .getConsumerId().toString();
+                memberId = ((ActiveMQMessageConsumer) memberChangeConsumer).getConsumerId().toString();
             } else {
                 memberId = this.idGenerator.generateId();
             }
@@ -252,63 +233,53 @@
             });
             this.messageProducer = this.messageSession.createProducer(null);
             this.messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-            MessageConsumer topicMessageConsumer = this.messageSession
-                    .createConsumer(this.messageTopic);
+            MessageConsumer topicMessageConsumer = this.messageSession.createConsumer(this.messageTopic);
             topicMessageConsumer.setMessageListener(new MessageListener() {
                 public void onMessage(Message message) {
                     processJMSMessage(message);
                 }
             });
-            MessageConsumer queueMessageConsumer = this.messageSession
-                    .createConsumer(this.messageQueue);
+            MessageConsumer queueMessageConsumer = this.messageSession.createConsumer(this.messageQueue);
             queueMessageConsumer.setMessageListener(new MessageListener() {
                 public void onMessage(Message message) {
                     processJMSMessage(message);
                 }
             });
-            MessageConsumer heartBeatConsumer = this.stateSession
-                    .createConsumer(this.heartBeatTopic);
+            MessageConsumer heartBeatConsumer = this.stateSession.createConsumer(this.heartBeatTopic);
             heartBeatConsumer.setMessageListener(new MessageListener() {
                 public void onMessage(Message message) {
                     handleHeartbeats(message);
                 }
             });
-            this.consumerEvents = new ConsumerEventSource(this.connection,
-                    this.stateTopic);
+            this.consumerEvents = new ConsumerEventSource(this.connection, this.stateTopic);
             this.consumerEvents.setConsumerListener(new ConsumerListener() {
                 public void onConsumerEvent(ConsumerEvent event) {
                     handleConsumerEvents(event);
                 }
             });
             this.consumerEvents.start();
-            this.electionExecutor = new ThreadPoolExecutor(1, 1, 0L,
-                    TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
-                    new ThreadFactory() {
+            this.electionExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
+                    new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
                         public Thread newThread(Runnable runnable) {
-                            Thread thread = new Thread(runnable, "Election{"
-                                    + Group.this.local + "}");
-                            thread.setDaemon(true);
-                            return thread;
-                        }
-                    });
-            this.stateExecutor = Executors
-                    .newSingleThreadExecutor(new ThreadFactory() {
-                        public Thread newThread(Runnable runnable) {
-                            Thread thread = new Thread(runnable, "Group State{"
-                                    + Group.this.local + "}");
-                            thread.setDaemon(true);
-                            return thread;
-                        }
-                    });
-            this.messageExecutor = Executors
-                    .newSingleThreadExecutor(new ThreadFactory() {
-                        public Thread newThread(Runnable runnable) {
-                            Thread thread = new Thread(runnable,
-                                    "Group Messages{" + Group.this.local + "}");
+                            Thread thread = new Thread(runnable, "Election{" + Group.this.local
+ "}");
                             thread.setDaemon(true);
                             return thread;
                         }
                     });
+            this.stateExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
+                public Thread newThread(Runnable runnable) {
+                    Thread thread = new Thread(runnable, "Group State{" + Group.this.local
+ "}");
+                    thread.setDaemon(true);
+                    return thread;
+                }
+            });
+            this.messageExecutor = Executors.newSingleThreadExecutor(new ThreadFactory()
{
+                public Thread newThread(Runnable runnable) {
+                    Thread thread = new Thread(runnable, "Group Messages{" + Group.this.local
+ "}");
+                    thread.setDaemon(true);
+                    return thread;
+                }
+            });
             sendHeartBeat();
             this.heartBeatTask = new SchedulerTimerTask(new Runnable() {
                 public void run() {
@@ -326,19 +297,13 @@
                 }
             });
             this.timer = new Timer("Distributed heart beat", true);
-            this.timer.scheduleAtFixedRate(this.heartBeatTask,
-                    getHeartBeatInterval() / 3, getHeartBeatInterval() / 2);
-            this.timer.scheduleAtFixedRate(this.checkMembershipTask,
-                    getHeartBeatInterval(), getHeartBeatInterval());
-            this.timer.scheduleAtFixedRate(this.expirationTask,
-                    EXPIRATION_SWEEP_INTERVAL, EXPIRATION_SWEEP_INTERVAL);
+            this.timer.scheduleAtFixedRate(this.heartBeatTask, getHeartBeatInterval() / 3,
getHeartBeatInterval() / 2);
+            this.timer.scheduleAtFixedRate(this.checkMembershipTask, getHeartBeatInterval(),
getHeartBeatInterval());
+            this.timer.scheduleAtFixedRate(this.expirationTask, EXPIRATION_SWEEP_INTERVAL,
EXPIRATION_SWEEP_INTERVAL);
             // await for members to join
-            long timeout = (long) (this.heartBeatInterval
-                    * this.minimumGroupSize *1.5);
+            long timeout = (long) (this.heartBeatInterval * this.minimumGroupSize * 1.5);
             long deadline = System.currentTimeMillis() + timeout;
-            while ((this.members.size() < this.minimumGroupSize || !this.electionFinished
-                    .get())
-                    && timeout > 0) {
+            while ((this.members.size() < this.minimumGroupSize || !this.electionFinished.get())
&& timeout > 0) {
                 synchronized (this.electionFinished) {
                     this.electionFinished.wait(timeout);
                 }
@@ -376,7 +341,6 @@
             } catch (Exception e) {
                 LOG.debug("Caught exception stopping", e);
             }
-            
         }
     }
 
@@ -421,8 +385,7 @@
 
     /**
      * @param alwaysLock -
-     *            set true if objects inserted will always be locked (default is
-     *            false)
+     *            set true if objects inserted will always be locked (default is false)
      */
     public void setAlwaysLock(boolean alwaysLock) {
         this.alwaysLock = alwaysLock;
@@ -520,8 +483,7 @@
     }
 
     /**
-     * Sets the policy for owned objects in the group If set to true, when this
-     * <code>GroupMap<code> stops,
+     * Sets the policy for owned objects in the group If set to true, when this <code>GroupMap<code>
stops,
      * any objects it owns will be removed from the group map
      * @param removeOwnedObjectsOnExit the removeOwnedObjectsOnExit to set
      */
@@ -612,16 +574,14 @@
 
     public boolean containsKey(Object key) {
         synchronized (this.mapMutex) {
-            return this.localMap != null ? this.localMap.containsKey(key)
-                    : false;
+            return this.localMap != null ? this.localMap.containsKey(key) : false;
         }
     }
 
     public boolean containsValue(Object value) {
         EntryValue entryValue = new EntryValue(null, value);
         synchronized (this.mapMutex) {
-            return this.localMap != null ? this.localMap
-                    .containsValue(entryValue) : false;
+            return this.localMap != null ? this.localMap.containsValue(entryValue) : false;
         }
     }
 
@@ -669,10 +629,9 @@
      * @throws IllegalStateException
      * 
      */
-    public V put(K key, V value) throws GroupUpdateException,
-            IllegalStateException {
-        return put(key, value, isAlwaysLock(), isRemoveOwnedObjectsOnExit(),
-                isReleaseLockOnExit(), getTimeToLive(), getLockTimeToLive());
+    public V put(K key, V value) throws GroupUpdateException, IllegalStateException {
+        return put(key, value, isAlwaysLock(), isRemoveOwnedObjectsOnExit(), isReleaseLockOnExit(),
getTimeToLive(),
+                getLockTimeToLive());
     }
 
     /**
@@ -690,9 +649,8 @@
      * @throws IllegalStateException
      * 
      */
-    public V put(K key, V value, boolean lock, boolean removeOnExit,
-            boolean releaseLockOnExit, long timeToLive, long leaseTime)
-            throws GroupUpdateException, IllegalStateException {
+    public V put(K key, V value, boolean lock, boolean removeOnExit, boolean releaseLockOnExit,
long timeToLive,
+            long leaseTime) throws GroupUpdateException, IllegalStateException {
         checkStatus();
         EntryKey<K> entryKey = new EntryKey<K>(this.local, key);
         entryKey.setLocked(lock);
@@ -756,10 +714,9 @@
      * @throws GroupUpdateException
      * @throws IllegalStateException
      */
-    public void putAll(Map<? extends K, ? extends V> t)
-            throws GroupUpdateException, IllegalStateException {
-        putAll(t, isAlwaysLock(), isRemoveOwnedObjectsOnExit(),
-                isReleaseLockOnExit(), getTimeToLive(), getLockTimeToLive());
+    public void putAll(Map<? extends K, ? extends V> t) throws GroupUpdateException,
IllegalStateException {
+        putAll(t, isAlwaysLock(), isRemoveOwnedObjectsOnExit(), isReleaseLockOnExit(), getTimeToLive(),
+                getLockTimeToLive());
     }
 
     /**
@@ -774,13 +731,10 @@
      * @throws GroupUpdateException
      * @throws IllegalStateException
      */
-    public void putAll(Map<? extends K, ? extends V> t, boolean lock,
-            boolean removeOnExit, boolean releaseLockOnExit, long timeToLive,
-            long lockTimeToLive) throws GroupUpdateException,
-            IllegalStateException {
+    public void putAll(Map<? extends K, ? extends V> t, boolean lock, boolean removeOnExit,
boolean releaseLockOnExit,
+            long timeToLive, long lockTimeToLive) throws GroupUpdateException, IllegalStateException
{
         for (java.util.Map.Entry<? extends K, ? extends V> entry : t.entrySet()) {
-            put(entry.getKey(), entry.getValue(), lock, removeOnExit,
-                    releaseLockOnExit, timeToLive, lockTimeToLive);
+            put(entry.getKey(), entry.getValue(), lock, removeOnExit, releaseLockOnExit,
timeToLive, lockTimeToLive);
         }
     }
 
@@ -793,14 +747,12 @@
      * @throws IllegalStateException
      * 
      */
-    public V remove(Object key) throws GroupUpdateException,
-            IllegalStateException {
+    public V remove(Object key) throws GroupUpdateException, IllegalStateException {
         EntryKey<K> entryKey = new EntryKey<K>(this.local, (K) key);
         return doRemove(entryKey);
     }
 
-    V doRemove(EntryKey<K> key) throws GroupUpdateException,
-            IllegalStateException {
+    V doRemove(EntryKey<K> key) throws GroupUpdateException, IllegalStateException
{
         checkStatus();
         EntryMessage entryMsg = new EntryMessage();
         entryMsg.setKey(key);
@@ -863,8 +815,7 @@
     }
 
     /**
-     * @return the local member that represents this <CODE>Group</CODE>
-     *         instance
+     * @return the local member that represents this <CODE>Group</CODE> instance
      */
     public Member getLocalMember() {
         return this.local;
@@ -881,8 +832,7 @@
         }
         boolean result = false;
         if (entryValue != null) {
-            result = entryValue.getKey().getOwner().getId().equals(
-                    this.local.getId());
+            result = entryValue.getKey().getOwner().getId().equals(this.local.getId());
         }
         return result;
     }
@@ -934,8 +884,7 @@
      */
     public void broadcastMessage(Object message) throws JMSException {
         checkStatus();
-        ObjectMessage objMsg = this.messageSession
-                .createObjectMessage((Serializable) message);
+        ObjectMessage objMsg = this.messageSession.createObjectMessage((Serializable) message);
         objMsg.setJMSCorrelationID(this.idGenerator.generateId());
         objMsg.setJMSType(MESSAGE_TYPE);
         objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId());
@@ -952,8 +901,7 @@
      * @return
      * @throws JMSException
      */
-    public Serializable broadcastMessageRequest(Object message, long timeout)
-            throws JMSException {
+    public Serializable broadcastMessageRequest(Object message, long timeout) throws JMSException
{
         checkStatus();
         Object result = null;
         MapRequest request = new MapRequest();
@@ -961,8 +909,7 @@
         synchronized (this.messageRequests) {
             this.messageRequests.put(id, request);
         }
-        ObjectMessage objMsg = this.stateSession
-                .createObjectMessage((Serializable) message);
+        ObjectMessage objMsg = this.stateSession.createObjectMessage((Serializable) message);
         objMsg.setJMSReplyTo(this.inboxTopic);
         objMsg.setJMSCorrelationID(id);
         objMsg.setJMSType(MESSAGE_TYPE);
@@ -973,16 +920,14 @@
     }
 
     /**
-     * Send a message to the group - but only the least loaded member will
-     * process it
+     * Send a message to the group - but only the least loaded member will process it
      * 
      * @param message
      * @throws JMSException
      */
     public void sendMessage(Object message) throws JMSException {
         checkStatus();
-        ObjectMessage objMsg = this.messageSession
-                .createObjectMessage((Serializable) message);
+        ObjectMessage objMsg = this.messageSession.createObjectMessage((Serializable) message);
         objMsg.setJMSCorrelationID(this.idGenerator.generateId());
         objMsg.setJMSType(MESSAGE_TYPE);
         objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId());
@@ -998,8 +943,7 @@
      */
     public void sendMessage(Member member, Object message) throws JMSException {
         checkStatus();
-        ObjectMessage objMsg = this.messageSession
-                .createObjectMessage((Serializable) message);
+        ObjectMessage objMsg = this.messageSession.createObjectMessage((Serializable) message);
         objMsg.setJMSCorrelationID(this.idGenerator.generateId());
         objMsg.setJMSType(MESSAGE_TYPE);
         objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId());
@@ -1016,8 +960,7 @@
      * @return the request or null
      * @throws JMSException
      */
-    public Object sendMessageRequest(Member member, Object message, long timeout)
-            throws JMSException {
+    public Object sendMessageRequest(Member member, Object message, long timeout) throws
JMSException {
         checkStatus();
         Object result = null;
         MapRequest request = new MapRequest();
@@ -1025,8 +968,7 @@
         synchronized (this.messageRequests) {
             this.messageRequests.put(id, request);
         }
-        ObjectMessage objMsg = this.stateSession
-                .createObjectMessage((Serializable) message);
+        ObjectMessage objMsg = this.stateSession.createObjectMessage((Serializable) message);
         objMsg.setJMSReplyTo(this.inboxTopic);
         objMsg.setJMSCorrelationID(id);
         objMsg.setJMSType(MESSAGE_TYPE);
@@ -1044,11 +986,9 @@
      * @param message
      * @throws JMSException
      */
-    public void sendMessageResponse(Member member, String replyId,
-            Object message) throws JMSException {
+    public void sendMessageResponse(Member member, String replyId, Object message) throws
JMSException {
         checkStatus();
-        ObjectMessage objMsg = this.messageSession
-                .createObjectMessage((Serializable) message);
+        ObjectMessage objMsg = this.messageSession.createObjectMessage((Serializable) message);
         objMsg.setJMSCorrelationID(replyId);
         objMsg.setJMSType(MESSAGE_TYPE);
         objMsg.setStringProperty(MEMBER_ID_PROPERTY, this.local.getId());
@@ -1056,24 +996,21 @@
     }
 
     /**
-     * Select a coordinator - coordinator weighting is used - or if everything
-     * is equal - a comparison of member ids.
+     * Select a coordinator - coordinator weighting is used - or if everything is equal -
a comparison of member ids.
      * 
      * @param members
      * @return
      */
     protected Member selectCordinator(List<Member> list) {
         List<Member> sorted = sortMemberList(list);
-        Member result = sorted.isEmpty() ? this.local : sorted
-                .get(list.size() - 1);
+        Member result = sorted.isEmpty() ? this.local : sorted.get(list.size() - 1);
         return result;
     }
 
     protected List<Member> sortMemberList(List<Member> list) {
         Collections.sort(list, new Comparator<Member>() {
             public int compare(Member m1, Member m2) {
-                int result = m1.getCoordinatorWeight()
-                        - m2.getCoordinatorWeight();
+                int result = m1.getCoordinatorWeight() - m2.getCoordinatorWeight();
                 if (result == 0) {
                     result = m1.getId().compareTo(m2.getId());
                 }
@@ -1091,8 +1028,7 @@
             this.stateRequests.put(id, request);
         }
         try {
-            ObjectMessage objMsg = this.stateSession
-                    .createObjectMessage(payload);
+            ObjectMessage objMsg = this.stateSession.createObjectMessage(payload);
             objMsg.setJMSReplyTo(this.inboxTopic);
             objMsg.setJMSCorrelationID(id);
             objMsg.setJMSType(STATE_TYPE);
@@ -1113,8 +1049,7 @@
         return result;
     }
 
-    void sendAsyncStateRequest(AsyncMapRequest asyncRequest, Member member,
-            Serializable payload) {
+    void sendAsyncStateRequest(AsyncMapRequest asyncRequest, Member member, Serializable
payload) {
         MapRequest request = new MapRequest();
         String id = this.idGenerator.generateId();
         asyncRequest.add(id, request);
@@ -1122,8 +1057,7 @@
             this.stateRequests.put(id, request);
         }
         try {
-            ObjectMessage objMsg = this.stateSession
-                    .createObjectMessage(payload);
+            ObjectMessage objMsg = this.stateSession.createObjectMessage(payload);
             objMsg.setJMSReplyTo(this.inboxTopic);
             objMsg.setJMSCorrelationID(id);
             objMsg.setJMSType(STATE_TYPE);
@@ -1142,8 +1076,7 @@
                     processRequest(id, reply);
                 } else {
                     try {
-                        ObjectMessage replyMsg = this.stateSession
-                                .createObjectMessage((Serializable) reply);
+                        ObjectMessage replyMsg = this.stateSession.createObjectMessage((Serializable)
reply);
                         replyMsg.setJMSCorrelationID(id);
                         replyMsg.setJMSType(STATE_TYPE);
                         this.stateProducer.send(replyTo, replyMsg);
@@ -1162,8 +1095,7 @@
             try {
                 EntryMessage copy = entry.copy();
                 copy.setMapUpdate(true);
-                ObjectMessage objMsg = this.stateSession
-                        .createObjectMessage(copy);
+                ObjectMessage objMsg = this.stateSession.createObjectMessage(copy);
                 objMsg.setJMSCorrelationID(correlationId);
                 objMsg.setJMSType(STATE_TYPE);
                 this.stateProducer.send(this.stateTopic, objMsg);
@@ -1230,22 +1162,18 @@
         }
     }
 
-    void processLockUpdate(EntryMessage entryMsg, Destination replyTo,
-            String correlationId) {
+    void processLockUpdate(EntryMessage entryMsg, Destination replyTo, String correlationId)
{
         waitForElection();
         synchronized (this.mapMutex) {
             boolean newLock = entryMsg.getKey().isLocked();
             Member newOwner = entryMsg.getKey().getOwner();
-            long newLockExpiration = newLock ? entryMsg.getKey()
-                    .getLockExpiration() : 0l;
+            long newLockExpiration = newLock ? entryMsg.getKey().getLockExpiration() : 0l;
             if (isCoordinator() && !entryMsg.isMapUpdate()) {
                 EntryKey originalKey = getKey(entryMsg.getKey().getKey());
                 if (originalKey != null) {
                     if (originalKey.isLocked()) {
-                        if (!originalKey.getOwner().equals(
-                                entryMsg.getKey().getOwner())) {
-                            Serializable reply = new GroupUpdateException(
-                                    "Owned by " + originalKey.getOwner());
+                        if (!originalKey.getOwner().equals(entryMsg.getKey().getOwner()))
{
+                            Serializable reply = new GroupUpdateException("Owned by " + originalKey.getOwner());
                             sendReply(reply, replyTo, correlationId);
                         } else {
                             originalKey.setLocked(newLock);
@@ -1271,13 +1199,11 @@
         }
     }
 
-    void processEntryMessage(EntryMessage entryMsg, Destination replyTo,
-            String correlationId) {
+    void processEntryMessage(EntryMessage entryMsg, Destination replyTo, String correlationId)
{
         waitForElection();
         if (isCoordinator()) {
             EntryKey<K> key = entryMsg.getKey();
-            EntryValue<V> value = new EntryValue<V>(key, (V) entryMsg
-                    .getValue());
+            EntryValue<V> value = new EntryValue<V>(key, (V) entryMsg.getValue());
             boolean insert = entryMsg.isInsert();
             boolean containsKey = false;
             synchronized (this.mapMutex) {
@@ -1285,8 +1211,7 @@
             }
             if (containsKey) {
                 EntryKey originalKey = getKey(key.getKey());
-                if (originalKey.equals(key.getOwner())
-                        || !originalKey.isLocked()) {
+                if (originalKey.equals(key.getOwner()) || !originalKey.isLocked()) {
                     EntryValue<V> old = null;
                     if (insert) {
                         synchronized (this.mapMutex) {
@@ -1299,11 +1224,9 @@
                     }
                     entryMsg.setOldValue(old.getValue());
                     broadcastMapUpdate(entryMsg, correlationId);
-                    fireMapChanged(key.getOwner(), key.getKey(),
-                            old.getValue(), value.getValue(), false);
+                    fireMapChanged(key.getOwner(), key.getKey(), old.getValue(), value.getValue(),
false);
                 } else {
-                    Serializable reply = new GroupUpdateException(
-                            "Owned by " + originalKey.getOwner());
+                    Serializable reply = new GroupUpdateException("Owned by " + originalKey.getOwner());
                     sendReply(reply, replyTo, correlationId);
                 }
             } else {
@@ -1312,8 +1235,7 @@
                         this.localMap.put(key.getKey(), value);
                     }
                     broadcastMapUpdate(entryMsg, correlationId);
-                    fireMapChanged(key.getOwner(), key.getKey(), null, value
-                            .getValue(), false);
+                    fireMapChanged(key.getOwner(), key.getKey(), null, value.getValue(),
false);
                 } else {
                     sendReply(null, replyTo, correlationId);
                 }
@@ -1349,24 +1271,20 @@
                             value.setValue(null);
                         }
                     }
-                    fireMapChanged(key.getOwner(), key.getKey(),
-                            old.getValue(), value.getValue(), entryMsg
-                                    .isExpired());
+                    fireMapChanged(key.getOwner(), key.getKey(), old.getValue(), value.getValue(),
entryMsg.isExpired());
                 }
             } else {
                 if (insert) {
                     synchronized (this.mapMutex) {
                         this.localMap.put(key.getKey(), value);
                     }
-                    fireMapChanged(key.getOwner(), key.getKey(), null, value
-                            .getValue(), false);
+                    fireMapChanged(key.getOwner(), key.getKey(), null, value.getValue(),
false);
                 }
             }
         }
     }
 
-    void processGroupMessage(String memberId, String replyId,
-            Destination replyTo, Object payload) {
+    void processGroupMessage(String memberId, String replyId, Destination replyTo, Object
payload) {
         Member member = this.members.get(memberId);
         if (member != null) {
             fireMemberMessage(member, replyId, payload);
@@ -1412,8 +1330,7 @@
 
     void handleConsumerEvents(ConsumerEvent event) {
         if (!event.isStarted()) {
-            Member member = this.members.remove(event.getConsumerId()
-                    .toString());
+            Member member = this.members.remove(event.getConsumerId().toString());
             if (member != null) {
                 fireMemberStopped(member);
                 election(member, false);
@@ -1423,8 +1340,7 @@
 
     void checkMembership() {
         if (this.started.get() && this.electionFinished.get()) {
-            long checkTime = System.currentTimeMillis()
-                    - getHeartBeatInterval();
+            long checkTime = System.currentTimeMillis() - getHeartBeatInterval();
             boolean doElection = false;
             for (Member member : this.members.values()) {
                 if (member.getTimeStamp() < checkTime) {
@@ -1442,8 +1358,7 @@
 
     void expirationSweep() {
         waitForElection();
-        if (isCoordinator() && this.started.get()
-                && this.electionFinished.get()) {
+        if (isCoordinator() && this.started.get() && this.electionFinished.get())
{
             List<EntryKey> expiredMessages = null;
             List<EntryKey> expiredLocks = null;
             synchronized (this.mapMutex) {
@@ -1455,13 +1370,12 @@
                         if (k.isExpired(currentTime)) {
                             if (expiredMessages == null) {
                                 expiredMessages = new ArrayList<EntryKey>();
-                                expiredMessages.add(k);
                             }
+                            expiredMessages.add(k);
                         } else if (k.isLockExpired(currentTime)) {
                             k.setLocked(false);
                             if (expiredLocks == null) {
                                 expiredLocks = new ArrayList<EntryKey>();
-                                expiredLocks.add(k);
                             }
                             expiredLocks.add(k);
                         }
@@ -1489,8 +1403,7 @@
     }
 
     void doMessageExpiration(List<EntryKey> list) {
-        if (this.started.get() && this.electionFinished.get()
-                && isCoordinator()) {
+        if (this.started.get() && this.electionFinished.get() && isCoordinator())
{
             for (EntryKey k : list) {
                 EntryValue<V> old = null;
                 synchronized (this.mapMutex) {
@@ -1503,16 +1416,14 @@
                     entryMsg.setKey(k);
                     entryMsg.setValue(old.getValue());
                     broadcastMapUpdate(entryMsg, "");
-                    fireMapChanged(k.getOwner(), k.getKey(), old.getValue(),
-                            null, true);
+                    fireMapChanged(k.getOwner(), k.getKey(), old.getValue(), null, true);
                 }
             }
         }
     }
 
     void doLockExpiration(List<EntryKey> list) {
-        if (this.started.get() && this.electionFinished.get()
-                && isCoordinator()) {
+        if (this.started.get() && this.electionFinished.get() && isCoordinator())
{
             for (EntryKey k : list) {
                 EntryMessage entryMsg = new EntryMessage();
                 entryMsg.setType(EntryMessage.MessageType.DELETE);
@@ -1530,8 +1441,7 @@
     void sendHeartBeat(Destination destination) {
         if (this.started.get()) {
             try {
-                ObjectMessage msg = this.stateSession
-                        .createObjectMessage(this.local);
+                ObjectMessage msg = this.stateSession.createObjectMessage(this.local);
                 msg.setJMSType(STATE_TYPE);
                 this.stateProducer.send(destination, msg);
             } catch (javax.jms.IllegalStateException e) {
@@ -1548,8 +1458,7 @@
         List<Map.Entry<K, EntryValue<V>>> list = new ArrayList<Map.Entry<K,
EntryValue<V>>>();
         synchronized (this.mapMutex) {
             if (this.localMap != null) {
-                for (Map.Entry<K, EntryValue<V>> entry : this.localMap
-                        .entrySet()) {
+                for (Map.Entry<K, EntryValue<V>> entry : this.localMap.entrySet())
{
                     list.add(entry);
                 }
             }
@@ -1561,12 +1470,10 @@
                 entryMsg.setValue(entry.getValue().getValue());
                 entryMsg.setType(EntryMessage.MessageType.SYNC);
                 entryMsg.setMapUpdate(true);
-                ObjectMessage objMsg = this.stateSession
-                        .createObjectMessage(entryMsg);
+                ObjectMessage objMsg = this.stateSession.createObjectMessage(entryMsg);
                 if (!member.equals(entry.getValue().getKey().getOwner())) {
                     objMsg.setJMSType(STATE_TYPE);
-                    this.stateProducer.send(member.getInBoxDestination(),
-                            objMsg);
+                    this.stateProducer.send(member.getInBoxDestination(), objMsg);
                 }
             }
         } catch (javax.jms.IllegalStateException e) {
@@ -1615,16 +1522,13 @@
                 synchronized (this.mapMutex) {
                     value = this.localMap.remove(entryKey);
                 }
-                fireMapChanged(member, entryKey.getKey(), value.getValue(),
-                        null, false);
+                fireMapChanged(member, entryKey.getKey(), value.getValue(), null, false);
             }
         }
     }
 
-    void fireMemberMessage(final Member member, final String replyId,
-            final Object message) {
-        if (this.started.get() && this.stateExecutor != null
-                && !this.messageExecutor.isShutdown()) {
+    void fireMemberMessage(final Member member, final String replyId, final Object message)
{
+        if (this.started.get() && this.stateExecutor != null && !this.messageExecutor.isShutdown())
{
             this.messageExecutor.execute(new Runnable() {
                 public void run() {
                     doFireMemberMessage(member, replyId, message);
@@ -1641,10 +1545,9 @@
         }
     }
 
-    void fireMapChanged(final Member owner, final Object key,
-            final Object oldValue, final Object newValue, final boolean expired) {
-        if (this.started.get() && this.stateExecutor != null
-                && !this.stateExecutor.isShutdown()) {
+    void fireMapChanged(final Member owner, final Object key, final Object oldValue, final
Object newValue,
+            final boolean expired) {
+        if (this.started.get() && this.stateExecutor != null && !this.stateExecutor.isShutdown())
{
             this.stateExecutor.execute(new Runnable() {
                 public void run() {
                     doFireMapChanged(owner, key, oldValue, newValue, expired);
@@ -1653,8 +1556,7 @@
         }
     }
 
-    void doFireMapChanged(Member owner, Object key, Object oldValue,
-            Object newValue, boolean expired) {
+    void doFireMapChanged(Member owner, Object key, Object oldValue, Object newValue, boolean
expired) {
         if (this.started.get()) {
             for (GroupStateChangedListener l : this.mapChangedListeners) {
                 if (oldValue == null) {
@@ -1670,28 +1572,24 @@
 
     void checkStatus() throws IllegalStateException {
         if (!started.get()) {
-            throw new IllegalStateException("GroupMap " + this.local.getName()
-                    + " not started");
+            throw new IllegalStateException("GroupMap " + this.local.getName() + " not started");
         }
         waitForElection();
     }
 
     public String toString() {
-        return "Group:" + getName() + "{id=" + this.local.getId()
-                + ",coordinator=" + isCoordinator() + ",inbox="
+        return "Group:" + getName() + "{id=" + this.local.getId() + ",coordinator=" + isCoordinator()
+ ",inbox="
                 + this.local.getInBoxDestination() + "}";
     }
 
     void election(final Member member, final boolean memberStarted) {
-        if (this.started.get() && this.stateExecutor != null
-                && !this.electionExecutor.isShutdown()) {
+        if (this.started.get() && this.electionExecutor != null && !this.electionExecutor.isShutdown())
{
             synchronized (this.electionFinished) {
                 this.electionFinished.set(false);
             }
             synchronized (this.electionExecutor) {
                 // remove any queued election tasks
-                List<Runnable> list = new ArrayList<Runnable>(
-                        this.electionExecutor.getQueue());
+                List<Runnable> list = new ArrayList<Runnable>(this.electionExecutor.getQueue());
                 for (Runnable r : list) {
                     ElectionService es = (ElectionService) r;
                     es.stop();
@@ -1723,8 +1621,7 @@
         return result;
     }
 
-    void processElectionMessage(ElectionMessage msg, Destination replyTo,
-            String correlationId) {
+    void processElectionMessage(ElectionMessage msg, Destination replyTo, String correlationId)
{
         if (msg.isElection()) {
             msg.setType(ElectionMessage.MessageType.ANSWER);
             msg.setMember(this.local);
@@ -1745,16 +1642,14 @@
                 ElectionMessage msg = new ElectionMessage();
                 msg.setMember(this.local);
                 msg.setType(type);
-                ObjectMessage objMsg = this.stateSession
-                        .createObjectMessage(msg);
+                ObjectMessage objMsg = this.stateSession.createObjectMessage(msg);
                 objMsg.setJMSType(STATE_TYPE);
                 this.stateProducer.send(this.stateTopic, objMsg);
             } catch (javax.jms.IllegalStateException e) {
                 // ignore - we are stopping
             } catch (JMSException e) {
                 if (this.started.get()) {
-                    LOG.error("Failed to broadcast election message: " + type,
-                            e);
+                    LOG.error("Failed to broadcast election message: " + type, e);
                 }
             }
         }
@@ -1795,30 +1690,26 @@
         }
 
         void doElection() {
-            if ((this.member == null || (!this.member.equals(Group.this.local) || Group.this.members
-                    .size() == getMinimumGroupSize()))) {
+            if ((this.member == null || (!this.member.equals(Group.this.local) || Group.this.members.size()
== getMinimumGroupSize()))) {
                 boolean wasCoordinator = isCoordinatorMatch() && !isEmpty();
                 // call an election
                 while (!callElection() && isStarted() && this.started.get())
                     ;
                 if (isStarted() && this.started.get()) {
-                    List<Member> members = new ArrayList<Member>(
-                            Group.this.members.values());
+                    List<Member> members = new ArrayList<Member>(Group.this.members.values());
                     Group.this.coordinator = selectCordinator(members);
                     if (isCoordinatorMatch()) {
                         broadcastElectionType(ElectionMessage.MessageType.COORDINATOR);
                     }
                     if (this.memberStarted && this.member != null) {
-                        if (wasCoordinator || isCoordinator()
-                                && this.started.get()) {
+                        if (wasCoordinator || isCoordinator() && this.started.get())
{
                             updateNewMemberMap(this.member);
                         }
                     }
                     if (!isElectionFinished() && this.started.get()) {
                         try {
                             synchronized (Group.this.electionFinished) {
-                                Group.this.electionFinished
-                                        .wait(Group.this.heartBeatInterval * 2);
+                                Group.this.electionFinished.wait(Group.this.heartBeatInterval
* 2);
                             }
                         } catch (InterruptedException e) {
                         }



Mime
View raw message