cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r833578 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: gms/ApplicationState.java gms/Gossiper.java gms/IEndPointStateChangeSubscriber.java service/StorageLoadBalancer.java service/StorageService.java
Date Fri, 06 Nov 2009 22:17:19 GMT
Author: jbellis
Date: Fri Nov  6 22:17:18 2009
New Revision: 833578

URL: http://svn.apache.org/viewvc?rev=833578&view=rev
Log:
split out onJoin from onChange, so we can pass a single ApplicationState object to onChange
to emphasize that it gets called once per AS; rename ApplicationState.getState -> getValue
patch by jbellis; reviewed by Jaakko Laine for CASSANDRA-525

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IEndPointStateChangeSubscriber.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java?rev=833578&r1=833577&r2=833578&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java Fri
Nov  6 22:17:18 2009
@@ -70,7 +70,7 @@
         version_ = VersionGenerator.getNextVersion();
     }
         
-    public String getState()
+    public String getValue()
     {
         return state_;
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=833578&r1=833577&r2=833578&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Fri Nov  6 22:17:18
2009
@@ -25,7 +25,6 @@
 import org.apache.cassandra.concurrent.SingleThreadedStage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import java.net.InetAddress;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
@@ -458,7 +457,7 @@
                         reqdEndPointState = new EndPointState(epState.getHeartBeatState());
                     }
                     if (logger_.isTraceEnabled())
-                        logger_.trace("Adding state " + key + ": " + appState.getState());
+                        logger_.trace("Adding state " + key + ": " + appState.getValue());
                     reqdEndPointState.addApplicationState(key, appState);
                 }
             }
@@ -565,11 +564,12 @@
     private void handleNewJoin(InetAddress ep, EndPointState epState)
     {
     	logger_.info("Node " + ep + " has now joined.");
-        /* Mark this endpoint as "live" */
         endPointStateMap_.put(ep, epState);
         isAlive(ep, epState, true);
-        /* Notify interested parties about endpoint state change */
-        doNotifications(ep, epState);
+        for (IEndPointStateChangeSubscriber subscriber : subscribers_)
+        {
+            subscriber.onJoin(ep, epState);
+        }
     }
 
     synchronized void applyStateLocally(Map<InetAddress, EndPointState> epStateMap)
@@ -653,10 +653,7 @@
             if ( localAppState == null )
             {
                 localStatePtr.addApplicationState(remoteKey, remoteAppState);
-                /* notify interested parties of endpoint state change */
-                EndPointState deltaState = new EndPointState(localStatePtr.getHeartBeatState());
-                deltaState.addApplicationState(remoteKey, remoteAppState);
-                doNotifications(addr, deltaState);
+                doNotifications(addr, remoteKey, remoteAppState);
                 continue;
             }
 
@@ -668,10 +665,7 @@
             if ( remoteGeneration > localGeneration )
             {
                 localStatePtr.addApplicationState(remoteKey, remoteAppState);
-                /* notify interested parties of endpoint state change */
-                EndPointState deltaState = new EndPointState(localStatePtr.getHeartBeatState());
-                deltaState.addApplicationState(remoteKey, remoteAppState);
-                doNotifications(addr, deltaState);
+                doNotifications(addr, remoteKey, remoteAppState);
                 continue;
             }
 
@@ -684,20 +678,17 @@
                 if ( remoteVersion > localVersion )
                 {
                     localStatePtr.addApplicationState(remoteKey, remoteAppState);
-                    /* notify interested parties of endpoint state change */
-                    EndPointState deltaState = new EndPointState(localStatePtr.getHeartBeatState());
-                    deltaState.addApplicationState(remoteKey, remoteAppState);
-                    doNotifications(addr, deltaState);
+                    doNotifications(addr, remoteKey, remoteAppState);
                 }
             }
         }
     }
 
-    void doNotifications(InetAddress addr, EndPointState epState)
+    void doNotifications(InetAddress addr, String stateName, ApplicationState state)
     {
-        for ( IEndPointStateChangeSubscriber subscriber : subscribers_ )
+        for (IEndPointStateChangeSubscriber subscriber : subscribers_)
         {
-            subscriber.onChange(addr, epState);
+            subscriber.onChange(addr, stateName, state);
         }
     }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IEndPointStateChangeSubscriber.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IEndPointStateChangeSubscriber.java?rev=833578&r1=833577&r2=833578&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IEndPointStateChangeSubscriber.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IEndPointStateChangeSubscriber.java
