ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jmar...@apache.org
Subject [28/58] [partial] ambari git commit: [RTC 136620]: Introduce BigInsights stacks on Ambari 2.4 branch
Date Wed, 17 Aug 2016 00:33:22 GMT
http://git-wip-us.apache.org/repos/asf/ambari/blob/66984d9a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/hdfs_namenode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/hdfs_namenode.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/hdfs_namenode.py
new file mode 100644
index 0000000..ad0daf6
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/hdfs_namenode.py
@@ -0,0 +1,488 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+import os.path
+import time
+
+from resource_management import *
+from resource_management.core.exceptions import ComponentIsNotRunning
+from resource_management.libraries.functions.check_process_status import check_process_status
+from resource_management.libraries.functions import Direction
+
+from resource_management.core.shell import as_user
+from resource_management.core.exceptions import Fail
+from resource_management.core.logger import Logger
+from resource_management.core import shell
+from resource_management.core.source import Template
+from resource_management.core.resources.system import File, Execute, Directory
+from resource_management.core.resources.service import Service
+from resource_management.libraries.functions.format import format
+from resource_management.libraries.functions.check_process_status import check_process_status
+from resource_management.libraries.resources.execute_hadoop import ExecuteHadoop
+from ambari_commons import OSCheck, OSConst
+from ambari_commons.os_family_impl import OsFamilyImpl, OsFamilyFuncImpl
+from utils import get_dfsadmin_base_command
+from utils import service, safe_zkfc_op, is_previous_fs_image
+
+if OSCheck.is_windows_family():
+  from resource_management.libraries.functions.windows_service_utils import check_windows_service_status
+
+from resource_management.core.shell import as_user
+from resource_management.core.exceptions import Fail
+from resource_management.core.logger import Logger
+
+from setup_ranger_hdfs import setup_ranger_hdfs, create_ranger_audit_hdfs_directories
+
+
+@OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT)
+def namenode(action=None, hdfs_binary=None, do_format=True, upgrade_type=None, env=None):
+  if action is None:
+    raise Fail('"action" parameter is required for function namenode().')
+
+  if action in ["start", "stop"] and hdfs_binary is None:
+    raise Fail('"hdfs_binary" parameter is required for function namenode().')
+
+  if action == "configure":
+    import params
+    #we need this directory to be present before any action(HA manual steps for
+    #additional namenode)
+    create_name_dirs(params.dfs_name_dir)
+  elif action == "start":
+    Logger.info("Called service {0} with upgrade_type: {1}".format(action, str(upgrade_type)))
+    setup_ranger_hdfs(upgrade_type=upgrade_type)
+    import params
+    if do_format:
+      format_namenode()
+      pass
+
+    File(params.exclude_file_path,
+         content=Template("exclude_hosts_list.j2"),
+         owner=params.hdfs_user,
+         group=params.user_group
+    )
+
+    Directory(params.hadoop_pid_dir_prefix,
+              mode=0755,
+              owner=params.hdfs_user,
+              group=params.user_group
+    )
+
+    if params.dfs_ha_enabled and \
+      params.dfs_ha_namenode_standby is not None and \
+      params.hostname == params.dfs_ha_namenode_standby:
+        # if the current host is the standby NameNode in an HA deployment
+        # run the bootstrap command, to start the NameNode in standby mode
+        # this requires that the active NameNode is already up and running,
+        # so this execute should be re-tried upon failure, up to a timeout
+        success = bootstrap_standby_namenode(params)
+        if not success:
+          raise Fail("Could not bootstrap standby namenode")
+
+    if upgrade_type == "rolling" and params.dfs_ha_enabled:
+      # Most likely, ZKFC is up since RU will initiate the failover command. However, if that failed, it would have tried
+      # to kill ZKFC manually, so we need to start it if not already running.
+      safe_zkfc_op(action, env)
+
+    #options = "-rollingUpgrade started" if rolling_restart else ""
+    options = ""
+    if upgrade_type == "rolling":
+      if params.upgrade_direction == Direction.UPGRADE:
+        options = "-rollingUpgrade started"
+      elif params.upgrade_direction == Direction.DOWNGRADE:
+        options = "-rollingUpgrade downgrade"
+    elif upgrade_type == "nonrolling":
+      is_previous_image_dir = is_previous_fs_image()
+      Logger.info(format("Previous file system image dir present is {is_previous_image_dir}"))
+
+      if params.upgrade_direction == Direction.UPGRADE:
+        options = "-rollingUpgrade started"
+      elif params.upgrade_direction == Direction.DOWNGRADE:
+        options = "-rollingUpgrade downgrade"
+
+    Logger.info(format("Option for start command: {options}"))
+
+    service(
+      action="start",
+      name="namenode",
+      user=params.hdfs_user,
+      options=options,
+      create_pid_dir=True,
+      create_log_dir=True
+    )
+
+
+    if params.security_enabled:
+      Execute(format("{kinit_path_local} -kt {hdfs_user_keytab} {hdfs_principal_name}"),
+              user = params.hdfs_user)
+    dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary, use_specific_namenode=True)
+    is_namenode_safe_mode_off = dfsadmin_base_command + " -safemode get | grep 'Safe mode is OFF'"
+    if params.dfs_ha_enabled:
+      is_active_namenode_cmd = as_user(format("{hdfs_binary} --config {hadoop_conf_dir} haadmin -getServiceState {namenode_id} | grep active"), params.hdfs_user, env={'PATH':params.hadoop_bin_dir})
+    else:
+      is_active_namenode_cmd = True
+    
+    # During NonRolling Upgrade, both NameNodes are initially down,
+    # so no point in checking if this is the active or standby.
+    if upgrade_type == "nonrolling":
+      is_active_namenode_cmd = False
+
+    # ___Scenario___________|_Expected safemode state__|_Wait for safemode OFF____|
+    # no-HA                 | ON -> OFF                | Yes                      |
+    # HA and active         | ON -> OFF                | Yes                      |
+    # HA and standby        | no change                | no check                 |
+    # RU with HA on active  | ON -> OFF                | Yes                      |
+    # RU with HA on standby | ON -> OFF                | Yes                      |
+    # EU with HA on active  | no change                | no check                 |
+    # EU with HA on standby | no change                | no check                 |
+    # EU non-HA             | no change                | no check                 |
+
+    check_for_safemode_off = False
+    msg = ""
+    if params.dfs_ha_enabled:
+      if upgrade_type is not None:
+        check_for_safemode_off = True
+        msg = "Must wait to leave safemode since High Availability is enabled during a Stack Upgrade"
+      else:
+        Logger.info("Wait for NameNode to become active.")
+        if is_active_namenode(hdfs_binary): # active
+          check_for_safemode_off = True
+          msg = "Must wait to leave safemode since High Availability is enabled and this is the Active NameNode."
+        else:
+          msg = "Will remain in the current safemode state."
+    else:
+      msg = "Must wait to leave safemode since High Availability is not enabled."
+      check_for_safemode_off = True
+
+    Logger.info(msg)
+
+    # During a NonRolling (aka Express Upgrade), stay in safemode since the DataNodes are down.
+    stay_in_safe_mode = False
+    if upgrade_type == "nonrolling":
+      stay_in_safe_mode = True
+
+    if check_for_safemode_off:
+      Logger.info("Stay in safe mode: {0}".format(stay_in_safe_mode))
+      if not stay_in_safe_mode:
+        Logger.info("Wait to leafe safemode since must transition from ON to OFF.")
+        try:
+          # Wait up to 30 mins
+          Execute(is_namenode_safe_mode_off,
+                  tries=180,
+                  try_sleep=10,
+                  user=params.hdfs_user,
+                  logoutput=True
+          )
+        except Fail:
+          Logger.error("NameNode is still in safemode, please be careful with commands that need safemode OFF.")
+
+    # Always run this on non-HA, or active NameNode during HA.
+    create_hdfs_directories(is_active_namenode_cmd)
+    create_ranger_audit_hdfs_directories(is_active_namenode_cmd)
+
+  elif action == "stop":
+    import params
+    service(
+      action="stop", name="namenode", 
+      user=params.hdfs_user
+    )
+  elif action == "status":
+    import status_params
+    check_process_status(status_params.namenode_pid_file)
+  elif action == "decommission":
+    decommission()
+
+@OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY)
+def namenode(action=None, do_format=True, upgrade_type=None, env=None):
+  if action == "configure":
+    pass
+  elif action == "start":
+    import params
+    #TODO: Replace with format_namenode()
+    namenode_format_marker = os.path.join(params.hadoop_conf_dir,"NN_FORMATTED")
+    if not os.path.exists(namenode_format_marker):
+      hadoop_cmd = "cmd /C %s" % (os.path.join(params.hadoop_home, "bin", "hadoop.cmd"))
+      Execute("%s namenode -format" % (hadoop_cmd))
+      open(namenode_format_marker, 'a').close()
+    Service(params.namenode_win_service_name, action=action)
+  elif action == "stop":
+    import params
+    Service(params.namenode_win_service_name, action=action)
+  elif action == "status":
+    import status_params
+    check_windows_service_status(status_params.namenode_win_service_name)
+  elif action == "decommission":
+    decommission()
+    
+def create_name_dirs(directories):
+  import params
+
+  dirs = directories.split(",")
+  Directory(dirs,
+            mode=0755,
+            owner=params.hdfs_user,
+            group=params.user_group,
+            recursive=True,
+            cd_access="a",
+  )
+
+
+def create_hdfs_directories(check):
+  import params
+
+  params.HdfsResource("/tmp",
+                       type="directory",
+                       action="create_on_execute",
+                       owner=params.hdfs_user,
+                       mode=0777,
+                       only_if=check
+  )
+  params.HdfsResource(params.smoke_hdfs_user_dir,
+                       type="directory",
+                       action="create_on_execute",
+                       owner=params.smoke_user,
+                       mode=params.smoke_hdfs_user_mode,
+                       only_if=check
+  )
+  params.HdfsResource(None, 
+                      action="execute",
+                      only_if=check #skip creation when HA not active
+  )
+
+def format_namenode(force=None):
+  import params
+
+  old_mark_dir = params.namenode_formatted_old_mark_dirs
+  mark_dir = params.namenode_formatted_mark_dirs
+  dfs_name_dir = params.dfs_name_dir
+  hdfs_user = params.hdfs_user
+  hadoop_conf_dir = params.hadoop_conf_dir
+
+  if not params.dfs_ha_enabled:
+    if force:
+      ExecuteHadoop('namenode -format',
+                    kinit_override=True,
+                    bin_dir=params.hadoop_bin_dir,
+                    conf_dir=hadoop_conf_dir)
+    else:
+      if not is_namenode_formatted(params):
+        Execute(format("yes Y | hdfs --config {hadoop_conf_dir} namenode -format"),
+                user = params.hdfs_user,
+                path = [params.hadoop_bin_dir]
+        )
+        for m_dir in mark_dir:
+          Directory(m_dir,
+            recursive = True
+          )
+  else:
+    if params.dfs_ha_namenode_active is not None and \
+       params.hostname == params.dfs_ha_namenode_active:
+      # check and run the format command in the HA deployment scenario
+      # only format the "active" namenode in an HA deployment
+      if force:
+        ExecuteHadoop('namenode -format',
+                      kinit_override=True,
+                      bin_dir=params.hadoop_bin_dir,
+                      conf_dir=hadoop_conf_dir)
+      else:
+        if not is_namenode_formatted(params):
+          Execute(format("yes Y | hdfs --config {hadoop_conf_dir} namenode -format"),
+                  user = params.hdfs_user,
+                  path = [params.hadoop_bin_dir]
+          )
+          for m_dir in mark_dir:
+            Directory(m_dir,
+              recursive = True
+            )
+
+def is_namenode_formatted(params):
+  old_mark_dirs = params.namenode_formatted_old_mark_dirs
+  mark_dirs = params.namenode_formatted_mark_dirs
+  nn_name_dirs = params.dfs_name_dir.split(',')
+  marked = False
+  # Check if name directories have been marked as formatted
+  for mark_dir in mark_dirs:
+    if os.path.isdir(mark_dir):
+      marked = True
+      print format("{mark_dir} exists. Namenode DFS already formatted")
+    
+  # Ensure that all mark dirs created for all name directories
+  if marked:
+    for mark_dir in mark_dirs:
+      Directory(mark_dir,
+        recursive = True
+      )      
+    return marked  
+  
+  # Move all old format markers to new place
+  for old_mark_dir in old_mark_dirs:
+    if os.path.isdir(old_mark_dir):
+      for mark_dir in mark_dirs:
+        Execute(('cp', '-ar', old_mark_dir, mark_dir),
+                sudo = True
+        )
+        marked = True
+      Directory(old_mark_dir,
+        action = "delete"
+      )    
+    elif os.path.isfile(old_mark_dir):
+      for mark_dir in mark_dirs:
+        Directory(mark_dir,
+                  recursive = True,
+        )
+      Directory(old_mark_dir,
+        action = "delete"
+      )
+      marked = True
+      
+  # Check if name dirs are not empty
+  for name_dir in nn_name_dirs:
+    try:
+      Execute(format("ls {name_dir} | wc -l  | grep -q ^0$"),
+      )
+      marked = False
+    except Exception:
+      marked = True
+      print format("ERROR: Namenode directory(s) is non empty. Will not format the namenode. List of non-empty namenode dirs {nn_name_dirs}")
+      break
+       
+  return marked    
+      
+def decommission():
+  import params
+
+  hdfs_user = params.hdfs_user
+  conf_dir = params.hadoop_conf_dir
+  user_group = params.user_group
+  nn_kinit_cmd = params.nn_kinit_cmd
+  
+  File(params.exclude_file_path,
+       content=Template("exclude_hosts_list.j2"),
+       owner=hdfs_user,
+       group=user_group
+  )
+  
+  if not params.update_exclude_file_only:
+    Execute(nn_kinit_cmd,
+            user=hdfs_user
+    )
+
+    if params.dfs_ha_enabled:
+      # due to a bug in hdfs, refreshNodes will not run on both namenodes so we
+      # need to execute each command scoped to a particular namenode
+      nn_refresh_cmd = format('dfsadmin -fs hdfs://{namenode_rpc} -refreshNodes')
+    else:
+      nn_refresh_cmd = format('dfsadmin -fs {namenode_address} -refreshNodes')
+    ExecuteHadoop(nn_refresh_cmd,
+                  user=hdfs_user,
+                  conf_dir=conf_dir,
+                  kinit_override=True,
+                  bin_dir=params.hadoop_bin_dir)
+
+def bootstrap_standby_namenode(params, use_path=False):
+
+  bin_path = os.path.join(params.hadoop_bin_dir, '') if use_path else ""
+
+  try:
+    iterations = 50
+    bootstrap_cmd = format("{bin_path}hdfs namenode -bootstrapStandby -nonInteractive")
+    # Blue print based deployments start both NN in parallel and occasionally
+    # the first attempt to bootstrap may fail. Depending on how it fails the
+    # second attempt may not succeed (e.g. it may find the folder and decide that
+    # bootstrap succeeded). The solution is to call with -force option but only
+    # during initial start
+    if params.command_phase == "INITIAL_START":
+      bootstrap_cmd = format("{bin_path}hdfs namenode -bootstrapStandby -nonInteractive -force")
+    Logger.info("Boostrapping standby namenode: %s" % (bootstrap_cmd))
+    for i in range(iterations):
+      Logger.info('Try %d out of %d' % (i+1, iterations))
+      code, out = shell.call(bootstrap_cmd, logoutput=False, user=params.hdfs_user)
+      if code == 0:
+        Logger.info("Standby namenode bootstrapped successfully")
+        return True
+      elif code == 5:
+        Logger.info("Standby namenode already bootstrapped")
+        return True
+      else:
+        Logger.warning('Bootstrap standby namenode failed with %d error code. Will retry' % (code))
+  except Exception as ex:
+    Logger.error('Bootstrap standby namenode threw an exception. Reason %s' %(str(ex)))
+  return False
+
+
+def is_active_namenode(hdfs_binary):
+  """
+  Checks if current NameNode is active. Waits up to 30 seconds. If other NameNode is active returns False.
+  :return: True if current NameNode is active, False otherwise
+  """
+  import params
+
+  if params.dfs_ha_enabled:
+    is_active_this_namenode_cmd = as_user(format("{hdfs_binary} --config {hadoop_conf_dir} haadmin -getServiceState {namenode_id} | grep active"), params.hdfs_user, env={'PATH':params.hadoop_bin_dir})
+    is_active_other_namenode_cmd = as_user(format("{hdfs_binary} --config {hadoop_conf_dir} haadmin -getServiceState {other_namenode_id} | grep active"), params.hdfs_user, env={'PATH':params.hadoop_bin_dir})
+
+    for i in range(0, 5):
+      code, out = shell.call(is_active_this_namenode_cmd) # If active NN, code will be 0
+      if code == 0: # active
+        return True
+
+      code, out = shell.call(is_active_other_namenode_cmd) # If other NN is active, code will be 0
+      if code == 0: # other NN is active
+        return False
+
+      if i < 4: # Do not sleep after last iteration
+        time.sleep(6)
+
+    Logger.info("Active NameNode is not found.")
+    return False
+
+  else:
+    return True
+
+def wait_for_safemode_off(afterwait_sleep=0, execute_kinit=False):
+  """
+  During NonRolling (aka Express Upgrade), after starting NameNode, which is still in safemode, and then starting
+  all of the DataNodes, we need for NameNode to receive all of the block reports and leave safemode.
+  If HA is present, then this command will run individually on each NameNode, which checks for its own address.
+  """
+  import params
+
+  Logger.info("Wait to leafe safemode since must transition from ON to OFF.")
+
+  if params.security_enabled and execute_kinit:
+    kinit_command = format("{params.kinit_path_local} -kt {params.hdfs_user_keytab} {params.hdfs_principal_name}")
+    Execute(kinit_command, user=params.hdfs_user, logoutput=True)
+
+  try:
+    # Note, this fails if namenode_address isn't prefixed with "params."
+
+    dfsadmin_base_command = get_dfsadmin_base_command('hdfs', use_specific_namenode=True)
+    is_namenode_safe_mode_off = dfsadmin_base_command + " -safemode get | grep 'Safe mode is OFF'"
+
+    # Wait up to 30 mins
+    Execute(is_namenode_safe_mode_off,
+            tries=115,
+            try_sleep=10,
+            user=params.hdfs_user,
+            logoutput=True
+            )
+
+    # Wait a bit more since YARN still depends on block reports coming in.
+    # Also saw intermittent errors with HBASE service check if it was done too soon.
+    time.sleep(afterwait_sleep)
+  except Fail:
+    Logger.error("NameNode is still in safemode, please be careful with commands that need safemode OFF.")

