incubator-ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r1179070 - in /incubator/ambari/trunk/controller/src/main/java/org/apache/ambari: controller/ resource/statemachine/
Date Wed, 05 Oct 2011 07:10:36 GMT
Author: ddas
Date: Wed Oct  5 07:10:36 2011
New Revision: 1179070

URL: http://svn.apache.org/viewvc?rev=1179070&view=rev
Log:
AMBARI-37. Makes further improvements to the statemachine. Contributed by Devaraj Das.

Modified:
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ClusterImpl.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/RoleImpl.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ServiceEventType.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ServiceImpl.java

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java?rev=1179070&r1=1179069&r2=1179070&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java
(original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java
Wed Oct  5 07:10:36 2011
@@ -68,8 +68,10 @@ public class HeartbeatHandler {
     
     short responseId = 
         (short) (Short.parseShort(response.getResponseId()) + 1);
+    
+    String hostname = heartbeat.getHostname();
 
-    Node node = Nodes.getInstance().getNodes().get(heartbeat.getHostname());
+    Node node = Nodes.getInstance().getNodes().get(hostname);
     NodeState state = node.getNodeState();
     GregorianCalendar c = new GregorianCalendar();
     c.setTime(new Date());
@@ -93,7 +95,7 @@ public class HeartbeatHandler {
       
       //what servers are running currently
       //ADD LOGIC FOR CAPTURING THE CLUSTER-ID THE SERVERS BELONG TO
-      //IF THEY BELONG TO THE CLUSTER-ID THIS AGENT IS PART OF, WELL AND GOOD
+      //IF THEY BELONG TO THE CLUSTER-ID THIS NODE IS PART OF, WELL AND GOOD
       //IF NOT, THEN SEND COMMANDS TO STOP THE SERVERS
       StartedComponentServers componentServers = new StartedComponentServers();
       for (ServerStatus status : roleStatuses) {
@@ -120,22 +122,22 @@ public class HeartbeatHandler {
           //check whether the agent should start any server
           if (role.shouldStart()) {
             if (!roleServerRunning) {
-              short retryCount = retryCountForRole.get(role);
+              short retryCount = retryCountForRole.get(hostname);
               if (retryCount > MAX_RETRY_COUNT) {
                 //LOG the failure to start the role server
                 StateMachineInvoker.getAMBARIEventHandler()
                 .handle(new RoleEvent(RoleEventType.START_FAILURE, role));
-                retryCountForRole.reset(role);
+                retryCountForRole.resetAttemptCount(hostname);
                 continue;
               }
               List<Action> actions = 
                   plugin.startRoleServer(clusterContext, role.getRoleName());
               allActions.addAll(actions);
-              retryCountForRole.incr(role);
+              retryCountForRole.incrAttemptCount(hostname);
             }
             //raise an event to the state machine for a successful role-start
             if (roleServerRunning) {
-              retryCountForRole.reset(role);
+              retryCountForRole.resetAttemptCount(hostname);
               StateMachineInvoker.getAMBARIEventHandler()
               .handle(new RoleEvent(RoleEventType.START_SUCCESS, role));
             }
@@ -143,22 +145,22 @@ public class HeartbeatHandler {
           //check whether the agent should stop any server
           if (role.shouldStop()) {
             if (roleServerRunning) {
-              short retryCount = retryCountForRole.get(role);
+              short retryCount = retryCountForRole.get(hostname);
               if (retryCount > MAX_RETRY_COUNT) {
                 //LOG the failure to stop the role server
                 StateMachineInvoker.getAMBARIEventHandler()
                 .handle(new RoleEvent(RoleEventType.STOP_FAILURE, role));
-                retryCountForRole.reset(role);
+                retryCountForRole.resetAttemptCount(hostname);
                 continue;
               }
               List<Action> actions = 
                   plugin.stopRoleServer(clusterContext, role.getRoleName());
               allActions.addAll(actions);
-              retryCountForRole.incr(role);
+              retryCountForRole.incrAttemptCount(hostname);
             }
             //raise an event to the state machine for a successful role-stop
             if (!roleServerRunning) {
-              retryCountForRole.reset(role);
+              retryCountForRole.resetAttemptCount(hostname);
               StateMachineInvoker.getAMBARIEventHandler()
               .handle(new RoleEvent(RoleEventType.STOP_SUCCESS, role));
             }
@@ -198,19 +200,19 @@ public class HeartbeatHandler {
   }
   
   private static class RetryCountForRoleServerAction {
-    private Map<Role, Short> countMap = new HashMap<Role, Short>();
-    public short get(Role role) {
-      return countMap.get(role);
+    private Map<String, Short> countMap = new HashMap<String, Short>();
+    public short get(String hostname) {
+      return countMap.get(hostname);
     }
-    public void incr(Role role) {
+    public void incrAttemptCount(String hostname) {
       Short currentCount = 0;
-      if ((currentCount = countMap.get(role)) == null) {
+      if ((currentCount = countMap.get(hostname)) == null) {
         currentCount = 0;
       }
-      countMap.put(role, (short) (currentCount + 1));
+      countMap.put(hostname, (short) (currentCount + 1));
     }
-    public void reset(Role role) {
-      countMap.remove(role);
+    public void resetAttemptCount(String hostname) {
+      countMap.remove(hostname);
     }
   }
 }

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ClusterImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ClusterImpl.java?rev=1179070&r1=1179069&r2=1179070&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ClusterImpl.java
(original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ClusterImpl.java
Wed Oct  5 07:10:36 2011
@@ -24,18 +24,14 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.apache.ambari.common.state.MultipleArcTransition;
 import org.apache.ambari.common.state.SingleArcTransition;
 import org.apache.ambari.common.state.StateMachine;
 import org.apache.ambari.common.state.StateMachineFactory;
-import org.apache.ambari.components.ClusterContext;
 import org.apache.ambari.event.EventHandler;
 
 public class ClusterImpl implements Cluster, EventHandler<ClusterEvent> {
@@ -67,10 +63,11 @@ public class ClusterImpl implements Clus
       ClusterEventType.START_FAILURE)
       
   .addTransition(ClusterState.ACTIVE, ClusterState.STOPPING, 
-      ClusterEventType.STOP)
+      ClusterEventType.STOP, new StopClusterTransition())
       
-  .addTransition(ClusterState.STOPPING, ClusterState.INACTIVE, 
-      ClusterEventType.STOP_SUCCESS)
+  .addTransition(ClusterState.STOPPING, EnumSet.of(ClusterState.INACTIVE,
+      ClusterState.STOPPING), ClusterEventType.STOP_SUCCESS,
+      new ServiceStoppedTransition())
       
   .addTransition(ClusterState.STOPPING, ClusterState.UNCLEAN_STOP, 
       ClusterEventType.STOP_FAILURE)
@@ -95,7 +92,6 @@ public class ClusterImpl implements Clus
   private List<Service> services;
   private StateMachine<ClusterState, ClusterEventType, ClusterEvent> 
           stateMachine;
-  private int totalEnabledServices;
   private Lock readLock;
   private Lock writeLock;
   private String clusterName;
@@ -150,10 +146,6 @@ public class ClusterImpl implements Clus
     return null;
   }
   
-  private int getTotalServiceCount() {
-    return totalEnabledServices;
-  }
-  
   static class StartClusterTransition implements 
   SingleArcTransition<ClusterImpl, ClusterEvent>  {
 
@@ -168,13 +160,44 @@ public class ClusterImpl implements Clus
     
   }
   
+  static class StopClusterTransition implements
+  SingleArcTransition<ClusterImpl, ClusterEvent>  {
+    
+    @Override
+    public void transition(ClusterImpl operand, ClusterEvent event) {
+      Service service = operand.getFirstService();
+      if (service != null) {
+        StateMachineInvoker.getAMBARIEventHandler().handle(
+            new ServiceEvent(ServiceEventType.STOP, service));
+      }
+    }
+  }
+  
+  static class ServiceStoppedTransition implements
+  MultipleArcTransition<ClusterImpl, ClusterEvent, ClusterState> {
+
+    @Override
+    public ClusterState transition(ClusterImpl operand, ClusterEvent event) {
+      //check whether all services stopped, and if not remain in the STOPPING
+      //state, else move to the INACTIVE state
+      Service service = operand.getNextService();
+      if (service != null) {
+        StateMachineInvoker.getAMBARIEventHandler().handle(new ServiceEvent(
+            ServiceEventType.STOP, service));
+        return ClusterState.STOPPING;
+      }
+      return ClusterState.INACTIVE;
+    }
+    
+  }
+  
   static class ServiceStartedTransition implements 
   MultipleArcTransition<ClusterImpl, ClusterEvent, ClusterState>  {
     @Override
     public ClusterState transition(ClusterImpl operand, ClusterEvent event) {
       //check whether all services started, and if not remain in the STARTING
       //state, else move to the ACTIVE state
-      Service service = operand.getNextService();
+      Service service = operand.getFirstService();
       if (service != null) {
         StateMachineInvoker.getAMBARIEventHandler().handle(new ServiceEvent(
             ServiceEventType.START, service));

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/RoleImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/RoleImpl.java?rev=1179070&r1=1179069&r2=1179070&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/RoleImpl.java
(original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/RoleImpl.java
Wed Oct  5 07:10:36 2011
@@ -18,8 +18,6 @@
 package org.apache.ambari.resource.statemachine;
 
 import java.util.EnumSet;
-import java.util.List;
-import java.util.Set;
 
 import org.apache.ambari.common.state.MultipleArcTransition;
 import org.apache.ambari.common.state.SingleArcTransition;
@@ -31,10 +29,10 @@ public class RoleImpl implements Role, E
 
   private RoleState myState;
   private String roleName;
-  private int totalRolesRequired;
-  private int totalRolesStarted;
+  private int totalInstancesRequired;
+  private int totalRoleInstancesStarted;
   private int totalRolesFailedToStart;
-  private int totalRoles;
+  private int totalInstancesDesired;
   private Service service;
   
   /* The state machine for the role looks like:
@@ -72,7 +70,7 @@ public class RoleImpl implements Role, E
              RoleEventType.STOP)
              
          .addTransition(RoleState.STOPPING, RoleState.INACTIVE,
-             RoleEventType.STOP_SUCCESS)
+             RoleEventType.STOP_SUCCESS, new RoleStopTransition())
              
          .addTransition(RoleState.STOPPING, RoleState.UNCLEAN_STOP,
              RoleEventType.STOP_FAILURE)
@@ -100,12 +98,12 @@ public class RoleImpl implements Role, E
     this(service, roleName, 1, 1);
   }
   
-  public RoleImpl(Service service, String roleName, int totalRoles, int totalRolesRequired)
{
+  public RoleImpl(Service service, String roleName, int totalInstancesDesired, int totalInstancesRequired)
{
     this.roleName = roleName;
     this.service = service;
     this.myState = RoleState.INACTIVE;
-    this.totalRolesRequired = totalRolesRequired;
-    this.totalRoles = totalRoles;
+    this.totalInstancesRequired = totalInstancesRequired;
+    this.totalInstancesDesired = totalInstancesDesired;
     stateMachine = stateMachineFactory.make(this);
   }
   
@@ -138,9 +136,9 @@ public class RoleImpl implements Role, E
 
     @Override
     public RoleState transition(RoleImpl operand, RoleEvent event) {
-      ServiceImpl service = (ServiceImpl)operand.getAssociatedService();
-      ++operand.totalRolesStarted;
-      if (operand.totalRolesRequired <= operand.totalRolesStarted) {
+      Service service = operand.getAssociatedService();
+      ++operand.totalRoleInstancesStarted;
+      if (operand.totalInstancesRequired <= operand.totalRoleInstancesStarted){
         StateMachineInvoker.getAMBARIEventHandler().handle(
             new ServiceEvent(ServiceEventType.ROLE_STARTED, service, 
                 operand));
@@ -157,14 +155,15 @@ public class RoleImpl implements Role, E
 
     @Override
     public RoleState transition(RoleImpl operand, RoleEvent event) {
-      ServiceImpl service = (ServiceImpl)operand.getAssociatedService();
+      Service service = operand.getAssociatedService();
       ++operand.totalRolesFailedToStart;
-      //if number of remaining roles required to declare a role as 'started'
-      //is more than the total number of roles that haven't reported back
+      //if number of remaining instances required to declare a role as 'started'
+      //is more than the total number of available nodes that haven't reported
       //declare the role failed to start
-      if ((operand.totalRolesRequired - operand.totalRolesStarted) 
-          >= (operand.totalRoles - 
-              (operand.totalRolesStarted + operand.totalRolesFailedToStart))) {
+      if ((operand.totalInstancesRequired - operand.totalRoleInstancesStarted) 
+          >= (operand.totalInstancesDesired - 
+              (operand.totalRoleInstancesStarted + 
+                  operand.totalRolesFailedToStart))) {
         StateMachineInvoker.getAMBARIEventHandler().handle(
             new ServiceEvent(ServiceEventType.START_FAILURE, service, 
                 operand));
@@ -174,6 +173,18 @@ public class RoleImpl implements Role, E
       }
     }
   }
+  
+  static class RoleStopTransition implements
+  SingleArcTransition<RoleImpl, RoleEvent> {
+    
+    @Override
+    public void transition(RoleImpl operand, RoleEvent event) {
+      Service service = operand.getAssociatedService();
+      StateMachineInvoker.getAMBARIEventHandler().handle(
+          new ServiceEvent(ServiceEventType.ROLE_STOPPED, service,
+              operand));
+    }
+  }
 
   @Override
   public void activate() {
@@ -187,11 +198,11 @@ public class RoleImpl implements Role, E
 
   @Override
   public boolean shouldStop() {
-    return myState == RoleState.STOPPING;
+    return myState == RoleState.STOPPING || myState == RoleState.INACTIVE;
   }
 
   @Override
   public boolean shouldStart() {
-    return myState == RoleState.STARTING;
+    return myState == RoleState.STARTING || myState == RoleState.ACTIVE;
   }
 }

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ServiceEventType.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ServiceEventType.java?rev=1179070&r1=1179069&r2=1179070&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ServiceEventType.java
(original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ServiceEventType.java
Wed Oct  5 07:10:36 2011
@@ -38,5 +38,8 @@ public enum ServiceEventType {
   STOP_FAILURE,
   
   //Producer: Role
-  ROLE_STARTED
+  ROLE_STARTED,
+  
+  //Producer: Role
+  ROLE_STOPPED
 }

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ServiceImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ServiceImpl.java?rev=1179070&r1=1179069&r2=1179070&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ServiceImpl.java
(original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ServiceImpl.java
Wed Oct  5 07:10:36 2011
@@ -203,14 +203,22 @@ public class ServiceImpl implements Serv
 
     @Override
     public ServiceState transition(ServiceImpl operand, ServiceEvent event) {
-      //check whether all roles stopped, and if not remain in the STOPPING
-      //state, else move to the ACTIVE state
+      //check whether all roles stopped, and if not, remain in the STOPPING
+      //state, else move to the INACTIVE state
       Role role = operand.getNextRole();
       if (role != null) {
         StateMachineInvoker.getAMBARIEventHandler().handle(new RoleEvent(
             RoleEventType.STOP, role));
         return ServiceState.STOPPING;
       } else {
+        if (operand.getAssociatedCluster().getState() == ClusterState.STOPPING) {
+          //since we support stopping services explicitly (without stopping the 
+          //associated cluster), we need to check what the cluster state is
+          //before sending it any event
+          StateMachineInvoker.getAMBARIEventHandler().handle(
+              new ClusterEvent(ClusterEventType.STOP_SUCCESS, 
+                  operand.getAssociatedCluster()));
+        }
         return ServiceState.INACTIVE;
       }
     }



Mime
View raw message