ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alejan...@apache.org
Subject ambari git commit: AMBARI-8703. Rolling Upgrade - Upgrade Pack to run preupgrade tasks only on master for NameNode (alejandro)
Date Tue, 16 Dec 2014 03:04:53 GMT
Repository: ambari
Updated Branches:
  refs/heads/trunk 7691aa4a5 -> a1348d3c2


AMBARI-8703. Rolling Upgrade - Upgrade Pack to run preupgrade tasks only on master for NameNode (alejandro)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/a1348d3c
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/a1348d3c
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/a1348d3c

Branch: refs/heads/trunk
Commit: a1348d3c20bacef12fbb66f463b9b1c2be50382c
Parents: 7691aa4
Author: Alejandro Fernandez <afernandez@hortonworks.com>
Authored: Fri Dec 12 15:00:36 2014 -0800
Committer: Alejandro Fernandez <afernandez@hortonworks.com>
Committed: Mon Dec 15 18:52:28 2014 -0800

----------------------------------------------------------------------
 .../dynamic_variable_interpretation.py          |   9 +-
 .../internal/UpgradeResourceProvider.java       |   5 +-
 .../apache/ambari/server/stack/HostsType.java   |  43 +++++
 .../ambari/server/stack/MasterHostResolver.java | 156 +++++++++++++++++++
 .../org/apache/ambari/server/state/Cluster.java |  10 ++
 .../ambari/server/state/UpgradeHelper.java      | 131 ++++------------
 .../server/state/cluster/ClusterImpl.java       |  24 +++
 .../state/stack/upgrade/ClusterGrouping.java    |  10 +-
 .../state/stack/upgrade/ColocatedGrouping.java  |  14 +-
 .../server/state/stack/upgrade/ExecuteTask.java |   7 +
 .../server/state/stack/upgrade/Grouping.java    |  21 ++-
 .../state/stack/upgrade/StageWrapper.java       |   2 +-
 .../stack/upgrade/StageWrapperBuilder.java      |   5 +-
 .../ambari/server/state/stack/upgrade/Task.java |   3 -
 .../state/stack/upgrade/TaskWrapperBuilder.java |  76 +++++++++
 .../custom_actions/scripts/ru_execute_tasks.py  |   1 +
 .../services/HDFS/package/scripts/utils.py      |   5 +-
 .../stacks/HDP/2.2/upgrades/upgrade-2.2.xml     |  26 ++--
 .../ambari/server/state/UpgradeHelperTest.java  |  24 ++-
 .../stacks/HDP/2.1.1/upgrades/upgrade_test.xml  |   2 +-
 20 files changed, 427 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/a1348d3c/ambari-common/src/main/python/resource_management/libraries/functions/dynamic_variable_interpretation.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/dynamic_variable_interpretation.py b/ambari-common/src/main/python/resource_management/libraries/functions/dynamic_variable_interpretation.py