http://git-wip-us.apache.org/repos/asf/ambari/blob/66984d9a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/hdfs_nfsgateway.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/hdfs_nfsgateway.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/hdfs_nfsgateway.py
new file mode 100644
index 0000000..d874b2e
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/hdfs_nfsgateway.py
@@ -0,0 +1,72 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+from resource_management.core.exceptions import Fail
+from resource_management.core.logger import Logger
+from resource_management.core.resources import Directory
+from resource_management.core import shell
+from utils import service
+import subprocess,os
+
+# NFS GATEWAY is always started by root using jsvc due to rpcbind bugs
+# on Linux such as CentOS6.2. https://bugzilla.redhat.com/show_bug.cgi?id=731542
+
+def prepare_rpcbind():
+  Logger.info("check if native nfs server is running")
+  p, output = shell.call("pgrep nfsd")
+  if p == 0 :
+    Logger.info("native nfs server is running. shutting it down...")
+    # shutdown nfs
+    shell.call("service nfs stop")
+    shell.call("service nfs-kernel-server stop")
+    Logger.info("check if the native nfs server is down...")
+    p, output = shell.call("pgrep nfsd")
+    if p == 0 :
+      raise Fail("Failed to shutdown native nfs service")
+
+  Logger.info("check if rpcbind or portmap is running")
+  p, output = shell.call("pgrep rpcbind")
+  q, output = shell.call("pgrep portmap")
+
+  if p!=0 and q!=0 :
+    Logger.info("no portmap or rpcbind running. starting one...")
+    p, output = shell.call(("service", "rpcbind", "start"), sudo=True)
+    q, output = shell.call(("service", "portmap", "start"), sudo=True)
+    if p!=0 and q!=0 :
+      raise Fail("Failed to start rpcbind or portmap")
+
+  Logger.info("now we are ready to start nfs gateway")
+
+
+def nfsgateway(action=None, format=False):
+  import params
+
+  if action== "start":
+    prepare_rpcbind()
+
+  if action == "configure":
+    return
+  elif action == "start" or action == "stop":
+    service(
+      action=action,
+      name="nfs3",
+      user=params.root_user,
+      create_pid_dir=True,
+      create_log_dir=True
+    )