Fri Nov  6 22:17:18 2009
@@ -38,7 +38,9 @@
      * @param endpoint endpoint for which the state change occurred.
      * @param epState state that actually changed for the above endpoint.
      */
-    public void onChange(InetAddress endpoint, EndPointState epState);
+    public void onJoin(InetAddress endpoint, EndPointState epState);
+
+    public void onChange(InetAddress endpoint, String stateName, ApplicationState state);
 
     public void onAlive(InetAddress endpoint, EndPointState state);
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=833578&r1=833577&r2=833578&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
Fri Nov  6 22:17:18 2009
@@ -205,28 +205,27 @@
         Gossiper.instance().register(this);
     }
 
-    public void onChange(InetAddress endpoint, EndPointState epState)
+    public void onChange(InetAddress endpoint, String stateName, ApplicationState state)
     {
-        // load information for this specified endpoint for load balancing 
-        ApplicationState loadInfoState = epState.getApplicationState(LoadDisseminator.loadInfo_);
-        if ( loadInfoState != null )
-        {
-            loadInfo_.put(endpoint, Double.parseDouble(loadInfoState.getState()));
-
-            /*
-            // clone load information to perform calculations
-            loadInfo2_.putAll(loadInfo_);
-            // Perform the analysis for load balance operations
-            if ( isHeavyNode() )
-            {
-                if (logger_.isDebugEnabled())
-                  logger_.debug(StorageService.getLocalStorageEndPoint() + " is a heavy node
with load " + localLoad());
-                // lb_.schedule( new LoadBalancer(), StorageLoadBalancer.delay_, TimeUnit.MINUTES
);
-            }
-            */
-        }       
+        if (!stateName.equals(LoadDisseminator.loadInfo_))
+            return;
+        loadInfo_.put(endpoint, Double.parseDouble(state.getValue()));
+
+        /*
+        // clone load information to perform calculations
+        loadInfo2_.putAll(loadInfo_);
+        // Perform the analysis for load balance operations
+        if ( isHeavyNode() )
+        {
+            if (logger_.isDebugEnabled())
+              logger_.debug(StorageService.getLocalStorageEndPoint() + " is a heavy node
with load " + localLoad());
+            // lb_.schedule( new LoadBalancer(), StorageLoadBalancer.delay_, TimeUnit.MINUTES
);
+        }
+        */
     }
 
+    public void onJoin(InetAddress endpoint, EndPointState epState) {}
+
     public void onAlive(InetAddress endpoint, EndPointState state) {}
 
     public void onDead(InetAddress endpoint, EndPointState state) {}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=833578&r1=833577&r2=833578&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Fri
Nov  6 22:17:18 2009
@@ -355,26 +355,22 @@
      *  we are interested in new tokens as a result of a new node or an
      *  existing node moving to a new location on the ring.
     */
-    public void onChange(InetAddress endpoint, EndPointState epState)
+    public void onChange(InetAddress endpoint, String stateName, ApplicationState state)
     {
-        /* node identifier for this endpoint on the identifier space */
-        ApplicationState nodeIdState = epState.getApplicationState(StorageService.NODE_ID);
-        /* Check if this has a bootstrapping state message */
-        ApplicationState modeState = epState.getApplicationState(StorageService.MODE);
-        if (modeState != null)
+        if (StorageService.MODE.equals(stateName))
         {
-            String mode = modeState.getState();
+            String mode = state.getValue();
             if (logger_.isDebugEnabled())
                 logger_.debug(endpoint + " is in " + mode + " mode");
             boolean bootstrapState = mode.equals(MODE_MOVING);
             tokenMetadata_.setBootstrapping(endpoint,  bootstrapState);
         }
 
-        if (nodeIdState != null)
+        if (StorageService.NODE_ID.equals(stateName))
         {
-            Token newToken = getPartitioner().getTokenFactory().fromString(nodeIdState.getState());
+            Token newToken = getPartitioner().getTokenFactory().fromString(state.getValue());
             if (logger_.isDebugEnabled())
-              logger_.debug("CHANGE IN STATE FOR " + endpoint + " - has token " + nodeIdState.getState());
+              logger_.debug("CHANGE IN STATE FOR " + endpoint + " - has token " + newToken);
 
             if (tokenMetadata_.isMember(endpoint))
             {
@@ -402,6 +398,8 @@
         }
     }
 
+    public void onJoin(InetAddress endpoint, EndPointState epState) {}
+
     public void onAlive(InetAddress endpoint, EndPointState state)
     {
         deliverHints(endpoint);



Mime
View raw message