index 5f952d3..7a27322 100644
--- a/ambari-common/src/main/python/resource_management/libraries/functions/dynamic_variable_interpretation.py
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/dynamic_variable_interpretation.py
@@ -148,7 +148,7 @@ def copy_tarballs_to_hdfs(tarball_prefix, component_user, file_owner, group_owne
   tmpfile = tempfile.NamedTemporaryFile()
   out = None
   with open(tmpfile.name, 'r+') as file:
-    get_hdp_version_cmd = '/usr/bin/hdp-select versions > %s' % tmpfile.name
+    get_hdp_version_cmd = '/usr/bin/hdp-select status > %s' % tmpfile.name
     code, stdoutdata = shell.call(get_hdp_version_cmd)
     out = file.read()
   pass
@@ -157,7 +157,12 @@ def copy_tarballs_to_hdfs(tarball_prefix, component_user, file_owner, group_owne
                    (get_hdp_version_cmd, str(code), str(out)))
     return 1
 
-  hdp_version = out.strip() # this should include the build number
+  matches = re.findall(r"([\d\.]+\-\d+)", out)
+  hdp_version = matches[0] if matches and len(matches) > 0 else None
+
+  if not hdp_version:
+    Logger.error("Could not parse HDP version from output of hdp-select: %s" % str(out))
+    return 1
 
   file_name = os.path.basename(component_tar_source_file)
   destination_file = os.path.join(component_tar_destination_folder, file_name)

http://git-wip-us.apache.org/repos/asf/ambari/blob/a1348d3c/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
index d6136eb..71bb22c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
@@ -62,6 +62,7 @@ import org.apache.ambari.server.orm.entities.UpgradeEntity;
 import org.apache.ambari.server.orm.entities.UpgradeGroupEntity;
 import org.apache.ambari.server.orm.entities.UpgradeItemEntity;
 import org.apache.ambari.server.serveraction.upgrades.ManualStageAction;
+import org.apache.ambari.server.stack.MasterHostResolver;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.ConfigHelper;
 import org.apache.ambari.server.state.StackId;
@@ -357,8 +358,10 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
     Cluster cluster = getManagementController().getClusters().getCluster(clusterName);
     ConfigHelper configHelper = getManagementController().getConfigHelper();
 
+    MasterHostResolver mhr = new MasterHostResolver(cluster);
     UpgradeHelper helper = new UpgradeHelper();
-    List<UpgradeGroupHolder> groups = helper.createUpgrade(cluster, pack);
+
+    List<UpgradeGroupHolder> groups = helper.createUpgrade(cluster, mhr, pack);
     List<UpgradeGroupEntity> groupEntities = new ArrayList<UpgradeGroupEntity>();
 
     final String version = (String) requestMap.get(UPGRADE_VERSION);

http://git-wip-us.apache.org/repos/asf/ambari/blob/a1348d3c/ambari-server/src/main/java/org/apache/ambari/server/stack/HostsType.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/stack/HostsType.java b/ambari-server/src/main/java/org/apache/ambari/server/stack/HostsType.java
new file mode 100644
index 0000000..601ee4b
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/stack/HostsType.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.stack;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Wrapper around a collection of hosts that may have either a Master or Secondary host.
+ */
+public class HostsType {
+
+  public String master;
+
+  public String secondary;
+
+  /**
+   * Ordered collection of hosts.
+   */
+  public Set<String> hosts;
+
+  public HostsType () {
+    this.master = null;
+    this.secondary = null;
+    this.hosts = new HashSet<String>();
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a1348d3c/ambari-server/src/main/java/org/apache/ambari/server/stack/MasterHostResolver.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/stack/MasterHostResolver.java b/ambari-server/src/main/java/org/apache/ambari/server/stack/MasterHostResolver.java
new file mode 100644
index 0000000..3baf187
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/stack/MasterHostResolver.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.stack;
+
+import com.google.common.reflect.TypeToken;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.utils.HTTPUtils;
+
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.ambari.server.utils.StageUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class MasterHostResolver {
+
+  private static Logger LOG = LoggerFactory.getLogger(MasterHostResolver.class);
+
+  private Cluster cluster;
+
+  enum Service {
+    HDFS,
+    HBASE
+  }
+
+  /**
+   * Union of status for several services.
+   */
+  enum Status {
+    ACTIVE,
+    STANDBY
+  }
+
+  public MasterHostResolver() {
+    ;
+  }
+
+  public MasterHostResolver(Cluster cluster) {
+    this.cluster = cluster;
+  }
+
+  /**
+   * Get the master hostname of the given service and component.
+   * @param serviceName Service
+   * @param componentName Component
+   * @return The hostname that is the master of the service and component if successful, null otherwise.
+   */
+  public HostsType getMasterAndHosts(String serviceName, String componentName) {
+    HostsType hostsType = new HostsType();
+
+    if (serviceName == null || componentName == null) {
+      return null;
+    }
+
+    Service s = Service.valueOf(serviceName.toUpperCase());
+
+    Set<String> componentHosts = cluster.getHosts(serviceName, componentName);
+    if (0 == componentHosts.size()) {
+      return null;
+    }
+
+    hostsType.hosts = componentHosts;
+
+    switch (s) {
+      case HDFS:
+        if (componentName.equalsIgnoreCase("NAMENODE")) {
+          Map<Status, String> pair = getNameNodePair(componentHosts);
+          if (pair != null) {
+            hostsType.master = pair.containsKey(Status.ACTIVE) ? pair.get(Status.ACTIVE) :  null;
+            hostsType.secondary = pair.containsKey(Status.STANDBY) ? pair.get(Status.STANDBY) :  null;
+          }
+        }
+        break;
+      case HBASE:
+        if (componentName.equalsIgnoreCase("HBASE_REGIONSERVER")) {
+          // TODO Rolling Upgrade, fill for this Component.
+          ;
+        }
+        break;
+    }
+    return hostsType;
+  }
+
+  /**
+   * Get mapping of the HDFS Namenodes from the state ("active" or "standby") to the hostname.
+   * @param hosts Hosts to lookup.
+   * @return Returns a map from the state ("active" or "standby" to the hostname with that state.
+   */
+  private Map<Status, String> getNameNodePair(Set<String> hosts) {
+    Map<Status, String> stateToHost = new HashMap<Status, String>();
+    if (hosts != null && hosts.size() == 2) {
+      Iterator iter = hosts.iterator();
+
+      while(iter.hasNext()) {
+        String hostname = (String) iter.next();
+        try {
+          // TODO Rolling Upgrade, don't hardcode jmx port number
+          // E.g.,
+          // dfs.namenode.http-address.dev.nn1 : c6401.ambari.apache.org:50070
+          // dfs.namenode.http-address.dev.nn2 : c6402.ambari.apache.org:50070
+          String endpoint = "http://" + hostname + ":50070/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus";
+          String response = HTTPUtils.requestURL(endpoint);
+
+          if (response != null && !response.isEmpty()) {
+            Map<String, ArrayList<HashMap<String, String>>> nameNodeInfo = new HashMap<String, ArrayList<HashMap<String, String>>>();
+            Type type = new TypeToken<Map<String, ArrayList<HashMap<String, String>>>>() {}.getType();
+            nameNodeInfo = StageUtils.getGson().fromJson(response, type);
+
+            try {
+              String state = nameNodeInfo.get("beans").get(0).get("State");
+
+              if (state.equalsIgnoreCase(Status.ACTIVE.toString()) || state.equalsIgnoreCase(Status.STANDBY.toString())) {
+                Status status = Status.valueOf(state.toUpperCase());
+                stateToHost.put(status, hostname);
+              }
+            } catch (Exception e) {
+              throw new Exception("Response from endpoint " + endpoint + " was not formatted correctly. Value: " + response);
+            }
+          } else {
+            throw new Exception("Response from endpoint " + endpoint + " was empty.");
+          }
+        } catch (Exception e) {
+          LOG.warn("Failed to parse namenode jmx endpoint to get state for host " + hostname + ". Error: " + e.getMessage());
+        }
+      }
+
+      if (stateToHost.containsKey(Status.ACTIVE) && stateToHost.containsKey(Status.STANDBY) && !stateToHost.get(Status.ACTIVE).equalsIgnoreCase(stateToHost.get(Status.STANDBY))) {
+        return stateToHost;
+      }
+    }
+
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a1348d3c/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
index 0cc6a24..e6067f5 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java
@@ -77,6 +77,16 @@ public interface Cluster {
    */
   public List<ServiceComponentHost> getServiceComponentHosts(String hostname);
 
+
+  /**
+   * Get all of the hosts running the provided service and component.
+   * @param serviceName
+   * @param componentName
+   * @return
+   */
+  public Set<String> getHosts(String serviceName, String componentName);
+
+
   /**
    * Remove ServiceComponentHost from cluster
    * @param svcCompHost

http://git-wip-us.apache.org/repos/asf/ambari/blob/a1348d3c/ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeHelper.java
index 15e3ddc..d7c09ea 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeHelper.java
@@ -17,17 +17,14 @@
  */
 package org.apache.ambari.server.state;
 
+import java.text.MessageFormat;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.LinkedHashSet;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
+import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.controller.internal.RequestResourceProvider;
 import org.apache.ambari.server.controller.internal.StageResourceProvider;
 import org.apache.ambari.server.controller.predicate.AndPredicate;
@@ -43,6 +40,9 @@ import org.apache.ambari.server.controller.spi.UnsupportedPropertyException;
 import org.apache.ambari.server.controller.utilities.ClusterControllerHelper;
 import org.apache.ambari.server.controller.utilities.PredicateBuilder;
 import org.apache.ambari.server.controller.utilities.PropertyHelper;
+import org.apache.ambari.server.stack.HostsType;
+import org.apache.ambari.server.stack.MasterHostResolver;
+import org.apache.ambari.server.state.cluster.ClusterImpl;
 import org.apache.ambari.server.state.stack.UpgradePack;
 import org.apache.ambari.server.state.stack.UpgradePack.ProcessingComponent;
 import org.apache.ambari.server.state.stack.upgrade.ClusterGrouping;
@@ -50,7 +50,7 @@ import org.apache.ambari.server.state.stack.upgrade.Grouping;
 import org.apache.ambari.server.state.stack.upgrade.StageWrapper;
 import org.apache.ambari.server.state.stack.upgrade.StageWrapperBuilder;
 import org.apache.ambari.server.state.stack.upgrade.TaskWrapper;
-import org.apache.ambari.server.utils.HTTPUtils;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,67 +62,13 @@ public class UpgradeHelper {
   private static Logger LOG = LoggerFactory.getLogger(UpgradeHelper.class);
 
   /**
-   * Tuple of namenode states
-   */
-  public static class NameNodePair {
-    String activeHostName;
-    String standbyHostName;
-  }
-
-  /**
-   * Retrieve a class that represents a tuple of the active and standby namenodes. This should be called in an HA cluster.
-   * @param hosts
-   * @return
-   */
-  public static NameNodePair getNameNodePair(Set<String> hosts) {
-    if (hosts != null && hosts.size() == 2) {
-      Iterator iter = hosts.iterator();
-      HashMap<String, String> stateToHost = new HashMap<String, String>();
-      Pattern pattern = Pattern.compile("^.*org\\.apache\\.hadoop\\.hdfs\\.server\\.namenode\\.NameNode\".*?\"State\"\\s*:\\s*\"(.+?)\".*$");
-
-      while(iter.hasNext()) {
-        String hostname = (String) iter.next();
-        try {
-          // TODO Rolling Upgrade, don't hardcode jmx port number
-          // E.g.,
-          // dfs.namenode.http-address.dev.nn1 : c6401.ambari.apache.org:50070
-          // dfs.namenode.http-address.dev.nn2 : c6402.ambari.apache.org:50070
-          String endpoint = "http://" + hostname + ":50070/jmx";
-          String response = HTTPUtils.requestURL(endpoint);
-
-          if (response != null && !response.isEmpty()) {
-            Matcher matcher = pattern.matcher(response);
-            if (matcher.matches()) {
-              String state = matcher.group(1);
-              stateToHost.put(state.toLowerCase(), hostname);
-            }
-          } else {
-            throw new Exception("Response from endpoint " + endpoint + " was empty.");
-          }
-        } catch (Exception e) {
-          LOG.warn("Failed to parse namenode jmx endpoint to get state for host " + hostname + ". Error: " + e.getMessage());
-        }
-      }
-
-      if (stateToHost.containsKey("active") && stateToHost.containsKey("standby") && !stateToHost.get("active").equalsIgnoreCase(stateToHost.get("standby"))) {
-        NameNodePair pair = new NameNodePair();
-        pair.activeHostName = stateToHost.get("active");
-        pair.standbyHostName = stateToHost.get("standby");
-        return pair;
-      }
-    }
-
-    return null;
-  }
-
-  /**
    * Generates a list of UpgradeGroupHolder items that are used to execute an upgrade
    * @param cluster the cluster
+   * @param mhr Master Host Resolver needed to get master and secondary hosts of several components like NAMENODE
    * @param upgradePack the upgrade pack
    * @return the list of holders
    */
-  public List<UpgradeGroupHolder> createUpgrade(Cluster cluster, UpgradePack upgradePack) {
-
+  public List<UpgradeGroupHolder> createUpgrade(Cluster cluster, MasterHostResolver mhr, UpgradePack upgradePack) throws AmbariException {
     Map<String, Map<String, ProcessingComponent>> allTasks = upgradePack.getTasks();
 
     List<UpgradeGroupHolder> groups = new ArrayList<UpgradeGroupHolder>();
@@ -154,25 +100,38 @@ public class UpgradeHelper {
             continue;
           }
 
-          Set<String> componentHosts = getClusterHosts(cluster, service.serviceName, component);
+          Set<String> componentHosts = cluster.getHosts(service.serviceName, component);
           if (0 == componentHosts.size()) {
             continue;
           }
+          HostsType hostsType = new HostsType();
+          hostsType.hosts = componentHosts;
 
           ProcessingComponent pc = allTasks.get(service.serviceName).get(component);
 
           // Special case for NAMENODE
           if (service.serviceName.equalsIgnoreCase("HDFS") && component.equalsIgnoreCase("NAMENODE")) {
-              NameNodePair pair = getNameNodePair(componentHosts);
-              if (pair != null ) {
-                // The order is important, first do the standby, then the active namenode.
-                Set<String> order = new LinkedHashSet<String>();
-                order.add(pair.standbyHostName);
-                order.add(pair.activeHostName);
-                builder.add(order, service.serviceName, pc);
-              }
-          } else {
-            builder.add(componentHosts, service.serviceName, pc);
+            hostsType = mhr.getMasterAndHosts(service.serviceName, component);
+            if (hostsType != null && hostsType.master != null && componentHosts.contains(hostsType.master) && hostsType.secondary != null && componentHosts.contains(hostsType.secondary)) {
+              // The order is important, first do the standby, then the active namenode.
+              Set<String> order = new LinkedHashSet<String>();
+
+              // TODO Upgrade Pack, somehow, running the secondary first causes them to swap, even before the restart.
+              order.add(hostsType.master);
+              order.add(hostsType.secondary);
+
+              // Override the hosts with the ordered collection
+              hostsType.hosts = order;
+              builder.add(hostsType, service.serviceName, pc);
+            } else {
+              throw new AmbariException(MessageFormat.format("Could not find active and standby namenodes using hosts: {0}", StringUtils.join(componentHosts, ", ").toString()));
+            }
+          }
+          /*
+          TODO Rolling Upgrade, write logic for HBASE
+          */
+          else {
+            builder.add(hostsType, service.serviceName, pc);
           }
         }
       }
@@ -305,30 +264,6 @@ public class UpgradeHelper {
   }
 
   /**
-   * @param cluster the cluster
-   * @param serviceName name of the service
-   * @param componentName name of the component
-   * @return the set of hosts for the provided service and component
-   */
-  public Set<String> getClusterHosts(Cluster cluster, String serviceName, String componentName) {
-    Map<String, Service> services = cluster.getServices();
-
-    if (!services.containsKey(serviceName)) {
-      return Collections.emptySet();
-    }
-
-    Service service = services.get(serviceName);
-    Map<String, ServiceComponent> components = service.getServiceComponents();
-
-    if (!components.containsKey(componentName) ||
-        components.get(componentName).getServiceComponentHosts().size() == 0) {
-      return Collections.emptySet();
-    }
-
-    return components.get(componentName).getServiceComponentHosts().keySet();
-  }
-
-  /**
    * Special handling for ClusterGrouping.
    * @param cluster the cluster
    * @param grouping the grouping
@@ -336,7 +271,7 @@ public class UpgradeHelper {
    */
   private UpgradeGroupHolder getClusterGroupHolder(Cluster cluster, ClusterGrouping grouping) {
 
-    grouping.getBuilder().setHelpers(this, cluster);
+    grouping.getBuilder().setHelpers(cluster);
     List<StageWrapper> wrappers = grouping.getBuilder().build();
 
     if (wrappers.size() > 0) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/a1348d3c/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
index c54e13a..e3f4893 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
@@ -2311,6 +2311,30 @@ public class ClusterImpl implements Cluster {
     return failedEvents;
   }
 
+  /**
+   * @param serviceName name of the service
+   * @param componentName name of the component
+   * @return the set of hosts for the provided service and component
+   */
+  @Override
+  public Set<String> getHosts(String serviceName, String componentName) {
+    Map<String, Service> services = this.getServices();
+
+    if (!services.containsKey(serviceName)) {
+      return Collections.emptySet();
+    }
+
+    Service service = services.get(serviceName);
+    Map<String, ServiceComponent> components = service.getServiceComponents();
+
+    if (!components.containsKey(componentName) ||
+        components.get(componentName).getServiceComponentHosts().size() == 0) {
+      return Collections.emptySet();
+    }
+
+    return components.get(componentName).getServiceComponentHosts().keySet();
+  }
+
   private ClusterHealthReport getClusterHealthReport() throws AmbariException {
 
     int staleConfigsHosts = 0;

http://git-wip-us.apache.org/repos/asf/ambari/blob/a1348d3c/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ClusterGrouping.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ClusterGrouping.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ClusterGrouping.java
index 984e270..77e9922 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ClusterGrouping.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ClusterGrouping.java
@@ -30,8 +30,10 @@ import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
 import javax.xml.bind.annotation.XmlType;
 
+import org.apache.ambari.server.stack.HostsType;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.UpgradeHelper;
+import org.apache.ambari.server.state.cluster.ClusterImpl;
 import org.apache.ambari.server.state.stack.UpgradePack;
 import org.apache.ambari.server.state.stack.UpgradePack.ProcessingComponent;
 
@@ -71,19 +73,17 @@ public class ClusterGrouping extends Grouping {
   }
 
   public class ClusterBuilder extends StageWrapperBuilder {
-    private UpgradeHelper m_helper = null;
     private Cluster m_cluster = null;
 
     /**
      * @param cluster the cluster to use with this builder
      */
-    public void setHelpers(UpgradeHelper helper, Cluster cluster) {
-      m_helper = helper;
+    public void setHelpers(Cluster cluster) {
       m_cluster = cluster;
     }
 
     @Override
-    public void add(Set<String> hosts, String service, ProcessingComponent pc) {
+    public void add(HostsType hostsType, String service, ProcessingComponent pc) {
       // !!! no-op in this case
     }
 
@@ -101,7 +101,7 @@ public class ClusterGrouping extends Grouping {
         StageWrapper wrapper = null;
 
         if (null != execution.service && null != execution.component) {
-          Set<String> hosts = m_helper.getClusterHosts(m_cluster, execution.service, execution.component);
+          Set<String> hosts = m_cluster.getHosts(execution.service, execution.component);
           // !!! FIXME other types
           if (hosts.size() > 0 && task.getType() == Task.Type.EXECUTE) {
             wrapper = new StageWrapper(

http://git-wip-us.apache.org/repos/asf/ambari/blob/a1348d3c/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ColocatedGrouping.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ColocatedGrouping.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ColocatedGrouping.java
index 677463d..cc8b57b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ColocatedGrouping.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ColocatedGrouping.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import javax.xml.bind.annotation.XmlElement;
 import javax.xml.bind.annotation.XmlType;
 
+import org.apache.ambari.server.stack.HostsType;
 import org.apache.ambari.server.state.stack.UpgradePack.ProcessingComponent;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -65,13 +66,16 @@ public class ColocatedGrouping extends Grouping {
     }
 
     @Override
-    public void add(Set<String> hosts, String service, ProcessingComponent pc) {
+    public void add(HostsType hostsType, String service, ProcessingComponent pc) {
 
       int count = Double.valueOf(Math.ceil(
-          (double) batch.percent / 100 * hosts.size())).intValue();
+          (double) batch.percent / 100 * hostsType.hosts.size())).intValue();
 
       int i = 0;
-      for (String host : hosts) {
+      for (String host : hostsType.hosts) {
+        // This class required inserting a single host into the collection
+        HostsType singleHostsType = new HostsType();
+        singleHostsType.hosts = Collections.singleton(host);
 
         Map<String, List<TaskProxy>> targetMap = ((i++) < count) ? initialBatch : finalBatches;
         List<TaskProxy> targetList = targetMap.get(host);
@@ -85,7 +89,7 @@ public class ColocatedGrouping extends Grouping {
         if (null != pc.preTasks && pc.preTasks.size() > 0) {
           proxy = new TaskProxy();
           proxy.message = getStageText("Preparing", pc.name, Collections.singleton(host));
-          proxy.tasks.add(new TaskWrapper(service, pc.name, Collections.singleton(host), pc.preTasks));
+          proxy.tasks.addAll(TaskWrapperBuilder.getTaskList(service, pc.name, singleHostsType, pc.preTasks));
           proxy.service = service;
           proxy.component = pc.name;
           targetList.add(proxy);
@@ -110,7 +114,7 @@ public class ColocatedGrouping extends Grouping {
           proxy = new TaskProxy();
           proxy.component = pc.name;
           proxy.service = service;
-          proxy.tasks.add(new TaskWrapper(service, pc.name, Collections.singleton(host), pc.postTasks));
+          proxy.tasks.addAll(TaskWrapperBuilder.getTaskList(service, pc.name, singleHostsType, pc.postTasks));
           proxy.message = getStageText("Completing", pc.name, Collections.singleton(host));
           targetList.add(proxy);
         }

http://git-wip-us.apache.org/repos/asf/ambari/blob/a1348d3c/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ExecuteTask.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ExecuteTask.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ExecuteTask.java
index 56e19eb..a70e08f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ExecuteTask.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ExecuteTask.java
@@ -23,6 +23,7 @@ import javax.xml.bind.annotation.XmlElement;
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
 import javax.xml.bind.annotation.XmlType;
+import javax.xml.bind.annotation.XmlAttribute;
 
 /**
  * Used to represent an execution that should occur on an agent.
@@ -36,6 +37,12 @@ public class ExecuteTask extends Task {
   private Task.Type type = Task.Type.EXECUTE;
 
   /**
+   * Host to run on, if not specified, run on all. Other values include "master"
+   */
+  @XmlAttribute
+  public String hosts;
+
+  /**
    * Command to run under normal conditions.
    */
   @XmlElement(name="command")

http://git-wip-us.apache.org/repos/asf/ambari/blob/a1348d3c/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Grouping.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Grouping.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Grouping.java
index 4f4b034..d68754d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Grouping.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Grouping.java
@@ -27,6 +27,7 @@ import javax.xml.bind.annotation.XmlAttribute;
 import javax.xml.bind.annotation.XmlElement;
 import javax.xml.bind.annotation.XmlSeeAlso;
 
+import org.apache.ambari.server.stack.HostsType;
 import org.apache.ambari.server.state.stack.UpgradePack;
 import org.apache.ambari.server.state.stack.UpgradePack.ProcessingComponent;
 import org.apache.commons.lang.StringUtils;
@@ -62,17 +63,20 @@ public class Grouping {
     /**
      * Add stages where the restart stages are ordered
      * E.g., preupgrade, restart hosts(0), ..., restart hosts(n-1), postupgrade
-     * @param hosts the hosts
+     * @param hostsType the order collection of hosts, which may have a master and secondary
      * @param service the service name
      * @param pc the ProcessingComponent derived from the upgrade pack.
      */
     @Override
-    public void add(Set<String> hosts, String service, ProcessingComponent pc) {
+    public void add(HostsType hostsType, String service, ProcessingComponent pc) {
       if (null != pc.preTasks && pc.preTasks.size() > 0) {
+        List<TaskWrapper> preTasks = TaskWrapperBuilder.getTaskList(service, pc.name, hostsType, pc.preTasks);
+        Set<String> preTasksEffectiveHosts = TaskWrapperBuilder.getEffectiveHosts(preTasks);
         StageWrapper stage = new StageWrapper(
             StageWrapper.Type.RU_TASKS,
-            getStageText("Preparing", pc.name, hosts),
-            new TaskWrapper(service, pc.name, hosts, pc.preTasks));
+            getStageText("Preparing", pc.name, preTasksEffectiveHosts),
+            preTasks
+            );
         stages.add(stage);
       }
 
@@ -80,7 +84,7 @@ public class Grouping {
       if (null != pc.tasks && 1 == pc.tasks.size()) {
         Task t = pc.tasks.get(0);
         if (RestartTask.class.isInstance(t)) {
-          for (String hostName : hosts) {
+          for (String hostName : hostsType.hosts) {
             StageWrapper stage = new StageWrapper(
                 StageWrapper.Type.RESTART,
                 getStageText("Restarting", pc.name, Collections.singleton(hostName)),
@@ -91,15 +95,16 @@ public class Grouping {
       }
 
       if (null != pc.postTasks && pc.postTasks.size() > 0) {
+        List<TaskWrapper> postTasks = TaskWrapperBuilder.getTaskList(service, pc.name, hostsType, pc.postTasks);
+        Set<String> postTasksEffectiveHosts = TaskWrapperBuilder.getEffectiveHosts(postTasks);
         StageWrapper stage = new StageWrapper(
             StageWrapper.Type.RU_TASKS,
-            getStageText("Completing", pc.name, hosts),
-            new TaskWrapper(service, pc.name, hosts, pc.postTasks));
+            getStageText("Completing", pc.name, postTasksEffectiveHosts),
+            postTasks);
         stages.add(stage);
       }
 
       serviceChecks.add(service);
-
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/ambari/blob/a1348d3c/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapper.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapper.java
index ae1558f..0c4d363 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapper.java
@@ -40,7 +40,7 @@ public class StageWrapper {
     this(type, text, Arrays.asList(tasks));
   }
 
-  private StageWrapper(Type type, String text, List<TaskWrapper> tasks) {
+  public StageWrapper(Type type, String text, List<TaskWrapper> tasks) {
     this.type = type;
     this.text = text;
     this.tasks = tasks;

http://git-wip-us.apache.org/repos/asf/ambari/blob/a1348d3c/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapperBuilder.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapperBuilder.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapperBuilder.java
index bc5d4a1..0015824 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapperBuilder.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapperBuilder.java
@@ -20,6 +20,7 @@ package org.apache.ambari.server.state.stack.upgrade;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.ambari.server.stack.HostsType;
 import org.apache.ambari.server.state.stack.UpgradePack.ProcessingComponent;
 
 /**
@@ -30,11 +31,11 @@ public abstract class StageWrapperBuilder {
   /**
    * Adds a processing component that will be built into stage wrappers.
    *
-   * @param hosts the hosts
+   * @param hostsType the hosts, along with their type
    * @param service the service name
    * @param pc the ProcessingComponent derived from the upgrade pack.
    */
-  public abstract void add(Set<String> hosts, String service, ProcessingComponent pc);
+  public abstract void add(HostsType hostsType, String service, ProcessingComponent pc);
 
   /**
    * Builds the stage wrappers.

http://git-wip-us.apache.org/repos/asf/ambari/blob/a1348d3c/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Task.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Task.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Task.java
index 8e7c577..c1e65ca 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Task.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Task.java
@@ -74,8 +74,5 @@ public abstract class Task {
     public boolean isCommand() {
       return this == RESTART;
     }
-
-
-
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/a1348d3c/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapperBuilder.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapperBuilder.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapperBuilder.java
new file mode 100644
index 0000000..81ec2ff
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapperBuilder.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.state.stack.upgrade;
+
+import org.apache.ambari.server.stack.HostsType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.MessageFormat;
+import java.util.*;
+
+
+/**
+ * Generates a collection of tasks that need to run on a set of hosts during an upgarde.
+ */
+public class TaskWrapperBuilder {
+
+  private static Logger LOG = LoggerFactory.getLogger(TaskWrapperBuilder.class);
+
+  /**
+   * Creates a collection of tasks based on the set of hosts they are allowed to run on
+   * by analyzing the "hosts" attribute of any ExecuteTask objects.
+   * @param service the service name for the tasks
+   * @param component the component name for the tasks
+   * @param hostsType the collection of sets along with their status
+   * @param tasks collection of tasks
+   */
+  public static List<TaskWrapper> getTaskList(String service, String component, HostsType hostsType, List<Task> tasks) {
+    List<TaskWrapper> collection = new ArrayList<TaskWrapper>();
+    for (Task t : tasks) {
+      if (t.getType().equals(Task.Type.EXECUTE)) {
+        if (((ExecuteTask) t).hosts != null && ((ExecuteTask) t).hosts.equalsIgnoreCase("master")) {
+          if (hostsType.master != null) {
+            collection.add(new TaskWrapper(service, component, Collections.singleton(hostsType.master), t));
+            continue;
+          } else {
+            LOG.error(MessageFormat.format("Found an Execute task for {0} and {1} meant to run on a master but could not find any masters to run on. Skipping this task.", service, component));
+            continue;
+          }
+        }
+      }
+
+      collection.add(new TaskWrapper(service, component, hostsType.hosts, t));
+    }
+
+    return collection;
+  }
+
+  /**
+   * Given a collection of tasks, get the union of the hosts.
+   * @param tasks Collection of tasks
+   * @return Returns the union of the hosts scheduled to perform the tasks.
+   */
+  public static Set<String> getEffectiveHosts(List<TaskWrapper> tasks) {
+    Set<String> effectiveHosts = new HashSet<String>();
+    for(TaskWrapper t : tasks) {
+      effectiveHosts.addAll(t.getHosts());
+    }
+    return effectiveHosts;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/a1348d3c/ambari-server/src/main/resources/custom_actions/scripts/ru_execute_tasks.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/custom_actions/scripts/ru_execute_tasks.py b/ambari-server/src/main/resources/custom_actions/scripts/ru_execute_tasks.py
index 018dc6f..645666d 100644
--- a/ambari-server/src/main/resources/custom_actions/scripts/ru_execute_tasks.py
+++ b/ambari-server/src/main/resources/custom_actions/scripts/ru_execute_tasks.py
@@ -35,6 +35,7 @@ from resource_management.libraries.functions.list_ambari_managed_repos import *
 # TODO, HACK
 def replace_variables(cmd, host_name, version):
   if cmd:
+    cmd = cmd.replace("{{host_name}}", "{host_name}")
     cmd = cmd.replace("0.0.0.0", "{host_name}")
     cmd = cmd.replace("{{version}}", "{version}")
     cmd = format(cmd)

http://git-wip-us.apache.org/repos/asf/ambari/blob/a1348d3c/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/package/scripts/utils.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/package/scripts/utils.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/package/scripts/utils.py
index a2c2400..6f421b6 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/package/scripts/utils.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/package/scripts/utils.py
@@ -76,7 +76,8 @@ def failover_namenode():
     Execute(check_standby_cmd,
             user=params.hdfs_user,
             tries=30,
-            try_sleep=6)
+            try_sleep=6,
+            logoutput=True)
   else:
     Logger.info("Rolling Upgrade - Host %s is the standby namenode." % str(params.hostname))
 
@@ -97,7 +98,7 @@ def kill_zkfc(zkfc_user):
       if code == 0:
         Logger.debug("ZKFC is running and will be killed to initiate namenode failover.")
         kill_command = format("{check_process} && kill -9 `cat {zkfc_pid_file}` > /dev/null 2>&1")
-        checked_call(kill_command)
+        checked_call(kill_command, verbose=True)
 
 
 def get_service_pid_file(name, user):

http://git-wip-us.apache.org/repos/asf/ambari/blob/a1348d3c/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.2.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.2.xml b/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.2.xml
index e9546ee..285e876 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.2.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.2.xml
@@ -53,7 +53,6 @@
       <service name="YARN">
         <component>APP_TIMELINE_SERVER</component>
         <component>RESOURCEMANAGER</component>
-        <component>NODEMANAGER</component>
       </service>
       <service name="HBASE">
         <component>HBASE_MASTER</component>
@@ -159,27 +158,19 @@
           FINALIZE rolling upgrade ...
           There is no rolling upgrade in progress or rolling upgrade has already been finalized.
           -->
-          <task xsi:type="execute">
-            <command>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -safemode enter'</command>
+          <task xsi:type="execute" hosts="master">
+            <first>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -safemode get | grep "Safe mode is ON in {{host_name}}"'</first>
+            <command>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -safemode leave'</command>
             <upto>10</upto>
             <every>6</every>
           </task>
 
-          <task xsi:type="execute">
+          <task xsi:type="execute" hosts="master">
             <command>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -rollingUpgrade prepare'</command>
-            <onfailure>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -safemode leave'</onfailure>   <!-- TODO, stay in safemode if in HA. -->
           </task>
 
-          <task xsi:type="execute">
+          <task xsi:type="execute" hosts="master">
             <command>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -rollingUpgrade query'</command>
-            <onfailure>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -safemode leave'</onfailure>   <!-- TODO, stay in safemode if in HA. -->
-          </task>
-
-          <!-- Apparently, the HDFS Namenode restart requires safemode to be OFF when not in HA. -->
-          <task xsi:type="execute">
-            <command>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -safemode leave'</command>
-            <upto>60</upto>
-            <every>1</every>
           </task>
         </pre-upgrade>
 
@@ -189,11 +180,12 @@
 
         <!-- This step should be done once the user clicks on the "Finalize" button. So the name post-upgrade is misleading. -->
         <post-upgrade>
-          <task xsi:type="execute">
+          <task xsi:type="execute" hosts="master">
             <command>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -rollingUpgrade finalize'</command>
           </task>
-          <task xsi:type="execute">
-            <command>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -safemode leave'</command>       <!-- TODO, stay in safemode if in HA. -->
+          <task xsi:type="execute" hosts="master">
+            <first>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -safemode get | grep "Safe mode is ON in {{host_name}}"'</first>
+            <command>su - {{hadoop-env/hdfs_user}} -c 'hdfs dfsadmin -safemode leave'</command>
           </task>
         </post-upgrade>
       </component>

http://git-wip-us.apache.org/repos/asf/ambari/blob/a1348d3c/ambari-server/src/test/java/org/apache/ambari/server/state/UpgradeHelperTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/UpgradeHelperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/UpgradeHelperTest.java
index 54c341f..6646b5e 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/UpgradeHelperTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/UpgradeHelperTest.java
@@ -29,6 +29,8 @@ import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
+import org.apache.ambari.server.stack.HostsType;
+import org.apache.ambari.server.stack.MasterHostResolver;
 import org.apache.ambari.server.state.UpgradeHelper.UpgradeGroupHolder;
 import org.apache.ambari.server.state.stack.UpgradePack;
 import org.junit.After;
@@ -38,6 +40,10 @@ import org.junit.Test;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.persist.PersistService;
+import org.mockito.Mockito;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests the {@link UpgradeHelper} class
@@ -46,6 +52,7 @@ public class UpgradeHelperTest {
 
   private Injector injector;
   private AmbariMetaInfo ambariMetaInfo;
+  private MasterHostResolver m_masterHostResolver;
 
   @Before
   public void before() throws Exception {
@@ -55,6 +62,8 @@ public class UpgradeHelperTest {
 
     ambariMetaInfo = injector.getInstance(AmbariMetaInfo.class);
     ambariMetaInfo.init();
+    
+    m_masterHostResolver = mock(MasterHostResolver.class);
   }
 
   @After
@@ -74,8 +83,14 @@ public class UpgradeHelperTest {
 
     Cluster cluster = makeCluster();
 
+    HostsType hostsType = new HostsType();
+    hostsType.hosts = cluster.getHosts("HDFS", "NAMENODE");
+    hostsType.master = "h1";
+    hostsType.secondary = "h2";
+    when(m_masterHostResolver.getMasterAndHosts(Mockito.matches("HDFS"), Mockito.matches("NAMENODE"))).thenReturn(hostsType);
+
     UpgradeHelper helper = new UpgradeHelper();
-    List<UpgradeGroupHolder> groups = helper.createUpgrade(cluster, upgrade);
+    List<UpgradeGroupHolder> groups = helper.createUpgrade(cluster, m_masterHostResolver, upgrade);
 
     assertEquals(5, groups.size());
 
@@ -86,10 +101,14 @@ public class UpgradeHelperTest {
     assertEquals("POST_CLUSTER", groups.get(4).name);
 
     assertEquals(6, groups.get(1).items.size());
-    assertEquals(2, groups.get(2).items.size());
+    assertEquals(6, groups.get(2).items.size());
     assertEquals(7, groups.get(3).items.size());
   }
 
+  /**
+   * Create an HA cluster
+   * @throws AmbariException
+   */
   public Cluster makeCluster() throws AmbariException {
     Clusters clusters = injector.getInstance(Clusters.class);
     ServiceFactory serviceFactory = injector.getInstance(ServiceFactory.class);
@@ -123,6 +142,7 @@ public class UpgradeHelperTest {
     Service s = c.getService("HDFS");
     ServiceComponent sc = s.addServiceComponent("NAMENODE");
     sc.addServiceComponentHost("h1");
+    sc.addServiceComponentHost("h2");
     sc = s.addServiceComponent("DATANODE");
     sc.addServiceComponentHost("h2");
     sc.addServiceComponentHost("h3");

http://git-wip-us.apache.org/repos/asf/ambari/blob/a1348d3c/ambari-server/src/test/resources/stacks/HDP/2.1.1/upgrades/upgrade_test.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/resources/stacks/HDP/2.1.1/upgrades/upgrade_test.xml b/ambari-server/src/test/resources/stacks/HDP/2.1.1/upgrades/upgrade_test.xml
index e93e985..70328ad 100644
--- a/ambari-server/src/test/resources/stacks/HDP/2.1.1/upgrades/upgrade_test.xml
+++ b/ambari-server/src/test/resources/stacks/HDP/2.1.1/upgrades/upgrade_test.xml
@@ -107,7 +107,7 @@
     <service name="HDFS">
       <component name="NAMENODE">
         <pre-upgrade>
-          <task xsi:type="execute">
+          <task xsi:type="execute" hosts="master">
             <command>su - {hdfs-user} -c 'dosomething'</command>
           </task>
           <task xsi:type="configure">


Mime
View raw message