http://git-wip-us.apache.org/repos/asf/ambari/blob/66984d9a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/hdfs_rebalance.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/hdfs_rebalance.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/hdfs_rebalance.py
new file mode 100644
index 0000000..1dc545e
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/hdfs_rebalance.py
@@ -0,0 +1,130 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import re
+
+class HdfsParser():
+  def __init__(self):
+    self.initialLine = None
+    self.state = None
+  
+  def parseLine(self, line):
+    hdfsLine = HdfsLine()
+    type, matcher = hdfsLine.recognizeType(line)
+    if(type == HdfsLine.LineType.HeaderStart):
+      self.state = 'PROCESS_STARTED'
+    elif (type == HdfsLine.LineType.Progress):
+      self.state = 'PROGRESS'
+      hdfsLine.parseProgressLog(line, matcher)
+      if(self.initialLine == None): self.initialLine = hdfsLine
+      
+      return hdfsLine 
+    elif (type == HdfsLine.LineType.ProgressEnd):
+      self.state = 'PROCESS_FINISED'
+    return None
+    
+class HdfsLine():
+  
+  class LineType:
+    HeaderStart, Progress, ProgressEnd, Unknown = range(4)
+  
+  
+  MEMORY_SUFFIX = ['B','KB','MB','GB','TB','PB','EB']
+  MEMORY_PATTERN = '(?P<memmult_%d>(?P<memory_%d>(\d+)(.|,)?(\d+)?) (?P<mult_%d>'+"|".join(MEMORY_SUFFIX)+'))'
+  
+  HEADER_BEGIN_PATTERN = re.compile('Time Stamp\w+Iteration#\w+Bytes Already Moved\w+Bytes Left To Move\w+Bytes Being Moved')
+  PROGRESS_PATTERN = re.compile(
+                            "(?P<date>.*?)\s+" + 
+                            "(?P<iteration>\d+)\s+" + 
+                            MEMORY_PATTERN % (1,1,1) + "\s+" + 
+                            MEMORY_PATTERN % (2,2,2) + "\s+" +
+                            MEMORY_PATTERN % (3,3,3)
+                            )
+  PROGRESS_END_PATTERN = re.compile('(The cluster is balanced. Exiting...|The cluster is balanced. Exiting...)')
+  
+  def __init__(self):
+    self.date = None
+    self.iteration = None
+    self.bytesAlreadyMoved = None 
+    self.bytesLeftToMove = None
+    self.bytesBeingMoved = None 
+    self.bytesAlreadyMovedStr = None 
+    self.bytesLeftToMoveStr = None
+    self.bytesBeingMovedStr = None 
+  
+  def recognizeType(self, line):
+    for (type, pattern) in (
+                            (HdfsLine.LineType.HeaderStart, self.HEADER_BEGIN_PATTERN),
+                            (HdfsLine.LineType.Progress, self.PROGRESS_PATTERN), 
+                            (HdfsLine.LineType.ProgressEnd, self.PROGRESS_END_PATTERN)
+                            ):
+      m = re.match(pattern, line)
+      if m:
+        return type, m
+    return HdfsLine.LineType.Unknown, None
+    
+  def parseProgressLog(self, line, m):
+    '''
+    Parse the line of 'hdfs rebalancer' output. The example output being parsed:
+    
+    Time Stamp               Iteration#  Bytes Already Moved  Bytes Left To Move  Bytes Being Moved
+    Jul 28, 2014 5:01:49 PM           0                  0 B             5.74 GB            9.79 GB
+    Jul 28, 2014 5:03:00 PM           1                  0 B             5.58 GB            9.79 GB
+    
+    Throws AmbariException in case of parsing errors
+
+    '''
+    m = re.match(self.PROGRESS_PATTERN, line)
+    if m:
+      self.date = m.group('date') 
+      self.iteration = int(m.group('iteration'))
+       
+      self.bytesAlreadyMoved = self.parseMemory(m.group('memory_1'), m.group('mult_1')) 
+      self.bytesLeftToMove = self.parseMemory(m.group('memory_2'), m.group('mult_2')) 
+      self.bytesBeingMoved = self.parseMemory(m.group('memory_3'), m.group('mult_3'))
+       
+      self.bytesAlreadyMovedStr = m.group('memmult_1') 
+      self.bytesLeftToMoveStr = m.group('memmult_2')
+      self.bytesBeingMovedStr = m.group('memmult_3') 
+    else:
+      raise AmbariException("Failed to parse line [%s]") 
+  
+  def parseMemory(self, memorySize, multiplier_type):
+    try:
+      factor = self.MEMORY_SUFFIX.index(multiplier_type)
+    except ValueError:
+      raise AmbariException("Failed to memory value [%s %s]" % (memorySize, multiplier_type))
+    
+    return float(memorySize) * (1024 ** factor)
+  def toJson(self):
+    return {
+            'timeStamp' : self.date,
+            'iteration' : self.iteration,
+            
+            'dataMoved': self.bytesAlreadyMovedStr,
+            'dataLeft' : self.bytesLeftToMoveStr,
+            'dataBeingMoved': self.bytesBeingMovedStr,
+            
+            'bytesMoved': self.bytesAlreadyMoved,
+            'bytesLeft' : self.bytesLeftToMove,
+            'bytesBeingMoved': self.bytesBeingMoved,
+          }
+  def __str__(self):
+    return "[ date=%s,iteration=%d, bytesAlreadyMoved=%d, bytesLeftToMove=%d, bytesBeingMoved=%d]"%(self.date, self.iteration, self.bytesAlreadyMoved, self.bytesLeftToMove, self.bytesBeingMoved)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/66984d9a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/hdfs_snamenode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/hdfs_snamenode.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/hdfs_snamenode.py
new file mode 100644
index 0000000..2675637
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/hdfs_snamenode.py
@@ -0,0 +1,53 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+from resource_management import *
+from utils import service
+
+def snamenode(action=None, format=False):
+  import params
+
+  if action == "configure":
+    for fs_checkpoint_dir in params.fs_checkpoint_dirs:
+      Directory(fs_checkpoint_dir,
+                recursive=True,
+                cd_access="a",
+                mode=0755,
+                owner=params.hdfs_user,
+                group=params.user_group)
+    File(params.exclude_file_path,
+         content=Template("exclude_hosts_list.j2"),
+         owner=params.hdfs_user,
+         group=params.user_group)
+  elif action == "start" or action == "stop":
+    Directory(params.hadoop_pid_dir_prefix,
+              mode=0755,
+              owner=params.hdfs_user,
+              group=params.user_group
+    )
+    service(
+      action=action,
+      name="secondarynamenode",
+      user=params.hdfs_user,
+      create_pid_dir=True,
+      create_log_dir=True
+    )
+  elif action == "status":
+    import status_params
+    check_process_status(status_params.snamenode_pid_file)

