activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r720544 - in /activemq/activemq-blaze/trunk/src: main/java/org/apache/activeblaze/ main/java/org/apache/activeblaze/cluster/ main/java/org/apache/activeblaze/group/ main/proto/ test/java/org/apache/activeblaze/cluster/
Date Tue, 25 Nov 2008 17:22:52 GMT
Author: rajdavies
Date: Tue Nov 25 09:22:50 2008
New Revision: 720544

URL: http://svn.apache.org/viewvc?rev=720544&view=rev
Log:
rename coordinated package to cluster

Added:
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannel.java
      - copied, changed from r720506, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannel.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelFactory.java
      - copied, changed from r720506, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannelFactory.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java
      - copied, changed from r720506, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannelImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupConfiguration.java
      - copied, changed from r720506, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupConfiguration.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterChangedListener.java
  (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterGroup.java
      - copied, changed from r720506, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/CoordinatedGroup.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterState.java
      - copied, changed from r720506, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/GroupState.java
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelTest.java
      - copied, changed from r720506, activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannelTest.java
Removed:
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannel.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannelFactory.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannelImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupConfiguration.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/CoordinatedGroup.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/CoordinatorChangedListener.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/GroupState.java
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannelTest.java
Modified:
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ElectionService.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java
    activemq/activemq-blaze/trunk/src/main/proto/blaze.proto

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java?rev=720544&r1=720543&r2=720544&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
(original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
Tue Nov 25 09:22:50 2008
@@ -51,7 +51,7 @@
     protected AtomicLong sequence = new AtomicLong();
     protected AtomicLong session = new AtomicLong(1);
     private Processor broadcast;
-    private BlazeConfiguration configuration = new BlazeConfiguration();
+    protected BlazeConfiguration configuration = new BlazeConfiguration();
     private String id;
     private Buffer managementURI;
     private InetSocketAddress toAddress;

Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannel.java
(from r720506, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannel.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannel.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannel.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannel.java&r1=720506&r2=720544&rev=720544&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannel.java
(original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannel.java
Tue Nov 25 09:22:50 2008
@@ -23,38 +23,38 @@
  * (elected leader) for the group
  * 
  */
-public interface BlazeCoordinatedGroupChannel  extends BlazeGroupChannel{
+public interface BlazeClusterGroupChannel  extends BlazeGroupChannel{
     /**
      * @return true if this Channel is the coordinator of the group
      * @throws Exception 
      */
-    public boolean isCoordinator() throws Exception;
+    public boolean isMaster() throws Exception;
     /**
      * @return the member of the group which is the coordinator
      * @throws Exception 
      */
-    public Member getCoordinator() throws Exception;
+    public Member getMaster() throws Exception;
     
     /**
-     * Add a listener for membership changes
+     * Add a listener for cluster changes
      * 
      * @param l
      * @throws Exception 
      */
-    public void addCoordinatorChangedListener(CoordinatorChangedListener l) throws Exception;
+    public void addClusterChangedListener(ClusterChangedListener l) throws Exception;
 
     /**
-     * Remove a listener for membership changes
+     * Remove a listener for cluster changes
      * 
      * @param l
      * @throws Exception 
      */
-    public void removeCoordinatorChangedListener(CoordinatorChangedListener l) throws Exception;
+    public void removeClusterChangedListener(ClusterChangedListener l) throws Exception;
     
     /**
      * @return the configuration
      */
-    public BlazeCoordinatedGroupConfiguration getCoordinatedGroupConfiguration();
+    public BlazeClusterGroupConfiguration getConfiguration();
     
     /**
      * waits for election in the group to finish

Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelFactory.java
(from r720506, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannelFactory.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelFactory.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelFactory.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannelFactory.java&r1=720506&r2=720544&rev=720544&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannelFactory.java
(original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelFactory.java
Tue Nov 25 09:22:50 2008
@@ -23,20 +23,20 @@
 /**
  * Factory class for creating <Code>BlazeGroupChannel</CODE>
  */
-public class BlazeCoordinatedGroupChannelFactory extends BlazeGroupChannelFactory {
+public class BlazeClusterGroupChannelFactory extends BlazeGroupChannelFactory {
     
     /**
      * Default Constructor
      */
-    public BlazeCoordinatedGroupChannelFactory() {
-        super(new BlazeCoordinatedGroupConfiguration());
+    public BlazeClusterGroupChannelFactory() {
+        super(new BlazeClusterGroupConfiguration());
     }
     
     /**
      * Construct a factory to use the passed Configuration
      * @param config
      */
-    public BlazeCoordinatedGroupChannelFactory(BlazeCoordinatedGroupConfiguration config){
+    public BlazeClusterGroupChannelFactory(BlazeClusterGroupConfiguration config){
         super(config);
     }
     
@@ -46,8 +46,8 @@
      * @return the Channel
      * @throws Exception 
      */
-    public BlazeCoordinatedGroupChannel createChannel(String name) throws Exception {
-        BlazeCoordinatedGroupChannelImpl result = new BlazeCoordinatedGroupChannelImpl(name);
+    public BlazeClusterGroupChannel createChannel(String name) throws Exception {
+        BlazeClusterGroupChannelImpl result = new BlazeClusterGroupChannelImpl(name);
         result.setConfiguration(getConfiguration().copy());
         return result;
     }
@@ -55,7 +55,7 @@
     /**
      * @return the configuration
      */
-    public BlazeCoordinatedGroupConfiguration getConfiguration() {
-        return (BlazeCoordinatedGroupConfiguration) super.getConfiguration();
+    public BlazeClusterGroupConfiguration getConfiguration() {
+        return (BlazeClusterGroupConfiguration) super.getConfiguration();
     }
 }

Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java
(from r720506, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannelImpl.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannelImpl.java&r1=720506&r2=720544&rev=720544&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannelImpl.java
(original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java
Tue Nov 25 09:22:50 2008
@@ -33,65 +33,65 @@
  * (elected leader) for the group
  * 
  */
-public class BlazeCoordinatedGroupChannelImpl extends BlazeGroupChannelImpl implements BlazeCoordinatedGroupChannel
{
-    private static final Log LOG = LogFactory.getLog(BlazeCoordinatedGroupChannelImpl.class);
-    private CoordinatedGroup coordinatedGroup;
+public class BlazeClusterGroupChannelImpl extends BlazeGroupChannelImpl implements BlazeClusterGroupChannel
{
+    private static final Log LOG = LogFactory.getLog(BlazeClusterGroupChannelImpl.class);
+    private ClusterGroup coordinatedGroup;
 
     /**
      * Constructor
      * 
      * @param name
      */
-    public BlazeCoordinatedGroupChannelImpl(String name) {
+    public BlazeClusterGroupChannelImpl(String name) {
         super(name);
     }
 
     /**
      * @param l
      * @throws Exception
-     * @see org.apache.activeblaze.cluster.BlazeCoordinatedGroupChannel#addCoordinatorChangedListener(org.apache.activeblaze.cluster.CoordinatorChangedListener)
+     * @see org.apache.activeblaze.cluster.BlazeClusterGroupChannel#addClusterChangedListener(org.apache.activeblaze.cluster.ClusterChangedListener)
      */
-    public void addCoordinatorChangedListener(CoordinatorChangedListener l) throws Exception
{
+    public void addClusterChangedListener(ClusterChangedListener l) throws Exception {
         init();
-        this.coordinatedGroup.addCoordinatorChangedListener(l);
+        this.coordinatedGroup.addClusterChangedListener(l);
     }
 
     /**
      * @return
      * @throws Exception
-     * @see org.apache.activeblaze.cluster.BlazeCoordinatedGroupChannel#getCoordinator()
+     * @see org.apache.activeblaze.cluster.BlazeClusterGroupChannel#getMaster()
      */
-    public Member getCoordinator() throws Exception {
+    public Member getMaster() throws Exception {
         init();
-        return this.coordinatedGroup.getCoordinator();
+        return this.coordinatedGroup.getMaster();
     }
 
     /**
      * @return
      * @throws Exception
-     * @see org.apache.activeblaze.cluster.BlazeCoordinatedGroupChannel#isCoordinator()
+     * @see org.apache.activeblaze.cluster.BlazeClusterGroupChannel#isMaster()
      */
-    public boolean isCoordinator() throws Exception {
+    public boolean isMaster() throws Exception {
         init();
-        return this.coordinatedGroup.isCoordinatorMatch();
+        return this.coordinatedGroup.isMasterMatch();
     }
 
     /**
      * @param l
      * @throws Exception
-     * @see org.apache.activeblaze.cluster.BlazeCoordinatedGroupChannel#removeMemberChangedListener(org.apache.activeblaze.cluster.CoordinatorChangedListener)
+     * @see org.apache.activeblaze.cluster.BlazeClusterGroupChannel#removeMemberChangedListener(org.apache.activeblaze.cluster.ClusterChangedListener)
      */
-    public void removeCoordinatorChangedListener(CoordinatorChangedListener l) throws Exception
{
+    public void removeClusterChangedListener(ClusterChangedListener l) throws Exception {
         init();
-        this.coordinatedGroup.removeCoordinatorChangedListener(l);
+        this.coordinatedGroup.removeClusterChangedListener(l);
     }
 
     /**
      * @return
-     * @see org.apache.activeblaze.cluster.BlazeCoordinatedGroupChannel#getCoordinatedGroupConfiguration()
+     * @see org.apache.activeblaze.cluster.BlazeClusterGroupChannel#getCoordinatedGroupConfiguration()
      */
-    public BlazeCoordinatedGroupConfiguration getCoordinatedGroupConfiguration() {
-        return (BlazeCoordinatedGroupConfiguration) getGroupConfiguration();
+    public BlazeClusterGroupConfiguration getConfiguration() {
+        return (BlazeClusterGroupConfiguration) this.configuration;
     }
     
     /**
@@ -121,11 +121,11 @@
     }
 
     protected MemberImpl createLocal(URI uri) throws Exception {
-        return new MemberImpl(getId(), getName(), getCoordinatedGroupConfiguration().getCoordinatorWeight(),
uri);
+        return new MemberImpl(getId(), getName(), getConfiguration().getMasterWeight(), uri);
     }
 
     protected Group createGroup() {
-        this.coordinatedGroup = new CoordinatedGroup(this);
+        this.coordinatedGroup = new ClusterGroup(this);
         return this.coordinatedGroup;
     }
 
@@ -134,7 +134,7 @@
         ElectionMessage electionMessage = (ElectionMessage) type.createMessage();
         Buffer payload = data.getPayload();
         electionMessage.mergeFramed(payload);
-        CoordinatedGroup group = (CoordinatedGroup) getGroup();
+        ClusterGroup group = (ClusterGroup) getGroup();
         group.processElectionMessage(electionMessage, id);
     }
 }

Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupConfiguration.java
(from r720506, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupConfiguration.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupConfiguration.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupConfiguration.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupConfiguration.java&r1=720506&r2=720544&rev=720544&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupConfiguration.java
(original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupConfiguration.java
Tue Nov 25 09:22:50 2008
@@ -23,8 +23,8 @@
  * Configuration for a BlazeCoordinatedGroupChannel
  *
  */
-public class BlazeCoordinatedGroupConfiguration extends BlazeGroupConfiguration{
-    private long coordinatorWeight = 0;
+public class BlazeClusterGroupConfiguration extends BlazeGroupConfiguration{
+    private long masterWeight = 0;
     private int minimumGroupSize = 1;
     private long  awaitGroupTimeout = getHeartBeatInterval()*2;
     
@@ -32,15 +32,15 @@
     /**
      * @return the coordinatorWeight
      */
-    public long getCoordinatorWeight() {
-        return this.coordinatorWeight;
+    public long getMasterWeight() {
+        return this.masterWeight;
     }
 
     /**
      * @param coordinatorWeight the coordinatorWeight to set
      */
-    public void setCoordinatorWeight(long coordinatorWeight) {
-        this.coordinatorWeight = coordinatorWeight;
+    public void setMasterWeight(long coordinatorWeight) {
+        this.masterWeight = coordinatorWeight;
     }
 
     /**
@@ -72,6 +72,6 @@
     }
     
     protected BlazeConfiguration newInstance() {
-        return new BlazeCoordinatedGroupConfiguration();
+        return new BlazeClusterGroupConfiguration();
     }
 }

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterChangedListener.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterChangedListener.java?rev=720544&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterChangedListener.java
(added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterChangedListener.java
Tue Nov 25 09:22:50 2008
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.cluster;
+
+import org.apache.activeblaze.group.Member;
+
+/**
+ * A listener for coordinator changes to a group
+ *
+ */
+public interface ClusterChangedListener  {
+    /**
+     * Fired when a master changes in the group
+     * @param master the new master of the cluster
+     */
+    void ClusterChanged(Member master);
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterChangedListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterGroup.java
(from r720506, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/CoordinatedGroup.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterGroup.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterGroup.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/CoordinatedGroup.java&r1=720506&r2=720544&rev=720544&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/CoordinatedGroup.java
(original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterGroup.java
Tue Nov 25 09:22:50 2008
@@ -41,13 +41,13 @@
  * Implementation of Group State
  * 
  */
-public class CoordinatedGroup extends Group {
-    static final Log LOG = LogFactory.getLog(CoordinatedGroup.class);
-    final BlazeCoordinatedGroupChannelImpl channel;
-    private final BlazeCoordinatedGroupConfiguration configuration;
+public class ClusterGroup extends Group {
+    static final Log LOG = LogFactory.getLog(ClusterGroup.class);
+    final BlazeClusterGroupChannelImpl channel;
+    private final BlazeClusterGroupConfiguration configuration;
     private ThreadPoolExecutor electionExecutor;
-    private MemberImpl coordinator;
-    private List<CoordinatorChangedListener> listeners = new CopyOnWriteArrayList<CoordinatorChangedListener>();
+    private MemberImpl master;
+    private List<ClusterChangedListener> listeners = new CopyOnWriteArrayList<ClusterChangedListener>();
     final AtomicBoolean electionFinished = new AtomicBoolean(false);
     private long startTime;
 
@@ -59,11 +59,11 @@
      * @param transport
      * @param config
      */
-    protected CoordinatedGroup(BlazeCoordinatedGroupChannelImpl channel) {
+    protected ClusterGroup(BlazeClusterGroupChannelImpl channel) {
         super(channel);
         this.channel = channel;
-        this.coordinator = this.channel.getLocalMember();
-        this.configuration = channel.getCoordinatedGroupConfiguration();
+        this.master = this.channel.getLocalMember();
+        this.configuration = channel.getConfiguration();
     }
 
     /**
@@ -78,8 +78,7 @@
             this.electionExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
                     new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
                         public Thread newThread(Runnable runnable) {
-                            Thread thread = new Thread(runnable, "Election{" + CoordinatedGroup.this.channel.getId()
-                                    + "}");
+                            Thread thread = new Thread(runnable, "Election{" + ClusterGroup.this.channel.getId()
+ "}");
                             thread.setDaemon(true);
                             return thread;
                         }
@@ -161,20 +160,24 @@
     /**
      * @return true if the coordinator for the map
      */
-    protected boolean isCoordinatorMatch() {
-        String coordinatorId = this.coordinator != null ? this.coordinator.getId() : "";
-        return this.channel.getId().equals(coordinatorId);
+    protected boolean isMasterMatch() {
+        String masterId = this.master != null ? this.master.getId() : "";
+        return this.channel.getId().equals(masterId);
     }
 
-    protected MemberImpl getCoordinator() {
-        return this.coordinator;
+    protected MemberImpl getMaster() {
+        return this.master;
     }
 
-    protected void setCoordinator(MemberImpl member) {
-        this.coordinator = member;
+    protected void setMaster(MemberImpl member) {
+        MemberImpl oldMaster = this.master;
+        this.master = member;
+        if (oldMaster == null || (oldMaster != null && this.master != null &&
!this.master.equals(oldMaster))) {
+            fireClusterChanged(this.master);
+        }
     }
 
-    protected void addCoordinatorChangedListener(CoordinatorChangedListener l) {
+    protected void addClusterChangedListener(ClusterChangedListener l) {
         this.listeners.add(l);
     }
 
@@ -184,19 +187,19 @@
      * @param l
      * @throws Exception
      */
-    protected void removeCoordinatorChangedListener(CoordinatorChangedListener l) {
+    protected void removeClusterChangedListener(ClusterChangedListener l) {
         this.listeners.remove(l);
     }
 
-    protected void fireCoordinatorChanged(MemberImpl newCoordinator) {
-        for (CoordinatorChangedListener l : this.listeners) {
-            l.coordinatorChanged(newCoordinator);
+    protected void fireClusterChanged(MemberImpl newMaster) {
+        for (ClusterChangedListener l : this.listeners) {
+            l.ClusterChanged(newMaster);
         }
     }
 
     boolean callElection() throws Exception {
         List<MemberImpl> members = new ArrayList<MemberImpl>(this.members.values());
-        List<MemberImpl> sorted = CoordinatedGroup.sortMemberList(members);
+        List<MemberImpl> sorted = ClusterGroup.sortMemberList(members);
         AsyncGroupRequest request = new AsyncGroupRequest();
         boolean doCall = false;
         for (Member member : sorted) {
@@ -216,16 +219,17 @@
     void processElectionMessage(ElectionMessage msg, String correlationId) throws Exception
{
         MemberImpl from = new MemberImpl(msg.getMember());
         if (from != null && !from.getId().equals(getLocalMember().getId())) {
-            LOG.debug(getLocalMember()+" Election message "+ msg.getElectionType() + " from
" + from);
+            LOG.debug(getLocalMember() + " Election message " + msg.getElectionType() + "
from " + from);
             if (msg.getElectionType().equals(ElectionType.ELECTION)) {
                 ElectionMessage reply = new ElectionMessage();
                 reply.setElectionType(ElectionType.ANSWER);
                 reply.setMember(this.channel.getLocalMember().getData());
                 this.channel.sendReply(from, msg.type(), reply, correlationId);
                 election(null, false);
-            } else if (msg.getElectionType().equals(ElectionType.COORDINATOR)) {
-                this.coordinator=from;
-                LOG.debug(getLocalMember()+" Coordinator is "+ from);
+            } else if (msg.getElectionType().equals(ElectionType.MASTER)) {
+                setMaster(from);
+                LOG.debug(getLocalMember() + " Master is " + from);
+                setMaster(from);
                 setElectionFinished(true);
             }
         }
@@ -262,7 +266,6 @@
         return !isStopped() && this.electionFinished.get();
     }
 
-
     protected static List<MemberImpl> sortMemberList(List<MemberImpl> list) {
         Collections.sort(list, new Comparator<Member>() {
             public int compare(Member m1, Member m2) {
@@ -279,7 +282,7 @@
     /**
      * @return the configuration
      */
-    public BlazeCoordinatedGroupConfiguration getConfiguration() {
+    public BlazeClusterGroupConfiguration getConfiguration() {
         return this.configuration;
     }
 }

Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterState.java
(from r720506, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/GroupState.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterState.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterState.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/GroupState.java&r1=720506&r2=720544&rev=720544&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/GroupState.java
(original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterState.java
Tue Nov 25 09:22:50 2008
@@ -61,10 +61,10 @@
  * 
  */
 
-public class  GroupState<String,V> implements Map<String,V>{
+public class  ClusterState<String,V> implements Map<String,V>{
     
-    private final BlazeCoordinatedGroupChannelImpl channel;
-    protected GroupState(BlazeCoordinatedGroupChannelImpl channel) {
+    private final BlazeClusterGroupChannelImpl channel;
+    protected ClusterState(BlazeClusterGroupChannelImpl channel) {
         this.channel=channel;
     }
     /**

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ElectionService.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ElectionService.java?rev=720544&r1=720543&r2=720544&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ElectionService.java
(original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ElectionService.java
Tue Nov 25 09:22:50 2008
@@ -17,8 +17,6 @@
 package org.apache.activeblaze.cluster;
 
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 import org.apache.activeblaze.BaseService;
 import org.apache.activeblaze.group.Member;
@@ -34,9 +32,9 @@
  */
 class ElectionService extends BaseService implements Runnable {
     private static final Log LOG = LogFactory.getLog(ElectionService.class);
-    private final CoordinatedGroup group;
+    private final ClusterGroup group;
     private Member member;
-    ElectionService(CoordinatedGroup group,Member member, boolean memberStarted) {
+    ElectionService(ClusterGroup group,Member member, boolean memberStarted) {
         this.group=group;
         this.member = member;
     }
@@ -59,9 +57,9 @@
                 ;
             if (this.group.isStarted() && isStarted()) {
                 
-                this.group.setCoordinator(selectCordinator(members));
-                if (this.group.isCoordinatorMatch()) {
-                    this.group.broadcastElectionType(ElectionType.COORDINATOR);
+                this.group.setMaster(selectCordinator(members));
+                if (this.group.isMasterMatch()) {
+                    this.group.broadcastElectionType(ElectionType.MASTER);
                 }
                 if (!this.group.isElectionFinished() && isStarted()) {
                     //ok - lets just wait for more members to show
@@ -75,17 +73,17 @@
                 }
                 if (!this.group.isElectionFinished() && isStarted()) {
                     // we must be the coordinator
-                    this.group.setCoordinator(this.group.getLocalMember());
+                    this.group.setMaster(this.group.getLocalMember());
                     this.group.setElectionFinished(true);
                     LOG.debug(this.group.getLocalMember()+" We are the Coordinator ");
-                    this.group.broadcastElectionType(ElectionType.COORDINATOR);
+                    this.group.broadcastElectionType(ElectionType.MASTER);
                 }
             }
         }
     }
     
     protected MemberImpl selectCordinator(List<MemberImpl> list) throws Exception {
-        List<MemberImpl> sorted = CoordinatedGroup.sortMemberList(list);
+        List<MemberImpl> sorted = ClusterGroup.sortMemberList(list);
         MemberImpl result = sorted.isEmpty() ? this.group.getLocalMember() : sorted
                 .get(list.size() - 1);
         return result;

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java?rev=720544&r1=720543&r2=720544&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java
(original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java
Tue Nov 25 09:22:50 2008
@@ -122,7 +122,7 @@
     /**
      * @return the configuration
      */
-    public BlazeGroupConfiguration getGroupConfiguration();
+    public BlazeGroupConfiguration getConfiguration();
 
     /**
      * @return a set of the members

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java?rev=720544&r1=720543&r2=720544&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
(original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
Tue Nov 25 09:22:50 2008
@@ -96,7 +96,7 @@
             // so need to get the potentially new value
             unicastURI = transport.getLocalURI();
             // append configuration properties
-            String groupManagementURIStr = getGroupConfiguration().getGroupManagementURI();
+            String groupManagementURIStr = getConfiguration().getGroupManagementURI();
             groupManagementURIStr = PropertyUtil.addPropertiesToURIFromBean(groupManagementURIStr,
getConfiguration());
             URI groupManagementURI = new URI(groupManagementURIStr);
             this.toManagementAddress = new InetSocketAddress(groupManagementURI.getHost(),
groupManagementURI.getPort());
@@ -190,8 +190,8 @@
      * @return this channel's configuration
      * @see org.apache.activeblaze.group.BlazeGroupChannel#getGroupConfiguration()
      */
-    public BlazeGroupConfiguration getGroupConfiguration() {
-        return (BlazeGroupConfiguration) getConfiguration();
+    public BlazeGroupConfiguration getConfiguration() {
+        return (BlazeGroupConfiguration) this.configuration;
     }
 
     /**

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java?rev=720544&r1=720543&r2=720544&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java
(original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java
Tue Nov 25 09:22:50 2008
@@ -25,7 +25,7 @@
 public class BlazeGroupConfiguration extends BlazeConfiguration {
     private String groupManagementURI = "mcast://224.2.2.2:8888";
     
-    private int heartBeatInterval = 1000;
+    private int heartBeatInterval = 250;
    
     /**
      * @return the groupManagementUTI

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java?rev=720544&r1=720543&r2=720544&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java Tue
Nov 25 09:22:50 2008
@@ -60,14 +60,14 @@
      */
     protected Group(BlazeGroupChannelImpl channel) {
         this.channel = channel;
-        this.configuration = channel.getGroupConfiguration();
+        this.configuration = channel.getConfiguration();
     }
 
     /**
      * @return the Member of the Channel
      * @throws Exception
      */
-    public MemberImpl getLocalMember() throws Exception {
+    public MemberImpl getLocalMember(){
         return this.channel.getLocalMember();
     }
 
@@ -268,6 +268,10 @@
         }
         return result;
     }
+    
+    public String toString() {
+        return "Group "+getLocalMember().getName();
+    }
 
     /**
      * Process a new member

Modified: activemq/activemq-blaze/trunk/src/main/proto/blaze.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/proto/blaze.proto?rev=720544&r1=720543&r2=720544&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/proto/blaze.proto (original)
+++ activemq/activemq-blaze/trunk/src/main/proto/blaze.proto Tue Nov 25 09:22:50 2008
@@ -78,7 +78,7 @@
     enum ElectionType {
     ELECTION = 0;
     ANSWER = 1;
-    COORDINATOR = 2;
+    MASTER = 2;
   }
     message ElectionMessage {
     //| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType";

Copied: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelTest.java
(from r720506, activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannelTest.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelTest.java?p2=activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelTest.java&p1=activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannelTest.java&r1=720506&r2=720544&rev=720544&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannelTest.java
(original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelTest.java
Tue Nov 25 09:22:50 2008
@@ -18,86 +18,118 @@
 
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.activeblaze.cluster.BlazeCoordinatedGroupChannel;
-import org.apache.activeblaze.cluster.BlazeCoordinatedGroupChannelFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.activeblaze.cluster.BlazeClusterGroupChannel;
+import org.apache.activeblaze.cluster.BlazeClusterGroupChannelFactory;
+import org.apache.activeblaze.group.Member;
 import junit.framework.TestCase;
 
 /**
- * Test for coordinated channel
+ * Test for clustered channel
  * 
  */
-public class BlazeCoordinatedGroupChannelTest extends TestCase {
+public class BlazeClusterGroupChannelTest extends TestCase {
+    
     public void testGroup() throws Exception {
         final int number = 3;
-        List<BlazeCoordinatedGroupChannel> channels = new ArrayList<BlazeCoordinatedGroupChannel>();
-        BlazeCoordinatedGroupChannelFactory factory = new BlazeCoordinatedGroupChannelFactory();
+        List<BlazeClusterGroupChannel> channels = new ArrayList<BlazeClusterGroupChannel>();
+        BlazeClusterGroupChannelFactory factory = new BlazeClusterGroupChannelFactory();
         for (int i = 0; i < number; i++) {
-            BlazeCoordinatedGroupChannel channel = factory.createChannel("test" + i);
-            channel.getCoordinatedGroupConfiguration().setMinimumGroupSize(number);
+            BlazeClusterGroupChannel channel = factory.createChannel("test" + i);
+            channel.getConfiguration().setMinimumGroupSize(number);
             channel.start();
             channels.add(channel);
         }
         channels.get(number - 1).waitForElection(5000);
-        int coordinatorNumber = 0;
-        BlazeCoordinatedGroupChannel coordinator = null;
-        for (BlazeCoordinatedGroupChannel channel : channels) {
-            if (channel.isCoordinator()) {
-                coordinatorNumber++;
-                coordinator = channel;
-            }
-        }
-        assertNotNull(coordinator);
-        assertEquals(1, coordinatorNumber);
-        // kill the coordinator
-        coordinator.shutDown();
-        Thread.sleep(factory.getConfiguration().getHeartBeatInterval() * 2);
-        coordinatorNumber = 0;
-        coordinator = null;
-        for (BlazeCoordinatedGroupChannel channel : channels) {
-            if (channel.isCoordinator()) {
-                coordinatorNumber++;
-                coordinator = channel;
-            }
-        }
-        assertNotNull(coordinator);
-        assertEquals(1, coordinatorNumber);
-        for (BlazeCoordinatedGroupChannel channel : channels) {
+        int masterNumber = 0;
+        BlazeClusterGroupChannel master = null;
+        for (BlazeClusterGroupChannel channel : channels) {
+            if (channel.isMaster()) {
+                masterNumber++;
+                master = channel;
+            }
+        }
+        assertNotNull(master);
+        assertEquals(1, masterNumber);
+        // kill the master
+        master.shutDown();
+        Thread.sleep(1000);
+        masterNumber = 0;
+        master = null;
+        for (BlazeClusterGroupChannel channel : channels) {
+            if (channel.isMaster()) {
+                masterNumber++;
+                master = channel;
+            }
+        }
+        assertNotNull(master);
+        assertEquals(1, masterNumber);
+        for (BlazeClusterGroupChannel channel : channels) {
             channel.shutDown();
         }
     }
 
     public void testWeightedGroup() throws Exception {
         final int number = 4;
-        List<BlazeCoordinatedGroupChannel> channels = new ArrayList<BlazeCoordinatedGroupChannel>();
-        BlazeCoordinatedGroupChannelFactory factory = new BlazeCoordinatedGroupChannelFactory();
-        BlazeCoordinatedGroupChannel weightedCoordinator = null;
+        List<BlazeClusterGroupChannel> channels = new ArrayList<BlazeClusterGroupChannel>();
+        BlazeClusterGroupChannelFactory factory = new BlazeClusterGroupChannelFactory();
+        BlazeClusterGroupChannel weightedMaster = null;
         for (int i = 0; i < number; i++) {
-            BlazeCoordinatedGroupChannel channel = factory.createChannel("test" + i);
-            channel.getCoordinatedGroupConfiguration().setMinimumGroupSize(number);
+            BlazeClusterGroupChannel channel = factory.createChannel("test" + i);
+            channel.getConfiguration().setMinimumGroupSize(number);
             if (i == number / 2) {
-                channel.getCoordinatedGroupConfiguration().setCoordinatorWeight(10);
-                weightedCoordinator=channel;
-            }else {
-                channel.getCoordinatedGroupConfiguration().setCoordinatorWeight(0);
-            }
-                channel.start();
-                channels.add(channel);
-            
+                channel.getConfiguration().setMasterWeight(10);
+                weightedMaster = channel;
+            } else {
+                channel.getConfiguration().setMasterWeight(0);
+            }
+            channel.start();
+            channels.add(channel);
         }
         channels.get(number - 1).waitForElection(5000);
-        int coordinatorNumber = 0;
-        BlazeCoordinatedGroupChannel coordinator = null;
-        for (BlazeCoordinatedGroupChannel channel : channels) {
-            if (channel.isCoordinator()) {
-                coordinatorNumber++;
-                coordinator = channel;
+        int masterNumber = 0;
+        BlazeClusterGroupChannel master = null;
+        for (BlazeClusterGroupChannel channel : channels) {
+            if (channel.isMaster()) {
+                masterNumber++;
+                master = channel;
             }
         }
-        assertNotNull(coordinator);
-        assertTrue(coordinator==weightedCoordinator);
-        assertEquals(1, coordinatorNumber);
-        for (BlazeCoordinatedGroupChannel channel : channels) {
+        assertNotNull(master);
+        assertTrue(master == weightedMaster);
+        assertEquals(1, masterNumber);
+        for (BlazeClusterGroupChannel channel : channels) {
             channel.shutDown();
         }
     }
+
+    public void testClusterChangedListener() throws Exception {
+        final AtomicBoolean result = new AtomicBoolean();
+        BlazeClusterGroupChannelFactory factory = new BlazeClusterGroupChannelFactory();
+        BlazeClusterGroupChannel master = factory.createChannel("master");
+        master.getConfiguration().setMasterWeight(10);
+        master.start();
+        
+        BlazeClusterGroupChannel channel = factory.createChannel("test1");
+        channel.addClusterChangedListener(new ClusterChangedListener() {
+            public void ClusterChanged(Member master) {
+               synchronized(result) {
+                   result.set(true);
+                   result.notifyAll();
+               }
+                
+            }
+        });
+        channel.start();
+        
+        synchronized(result) {
+            if (!result.get()) {
+               result.wait(3000); 
+            }
+        }
+        assertTrue(result.get());
+        channel.shutDown();
+        master.shutDown();
+        
+    }
 }



Mime
View raw message