incubator-ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r1177376 - in /incubator/ambari/trunk/controller/src/main/java/org/apache/ambari: components/ components/impl/ controller/ resource/statemachine/
Date Thu, 29 Sep 2011 17:29:22 GMT
Author: ddas
Date: Thu Sep 29 17:29:21 2011
New Revision: 1177376

URL: http://svn.apache.org/viewvc?rev=1177376&view=rev
Log:
AMBARI-10. Heartbeat iteration.

Modified:
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/components/ClusterContext.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/components/ComponentPlugin.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/components/impl/ClusterContextImpl.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/components/impl/HDFSPluginImpl.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/Cluster.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ClusterImpl.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/Role.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/RoleImpl.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/Service.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ServiceImpl.java
    incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/components/ClusterContext.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/components/ClusterContext.java?rev=1177376&r1=1177375&r2=1177376&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/components/ClusterContext.java
(original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/components/ClusterContext.java
Thu Sep 29 17:29:21 2011
@@ -37,12 +37,6 @@ public interface ClusterContext {
   String[] getAllRoles();
   
   /**
-   * Get all of the components that are currently active in the cluster
-   * @return a list of all of the services for this node
-   */
-  String[] getClusterComponents();
-  
-  /**
    * Get the directory name for the directory that should contain the software.
    * @return the full pathname of the directory
    */

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/components/ComponentPlugin.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/components/ComponentPlugin.java?rev=1177376&r1=1177375&r2=1177376&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/components/ComponentPlugin.java
(original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/components/ComponentPlugin.java
Thu Sep 29 17:29:21 2011
@@ -31,11 +31,18 @@ import org.apache.ambari.common.rest.ent
 public abstract class ComponentPlugin {
   
   /**
-   * Get the roles for this component.
+   * Get the inactive roles for this component.
+   * @return the list of roles that need to be installed, but don't have servers.
+   * @throws IOException
+   */
+  public abstract String[] getInactiveRoles() throws IOException;
+  
+  /**
+   * Get the active roles (ie. with servers) for this component.
    * @return the list of roles in the order that they should be started
    * @throws IOException
    */
-  public abstract String[] getRoles() throws IOException;
+  public abstract String[] getActiveRoles() throws IOException;
   
   /**
    * Get the components that this one depends on.

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/components/impl/ClusterContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/components/impl/ClusterContextImpl.java?rev=1177376&r1=1177375&r2=1177376&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/components/impl/ClusterContextImpl.java
(original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/components/impl/ClusterContextImpl.java
Thu Sep 29 17:29:21 2011
@@ -17,8 +17,6 @@
  */
 package org.apache.ambari.components.impl;
 
-import java.util.List;
-
 import org.apache.ambari.common.rest.entities.Blueprint;
 import org.apache.ambari.common.rest.entities.Cluster;
 import org.apache.ambari.common.rest.entities.ClusterDefinition;
@@ -70,10 +68,4 @@ public class ClusterContextImpl implemen
     return null;
   }
 
-  @Override
-  public String[] getClusterComponents() {
-    List<String> roles = cluster.getClusterDefinition().getActiveServices();
-    return roles.toArray(new String[1]);
-  }
-
 }

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/components/impl/HDFSPluginImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/components/impl/HDFSPluginImpl.java?rev=1177376&r1=1177375&r2=1177376&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/components/impl/HDFSPluginImpl.java
(original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/components/impl/HDFSPluginImpl.java
Thu Sep 29 17:29:21 2011
@@ -28,12 +28,6 @@ import org.apache.ambari.components.Comp
 public class HDFSPluginImpl extends ComponentPlugin {
 
   @Override
-  public String[] getRoles() throws IOException {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  @Override
   public String[] getRequiredComponents() throws IOException {
     // TODO Auto-generated method stub
     return null;
@@ -78,4 +72,16 @@ public class HDFSPluginImpl extends Comp
     return null;
   }
 
+  @Override
+  public String[] getInactiveRoles() throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public String[] getActiveRoles() throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
 }

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java?rev=1177376&r1=1177375&r2=1177376&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java
(original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/controller/HeartbeatHandler.java
Thu Sep 29 17:29:21 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.ambari.controller;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.GregorianCalendar;
@@ -38,9 +39,14 @@ import org.apache.ambari.common.rest.ent
 import org.apache.ambari.common.rest.entities.agent.ControllerResponse;
 import org.apache.ambari.common.rest.entities.agent.HeartBeat;
 import org.apache.ambari.common.rest.entities.agent.ServerStatus;
+import org.apache.ambari.common.rest.entities.agent.ServerStatus.State;
 import org.apache.ambari.components.ClusterContext;
 import org.apache.ambari.components.impl.ClusterContextImpl;
+import org.apache.ambari.components.impl.HDFSPluginImpl;
+import org.apache.ambari.resource.statemachine.Role;
+import org.apache.ambari.resource.statemachine.Service;
 import org.apache.ambari.resource.statemachine.StateMachineInvoker;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
 
 public class HeartbeatHandler {
   
@@ -63,7 +69,7 @@ public class HeartbeatHandler {
   }
   
   public ControllerResponse processHeartBeat(HeartBeat heartbeat) 
-      throws DatatypeConfigurationException {
+      throws DatatypeConfigurationException, IOException {
     ControllerResponse response = 
         agentToHeartbeatResponseMap.get(heartbeat.getHostname());
     if (response != null) {
@@ -71,6 +77,9 @@ public class HeartbeatHandler {
         return response; //duplicate heartbeat
       }
     }
+    
+    short responseId = 
+        (short) (Short.parseShort(response.getResponseId()) + 1);
 
     Node node = Nodes.getInstance().getNodes().get(heartbeat.getHostname());
     NodeState state = node.getNodeState();
@@ -79,25 +88,81 @@ public class HeartbeatHandler {
     state.setLastHeartbeatTime(
         DatatypeFactory.newInstance().newXMLGregorianCalendar(c));
     
-    Cluster cluster = Clusters.getInstance().getClusterByName(state.getClusterName());
+    Cluster cluster = 
+        Clusters.getInstance().getClusterByName(state.getClusterName());
     ClusterContext clusterContext = new ClusterContextImpl(cluster, node);
-
-    //get what is currently running on the node
-    List<ServerStatus> servers = heartbeat.getServersStatus();    
     
-    //get the state machine reference to the cluster
-    org.apache.ambari.resource.statemachine.Cluster stateMachineCluster = 
-        StateMachineInvoker.getStateMachineClusterInstance(state.getClusterName());
+    List <Action> allActions = new ArrayList<Action>();
     
-    
-    List<Action> actions = new ArrayList<Action>();
-    synchronized (this) {
-      actions = responseMap.get(node.getName());
+    if (heartbeat.getIdle()) {
+      //if the command-execution takes longer than one heartbeat interval
+      //the check for idleness will prevent the same node getting the same 
+      //command more than once. In the future this could be improved
+      //to reflect the command execution state more accurately.
+      
+      //get what is currently running on the node
+      List<ServerStatus> serverStatuses = heartbeat.getServersStatus();
+      
+      //CHECK what servers moved the role to ACTIVE state
+      StartedComponentServers componentServers = new StartedComponentServers();
+      for (ServerStatus status : serverStatuses) {
+        if (status.getState() == State.STARTED) {
+          componentServers.serverStarted(status.getComponent(), 
+              status.getServerName());
+        }
+      }
+
+      //get the state machine reference to the cluster
+      org.apache.ambari.resource.statemachine.Cluster clusterSMobject = 
+          StateMachineInvoker
+          .getStateMachineClusterInstance(state.getClusterName());
+      //the state machine reference to the services
+      List<Service> clusterServices = clusterSMobject.getServices();
+      //go through all the services, and check which role should be started
+      //Get the corresponding commands
+      for (Service service : clusterServices) {
+        List<Role> roles = service.getRoles();
+        for (Role role : roles) {
+          if (role.shouldStart() && 
+              !componentServers.isStarted(
+                  role.getAssociatedService().getServiceName(),
+                  role.getRoleName())) {
+            //TODO: get reference to the plugin impl
+            HDFSPluginImpl plugin = new HDFSPluginImpl();
+            List<Action> actions = 
+                plugin.startRoleServer(clusterContext, role.getRoleName());
+            allActions.addAll(actions);
+          }
+        }
+      }
     }
     ControllerResponse r = new ControllerResponse();
-    r.setActions(actions);
+    r.setResponseId(String.valueOf(responseId));
+    r.setActions(allActions);
     agentToHeartbeatResponseMap.put(heartbeat.getHostname(), r);
-    
     return r;
   }
+  
+  private static class StartedComponentServers {
+    private Map<String, Map<String, Boolean>> startedComponentServerMap =
+        new HashMap<String, Map<String, Boolean>>();
+    void serverStarted(String component, String server) {
+      Map<String, Boolean> serverStartedMap = null;
+      if ((serverStartedMap = startedComponentServerMap.get(component))
+          != null) {
+        serverStartedMap.put(server, true);
+        return;
+      }
+      serverStartedMap = new HashMap<String, Boolean>();
+      serverStartedMap.put(server, true);
+      startedComponentServerMap.put(component, serverStartedMap);
+    }
+    boolean isStarted(String component, String server) {
+      Map<String, Boolean> startedServerMap;
+      if ((startedServerMap=startedComponentServerMap.get(component)) != null){
+        return startedServerMap.get(server) != null;
+      }
+      return false;
+    }
+  }
 }

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/Cluster.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/Cluster.java?rev=1177376&r1=1177375&r2=1177376&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/Cluster.java
(original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/Cluster.java
Thu Sep 29 17:29:21 2011
@@ -20,8 +20,6 @@ package org.apache.ambari.resource.state
 import java.util.List;
 import java.util.Map;
 
-import org.apache.ambari.components.ClusterContext;
-
 public interface Cluster extends LifeCycle {
   public List<Service> getServices();
   public ClusterState getClusterState();

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ClusterImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ClusterImpl.java?rev=1177376&r1=1177375&r2=1177376&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ClusterImpl.java
(original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ClusterImpl.java
Thu Sep 29 17:29:21 2011
@@ -213,5 +213,4 @@ public class ClusterImpl implements Clus
   public String getClusterName() {
     return clusterName;
   }
-
 }

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/Role.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/Role.java?rev=1177376&r1=1177375&r2=1177376&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/Role.java
(original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/Role.java
Thu Sep 29 17:29:21 2011
@@ -25,4 +25,7 @@ public interface Role extends LifeCycle 
   
   public Service getAssociatedService();
   
+  public boolean shouldStop();
+  
+  public boolean shouldStart();
 }

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/RoleImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/RoleImpl.java?rev=1177376&r1=1177375&r2=1177376&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/RoleImpl.java
(original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/RoleImpl.java
Thu Sep 29 17:29:21 2011
@@ -121,6 +121,16 @@ public class RoleImpl implements Role, E
 
   @Override
   public void deactivate() {
-    //load the plugin and get the commands for stopping the role
+    
+  }
+
+  @Override
+  public boolean shouldStop() {
+    return myState == RoleState.STOPPING;
+  }
+
+  @Override
+  public boolean shouldStart() {
+    return myState == RoleState.STARTING;
   }
 }

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/Service.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/Service.java?rev=1177376&r1=1177375&r2=1177376&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/Service.java
(original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/Service.java
Thu Sep 29 17:29:21 2011
@@ -17,6 +17,8 @@
 */
 package org.apache.ambari.resource.statemachine;
 
+import java.util.List;
+
 public interface Service extends LifeCycle {
   
   public ServiceState getServiceState();
@@ -24,5 +26,9 @@ public interface Service extends LifeCyc
   public String getServiceName();
   
   public Cluster getAssociatedCluster();
+  
+  public boolean isActive();
+  
+  public List<Role> getRoles();
 
 }

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ServiceImpl.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ServiceImpl.java?rev=1177376&r1=1177375&r2=1177376&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ServiceImpl.java
(original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/ServiceImpl.java
Thu Sep 29 17:29:21 2011
@@ -75,7 +75,7 @@ public class ServiceImpl implements Serv
     this.myState = ServiceState.INACTIVE;
     //load plugin and get the roles and create them
     this.plugin = new HDFSPluginImpl();
-    String[] roles = this.plugin.getRoles();
+    String[] roles = this.plugin.getActiveRoles();
     for (String role : roles) {
       RoleImpl roleImpl = new RoleImpl(this, role);
       serviceRoles.add(roleImpl);
@@ -161,4 +161,14 @@ public class ServiceImpl implements Serv
     StateMachineInvoker.getAMBARIEventHandler().handle(
               new ServiceEvent(ServiceEventType.S_STOP, this));
   }
+
+  @Override
+  public boolean isActive() {
+    return myState == ServiceState.ACTIVE;
+  }
+
+  @Override
+  public List<Role> getRoles() {
+    return serviceRoles;
+  }
 }

Modified: incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java?rev=1177376&r1=1177375&r2=1177376&view=diff
==============================================================================
--- incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java
(original)
+++ incubator/ambari/trunk/controller/src/main/java/org/apache/ambari/resource/statemachine/StateMachineInvoker.java
Thu Sep 29 17:29:21 2011
@@ -20,11 +20,9 @@ package org.apache.ambari.resource.state
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
-import org.apache.ambari.components.ClusterContext;
 import org.apache.ambari.event.AsyncDispatcher;
 import org.apache.ambari.event.Dispatcher;
 import org.apache.ambari.event.EventHandler;



Mime
View raw message