http://git-wip-us.apache.org/repos/asf/ambari/blob/66984d9a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/install_params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/install_params.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/install_params.py
new file mode 100644
index 0000000..fe488c3
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/install_params.py
@@ -0,0 +1,39 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+from ambari_commons import OSCheck
+
+# These parameters are supposed to be referenced at installation time, before the Hadoop environment variables have been set
+if OSCheck.is_windows_family():
+  exclude_packages = []
+else:
+  from resource_management.libraries.functions.default import default
+  from resource_management.libraries.functions.get_lzo_packages import get_lzo_packages
+  from resource_management.libraries.script.script import Script
+
+  _config = Script.get_config()
+  stack_version_unformatted = str(_config['hostLevelParams']['stack_version'])
+
+  # The logic for LZO also exists in OOZIE's params.py
+  io_compression_codecs = default("/configurations/core-site/io.compression.codecs", None)
+  lzo_enabled = io_compression_codecs is not None and "com.hadoop.compression.lzo" in io_compression_codecs.lower()
+  lzo_packages = get_lzo_packages(stack_version_unformatted)
+
+  exclude_packages = []
+  if not lzo_enabled:
+    exclude_packages += lzo_packages

http://git-wip-us.apache.org/repos/asf/ambari/blob/66984d9a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/journalnode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/journalnode.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/journalnode.py
new file mode 100644
index 0000000..b970f04
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/journalnode.py
@@ -0,0 +1,169 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+from resource_management import *
+from resource_management.libraries.functions import conf_select
+from resource_management.libraries.functions import iop_select
+from resource_management.libraries.functions.version import compare_versions, \
+  format_hdp_stack_version
+from resource_management.libraries.functions.format import format
+from resource_management.libraries.functions.security_commons import build_expectations, \
+  cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, \
+  FILE_TYPE_XML
+
+from utils import service
+from hdfs import hdfs
+import journalnode_upgrade
+
+
+class JournalNode(Script):
+
+  def get_stack_to_component(self):
+    return {"BigInsights": "hadoop-hdfs-journalnode"}
+
+  def install(self, env):
+    import params
+
+    self.install_packages(env, params.exclude_packages)
+    env.set_params(params)
+
+  def pre_upgrade_restart(self, env, upgrade_type=None):
+    Logger.info("Executing Stack Upgrade pre-restart")
+    import params
+    env.set_params(params)
+
+    if params.version and compare_versions(format_hdp_stack_version(params.version), '4.0.0.0') >= 0:
+      conf_select.select(params.stack_name, "hadoop", params.version)
+      iop_select.select("hadoop-hdfs-journalnode", params.version)
+      #Execute(format("iop-select set hadoop-hdfs-journalnode {version}"))
+
+  def start(self, env, upgrade_type=None):
+    import params
+
+    env.set_params(params)
+    self.configure(env)
+    Directory(params.hadoop_pid_dir_prefix,
+              mode=0755,
+              owner=params.hdfs_user,
+              group=params.user_group
+    )
+    service(
+      action="start", name="journalnode", user=params.hdfs_user,
+      create_pid_dir=True,
+      create_log_dir=True
+    )
+
+  def post_upgrade_restart(self, env, upgrade_type=None):
+    if upgrade_type == "nonrolling":
+      return
+    
+    Logger.info("Executing Stack Upgrade post-restart")
+    import params
+    env.set_params(params)
+    journalnode_upgrade.post_upgrade_check()
+
+  def stop(self, env, upgrade_type=None):
+    import params
+
+    env.set_params(params)
+    service(
+      action="stop", name="journalnode", user=params.hdfs_user,
+      create_pid_dir=True,
+      create_log_dir=True
+    )
+
+  def configure(self, env):
+    import params
+
+    Directory(params.jn_edits_dir,
+              recursive=True,
+              cd_access="a",
+              owner=params.hdfs_user,
+              group=params.user_group
+    )
+    env.set_params(params)
+    hdfs()
+    pass
+
+  def status(self, env):
+    import status_params
+
+    env.set_params(status_params)
+    check_process_status(status_params.journalnode_pid_file)
+
+  def security_status(self, env):
+    import status_params
+
+    env.set_params(status_params)
+    props_value_check = {"hadoop.security.authentication": "kerberos",
+                         "hadoop.security.authorization": "true"}
+    props_empty_check = ["hadoop.security.auth_to_local"]
+    props_read_check = None
+    core_site_expectations = build_expectations('core-site', props_value_check, props_empty_check,
+                                                props_read_check)
+
+    props_value_check = None
+    props_empty_check = ['dfs.journalnode.keytab.file',
+                         'dfs.journalnode.kerberos.principal']
+    props_read_check = ['dfs.journalnode.keytab.file']
+    hdfs_site_expectations = build_expectations('hdfs-site', props_value_check, props_empty_check,
+                                                props_read_check)
+
+    hdfs_expectations = {}
+    hdfs_expectations.update(hdfs_site_expectations)
+    hdfs_expectations.update(core_site_expectations)
+
+    security_params = get_params_from_filesystem(status_params.hadoop_conf_dir,
+                                                 {'core-site.xml': FILE_TYPE_XML})
+    if 'core-site' in security_params and 'hadoop.security.authentication' in security_params['core-site'] and \
+        security_params['core-site']['hadoop.security.authentication'].lower() == 'kerberos':
+      result_issues = validate_security_config_properties(security_params, hdfs_expectations)
+      if not result_issues:  # If all validations passed successfully
+        try:
+          # Double check the dict before calling execute
+          if ('hdfs-site' not in security_params or
+                  'dfs.journalnode.kerberos.keytab.file' not in security_params['hdfs-site'] or
+                  'dfs.journalnode.kerberos.principal' not in security_params['hdfs-site']):
+            self.put_structured_out({"securityState": "UNSECURED"})
+            self.put_structured_out(
+              {"securityIssuesFound": "Keytab file or principal are not set property."})
+            return
+
+          cached_kinit_executor(status_params.kinit_path_local,
+                                status_params.hdfs_user,
+                                security_params['hdfs-site']['dfs.journalnode.kerberos.keytab.file'],
+                                security_params['hdfs-site']['dfs.journalnode.kerberos.principal'],
+                                status_params.hostname,
+                                status_params.tmp_dir)
+          self.put_structured_out({"securityState": "SECURED_KERBEROS"})
+        except Exception as e:
+          self.put_structured_out({"securityState": "ERROR"})
+          self.put_structured_out({"securityStateErrorInfo": str(e)})
+      else:
+        issues = []
+        for cf in result_issues:
+          issues.append("Configuration file %s did not pass the validation. Reason: %s" % (cf, result_issues[cf]))
+        self.put_structured_out({"securityIssuesFound": ". ".join(issues)})
+        self.put_structured_out({"securityState": "UNSECURED"})
+    else:
+      self.put_structured_out({"securityState": "UNSECURED"})
+
+
+if __name__ == "__main__":
+  JournalNode().execute()

http://git-wip-us.apache.org/repos/asf/ambari/blob/66984d9a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/journalnode_upgrade.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/journalnode_upgrade.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/journalnode_upgrade.py
new file mode 100644
index 0000000..ab17b2a
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/journalnode_upgrade.py
@@ -0,0 +1,140 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+import time
+
+from resource_management.core.logger import Logger
+from resource_management.core.resources.system import Execute
+from resource_management.libraries.functions.default import default
+from resource_management.core.exceptions import Fail
+import utils
+from resource_management.libraries.functions.jmx import get_value_from_jmx
+import namenode_ha_state
+from namenode_ha_state import NAMENODE_STATE, NamenodeHAState
+from utils import get_dfsadmin_base_command
+
+
+def post_upgrade_check():
+  """
+  Ensure all journal nodes are up and quorum is established during Rolling Upgrade.
+  :return:
+  """
+  import params
+  Logger.info("Ensuring Journalnode quorum is established")
+
+  if params.security_enabled:
+    Execute(params.jn_kinit_cmd, user=params.hdfs_user)
+
+  time.sleep(5)
+  hdfs_roll_edits()
+  time.sleep(5)
+
+  all_journal_node_hosts = default("/clusterHostInfo/journalnode_hosts", [])
+
+  if len(all_journal_node_hosts) < 3:
+    raise Fail("Need at least 3 Journalnodes to maintain a quorum")
+
+  try:
+    namenode_ha = namenode_ha_state.NamenodeHAState()
+  except ValueError, err:
+    raise Fail("Could not retrieve Namenode HA addresses. Error: " + str(err))
+
+  Logger.info(str(namenode_ha))
+  nn_address = namenode_ha.get_address(NAMENODE_STATE.ACTIVE)
+
+  nn_data = utils.get_jmx_data(nn_address, 'org.apache.hadoop.hdfs.server.namenode.FSNamesystem', 'JournalTransactionInfo',
+                         namenode_ha.is_encrypted(), params.security_enabled)
+  if not nn_data:
+    raise Fail("Could not retrieve JournalTransactionInfo from JMX")
+
+  try:
+    last_txn_id = int(nn_data['LastAppliedOrWrittenTxId'])
+    success = ensure_jns_have_new_txn(all_journal_node_hosts, last_txn_id)
+
+    if not success:
+      raise Fail("Could not ensure that all Journal nodes have a new log transaction id")
+  except KeyError:
+    raise Fail("JournalTransactionInfo does not have key LastAppliedOrWrittenTxId from JMX info")
+
+
+def hdfs_roll_edits():
+  """
+  HDFS_CLIENT needs to be a dependency of JOURNALNODE
+  Roll the logs so that Namenode will be able to connect to the Journalnode.
+  Must kinit before calling this command.
+  """
+  import params
+
+  # TODO, this will be to be doc'ed since existing IOP 4.0 clusters will needs HDFS_CLIENT on all JOURNALNODE hosts
+  dfsadmin_base_command = get_dfsadmin_base_command('hdfs')
+  command = dfsadmin_base_command + ' -rollEdits'
+  Execute(command, user=params.hdfs_user, tries=1)
+
+
+def ensure_jns_have_new_txn(nodes, last_txn_id):
+  """
+  :param nodes: List of Journalnodes
+  :param last_txn_id: Integer of last transaction id
+  :return: Return true on success, false otherwise
+  """
+  import params
+
+  num_of_jns = len(nodes)
+  actual_txn_ids = {}
+  jns_updated = 0
+
+  if params.journalnode_address is None:
+    raise Fail("Could not retrieve Journal node address")
+
+  if params.journalnode_port is None:
+    raise Fail("Could not retrieve Journalnode port")
+
+  time_out_secs = 3 * 60
+  step_time_secs = 10
+  iterations = int(time_out_secs/step_time_secs)
+
+  protocol = "https" if params.https_only else "http"
+
+  Logger.info("Checking if all Journalnodes are updated.")
+  for i in range(iterations):
+    Logger.info('Try %d out of %d' % (i+1, iterations))
+    for node in nodes:
+      # if all JNS are updated break
+      if jns_updated == num_of_jns:
+        Logger.info("All journal nodes are updated")
+        return True
+
+      # JN already meets condition, skip it
+      if node in actual_txn_ids and actual_txn_ids[node] and actual_txn_ids[node] >= last_txn_id:
+        continue
+
+      url = '%s://%s:%s' % (protocol, node, params.journalnode_port)
+      data = utils.get_jmx_data(url, 'Journal-', 'LastWrittenTxId', params.https_only, params.security_enabled)
+      if data:
+        actual_txn_ids[node] = int(data)
+        if actual_txn_ids[node] >= last_txn_id:
+          Logger.info("Journalnode %s has a higher transaction id: %s" % (node, str(data)))
+          jns_updated += 1
+        else:
+          Logger.info("Journalnode %s is still on transaction id: %s" % (node, str(data)))
+
+    Logger.info("Sleeping for %d secs" % step_time_secs)
+    time.sleep(step_time_secs)
+
+  return jns_updated == num_of_jns
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/66984d9a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/namenode.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/namenode.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/namenode.py
new file mode 100644
index 0000000..eef2905
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/namenode.py
@@ -0,0 +1,334 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+import sys
+import os
+import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set.
+import  tempfile
+from resource_management import Script
+from resource_management.core.resources.system import Execute, File
+from resource_management.core import shell
+from resource_management.libraries.functions import conf_select
+from resource_management.libraries.functions import iop_select
+from resource_management.libraries.functions.security_commons import build_expectations, \
+  cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, \
+  FILE_TYPE_XML
+from resource_management.libraries.functions.version import compare_versions, \
+  format_hdp_stack_version
+from resource_management.libraries.functions.format import format
+from resource_management.libraries.functions.check_process_status import check_process_status
+from resource_management.core.exceptions import Fail
+from resource_management.core.shell import as_user
+from resource_management.core.logger import Logger
+
+import namenode_upgrade
+from hdfs_namenode import namenode, wait_for_safemode_off
+from hdfs import hdfs
+import hdfs_rebalance
+from utils import initiate_safe_zkfc_failover, get_hdfs_binary, get_dfsadmin_base_command
+
+
+
+# hashlib is supplied as of Python 2.5 as the replacement interface for md5
+# and other secure hashes.  In 2.6, md5 is deprecated.  Import hashlib if
+# available, avoiding a deprecation warning under 2.6.  Import md5 otherwise,
+# preserving 2.4 compatibility.
+try:
+  import hashlib
+  _md5 = hashlib.md5
+except ImportError:
+  import md5
+  _md5 = md5.new
+
+class NameNode(Script):
+
+  def get_stack_to_component(self):
+    return {"BigInsights": "hadoop-hdfs-namenode"}
+
+  def get_hdfs_binary(self):
+    """
+    Get the name or path to the hdfs binary depending on the stack and version.
+    """
+    import params
+    stack_to_comp = self.get_stack_to_component()
+    if params.stack_name in stack_to_comp:
+      return get_hdfs_binary(stack_to_comp[params.stack_name])
+    return "hdfs"
+  def install(self, env):
+    import params
+
+    self.install_packages(env, params.exclude_packages)
+    env.set_params(params)
+    #TODO we need this for HA because of manual steps
+    self.configure(env)
+
+  def prepare_rolling_upgrade(self, env):
+    hfds_binary = self.get_hdfs_binary()
+    namenode_upgrade.prepare_rolling_upgrade(hfds_binary)
+  
+  def wait_for_safemode_off(self, env):
+    wait_for_safemode_off(30, True)
+
+  def finalize_non_rolling_upgrade(self, env):
+    hfds_binary = self.get_hdfs_binary()
+    namenode_upgrade.finalize_upgrade("nonrolling", hfds_binary)
+
+  def finalize_rolling_upgrade(self, env):
+    hfds_binary = self.get_hdfs_binary()
+    namenode_upgrade.finalize_upgrade("rolling", hfds_binary)
+
+  def pre_upgrade_restart(self, env, upgrade_type=None):
+    Logger.info("Executing Stack Upgrade pre-restart")
+    import params
+    env.set_params(params)
+
+    if params.version and compare_versions(format_hdp_stack_version(params.version), '4.0.0.0') >= 0:
+      conf_select.select(params.stack_name, "hadoop", params.version)
+      iop_select.select("hadoop-hdfs-namenode", params.version)
+      #Execute(format("iop-select set hadoop-hdfs-namenode {version}"))
+
+  def start(self, env, upgrade_type=None):
+    import params
+
+    env.set_params(params)
+    self.configure(env)
+    hdfs_binary = self.get_hdfs_binary()
+    namenode(action="start", hdfs_binary=hdfs_binary, upgrade_type=upgrade_type, env=env)
+
+  def post_upgrade_restart(self, env, upgrade_type=None):
+    Logger.info("Executing Stack Upgrade post-restart")
+    import params
+    env.set_params(params)
+
+    hdfs_binary = self.get_hdfs_binary()
+    dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary)
+    dfsadmin_cmd = dfsadmin_base_command + " -report -live"
+    Execute(dfsadmin_cmd,
+            user=params.hdfs_user,
+            tries=60,
+            try_sleep=10
+    )
+
+  def stop(self, env, upgrade_type=None):
+    import params
+    env.set_params(params)
+    hdfs_binary = self.get_hdfs_binary()
+    if upgrade_type == "rolling" and params.dfs_ha_enabled:
+      if params.dfs_ha_automatic_failover_enabled:
+        initiate_safe_zkfc_failover()
+      else:
+        raise Fail("Rolling Upgrade - dfs.ha.automatic-failover.enabled must be enabled to perform a rolling restart")
+
+    namenode(action="stop", hdfs_binary=hdfs_binary, upgrade_type=upgrade_type, env=env)
+
+  def configure(self, env):
+    import params
+
+    env.set_params(params)
+    hdfs("namenode")
+    hdfs_binary = self.get_hdfs_binary()
+    namenode(action="configure", hdfs_binary=hdfs_binary, env=env)
+
+  def status(self, env):
+    import status_params
+
+    env.set_params(status_params)
+    check_process_status(status_params.namenode_pid_file)
+    pass
+
+  def security_status(self, env):
+    import status_params
+
+    env.set_params(status_params)
+    props_value_check = {"hadoop.security.authentication": "kerberos",
+                         "hadoop.security.authorization": "true"}
+    props_empty_check = ["hadoop.security.auth_to_local"]
+    props_read_check = None
+    core_site_expectations = build_expectations('core-site', props_value_check, props_empty_check,
+                                                props_read_check)
+    props_value_check = None
+    props_empty_check = ['dfs.namenode.kerberos.internal.spnego.principal',
+                         'dfs.namenode.keytab.file',
+                         'dfs.namenode.kerberos.principal']
+    props_read_check = ['dfs.namenode.keytab.file']
+    hdfs_site_expectations = build_expectations('hdfs-site', props_value_check, props_empty_check,
+                                                props_read_check)
+
+    hdfs_expectations = {}
+    hdfs_expectations.update(core_site_expectations)
+    hdfs_expectations.update(hdfs_site_expectations)
+
+    security_params = get_params_from_filesystem(status_params.hadoop_conf_dir,
+                                                 {'core-site.xml': FILE_TYPE_XML,
+                                                  'hdfs-site.xml': FILE_TYPE_XML})
+    if 'core-site' in security_params and 'hadoop.security.authentication' in security_params['core-site'] and \
+        security_params['core-site']['hadoop.security.authentication'].lower() == 'kerberos':
+      result_issues = validate_security_config_properties(security_params, hdfs_expectations)
+      if not result_issues:  # If all validations passed successfully
+        try:
+          # Double check the dict before calling execute
+          if ( 'hdfs-site' not in security_params
+               or 'dfs.namenode.keytab.file' not in security_params['hdfs-site']
+               or 'dfs.namenode.kerberos.principal' not in security_params['hdfs-site']):
+            self.put_structured_out({"securityState": "UNSECURED"})
+            self.put_structured_out(
+              {"securityIssuesFound": "Keytab file or principal are not set property."})
+            return
+          cached_kinit_executor(status_params.kinit_path_local,
+                                status_params.hdfs_user,
+                                security_params['hdfs-site']['dfs.namenode.keytab.file'],
+                                security_params['hdfs-site']['dfs.namenode.kerberos.principal'],
+                                status_params.hostname,
+                                status_params.tmp_dir)
+          self.put_structured_out({"securityState": "SECURED_KERBEROS"})
+        except Exception as e:
+          self.put_structured_out({"securityState": "ERROR"})
+          self.put_structured_out({"securityStateErrorInfo": str(e)})
+      else:
+        issues = []
+        for cf in result_issues:
+          issues.append("Configuration file %s did not pass the validation. Reason: %s" % (cf, result_issues[cf]))
+        self.put_structured_out({"securityIssuesFound": ". ".join(issues)})
+        self.put_structured_out({"securityState": "UNSECURED"})
+    else:
+      self.put_structured_out({"securityState": "UNSECURED"})
+
+
+  def decommission(self, env):
+    import params
+
+    env.set_params(params)
+    hdfs_binary = self.get_hdfs_binary()
+    namenode(action="decommission", hdfs_binary=hdfs_binary)
+
+
+  def rebalancehdfs(self, env):
+    import params
+    env.set_params(params)
+
+    name_node_parameters = json.loads( params.name_node_params )
+    threshold = name_node_parameters['threshold']
+    _print("Starting balancer with threshold = %s\n" % threshold)
+
+    rebalance_env = {'PATH': params.hadoop_bin_dir}
+
+    if params.security_enabled:
+      # Create the kerberos credentials cache (ccache) file and set it in the environment to use
+      # when executing HDFS rebalance command. Use the md5 hash of the combination of the principal and keytab file
+      # to generate a (relatively) unique cache filename so that we can use it as needed.
+      # TODO: params.tmp_dir=/var/lib/ambari-agent/data/tmp. However hdfs user doesn't have access to this path.
+      # TODO: Hence using /tmp
+      ccache_file_name = "hdfs_rebalance_cc_" + _md5(format("{hdfs_principal_name}|{hdfs_user_keytab}")).hexdigest()
+      ccache_file_path = os.path.join(tempfile.gettempdir(), ccache_file_name)
+      rebalance_env['KRB5CCNAME'] = ccache_file_path
+
+      # If there are no tickets in the cache or they are expired, perform a kinit, else use what
+      # is in the cache
+      klist_cmd = format("{klist_path_local} -s {ccache_file_path}")
+      kinit_cmd = format("{kinit_path_local} -c {ccache_file_path} -kt {hdfs_user_keytab} {hdfs_principal_name}")
+      if shell.call(klist_cmd, user=params.hdfs_user)[0] != 0:
+        Execute(kinit_cmd, user=params.hdfs_user)
+
+    def calculateCompletePercent(first, current):
+      return 1.0 - current.bytesLeftToMove/first.bytesLeftToMove
+
+
+    def startRebalancingProcess(threshold, rebalance_env):
+      rebalanceCommand = format('hdfs --config {hadoop_conf_dir} balancer -threshold {threshold}')
+      return as_user(rebalanceCommand, params.hdfs_user, env=rebalance_env)
+
+    command = startRebalancingProcess(threshold, rebalance_env)
+
+    basedir = os.path.join(env.config.basedir, 'scripts')
+    if(threshold == 'DEBUG'): #FIXME TODO remove this on PROD
+      basedir = os.path.join(env.config.basedir, 'scripts', 'balancer-emulator')
+      command = ['python','hdfs-command.py']
+
+    _print("Executing command %s\n" % command)
+
+    parser = hdfs_rebalance.HdfsParser()
+
+    def handle_new_line(line, is_stderr):
+      if is_stderr:
+        return
+
+      _print('[balancer] %s' % (line))
+      pl = parser.parseLine(line)
+      if pl:
+        res = pl.toJson()
+        res['completePercent'] = calculateCompletePercent(parser.initialLine, pl)
+
+        self.put_structured_out(res)
+      elif parser.state == 'PROCESS_FINISHED' :
+        _print('[balancer] %s' % ('Process is finished' ))
+        self.put_structured_out({'completePercent' : 1})
+        return
+
+    Execute(command,
+            on_new_line = handle_new_line,
+            logoutput = False,
+    )
+
+    if params.security_enabled and os.path.exists(ccache_file_path):
+      # Delete the kerberos credentials cache (ccache) file
+      os.remove(ccache_file_path)
+
+  def prepare_express_upgrade(self, env):
+    """
+    During an Express Upgrade.
+    If in HA, on the Active NameNode only, examine the directory dfs.namenode.name.dir and
+    make sure that there is no "/previous" directory.
+
+    Create a list of all the DataNodes in the cluster.
+    hdfs dfsadmin -report > dfs-old-report-1.log
+
+    hdfs dfsadmin -safemode enter
+    hdfs dfsadmin -saveNamespace
+
+    Copy the checkpoint files located in ${dfs.namenode.name.dir}/current into a backup directory.
+
+    Finalize any prior HDFS upgrade,
+    hdfs dfsadmin -finalizeUpgrade
+
+    Prepare for a NameNode rolling upgrade in order to not lose any data.
+    hdfs dfsadmin -rollingUpgrade prepare
+    """
+    import params
+    Logger.info("Preparing the NameNodes for a NonRolling (aka Express) Upgrade.")
+
+    if params.security_enabled:
+      kinit_command = format("{params.kinit_path_local} -kt {params.hdfs_user_keytab} {params.hdfs_principal_name}")
+      Execute(kinit_command, user=params.hdfs_user, logoutput=True)
+
+    hdfs_binary = self.get_hdfs_binary()
+    namenode_upgrade.prepare_upgrade_check_for_previous_dir()
+    namenode_upgrade.prepare_upgrade_enter_safe_mode(hdfs_binary)
+    namenode_upgrade.prepare_upgrade_save_namespace(hdfs_binary)
+    namenode_upgrade.prepare_upgrade_backup_namenode_dir()
+    namenode_upgrade.prepare_upgrade_finalize_previous_upgrades(hdfs_binary)
+
+    # Call -rollingUpgrade prepare
+    namenode_upgrade.prepare_rolling_upgrade(hdfs_binary)
+
+def _print(line):
+  sys.stdout.write(line)
+  sys.stdout.flush()
+
+if __name__ == "__main__":
+  NameNode().execute()

http://git-wip-us.apache.org/repos/asf/ambari/blob/66984d9a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/namenode_ha_state.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/namenode_ha_state.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/namenode_ha_state.py
new file mode 100644
index 0000000..bff72cb
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/namenode_ha_state.py
@@ -0,0 +1,216 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+from resource_management.core import shell
+from resource_management.core.logger import Logger
+from resource_management.libraries.functions.default import default
+from resource_management.libraries.functions.jmx import get_value_from_jmx
+
+
+class NAMENODE_STATE:
+  ACTIVE = "active"
+  STANDBY = "standby"
+  UNKNOWN = "unknown"
+
+
+class NamenodeHAState:
+  """
+  Represents the current state of the Namenode Hosts in High Availability Mode
+  """
+
+  def __init__(self):
+    """
+    Initializes all fields by querying the Namenode state.
+    Raises a ValueError if unable to construct the object.
+    """
+    import params
+
+    self.name_service = default("/configurations/hdfs-site/dfs.nameservices", None)
+    if not self.name_service:
+      raise ValueError("Could not retrieve property dfs.nameservices")
+
+    nn_unique_ids_key = "dfs.ha.namenodes." + str(self.name_service)
+    # List of the nn unique ids
+    self.nn_unique_ids = default("/configurations/hdfs-site/" + nn_unique_ids_key, None)
+    if not self.nn_unique_ids:
+      raise ValueError("Could not retrieve property " + nn_unique_ids_key)
+
+    self.nn_unique_ids = self.nn_unique_ids.split(",")
+    self.nn_unique_ids = [x.strip() for x in self.nn_unique_ids]
+
+    policy = default("/configurations/hdfs-site/dfs.http.policy", "HTTP_ONLY")
+    self.encrypted = policy.upper() == "HTTPS_ONLY"
+
+    jmx_uri_fragment = ("https" if self.encrypted else "http") + "://{0}/jmx?qry=Hadoop:service=NameNode,name=FSNamesystem"
+    namenode_http_fragment = "dfs.namenode.http-address.{0}.{1}"
+    namenode_https_fragment = "dfs.namenode.https-address.{0}.{1}"
+
+    # Dictionary where the key is the Namenode State (e.g., ACTIVE), and the value is a set of hostnames
+    self.namenode_state_to_hostnames = {}
+
+    # Dictionary from nn unique id name to a tuple of (http address, https address)
+    self.nn_unique_id_to_addresses = {}
+    for nn_unique_id in self.nn_unique_ids:
+      http_key = namenode_http_fragment.format(self.name_service, nn_unique_id)
+      https_key = namenode_https_fragment.format(self.name_service, nn_unique_id)
+
+      http_value = default("/configurations/hdfs-site/" + http_key, None)
+      https_value = default("/configurations/hdfs-site/" + https_key, None)
+      actual_value = https_value if self.encrypted else http_value
+      hostname = actual_value.split(":")[0].strip() if actual_value and ":" in actual_value else None
+
+      self.nn_unique_id_to_addresses[nn_unique_id] = (http_value, https_value)
+      try:
+        if not hostname:
+          raise Exception("Could not retrieve hostname from address " + actual_value)
+
+        jmx_uri = jmx_uri_fragment.format(actual_value)
+        state = get_value_from_jmx(jmx_uri, "tag.HAState", params.security_enabled, params.hdfs_user, params.is_https_enabled)
+
+        # If JMX parsing failed
+        if not state:
+          run_user = default("/configurations/hadoop-env/hdfs_user", "hdfs")
+          check_service_cmd = "hdfs haadmin -getServiceState {0}".format(nn_unique_id)
+          code, out = shell.call(check_service_cmd, logoutput=True, user=run_user)
+          if code == 0 and out:
+            if NAMENODE_STATE.STANDBY in out:
+              state = NAMENODE_STATE.STANDBY
+            elif NAMENODE_STATE.ACTIVE in out:
+              state = NAMENODE_STATE.ACTIVE
+
+        if not state:
+          raise Exception("Could not retrieve Namenode state from URL " + jmx_uri)
+
+        state = state.lower()
+
+        if state not in [NAMENODE_STATE.ACTIVE, NAMENODE_STATE.STANDBY]:
+          state = NAMENODE_STATE.UNKNOWN
+
+        if state in self.namenode_state_to_hostnames:
+          self.namenode_state_to_hostnames[state].add(hostname)
+        else:
+          hostnames = set([hostname, ])
+          self.namenode_state_to_hostnames[state] = hostnames
+      except:
+        Logger.error("Could not get namenode state for " + nn_unique_id)
+
+  def __str__(self):
+    return "Namenode HA State: {\n" + \
+           ("IDs: %s\n"       % ", ".join(self.nn_unique_ids)) + \
+           ("Addresses: %s\n" % str(self.nn_unique_id_to_addresses)) + \
+           ("States: %s\n"    % str(self.namenode_state_to_hostnames)) + \
+           ("Encrypted: %s\n" % str(self.encrypted)) + \
+           ("Healthy: %s\n"   % str(self.is_healthy())) + \
+           "}"
+
+  def is_encrypted(self):
+    """
+    :return: Returns a bool indicating if HTTPS is enabled
+    """
+    return self.encrypted
+
+  def get_nn_unique_ids(self):
+    """
+    :return Returns a list of the nn unique ids
+    """
+    return self.nn_unique_ids
+
+  def get_nn_unique_id_to_addresses(self):
+    """
+    :return Returns a dictionary where the key is the nn unique id, and the value is a tuple of (http address, https address)
+    Each address is of the form, hostname:port
+    """
+    return self.nn_unique_id_to_addresses
+
+  def get_address_for_nn_id(self, id):
+    """
+    :param id: Namenode ID
+    :return: Returns the appropriate address (HTTP if no encryption, HTTPS otherwise) for the given namenode id.
+    """
+    if id in self.nn_unique_id_to_addresses:
+      addresses = self.nn_unique_id_to_addresses[id]
+      if addresses and len(addresses) == 2:
+        return addresses[1] if self.encrypted else addresses[0]
+    return None
+
+  def get_address_for_host(self, hostname):
+    """
+    :param hostname: Host name
+    :return: Returns the appropriate address (HTTP if no encryption, HTTPS otherwise) for the given host.
+    """
+    for id, addresses in self.nn_unique_id_to_addresses.iteritems():
+      if addresses and len(addresses) == 2:
+        if ":" in addresses[0]:
+          nn_hostname = addresses[0].split(":")[0].strip()
+          if nn_hostname == hostname:
+            # Found the host
+            return addresses[1] if self.encrypted else addresses[0]
+    return None
+
+  def get_namenode_state_to_hostnames(self):
+    """
+    :return Return a dictionary where the key is a member of NAMENODE_STATE, and the value is a set of hostnames.
+    """
+    return self.namenode_state_to_hostnames
+
+  def get_address(self, namenode_state):
+    """
+    @param namenode_state: Member of NAMENODE_STATE
+    :return Get the address that corresponds to the first host with the given state
+    """
+    hosts = self.namenode_state_to_hostnames[namenode_state] if namenode_state in self.namenode_state_to_hostnames else []
+    if hosts and len(hosts) > 0:
+      hostname = list(hosts)[0]
+      return self.get_address_for_host(hostname)
+    return None
+
+  def is_active(self, host_name):
+    """
+    :param host_name: Host name
+    :return: Return True if this is the active NameNode, otherwise, False.
+    """
+    return self._is_in_state(host_name, NAMENODE_STATE.ACTIVE)
+
+  def is_standby(self, host_name):
+    """
+    :param host_name: Host name
+    :return: Return True if this is the standby NameNode, otherwise, False.
+    """
+    return self._is_in_state(host_name, NAMENODE_STATE.STANDBY)
+
+  def _is_in_state(self, host_name, state):
+    """
+    :param host_name: Host name
+    :param state: State to check
+    :return: Return True if this NameNode is in the specified state, otherwise, False.
+    """
+    mapping = self.get_namenode_state_to_hostnames()
+    if state in mapping:
+      hosts_in_state = mapping[state]
+      if hosts_in_state is not None and len(hosts_in_state) == 1 and next(iter(hosts_in_state)).lower() == host_name.lower():
+        return True
+    return False
+
+  def is_healthy(self):
+    """
+    :return: Returns a bool indicating if exactly one ACTIVE and one STANDBY host exist.
+    """
+    active_hosts = self.namenode_state_to_hostnames[NAMENODE_STATE.ACTIVE] if NAMENODE_STATE.ACTIVE in self.namenode_state_to_hostnames else []
+    standby_hosts = self.namenode_state_to_hostnames[NAMENODE_STATE.STANDBY] if NAMENODE_STATE.STANDBY in self.namenode_state_to_hostnames else []
+    return len(active_hosts) == 1 and len(standby_hosts) == 1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/66984d9a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/namenode_upgrade.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/namenode_upgrade.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/namenode_upgrade.py
new file mode 100644
index 0000000..44c2e37
--- /dev/null
+++ b/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/namenode_upgrade.py
@@ -0,0 +1,269 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+import re
+import os
+
+from resource_management.core.logger import Logger
+from resource_management.core.resources.system import Execute
+from resource_management.core import shell
+from resource_management.core.shell import as_user
+from resource_management.core.exceptions import Fail
+from resource_management.libraries.functions.format import format
+from resource_management.libraries.functions import get_unique_id_and_date
+from resource_management.libraries.functions import Direction, SafeMode
+from utils import get_dfsadmin_base_command
+from namenode_ha_state import NamenodeHAState
+
+safemode_to_instruction = {SafeMode.ON: "enter",
+                           SafeMode.OFF: "leave"}
+
+def prepare_upgrade_check_for_previous_dir():
+  """
+  During a NonRolling (aka Express Upgrade), preparing the NameNode requires backing up some data.
+  Check that there is no "previous" folder inside the NameNode Name Dir.
+  """
+  import params
+
+  if params.dfs_ha_enabled:
+    namenode_ha = NamenodeHAState()
+    if namenode_ha.is_active(params.hostname):
+      Logger.info("NameNode High Availability is enabled and this is the Active NameNode.")
+
+      problematic_previous_namenode_dirs = set()
+      nn_name_dirs = params.dfs_name_dir.split(',')
+      for nn_dir in nn_name_dirs:
+        if os.path.isdir(nn_dir):
+          # Check for a previous folder, which is not allowed.
+          previous_dir = os.path.join(nn_dir, "previous")
+          if os.path.isdir(previous_dir):
+            problematic_previous_namenode_dirs.add(previous_dir)
+
+      if len(problematic_previous_namenode_dirs) > 0:
+        message = 'WARNING. The following NameNode Name Dir(s) have a "previous" folder from an older version.\n' \
+                  'Please back it up first, and then delete it, OR Finalize (E.g., "hdfs dfsadmin -finalizeUpgrade").\n' \
+                  'NameNode Name Dir(s): {0}\n' \
+                  '***** Then, retry this step. *****'.format(", ".join(problematic_previous_namenode_dirs))
+        Logger.error(message)
+        raise Fail(message)
+
+def prepare_upgrade_enter_safe_mode(hdfs_binary):
+  """
+  During a NonRolling (aka Express Upgrade), preparing the NameNode requires first entering Safemode.
+  :param hdfs_binary: name/path of the HDFS binary to use
+  """
+  import params
+
+  dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary)
+  safe_mode_enter_cmd = dfsadmin_base_command + " -safemode enter"
+  try:
+    # Safe to call if already in Safe Mode
+    desired_state = SafeMode.ON
+    safemode_transition_successful, original_state = reach_safemode_state(params.hdfs_user, desired_state, params.dfs_ha_enabled, hdfs_binary)
+    Logger.info("Transition successful: {0}, original state: {1}".format(str(safemode_transition_successful), str(original_state)))
+    if not safemode_transition_successful:
+      raise Fail("Could not transition to safemode state %s. Please check logs to make sure namenode is up." % str(desired_state))
+  except Exception, e:
+    message = "Could not enter safemode. Error: {0}. As the HDFS user, call this command: {1}".format(str(e), safe_mode_enter_cmd)
+    Logger.error(message)
+    raise Fail(message)
+
+def prepare_upgrade_save_namespace(hdfs_binary):
+  """
+  During a NonRolling (aka Express Upgrade), preparing the NameNode requires saving the namespace.
+  :param hdfs_binary: name/path of the HDFS binary to use
+  """
+  import params
+
+  dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary)
+  save_namespace_cmd = dfsadmin_base_command + " -saveNamespace"
+  try:
+    Logger.info("Checkpoint the current namespace.")
+    as_user(save_namespace_cmd, params.hdfs_user, env={'PATH': params.hadoop_bin_dir})
+  except Exception, e:
+    message = format("Could not save the NameSpace. As the HDFS user, call this command: {save_namespace_cmd}")
+    Logger.error(message)
+    raise Fail(message)
+
+def prepare_upgrade_backup_namenode_dir():
+  """
+  During a NonRolling (aka Express Upgrade), preparing the NameNode requires backing up the NameNode Name Dirs.
+  """
+  import params
+
+  i = 0
+  failed_paths = []
+  nn_name_dirs = params.dfs_name_dir.split(',')
+  backup_destination_root_dir = "/tmp/upgrades/{0}".format(params.stack_version_unformatted)
+  if len(nn_name_dirs) > 0:
+    Logger.info("Backup the NameNode name directory's CURRENT folder.")
+  for nn_dir in nn_name_dirs:
+    i += 1
+    namenode_current_image = os.path.join(nn_dir, "current")
+    unique = get_unique_id_and_date() + "_" + str(i)
+    # Note that /tmp may not be writeable.
+    backup_current_folder = "{0}/namenode_{1}/".format(backup_destination_root_dir, unique)
+
+    if os.path.isdir(namenode_current_image) and not os.path.isdir(backup_current_folder):
+      try:
+        os.makedirs(backup_current_folder)
+        Execute(('cp', '-ar', namenode_current_image, backup_current_folder),
+                sudo=True
+        )
+      except Exception, e:
+        failed_paths.append(namenode_current_image)
+  if len(failed_paths) > 0:
+    Logger.error("Could not backup the NameNode Name Dir(s) to {0}, make sure that the destination path is "
+                 "writeable and copy the directories on your own. Directories: {1}".format(backup_destination_root_dir,
+                                                                                           ", ".join(failed_paths)))
+
+def prepare_upgrade_finalize_previous_upgrades(hdfs_binary):
+  """
+  During a NonRolling (aka Express Upgrade), preparing the NameNode requires Finalizing any upgrades that are in progress.
+  :param hdfs_binary: name/path of the HDFS binary to use
+  """
+  import params
+
+  dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary)
+  finalize_command = dfsadmin_base_command + " -rollingUpgrade finalize"
+  try:
+    Logger.info("Attempt to Finalize if there are any in-progress upgrades. "
+                "This will return 255 if no upgrades are in progress.")
+    code, out = shell.checked_call(finalize_command, logoutput=True, user=params.hdfs_user)
+    if out:
+      expected_substring = "there is no rolling upgrade in progress"
+      if expected_substring not in out.lower():
+        Logger.warning('Finalize command did not contain substring: %s' % expected_substring)
+    else:
+      Logger.warning("Finalize command did not return any output.")
+  except Exception, e:
+    Logger.warning("Ensure no upgrades are in progress.")
+
+def reach_safemode_state(user, safemode_state, in_ha, hdfs_binary):
+  """
+  Enter or leave safemode for the Namenode.
+  :param user: user to perform action as
+  :param safemode_state: Desired state of ON or OFF
+  :param in_ha: bool indicating if Namenode High Availability is enabled
+  :param hdfs_binary: name/path of the HDFS binary to use
+  :return: Returns a tuple of (transition success, original state). If no change is needed, the indicator of
+  success will be True
+  """
+  Logger.info("Prepare to transition into safemode state %s" % safemode_state)
+  import params
+  original_state = SafeMode.UNKNOWN
+
+  dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary)
+  safemode_base_command = dfsadmin_base_command + " -safemode "
+  safemode_check_cmd = safemode_base_command + " get"
+
+  grep_pattern = format("Safe mode is {safemode_state}")
+  safemode_check_with_grep = format("{safemode_check_cmd} | grep '{grep_pattern}'")
+
+  code, out = shell.call(safemode_check_cmd, user=user, logoutput=True)
+  Logger.info("Command: %s\nCode: %d." % (safemode_check_cmd, code))
+  if code == 0 and out is not None:
+    Logger.info(out)
+    re_pattern = r"Safe mode is (\S*)"
+    Logger.info("Pattern to search: {0}".format(re_pattern))
+    m = re.search(re_pattern, out, re.IGNORECASE)
+    if m and len(m.groups()) >= 1:
+      original_state = m.group(1).upper()
+
+      if original_state == safemode_state:
+        return (True, original_state)
+      else:
+        # Make a transition
+        command = safemode_base_command + safemode_to_instruction[safemode_state]
+        Execute(command,
+                user=user,
+                logoutput=True,
+                path=[params.hadoop_bin_dir])
+
+        code, out = shell.call(safemode_check_with_grep, user=user)
+        Logger.info("Command: %s\nCode: %d. Out: %s" % (safemode_check_with_grep, code, out))
+        if code == 0:
+          return (True, original_state)
+  return (False, original_state)
+
+
+def prepare_rolling_upgrade(hdfs_binary):
+  """
+  This can be called during either Rolling Upgrade or Express Upgrade (aka nonrolling)
+
+  Rolling Upgrade for HDFS Namenode requires the following.
+  0. Namenode must be up
+  1. If HA: leave safemode if the safemode status is not OFF
+  2. Execute a rolling upgrade "prepare"
+  3. Execute a rolling upgrade "query"
+  :param hdfs_binary: name/path of the HDFS binary to use
+  """
+  import params
+
+  if not params.upgrade_direction or params.upgrade_direction not in [Direction.UPGRADE, Direction.DOWNGRADE]:
+    raise Fail("Could not retrieve upgrade direction: %s" % str(params.upgrade_direction))
+  Logger.info(format("Performing a(n) {params.upgrade_direction} of HDFS"))
+
+  if params.security_enabled:
+    kinit_command = format("{params.kinit_path_local} -kt {params.hdfs_user_keytab} {params.hdfs_principal_name}") 
+    Execute(kinit_command, user=params.hdfs_user, logoutput=True)
+
+  if params.upgrade_direction == Direction.UPGRADE:
+    if params.dfs_ha_enabled:
+      Logger.info('High Availability is enabled, must leave safemode before calling "-rollingUpgrade prepare"')
+      desired_state = SafeMode.OFF
+      safemode_transition_successful, original_state = reach_safemode_state(params.hdfs_user, desired_state, True, hdfs_binary)
+      if not safemode_transition_successful:
+        raise Fail("Could not transition to safemode state %s. Please check logs to make sure namenode is up." % str(desired_state))
+
+    dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary)
+    prepare = dfsadmin_base_command + " -rollingUpgrade prepare"
+    query = dfsadmin_base_command + " -rollingUpgrade query"
+    Execute(prepare,
+            user=params.hdfs_user,
+            logoutput=True)
+    Execute(query,
+            user=params.hdfs_user,
+            logoutput=True)
+
+def finalize_upgrade(upgrade_type, hdfs_binary):
+  """
+  Finalize the Namenode upgrade, at which point it cannot be downgraded.
+  :param upgrade_type rolling or nonrolling
+  :param hdfs_binary: name/path of the HDFS binary to use
+  """
+  Logger.info("Executing Rolling Upgrade finalize")
+  import params
+
+  if params.security_enabled:
+    kinit_command = format("{params.kinit_path_local} -kt {params.hdfs_user_keytab} {params.hdfs_principal_name}") 
+    Execute(kinit_command, user=params.hdfs_user, logoutput=True)
+
+  dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary)
+  finalize_cmd = dfsadmin_base_command + " -rollingUpgrade finalize"
+  query_cmd = dfsadmin_base_command + " -rollingUpgrade query"
+
+  Execute(query_cmd,
+        user=params.hdfs_user,
+        logoutput=True)
+  Execute(finalize_cmd,
+          user=params.hdfs_user,
+          logoutput=True)
+  Execute(query_cmd,
+          user=params.hdfs_user,
+          logoutput=True)
\ No newline at end of file


Mime
View raw message