ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yus...@apache.org
Subject [45/51] [partial] AMBARI-7621. Import initial contribution for Ambari support on Windows to branch-windows-dev. (Jayush Luniya and Florian Barca via yusaku)
Date Tue, 07 Oct 2014 22:53:13 GMT
http://git-wip-us.apache.org/repos/asf/ambari/blob/7e28d1e3/ambari-agent/src/main/python/ambari_agent/HostInfo_linux.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HostInfo_linux.py b/ambari-agent/src/main/python/ambari_agent/HostInfo_linux.py
new file mode 100644
index 0000000..d2db2f6
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/HostInfo_linux.py
@@ -0,0 +1,477 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import os
+import glob
+import logging
+import pwd
+import re
+import time
+import subprocess
+import threading
+import shlex
+import platform
+from PackagesAnalyzer import PackagesAnalyzer
+from HostCheckReportFileHandler import HostCheckReportFileHandler
+from Hardware import Hardware
+from ambari_commons import OSCheck, OSConst
+import socket
+
+logger = logging.getLogger()
+
+# OS info
+OS_VERSION = OSCheck().get_os_major_version()
+OS_TYPE = OSCheck.get_os_type()
+OS_FAMILY = OSCheck.get_os_family()
+
+# service cmd
+SERVICE_CMD = "/sbin/service"
+
+# on ubuntu iptables service is called ufw
+if OS_FAMILY == OSConst.DEBIAN_FAMILY:
+  SERVICE_CMD = "/usr/sbin/service"
+
+
+class HostInfo:
+  # List of project names to be used to find alternatives folders etc.
+  DEFAULT_PROJECT_NAMES = [
+    "hadoop*", "hadoop", "hbase", "hcatalog", "hive", "ganglia", "nagios",
+    "oozie", "sqoop", "hue", "zookeeper", "mapred", "hdfs", "flume",
+    "storm", "hive-hcatalog", "tez", "falcon", "ambari_qa", "hadoop_deploy",
+    "rrdcached", "hcat", "ambari-qa", "sqoop-ambari-qa", "sqoop-ambari_qa",
+    "webhcat", "hadoop-hdfs", "hadoop-yarn", "hadoop-mapreduce"
+  ]
+
+  # List of live services checked for on the host, takes a map of plan strings
+  DEFAULT_LIVE_SERVICES = [
+    {OSConst.REDHAT_FAMILY: "ntpd", OSConst.SUSE_FAMILY: "ntp", OSConst.DEBIAN_FAMILY: "ntp"}
+  ]
+
+  # Set of default users (need to be replaced with the configured user names)
+  DEFAULT_USERS = [
+    "nagios", "hive", "ambari-qa", "oozie", "hbase", "hcat", "mapred",
+    "hdfs", "rrdcached", "zookeeper", "flume", "sqoop", "sqoop2",
+    "hue", "yarn"
+  ]
+
+  # Filters used to identify processed
+  PROC_FILTER = [
+    "hadoop", "zookeeper"
+  ]
+
+  # Additional path patterns to find existing directory
+  DIRNAME_PATTERNS = [
+    "/tmp/hadoop-", "/tmp/hsperfdata_"
+  ]
+
+  # Default set of directories that are checked for existence of files and folders
+  DEFAULT_DIRS = [
+    "/etc", "/var/run", "/var/log", "/usr/lib", "/var/lib", "/var/tmp", "/tmp", "/var", "/hadoop"
+  ]
+
+  # Packages that are used to find repos (then repos are used to find other packages)
+  PACKAGES = [
+    "hadoop", "zookeeper", "webhcat", "*-manager-server-db", "*-manager-daemons"
+  ]
+
+  # Additional packages to look for (search packages that start with these)
+  ADDITIONAL_PACKAGES = [
+    "rrdtool", "rrdtool-python", "nagios", "ganglia", "gmond", "gweb", "libconfuse", "ambari-log4j",
+    "hadoop", "zookeeper", "oozie", "webhcat"
+  ]
+
+  # ignore packages from repos whose names start with these strings
+  IGNORE_PACKAGES_FROM_REPOS = [
+    "ambari", "installed"
+  ]
+
+  # ignore required packages
+  IGNORE_PACKAGES = [
+    "epel-release"
+  ]
+
+  # ignore repos from the list of repos to be cleaned
+  IGNORE_REPOS = [
+    "ambari", "HDP-UTILS"
+  ]
+
+  # default timeout for async invoked processes
+  TIMEOUT_SECONDS = 60
+  RESULT_UNAVAILABLE = "unable_to_determine"
+
+  DEFAULT_SERVICE_NAME = "ntpd"
+  SERVICE_STATUS_CMD = "%s %s status" % (SERVICE_CMD, DEFAULT_SERVICE_NAME)
+
+  THP_FILE = "/sys/kernel/mm/redhat_transparent_hugepage/enabled"
+
+  event = threading.Event()
+
+  current_umask = -1
+
+  def __init__(self, config=None):
+    self.packages = PackagesAnalyzer()
+    self.reportFileHandler = HostCheckReportFileHandler(config)
+
+  def dirType(self, path):
+    if not os.path.exists(path):
+      return 'not_exist'
+    elif os.path.islink(path):
+      return 'sym_link'
+    elif os.path.isdir(path):
+      return 'directory'
+    elif os.path.isfile(path):
+      return 'file'
+    return 'unknown'
+
+  def hadoopVarRunCount(self):
+    if not os.path.exists('/var/run/hadoop'):
+      return 0
+    pids = glob.glob('/var/run/hadoop/*/*.pid')
+    return len(pids)
+
+  def hadoopVarLogCount(self):
+    if not os.path.exists('/var/log/hadoop'):
+      return 0
+    logs = glob.glob('/var/log/hadoop/*/*.log')
+    return len(logs)
+
+  def etcAlternativesConf(self, projects, etcResults):
+    if not os.path.exists('/etc/alternatives'):
+      return []
+    projectRegex = "'" + '|'.join(projects) + "'"
+    files = [f for f in os.listdir('/etc/alternatives') if re.match(projectRegex, f)]
+    for conf in files:
+      result = {}
+      filePath = os.path.join('/etc/alternatives', conf)
+      if os.path.islink(filePath):
+        realConf = os.path.realpath(filePath)
+        result['name'] = conf
+        result['target'] = realConf
+        etcResults.append(result)
+
+  def checkLiveServices(self, services, result):
+    osType = OSCheck.get_os_family()
+    for service in services:
+      svcCheckResult = {}
+      if isinstance(service, dict):
+        serviceName = service[osType]
+      else:
+        serviceName = service
+
+      service_check_live = shlex.split(self.SERVICE_STATUS_CMD)
+      service_check_live[1] = serviceName
+
+      svcCheckResult['name'] = serviceName
+      svcCheckResult['status'] = "UNKNOWN"
+      svcCheckResult['desc'] = ""
+      try:
+        osStat = subprocess.Popen(service_check_live, stdout=subprocess.PIPE,
+                                  stderr=subprocess.PIPE)
+        out, err = osStat.communicate()
+        if 0 != osStat.returncode:
+          svcCheckResult['status'] = "Unhealthy"
+          svcCheckResult['desc'] = out
+          if len(out) == 0:
+            svcCheckResult['desc'] = err
+        else:
+          svcCheckResult['status'] = "Healthy"
+      except Exception, e:
+        svcCheckResult['status'] = "Unhealthy"
+        svcCheckResult['desc'] = repr(e)
+      result.append(svcCheckResult)
+
+  def checkUsers(self, users, results):
+    f = open('/etc/passwd', 'r')
+    for userLine in f:
+      fields = userLine.split(":")
+      if fields[0] in users:
+        result = {}
+        homeDir = fields[5]
+        result['name'] = fields[0]
+        result['homeDir'] = fields[5]
+        result['status'] = "Available"
+        if not os.path.exists(homeDir):
+          result['status'] = "Invalid home directory"
+        results.append(result)
+
+  def osdiskAvailableSpace(self, path):
+    diskInfo = {}
+    try:
+      df = subprocess.Popen(["df", "-kPT", path], stdout=subprocess.PIPE)
+      dfdata = df.communicate()[0]
+      return Hardware.extractMountInfo(dfdata.splitlines()[-1])
+    except:
+      pass
+    return diskInfo
+
+  def checkFolders(self, basePaths, projectNames, existingUsers, dirs):
+    foldersToIgnore = []
+    for user in existingUsers:
+      foldersToIgnore.append(user['homeDir'])
+    try:
+      for dirName in basePaths:
+        for project in projectNames:
+          path = os.path.join(dirName.strip(), project.strip())
+          if not path in foldersToIgnore and os.path.exists(path):
+            obj = {}
+            obj['type'] = self.dirType(path)
+            obj['name'] = path
+            dirs.append(obj)
+    except:
+      pass
+
+  def javaProcs(self, list):
+    try:
+      pids = [pid for pid in os.listdir('/proc') if pid.isdigit()]
+      for pid in pids:
+        cmd = open(os.path.join('/proc', pid, 'cmdline'), 'rb').read()
+        cmd = cmd.replace('\0', ' ')
+        if not 'AmbariServer' in cmd:
+          if 'java' in cmd:
+            dict = {}
+            dict['pid'] = int(pid)
+            dict['hadoop'] = False
+            for filter in self.PROC_FILTER:
+              if filter in cmd:
+                dict['hadoop'] = True
+            dict['command'] = cmd.strip()
+            for line in open(os.path.join('/proc', pid, 'status')):
+              if line.startswith('Uid:'):
+                uid = int(line.split()[1])
+                dict['user'] = pwd.getpwuid(uid).pw_name
+            list.append(dict)
+    except:
+      pass
+    pass
+
+  def getReposToRemove(self, repos, ignoreList):
+    reposToRemove = []
+    for repo in repos:
+      addToRemoveList = True
+      for ignoreRepo in ignoreList:
+        if self.packages.nameMatch(ignoreRepo, repo):
+          addToRemoveList = False
+          continue
+      if addToRemoveList:
+        reposToRemove.append(repo)
+    return reposToRemove
+
+  def getUMask(self):
+    if (self.current_umask == -1):
+     self.current_umask = os.umask(self.current_umask)
+     os.umask(self.current_umask)
+     return self.current_umask
+    else:
+     return self.current_umask
+
+  def getTransparentHugePage(self):
+    # This file exist only on redhat 6
+    thp_regex = "\[(.+)\]"
+    if os.path.isfile(self.THP_FILE):
+      with open(self.THP_FILE) as f:
+        file_content = f.read()
+        return re.search(thp_regex, file_content).groups()[0]
+    else:
+      return ""
+
+  def getFirewallObject(self):
+    if OS_TYPE == OSConst.OS_UBUNTU:
+      return UbuntuFirewallChecks()
+    elif OS_TYPE == OSConst.OS_FEDORA and int(OS_VERSION) >= 18:
+      return Fedora18FirewallChecks()
+    elif OS_FAMILY == OSConst.SUSE_FAMILY:
+      return SuseFirewallChecks()
+    else:
+      return FirewallChecks()
+
+  def getFirewallObjectTypes(self):
+    # To support test code, so tests can loop through the types
+    return (FirewallChecks,
+            UbuntuFirewallChecks,
+            Fedora18FirewallChecks,
+            SuseFirewallChecks)
+
+  def checkIptables(self):
+    return self.getFirewallObject().check_iptables()
+
+  """ Return various details about the host
+  componentsMapped: indicates if any components are mapped to this host
+  commandsInProgress: indicates if any commands are in progress
+  """
+  def register(self, dict, componentsMapped=True, commandsInProgress=True):
+    dict['hostHealth'] = {}
+
+    java = []
+    self.javaProcs(java)
+    dict['hostHealth']['activeJavaProcs'] = java
+
+    liveSvcs = []
+    self.checkLiveServices(self.DEFAULT_LIVE_SERVICES, liveSvcs)
+    dict['hostHealth']['liveServices'] = liveSvcs
+
+    dict['umask'] = str(self.getUMask())
+
+    dict['transparentHugePage'] = self.getTransparentHugePage()
+    dict['iptablesIsRunning'] = self.checkIptables()
+    dict['reverseLookup'] = self.checkReverseLookup()
+    # If commands are in progress or components are already mapped to this host
+    # Then do not perform certain expensive host checks
+    if componentsMapped or commandsInProgress:
+      dict['existingRepos'] = [self.RESULT_UNAVAILABLE]
+      dict['installedPackages'] = []
+      dict['alternatives'] = []
+      dict['stackFoldersAndFiles'] = []
+      dict['existingUsers'] = []
+
+    else:
+      etcs = []
+      self.etcAlternativesConf(self.DEFAULT_PROJECT_NAMES, etcs)
+      dict['alternatives'] = etcs
+
+      existingUsers = []
+      self.checkUsers(self.DEFAULT_USERS, existingUsers)
+      dict['existingUsers'] = existingUsers
+
+      dirs = []
+      self.checkFolders(self.DEFAULT_DIRS, self.DEFAULT_PROJECT_NAMES, existingUsers, dirs)
+      dict['stackFoldersAndFiles'] = dirs
+
+      installedPackages = []
+      availablePackages = []
+      self.packages.allInstalledPackages(installedPackages)
+      self.packages.allAvailablePackages(availablePackages)
+
+      repos = []
+      self.packages.getInstalledRepos(self.PACKAGES, installedPackages + availablePackages,
+                                      self.IGNORE_PACKAGES_FROM_REPOS, repos)
+      packagesInstalled = self.packages.getInstalledPkgsByRepo(repos, self.IGNORE_PACKAGES, installedPackages)
+      additionalPkgsInstalled = self.packages.getInstalledPkgsByNames(
+        self.ADDITIONAL_PACKAGES, installedPackages)
+      allPackages = list(set(packagesInstalled + additionalPkgsInstalled))
+      dict['installedPackages'] = self.packages.getPackageDetails(installedPackages, allPackages)
+
+      repos = self.getReposToRemove(repos, self.IGNORE_REPOS)
+      dict['existingRepos'] = repos
+
+      self.reportFileHandler.writeHostCheckFile(dict)
+      pass
+
+    # The time stamp must be recorded at the end
+    dict['hostHealth']['agentTimeStampAtReporting'] = int(time.time() * 1000)
+
+    pass
+
+  def checkReverseLookup(self):
+    """
+    Check if host fqdn resolves to current host ip
+    """
+    try:
+      host_name = socket.gethostname().lower()
+      host_ip = socket.gethostbyname(host_name)
+      host_fqdn = socket.getfqdn().lower()
+      fqdn_ip = socket.gethostbyname(host_fqdn)
+      return host_ip == fqdn_ip
+    except socket.error:
+      pass
+    return False
+
+
+class FirewallChecks(object):
+  def __init__(self):
+    self.FIREWALL_SERVICE_NAME = "iptables"
+    self.SERVICE_CMD = SERVICE_CMD
+    self.SERVICE_SUBCMD = "status"
+
+  def get_command(self):
+    return "%s %s %s" % (self.SERVICE_CMD, self.FIREWALL_SERVICE_NAME, self.SERVICE_SUBCMD)
+
+  def check_result(self, retcode, out, err):
+      return retcode == 0
+
+  def check_iptables(self):
+    retcode, out, err = self.run_os_command(self.get_command())
+    return self.check_result(retcode, out, err)
+
+  def get_running_result(self):
+    # To support test code.  Expected ouput from run_os_command.
+    return (0, "", "")
+
+  def get_stopped_result(self):
+    # To support test code.  Expected output from run_os_command.
+    return (3, "", "")
+
+  def run_os_command(self, cmd):
+    if type(cmd) == str:
+      cmd = shlex.split(cmd)
+
+    try:
+      process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE,
+        stderr=subprocess.PIPE)
+      (stdoutdata, stderrdata) = process.communicate()
+      return process.returncode, stdoutdata, stderrdata
+    except OSError:
+      return self.get_stopped_result()
+
+
+class UbuntuFirewallChecks(FirewallChecks):
+  def __init__(self):
+    super(UbuntuFirewallChecks, self).__init__()
+
+    self.FIREWALL_SERVICE_NAME = "ufw"
+    self.SERVICE_CMD = 'service'
+
+  def check_result(self, retcode, out, err):
+    # On ubuntu, the status command returns 0 whether running or not
+    return out and len(out) > 0 and out.strip() != "ufw stop/waiting"
+
+  def get_running_result(self):
+    # To support test code.  Expected ouput from run_os_command.
+    return (0, "ufw start/running", "")
+
+  def get_stopped_result(self):
+    # To support test code.  Expected output from run_os_command.
+    return (0, "ufw stop/waiting", "")
+
+
+class Fedora18FirewallChecks(FirewallChecks):
+  def __init__(self):
+    self.FIREWALL_SERVICE_NAME = "firewalld.service"
+
+  def get_command(self):
+    return "systemctl is-active firewalld.service"
+
+
+class SuseFirewallChecks(FirewallChecks):
+  def __init__(self):
+    self.FIREWALL_SERVICE_NAME = "SuSEfirewall2"
+
+  def get_command(self):
+    return "/sbin/SuSEfirewall2 status"
+
+
+def main(argv=None):
+  h = HostInfo()
+  struct = {}
+  h.register(struct)
+  print struct
+
+
+if __name__ == '__main__':
+  main()

http://git-wip-us.apache.org/repos/asf/ambari/blob/7e28d1e3/ambari-agent/src/main/python/ambari_agent/HostInfo_win.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HostInfo_win.py b/ambari-agent/src/main/python/ambari_agent/HostInfo_win.py
new file mode 100644
index 0000000..2140426
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/HostInfo_win.py
@@ -0,0 +1,227 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import os
+import logging
+import time
+import subprocess
+from HostCheckReportFileHandler import HostCheckReportFileHandler
+from shell import shellRunner
+from ambari_commons.os_check import OSCheck, OSConst
+from ambari_commons.os_windows import run_powershell_script, CHECK_FIREWALL_SCRIPT
+import socket
+
+logger = logging.getLogger()
+
+# OS info
+OS_VERSION = OSCheck().get_os_major_version()
+OS_TYPE = OSCheck.get_os_type()
+OS_FAMILY = OSCheck.get_os_family()
+
+class HostInfo:
+  # List of live services checked for on the host, takes a map of plan strings
+  DEFAULT_LIVE_SERVICES = [
+    {OSConst.WINSRV_FAMILY: "W32Time"}
+  ]
+
+  # Set of default users (need to be replaced with the configured user names)
+  DEFAULT_USERS = [
+    "nagios", "hive", "ambari-qa", "oozie", "hbase", "hcat", "mapred",
+    "hdfs", "rrdcached", "zookeeper", "flume", "sqoop", "sqoop2",
+    "hue", "yarn"
+  ]
+
+  # Filters used to identify processed
+  PROC_FILTER = [
+    "hadoop", "zookeeper"
+  ]
+
+  RESULT_UNAVAILABLE = "unable_to_determine"
+
+  SERVICE_STATUS_CMD = 'If ((Get-Service | Where-Object {{$_.Name -eq \'{0}\'}}).Status -eq \'Running\') {{echo "Running"; $host.SetShouldExit(0)}} Else {{echo "Stopped"; $host.SetShouldExit(1)}}'
+  GET_USERS_CMD = '$accounts=(Get-WmiObject -Class Win32_UserAccount -Namespace "root\cimv2" -Filter "LocalAccount=\'$True\'" -ComputerName "LocalHost" -ErrorAction Stop); foreach ($acc in $accounts) {echo $acc.Name}'
+  GET_JAVA_PROC_CMD = 'foreach ($process in (gwmi Win32_Process -Filter "name = \'java.exe\'")){echo $process.ProcessId;echo $process.CommandLine; echo $process.GetOwner().User}'
+
+  current_umask = -1
+
+  def __init__(self, config=None):
+    self.reportFileHandler = HostCheckReportFileHandler(config)
+
+  def dirType(self, path):
+    if not os.path.exists(path):
+      return 'not_exist'
+    elif os.path.islink(path):
+      return 'sym_link'
+    elif os.path.isdir(path):
+      return 'directory'
+    elif os.path.isfile(path):
+      return 'file'
+    return 'unknown'
+
+  def checkLiveServices(self, services, result):
+    osType = OSCheck.get_os_family()
+    for service in services:
+      svcCheckResult = {}
+      if isinstance(service, dict):
+        serviceName = service[osType]
+      else:
+        serviceName = service
+
+      service_check_live = ["powershell",'-noProfile', '-NonInteractive',  '-nologo', "-Command", self.SERVICE_STATUS_CMD.format(serviceName)]
+      svcCheckResult['name'] = serviceName
+      svcCheckResult['status'] = "UNKNOWN"
+      svcCheckResult['desc'] = ""
+      try:
+        osStat = subprocess.Popen(service_check_live, stdout=subprocess.PIPE,
+                                  stderr=subprocess.PIPE)
+        out, err = osStat.communicate()
+        if 0 != osStat.returncode:
+          svcCheckResult['status'] = "Unhealthy"
+          svcCheckResult['desc'] = out
+          if len(out) == 0:
+            svcCheckResult['desc'] = err
+        else:
+          svcCheckResult['status'] = "Healthy"
+      except Exception, e:
+        svcCheckResult['status'] = "Unhealthy"
+        svcCheckResult['desc'] = repr(e)
+      result.append(svcCheckResult)
+
+  #TODO get user directory
+  def checkUsers(self, users, results):
+    get_users_cmd = ["powershell",'-noProfile', '-NonInteractive',  '-nologo', "-Command", self.GET_USERS_CMD]
+    try:
+      osStat = subprocess.Popen(get_users_cmd, stdout=subprocess.PIPE,                               stderr=subprocess.PIPE)
+      out, err = osStat.communicate()
+    except:
+      raise Exception("Failed to get users.")
+    for user in out.split(os.linesep):
+      if user in users:
+        result = {}
+        result['name'] = user
+        result['status'] = "Available"
+        results.append(result)
+
+  def javaProcs(self, list):
+    try:
+      runner = shellRunner()
+      command_result = runner.run(["powershell",'-noProfile', '-NonInteractive',  '-nologo', "-Command", self.GET_JAVA_PROC_CMD])
+      if command_result["exitCode"] == 0:
+        splitted_output = command_result["output"].split(os.linesep)
+        for i in [index for index in range(0,len(splitted_output)) if (index % 3)==0]:
+          pid = splitted_output[i]
+          cmd = splitted_output[i+1]
+          user = splitted_output[i+2]
+          if not 'AmbariServer' in cmd:
+            if 'java' in cmd:
+              dict = {}
+              dict['pid'] = int(pid)
+              dict['hadoop'] = False
+              for filter in self.PROC_FILTER:
+                if filter in cmd:
+                  dict['hadoop'] = True
+              dict['command'] = cmd.strip()
+              dict['user'] = user
+              list.append(dict)
+    except Exception as e:
+      pass
+    pass
+
+  def getUMask(self):
+    if (self.current_umask == -1):
+      self.current_umask = os.umask(self.current_umask)
+      os.umask(self.current_umask)
+      return self.current_umask
+    else:
+      return self.current_umask
+
+  def checkIptables(self):
+    out = run_powershell_script(CHECK_FIREWALL_SCRIPT)
+    if out[0] != 0:
+      logger.warn("Unable to check firewall status:{0}".format(out[2]))
+      return False
+    profiles_status = [i for i in out[1].split("\n") if not i == ""]
+    if "1" in profiles_status:
+      return True
+    return False
+
+  """ Return various details about the host
+  componentsMapped: indicates if any components are mapped to this host
+  commandsInProgress: indicates if any commands are in progress
+  """
+  def register(self, dict, componentsMapped=True, commandsInProgress=True):
+    dict['hostHealth'] = {}
+
+    java = []
+    self.javaProcs(java)
+    dict['hostHealth']['activeJavaProcs'] = java
+
+    liveSvcs = []
+    self.checkLiveServices(self.DEFAULT_LIVE_SERVICES, liveSvcs)
+    dict['hostHealth']['liveServices'] = liveSvcs
+
+    dict['umask'] = str(self.getUMask())
+
+    dict['iptablesIsRunning'] = self.checkIptables()
+    dict['reverseLookup'] = self.checkReverseLookup()
+    # If commands are in progress or components are already mapped to this host
+    # Then do not perform certain expensive host checks
+    if componentsMapped or commandsInProgress:
+      dict['existingRepos'] = [self.RESULT_UNAVAILABLE]
+      dict['installedPackages'] = []
+      dict['alternatives'] = []
+      dict['stackFoldersAndFiles'] = []
+      dict['existingUsers'] = []
+    else:
+      existingUsers = []
+      self.checkUsers(self.DEFAULT_USERS, existingUsers)
+      dict['existingUsers'] = existingUsers
+      #TODO check HDP stack and folders here
+      self.reportFileHandler.writeHostCheckFile(dict)
+      pass
+
+    # The time stamp must be recorded at the end
+    dict['hostHealth']['agentTimeStampAtReporting'] = int(time.time() * 1000)
+
+    pass
+
+  def checkReverseLookup(self):
+    """
+    Check if host fqdn resolves to current host ip
+    """
+    try:
+      host_name = socket.gethostname().lower()
+      host_ip = socket.gethostbyname(host_name)
+      host_fqdn = socket.getfqdn().lower()
+      fqdn_ip = socket.gethostbyname(host_fqdn)
+      return host_ip == fqdn_ip
+    except socket.error:
+      pass
+    return False
+
+def main(argv=None):
+  h = HostInfo()
+  struct = {}
+  h.register(struct)
+  print struct
+
+
+if __name__ == '__main__':
+  main()

http://git-wip-us.apache.org/repos/asf/ambari/blob/7e28d1e3/ambari-agent/src/main/python/ambari_agent/NetUtil.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/NetUtil.py b/ambari-agent/src/main/python/ambari_agent/NetUtil.py
index 3ce981a..ecccee2 100644
--- a/ambari-agent/src/main/python/ambari_agent/NetUtil.py
+++ b/ambari-agent/src/main/python/ambari_agent/NetUtil.py
@@ -15,18 +15,13 @@
 
 
 from urlparse import urlparse
-import time
 import logging
 import httplib
 from ssl import SSLError
-
-ERROR_SSL_WRONG_VERSION = "SSLError: Failed to connect. Please check openssl library versions. \n" +\
-              "Refer to: https://bugzilla.redhat.com/show_bug.cgi?id=1022468 for more details."
-LOG_REQUEST_MESSAGE = "GET %s -> %s, body: %s"
+import platform
 
 logger = logging.getLogger()
 
-
 class NetUtil:
 
   CONNECT_SERVER_RETRY_INTERVAL_SEC = 10
@@ -35,61 +30,84 @@ class NetUtil:
 
   # Url within server to request during status check. This url
   # should return HTTP code 200
-  SERVER_STATUS_REQUEST = "{0}/ca"
+  SERVER_STATUS_REQUEST = "{0}/cert/ca"
+
   # For testing purposes
   DEBUG_STOP_RETRIES_FLAG = False
 
+  # Stop implementation
+  # Typically, it waits for a certain time for the daemon/service to receive the stop signal.
+  # Received the number of seconds to wait as an argument
+  # Returns true if the application is stopping, false if continuing execution
+  stopCallback = None
+
+  def __init__(self, stop_callback=None):
+    if stop_callback is None:
+      IS_WINDOWS = platform.system() == "Windows"
+      if IS_WINDOWS:
+        from HeartbeatHandlers_windows import HeartbeatStopHandler
+      else:
+        from HeartbeatStopHandler_linux import HeartbeatStopHandler
+      stop_callback = HeartbeatStopHandler
+
+    self.stopCallback = stop_callback
+
   def checkURL(self, url):
     """Try to connect to a given url. Result is True if url returns HTTP code 200, in any other case
-    (like unreachable server or wrong HTTP code) result will be False.
-
-       Additionally returns body of request, if available
+    (like unreachable server or wrong HTTP code) result will be False
     """
-    logger.info("Connecting to " + url)
-    responseBody = ""
-
+    logger.info("Connecting to " + url);
+    
     try:
       parsedurl = urlparse(url)
       ca_connection = httplib.HTTPSConnection(parsedurl[1])
-      ca_connection.request("GET", parsedurl[2])
-      response = ca_connection.getresponse()
-      status = response.status
-
+      ca_connection.request("HEAD", parsedurl[2])
+      response = ca_connection.getresponse()  
+      status = response.status    
+      
+      requestLogMessage = "HEAD %s -> %s"
+      
       if status == 200:
-        responseBody = response.read()
-        logger.debug(LOG_REQUEST_MESSAGE, url, str(status), responseBody)
-        return True, responseBody
-      else:
-        logger.warning(LOG_REQUEST_MESSAGE, url, str(status), responseBody)
-        return False, responseBody
+        logger.debug(requestLogMessage, url, str(status) ) 
+        return True
+      else: 
+        logger.warning(requestLogMessage, url, str(status) )
+        return False
     except SSLError as slerror:
       logger.error(str(slerror))
-      logger.error(ERROR_SSL_WRONG_VERSION)
-      return False, responseBody
-
+      logger.error("SSLError: Failed to connect. Please check openssl library versions. \n" +
+                   "Refer to: https://bugzilla.redhat.com/show_bug.cgi?id=1022468 for more details.")
+      return False
+    
     except Exception, e:
       logger.warning("Failed to connect to " + str(url) + " due to " + str(e) + "  ")
-      return False, responseBody
+      return False
 
-  def try_to_connect(self, server_url, max_retries, logger=None):
+  def try_to_connect(self, server_url, max_retries, logger = None):
     """Try to connect to a given url, sleeping for CONNECT_SERVER_RETRY_INTERVAL_SEC seconds
     between retries. No more than max_retries is performed. If max_retries is -1, connection
     attempts will be repeated forever until server is not reachable
-
     Returns count of retries
     """
+    connected = False
     if logger is not None:
       logger.debug("Trying to connect to %s", server_url)
-
+      
     retries = 0
     while (max_retries == -1 or retries < max_retries) and not self.DEBUG_STOP_RETRIES_FLAG:
-      server_is_up, responseBody = self.checkURL(self.SERVER_STATUS_REQUEST.format(server_url))
+      server_is_up = self.checkURL(self.SERVER_STATUS_REQUEST.format(server_url))
       if server_is_up:
+        connected = True
         break
       else:
         if logger is not None:
           logger.warn('Server at {0} is not reachable, sleeping for {1} seconds...'.format(server_url,
             self.CONNECT_SERVER_RETRY_INTERVAL_SEC))
         retries += 1
-        time.sleep(self.CONNECT_SERVER_RETRY_INTERVAL_SEC)
-    return retries
+
+      if 0 == self.stopCallback.wait(self.CONNECT_SERVER_RETRY_INTERVAL_SEC):
+        #stop waiting
+        logger.info("Stop event received")
+        self.DEBUG_STOP_RETRIES_FLAG = True
+    return retries, connected
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/7e28d1e3/ambari-agent/src/main/python/ambari_agent/PackagesAnalyzer.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/PackagesAnalyzer.py b/ambari-agent/src/main/python/ambari_agent/PackagesAnalyzer.py
index bbd2877..94d8744 100644
--- a/ambari-agent/src/main/python/ambari_agent/PackagesAnalyzer.py
+++ b/ambari-agent/src/main/python/ambari_agent/PackagesAnalyzer.py
@@ -31,7 +31,7 @@ logger = logging.getLogger()
 class PackagesAnalyzer:
 
   # default timeout for async invoked processes
-  TIMEOUT_SECONDS = 20
+  TIMEOUT_SECONDS = 10
   event = threading.Event()
 
   def launch_subprocess(self, command):

http://git-wip-us.apache.org/repos/asf/ambari/blob/7e28d1e3/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
index 874b70b..4b11e9d 100644
--- a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
@@ -23,10 +23,8 @@ import os
 import subprocess
 import pprint
 import threading
+import platform
 from threading import Thread
-import time
-from BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle 
-
 from Grep import Grep
 import shell, sys
 
@@ -39,6 +37,7 @@ class PythonExecutor:
   Warning: class maintains internal state. As a result, instances should not be
   used as a singleton for a concurrent execution of python scripts
   """
+
   NO_ERROR = "none"
   grep = Grep()
   event = threading.Event()
@@ -49,19 +48,8 @@ class PythonExecutor:
     self.config = config
     pass
 
-
-  def open_subporcess_files(self, tmpoutfile, tmperrfile, override_output_files):
-    if override_output_files: # Recreate files
-      tmpout =  open(tmpoutfile, 'w')
-      tmperr =  open(tmperrfile, 'w')
-    else: # Append to files
-      tmpout =  open(tmpoutfile, 'a')
-      tmperr =  open(tmperrfile, 'a')
-    return tmpout, tmperr
-    
-  def run_file(self, script, script_params, tmp_dir, tmpoutfile, tmperrfile,
-               timeout, tmpstructedoutfile, logger_level, callback, task_id,
-               override_output_files = True, handle = None):
+  def run_file(self, script, script_params, tmpoutfile, tmperrfile, timeout,
+               tmpstructedoutfile, logger_level, override_output_files = True):
     """
     Executes the specified python file in a separate subprocess.
     Method returns only when the subprocess is finished.
@@ -71,6 +59,13 @@ class PythonExecutor:
     override_output_files option defines whether stdout/stderr files will be
     recreated or appended
     """
+    if override_output_files: # Recreate files
+      tmpout =  open(tmpoutfile, 'w')
+      tmperr =  open(tmperrfile, 'w')
+    else: # Append to files
+      tmpout =  open(tmpoutfile, 'a')
+      tmperr =  open(tmperrfile, 'a')
+
     # need to remove this file for the following case:
     # status call 1 does not write to file; call 2 writes to file;
     # call 3 does not write to file, so contents are still call 2's result
@@ -79,75 +74,62 @@ class PythonExecutor:
     except OSError:
       pass # no error
 
-    script_params += [tmpstructedoutfile, logger_level, tmp_dir]
+    script_params += [tmpstructedoutfile, logger_level]
     pythonCommand = self.python_command(script, script_params)
     logger.info("Running command " + pprint.pformat(pythonCommand))
-    if(handle == None) :
-      tmpout, tmperr = self.open_subporcess_files(tmpoutfile, tmperrfile, override_output_files)
-      
-      process = self.launch_python_subprocess(pythonCommand, tmpout, tmperr)
-      # map task_id to pid
-      callback(task_id, process.pid)
-      logger.debug("Launching watchdog thread")
-      self.event.clear()
-      self.python_process_has_been_killed = False
-      thread = Thread(target =  self.python_watchdog_func, args = (process, timeout))
-      thread.start()
-      # Waiting for the process to be either finished or killed
-      process.communicate()
-      self.event.set()
-      thread.join()
-      return self.prepare_process_result(process, tmpoutfile, tmperrfile, tmpstructedoutfile)
-    else:
-      holder = Holder(pythonCommand, tmpoutfile, tmperrfile, tmpstructedoutfile, handle)
-      
-      background = BackgroundThread(holder, self)
-      background.start()
-      return {"exitcode": 777}
-
-  def prepare_process_result (self, process, tmpoutfile, tmperrfile, tmpstructedoutfile):
-    out, error, structured_out = self.read_result_from_files(tmpoutfile, tmperrfile, tmpstructedoutfile)
+    process = self.launch_python_subprocess(pythonCommand, tmpout, tmperr)
+    logger.debug("Launching watchdog thread")
+    self.event.clear()
+    self.python_process_has_been_killed = False
+    thread = Thread(target =  self.python_watchdog_func, args = (process, timeout))
+    thread.start()
+    # Waiting for the process to be either finished or killed
+    process.communicate()
+    self.event.set()
+    thread.join()
     # Building results
+    error = self.NO_ERROR
     returncode = process.returncode
+    out = open(tmpoutfile, 'r').read()
+    error = open(tmperrfile, 'r').read()
 
-    if self.python_process_has_been_killed:
-      error = str(error) + "\n Python script has been killed due to timeout"
-      returncode = 999
-    result = self.condenseOutput(out, error, returncode, structured_out)
-    logger.info("Result: %s" % result)
-    return result
-  
-  def read_result_from_files(self, out_path, err_path, structured_out_path):
-    out = open(out_path, 'r').read()
-    error = open(err_path, 'r').read()
     try:
-      with open(structured_out_path, 'r') as fp:
+      with open(tmpstructedoutfile, 'r') as fp:
         structured_out = json.load(fp)
     except Exception:
-      if os.path.exists(structured_out_path):
-        errMsg = 'Unable to read structured output from ' + structured_out_path
+      if os.path.exists(tmpstructedoutfile):
+        errMsg = 'Unable to read structured output from ' + tmpstructedoutfile
         structured_out = {
           'msg' : errMsg
         }
         logger.warn(structured_out)
       else:
         structured_out = {}
-    return out, error, structured_out
-  
+
+    if self.python_process_has_been_killed:
+      error = str(error) + "\n Python script has been killed due to timeout"
+      returncode = 999
+    result = self.condenseOutput(out, error, returncode, structured_out)
+    logger.info("Result: %s" % result)
+    return result
+
+
   def launch_python_subprocess(self, command, tmpout, tmperr):
     """
     Creates subprocess with given parameters. This functionality was moved to separate method
     to make possible unit testing
     """
+    close_fds = None if platform.system() == "Windows" else True
     return subprocess.Popen(command,
       stdout=tmpout,
-      stderr=tmperr, close_fds=True)
-    
+      stderr=tmperr, close_fds=close_fds)
+
   def isSuccessfull(self, returncode):
     return not self.python_process_has_been_killed and returncode == 0
 
   def python_command(self, script, script_params):
-    python_binary = sys.executable
+    #we need manually pass python executable on windows because sys.executable will return service wrapper
+    python_binary = os.environ['PYTHON_EXE'] if 'PYTHON_EXE' in os.environ else sys.executable
     python_command = [python_binary, script] + script_params
     return python_command
 
@@ -171,39 +153,3 @@ class PythonExecutor:
       shell.kill_process_with_children(python.pid)
       self.python_process_has_been_killed = True
     pass
-
-class Holder:
-  def __init__(self, command, out_file, err_file, structured_out_file, handle):
-    self.command = command
-    self.out_file = out_file
-    self.err_file = err_file
-    self.structured_out_file = structured_out_file
-    self.handle = handle
-    
-class BackgroundThread(threading.Thread):
-  def __init__(self, holder, pythonExecutor):
-    threading.Thread.__init__(self)
-    self.holder = holder
-    self.pythonExecutor = pythonExecutor
-  
-  def run(self):
-    process_out, process_err  = self.pythonExecutor.open_subporcess_files(self.holder.out_file, self.holder.err_file, True)
-    
-    logger.info("Starting process command %s" % self.holder.command)
-    process = self.pythonExecutor.launch_python_subprocess(self.holder.command, process_out, process_err)
-    
-    logger.info("Process has been started. Pid = %s" % process.pid)
-    
-    self.holder.handle.pid = process.pid
-    self.holder.handle.status = BackgroundCommandExecutionHandle.RUNNING_STATUS
-    self.holder.handle.on_background_command_started(self.holder.handle.command['taskId'], process.pid)
-    
-    process.communicate()
-    
-    self.holder.handle.exitCode = process.returncode
-    process_condenced_result = self.pythonExecutor.prepare_process_result(process, self.holder.out_file, self.holder.err_file, self.holder.structured_out_file)
-    logger.info("Calling callback with args %s" % process_condenced_result)
-    self.holder.handle.on_background_command_complete_callback(process_condenced_result, self.holder.handle)
-    logger.info("Exiting from thread for holder pid %s" % self.holder.handle.pid)
-    
-  

http://git-wip-us.apache.org/repos/asf/ambari/blob/7e28d1e3/ambari-agent/src/main/python/ambari_agent/Register.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Register.py b/ambari-agent/src/main/python/ambari_agent/Register.py
index 21aa8ec..dbf0ef1 100644
--- a/ambari-agent/src/main/python/ambari_agent/Register.py
+++ b/ambari-agent/src/main/python/ambari_agent/Register.py
@@ -28,7 +28,7 @@ from HostInfo import HostInfo
 
 firstContact = True
 class Register:
-  """ Registering with the server. Get the hardware profile and
+  """ Registering with the server. Get the hardware profile and 
   declare success for now """
   def __init__(self, config):
     self.hardware = Hardware()
@@ -37,23 +37,22 @@ class Register:
   def build(self, id='-1'):
     global clusterId, clusterDefinitionRevision, firstContact
     timestamp = int(time.time()*1000)
-
+   
     hostInfo = HostInfo(self.config)
     agentEnv = { }
     hostInfo.register(agentEnv, False, False)
 
     version = self.read_agent_version()
     current_ping_port = self.config.get('agent','current_ping_port')
-
+    
     register = { 'responseId'        : int(id),
                  'timestamp'         : timestamp,
-                 'hostname'          : hostname.hostname(self.config),
+                 'hostname'          : hostname.hostname(),
                  'currentPingPort'   : int(current_ping_port),
-                 'publicHostname'    : hostname.public_hostname(self.config),
+                 'publicHostname'    : hostname.public_hostname(),
                  'hardwareProfile'   : self.hardware.get(),
                  'agentEnv'          : agentEnv,
-                 'agentVersion'      : version,
-                 'prefix'            : self.config.get('agent', 'prefix')
+                 'agentVersion'      : version
                }
     return register
 
@@ -64,3 +63,4 @@ class Register:
     version = f.read().strip()
     f.close()
     return version
+  

http://git-wip-us.apache.org/repos/asf/ambari/blob/7e28d1e3/ambari-agent/src/main/python/ambari_agent/hostname.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/hostname.py b/ambari-agent/src/main/python/ambari_agent/hostname.py
index 9fbe145..bcf4075 100644
--- a/ambari-agent/src/main/python/ambari_agent/hostname.py
+++ b/ambari-agent/src/main/python/ambari_agent/hostname.py
@@ -30,33 +30,33 @@ logger = logging.getLogger()
 cached_hostname = None
 cached_public_hostname = None
 
-
-def hostname(config):
+def hostname():
   global cached_hostname
   if cached_hostname is not None:
     return cached_hostname
 
+  config = AmbariConfig.config
   try:
     scriptname = config.get('agent', 'hostname_script')
-    try:
+    try: 
       osStat = subprocess.Popen([scriptname], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
       out, err = osStat.communicate()
       if (0 == osStat.returncode and 0 != len(out.strip())):
         cached_hostname = out.strip()
       else:
-        cached_hostname = socket.getfqdn()
+        cached_hostname = socket.getfqdn().lower()
     except:
-      cached_hostname = socket.getfqdn()
+      cached_hostname = socket.getfqdn().lower()
   except:
-    cached_hostname = socket.getfqdn()
+    cached_hostname = socket.getfqdn().lower()
   return cached_hostname
 
-
-def public_hostname(config):
+def public_hostname():
   global cached_public_hostname
   if cached_public_hostname is not None:
     return cached_public_hostname
 
+  config = AmbariConfig.config
   out = ''
   err = ''
   try:
@@ -68,12 +68,12 @@ def public_hostname(config):
         cached_public_hostname = out.strip()
         return cached_public_hostname
   except:
-    #ignore for now.
+    #ignore for now. 
     trace_info = traceback.format_exc()
-    logger.info("Error using the scriptname:" +  trace_info
+    logger.info("Error using the scriptname:" +  trace_info 
                 + " :out " + out + " :err " + err)
     logger.info("Defaulting to fqdn.")
-
+    
   # future - do an agent entry for this too
   try:
     handle = urllib2.urlopen('http://169.254.169.254/latest/meta-data/public-hostname', '', 2)
@@ -81,7 +81,7 @@ def public_hostname(config):
     handle.close()
     cached_public_hostname = str
   except Exception, e:
-    cached_public_hostname = socket.getfqdn()
+    cached_public_hostname = socket.getfqdn().lower()
   return cached_public_hostname
 
 def main(argv=None):

http://git-wip-us.apache.org/repos/asf/ambari/blob/7e28d1e3/ambari-agent/src/main/python/ambari_agent/main.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py
index 5d33ca4..c7a8612 100644
--- a/ambari-agent/src/main/python/ambari_agent/main.py
+++ b/ambari-agent/src/main/python/ambari_agent/main.py
@@ -25,60 +25,41 @@ import sys
 import traceback
 import os
 import time
+import platform
 import ConfigParser
 import ProcessHelper
 from Controller import Controller
-from AmbariConfig import AmbariConfig
+import AmbariConfig
 from NetUtil import NetUtil
 from PingPortListener import PingPortListener
 import hostname
 from DataCleaner import DataCleaner
 import socket
-
 logger = logging.getLogger()
-formatstr = "%(levelname)s %(asctime)s %(filename)s:%(lineno)d - %(message)s"
-agentPid = os.getpid()
-config = AmbariConfig()
-configFile = config.CONFIG_FILE
-two_way_ssl_property = config.TWO_WAY_SSL_PROPERTY
 
-if 'AMBARI_LOG_DIR' in os.environ:
-  logfile = os.environ['AMBARI_LOG_DIR'] + "/ambari-agent.log"
-else:
-  logfile = "/var/log/ambari-agent/ambari-agent.log"
-
-def signal_handler(signum, frame):
-  #we want the handler to run only for the agent process and not
-  #for the children (e.g. namenode, etc.)
-  if os.getpid() != agentPid:
-    os._exit(0)
-  logger.info('signal received, exiting.')
-  ProcessHelper.stopAgent()
+IS_WINDOWS = platform.system() == "Windows"
 
-def debug(sig, frame):
-  """Interrupt running process, and provide a python prompt for
-  interactive debugging."""
-  d={'_frame':frame}         # Allow access to frame object.
-  d.update(frame.f_globals)  # Unless shadowed by global
-  d.update(frame.f_locals)
+if IS_WINDOWS:
+  from HeartbeatHandlers_windows import bind_signal_handlers
+else:
+  from HeartbeatStopHandler_linux import bind_signal_handlers
 
-  message  = "Signal received : entering python shell.\nTraceback:\n"
-  message += ''.join(traceback.format_stack(frame))
-  logger.info(message)
+formatstr = "%(levelname)s %(asctime)s %(filename)s:%(lineno)d - %(message)s"
+agentPid = os.getpid()
 
 
 def setup_logging(verbose):
   formatter = logging.Formatter(formatstr)
-  rotateLog = logging.handlers.RotatingFileHandler(logfile, "a", 10000000, 25)
+  rotateLog = logging.handlers.RotatingFileHandler(AmbariConfig.AmbariConfig.getLogFile(), "a", 10000000, 25)
   rotateLog.setFormatter(formatter)
   logger.addHandler(rotateLog)
 
   if verbose:
-    logging.basicConfig(format=formatstr, level=logging.DEBUG, filename=logfile)
+    logging.basicConfig(format=formatstr, level=logging.DEBUG, filename=AmbariConfig.AmbariConfig.getLogFile())
     logger.setLevel(logging.DEBUG)
     logger.info("loglevel=logging.DEBUG")
   else:
-    logging.basicConfig(format=formatstr, level=logging.INFO, filename=logfile)
+    logging.basicConfig(format=formatstr, level=logging.INFO, filename=AmbariConfig.AmbariConfig.getLogFile())
     logger.setLevel(logging.INFO)
     logger.info("loglevel=logging.INFO")
 
@@ -89,44 +70,37 @@ def update_log_level(config):
     loglevel = config.get('agent', 'loglevel')
     if loglevel is not None:
       if loglevel == 'DEBUG':
-        logging.basicConfig(format=formatstr, level=logging.DEBUG, filename=logfile)
+        logging.basicConfig(format=formatstr, level=logging.DEBUG, filename=AmbariConfig.AmbariConfig.getLogFile())
         logger.setLevel(logging.DEBUG)
         logger.info("Newloglevel=logging.DEBUG")
       else:
-        logging.basicConfig(format=formatstr, level=logging.INFO, filename=logfile)
+        logging.basicConfig(format=formatstr, level=logging.INFO, filename=AmbariConfig.AmbariConfig.getLogFile())
         logger.setLevel(logging.INFO)
         logger.debug("Newloglevel=logging.INFO")
   except Exception, err:
     logger.info("Default loglevel=DEBUG")
 
 
-def bind_signal_handlers():
-  signal.signal(signal.SIGINT, signal_handler)
-  signal.signal(signal.SIGTERM, signal_handler)
-  signal.signal(signal.SIGUSR1, debug)
-
-
 #  ToDo: move that function inside AmbariConfig
 def resolve_ambari_config():
   global config
+  configPath = os.path.abspath(AmbariConfig.AmbariConfig.getConfigFile())
+
   try:
-    if os.path.exists(configFile):
-        config.read(configFile)
+    if os.path.exists(configPath):
+      config.read(configPath)
     else:
-      raise Exception("No config found, use default")
+      raise Exception("No config found at {0}, use default".format(configPath))
 
   except Exception, err:
     logger.warn(err)
-  return config
 
 
 def perform_prestart_checks(expected_hostname):
   # Check if current hostname is equal to expected one (got from the server
   # during bootstrap.
-  global config
-
   if expected_hostname is not None:
-    current_hostname = hostname.hostname(config)
+    current_hostname = hostname.hostname()
     if current_hostname != expected_hostname:
       print("Determined hostname does not match expected. Please check agent "
             "log for details")
@@ -137,16 +111,21 @@ def perform_prestart_checks(expected_hostname):
       logger.error(msg)
       sys.exit(1)
   # Check if there is another instance running
-  if os.path.isfile(ProcessHelper.pidfile):
-    print("%s already exists, exiting" % ProcessHelper.pidfile)
-    sys.exit(1)
-  # check if ambari prefix exists
-  elif not os.path.isdir(config.get("agent", "prefix")):
+  # if os.path.isfile(ProcessHelper.pidfile):
+  #   print("%s already exists, exiting" % ProcessHelper.pidfile)
+  #   sys.exit(1)
+  # # check if ambari prefix exists
+  elif config.has_option('agent', 'prefix') and not os.path.isdir(os.path.abspath(config.get('agent', 'prefix'))):
     msg = "Ambari prefix dir %s does not exists, can't continue" \
           % config.get("agent", "prefix")
     logger.error(msg)
     print(msg)
     sys.exit(1)
+  elif not config.has_option('agent', 'prefix'):
+    msg = "Ambari prefix dir %s not configured, can't continue"
+    logger.error(msg)
+    print(msg)
+    sys.exit(1)
 
 
 def daemonize():
@@ -181,7 +160,9 @@ def stop_agent():
     os._exit(1)
 
 
-def main():
+# event - event, that will be passed to Controller and NetUtil to make able to interrupt loops form outside process
+# we need this for windows os, where no sigterm available
+def main(heartbeat_stop_callback):
   global config
   parser = OptionParser()
   parser.add_option("-v", "--verbose", dest="verbose", action="store_true", help="verbose log output", default=False)
@@ -193,32 +174,32 @@ def main():
 
   setup_logging(options.verbose)
 
-  default_cfg = {'agent': {'prefix': '/home/ambari'}}
-  config.load(default_cfg)
-
-  bind_signal_handlers()
+  default_cfg = { 'agent' : { 'prefix' : '/home/ambari' } }
+  config = ConfigParser.RawConfigParser(default_cfg)
 
-  if (len(sys.argv) > 1) and sys.argv[1] == 'stop':
+  if (len(sys.argv) >1) and sys.argv[1]=='stop':
     stop_agent()
 
   # Check for ambari configuration file.
-  config = resolve_ambari_config()
+  resolve_ambari_config()
 
   # Starting data cleanup daemon
   data_cleaner = None
-  if int(config.get('agent', 'data_cleanup_interval')) > 0:
+  if config.has_option('agent', 'data_cleanup_interval') and int(config.get('agent','data_cleanup_interval')) > 0:
     data_cleaner = DataCleaner(config)
     data_cleaner.start()
 
   perform_prestart_checks(expected_hostname)
-  daemonize()
+
+  if not IS_WINDOWS:
+    daemonize()
 
   # Starting ping port listener
   try:
     ping_port_listener = PingPortListener(config)
   except Exception as ex:
     err_message = "Failed to start ping port listener of: " + str(ex)
-    logger.error(err_message)
+    logger.error(err_message);
     sys.stderr.write(err_message)
     sys.exit(1)
   ping_port_listener.start()
@@ -226,7 +207,7 @@ def main():
   update_log_level(config)
 
   server_hostname = config.get('server', 'hostname')
-  server_url = config.get_api_url()
+  server_url = 'https://' + server_hostname + ':' + config.get('server', 'url_port')
 
   try:
     server_ip = socket.gethostbyname(server_hostname)
@@ -235,15 +216,18 @@ def main():
     logger.warn("Unable to determine the IP address of the Ambari server '%s'", server_hostname)
 
   # Wait until server is reachable
-  netutil = NetUtil()
-  netutil.try_to_connect(server_url, -1, logger)
-
-  # Launch Controller communication
-  controller = Controller(config)
-  controller.start()
-  controller.join()
-  stop_agent()
+  netutil = NetUtil(heartbeat_stop_callback)
+  retries, connected = netutil.try_to_connect(server_url, -1, logger)
+  # Ambari Agent was stopped using stop event
+  if connected:
+    # Launch Controller communication
+    controller = Controller(config, heartbeat_stop_callback)
+    controller.start()
+    controller.join()
+  #stop_agent()
   logger.info("finished")
 
 if __name__ == "__main__":
-  main()
+  heartbeat_stop_callback = bind_signal_handlers(agentPid)
+
+  main(heartbeat_stop_callback)

http://git-wip-us.apache.org/repos/asf/ambari/blob/7e28d1e3/ambari-agent/src/main/python/ambari_agent/security.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/security.py b/ambari-agent/src/main/python/ambari_agent/security.py
index bc101b8..dd9e8e1 100644
--- a/ambari-agent/src/main/python/ambari_agent/security.py
+++ b/ambari-agent/src/main/python/ambari_agent/security.py
@@ -27,46 +27,42 @@ import json
 import pprint
 import traceback
 import hostname
+import platform
 
 logger = logging.getLogger()
 
-GEN_AGENT_KEY = "openssl req -new -newkey rsa:1024 -nodes -keyout %(keysdir)s/%(hostname)s.key\
-  -subj /OU=%(hostname)s/\
-        -out %(keysdir)s/%(hostname)s.csr"
+GEN_AGENT_KEY='openssl req -new -newkey rsa:1024 -nodes -keyout "%(keysdir)s'+os.sep+'%(hostname)s.key" '\
+	'-subj /OU=%(hostname)s/ -out "%(keysdir)s'+os.sep+'%(hostname)s.csr"'
 
 
 class VerifiedHTTPSConnection(httplib.HTTPSConnection):
   """ Connecting using ssl wrapped sockets """
   def __init__(self, host, port=None, config=None):
     httplib.HTTPSConnection.__init__(self, host, port=port)
-    self.two_way_ssl_required = False
-    self.config = config
+    self.config=config
+    self.two_way_ssl_required=False
 
   def connect(self):
-    self.two_way_ssl_required = self.config.isTwoWaySSLConnection()
-    logger.debug("Server two-way SSL authentication required: %s" % str(self.two_way_ssl_required))
-    if self.two_way_ssl_required is True:
-      logger.info('Server require two-way SSL authentication. Use it instead of one-way...')
 
     if not self.two_way_ssl_required:
       try:
-        sock = self.create_connection()
+        sock=self.create_connection()
         self.sock = ssl.wrap_socket(sock, cert_reqs=ssl.CERT_NONE)
         logger.info('SSL connection established. Two-way SSL authentication is '
                     'turned off on the server.')
       except (ssl.SSLError, AttributeError):
-        self.two_way_ssl_required = True
+        self.two_way_ssl_required=True
         logger.info('Insecure connection to https://' + self.host + ':' + self.port +
                     '/ failed. Reconnecting using two-way SSL authentication..')
 
     if self.two_way_ssl_required:
-      self.certMan = CertificateManager(self.config)
+      self.certMan=CertificateManager(self.config)
       self.certMan.initSecurity()
       agent_key = self.certMan.getAgentKeyName()
       agent_crt = self.certMan.getAgentCrtName()
       server_crt = self.certMan.getSrvrCrtName()
 
-      sock = self.create_connection()
+      sock=self.create_connection()
 
       try:
         self.sock = ssl.wrap_socket(sock,
@@ -92,40 +88,41 @@ class VerifiedHTTPSConnection(httplib.HTTPSConnection):
       self.sock.close()
     logger.info("SSL Connect being called.. connecting to the server")
     sock = socket.create_connection((self.host, self.port), 60)
-    sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+    sock.setsockopt( socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
     if self._tunnel_host:
       self.sock = sock
       self._tunnel()
 
     return sock
 
-
 class CachedHTTPSConnection:
   """ Caches a ssl socket and uses a single https connection to the server. """
-
+  
   def __init__(self, config):
-    self.connected = False
+    self.connected = False;
     self.config = config
     self.server = config.get('server', 'hostname')
     self.port = config.get('server', 'secured_url_port')
     self.connect()
-
+  
   def connect(self):
-    if not self.connected:
+    if  not self.connected:
       self.httpsconn = VerifiedHTTPSConnection(self.server, self.port, self.config)
       self.httpsconn.connect()
       self.connected = True
     # possible exceptions are caught and processed in Controller
 
+
+  
   def forceClear(self):
     self.httpsconn = VerifiedHTTPSConnection(self.server, self.port, self.config)
     self.connect()
-
-  def request(self, req):
+    
+  def request(self, req): 
     self.connect()
     try:
-      self.httpsconn.request(req.get_method(), req.get_full_url(),
-                             req.get_data(), req.headers)
+      self.httpsconn.request(req.get_method(), req.get_full_url(), 
+                                  req.get_data(), req.headers)
       response = self.httpsconn.getresponse()
       readResponse = response.read()
     except Exception as ex:
@@ -136,60 +133,59 @@ class CachedHTTPSConnection:
       self.connected = False
       raise IOError("Error occured during connecting to the server: " + str(ex))
     return readResponse
-
-
+  
 class CertificateManager():
   def __init__(self, config):
     self.config = config
-    self.keysdir = self.config.get('security', 'keysdir')
-    self.server_crt = self.config.get('security', 'server_crt')
+    self.keysdir = os.path.abspath(self.config.get('security', 'keysdir'))
+    self.server_crt=self.config.get('security', 'server_crt')
     self.server_url = 'https://' + self.config.get('server', 'hostname') + ':' \
        + self.config.get('server', 'url_port')
-
+    
   def getAgentKeyName(self):
-    keysdir = self.config.get('security', 'keysdir')
-    return keysdir + os.sep + hostname.hostname(self.config) + ".key"
+    keysdir = os.path.abspath(self.config.get('security', 'keysdir'))
+    return keysdir + os.sep + hostname.hostname() + ".key"
 
   def getAgentCrtName(self):
-    keysdir = self.config.get('security', 'keysdir')
-    return keysdir + os.sep + hostname.hostname(self.config) + ".crt"
+    keysdir = os.path.abspath(self.config.get('security', 'keysdir'))
+    return keysdir + os.sep + hostname.hostname() + ".crt"
 
   def getAgentCrtReqName(self):
-    keysdir = self.config.get('security', 'keysdir')
-    return keysdir + os.sep + hostname.hostname(self.config) + ".csr"
+    keysdir = os.path.abspath(self.config.get('security', 'keysdir'))
+    return keysdir + os.sep + hostname.hostname() + ".csr"
 
   def getSrvrCrtName(self):
-    keysdir = self.config.get('security', 'keysdir')
+    keysdir = os.path.abspath(self.config.get('security', 'keysdir'))
     return keysdir + os.sep + "ca.crt"
-
+    
   def checkCertExists(self):
-
-    s = self.config.get('security', 'keysdir') + os.sep + "ca.crt"
+    
+    s = os.path.abspath(self.config.get('security', 'keysdir')) + os.sep + "ca.crt"
 
     server_crt_exists = os.path.exists(s)
-
+    
     if not server_crt_exists:
       logger.info("Server certicate not exists, downloading")
       self.loadSrvrCrt()
     else:
       logger.info("Server certicate exists, ok")
-
+      
     agent_key_exists = os.path.exists(self.getAgentKeyName())
-
+    
     if not agent_key_exists:
       logger.info("Agent key not exists, generating request")
       self.genAgentCrtReq()
     else:
       logger.info("Agent key exists, ok")
-
+      
     agent_crt_exists = os.path.exists(self.getAgentCrtName())
-
+    
     if not agent_crt_exists:
       logger.info("Agent certificate not exists, sending sign request")
       self.reqSignCrt()
     else:
       logger.info("Agent certificate exists, ok")
-
+            
   def loadSrvrCrt(self):
     get_ca_url = self.server_url + '/cert/ca/'
     logger.info("Downloading server cert from " + get_ca_url)
@@ -200,15 +196,15 @@ class CertificateManager():
     stream.close()
     srvr_crt_f = open(self.getSrvrCrtName(), 'w+')
     srvr_crt_f.write(response)
-
+      
   def reqSignCrt(self):
-    sign_crt_req_url = self.server_url + '/certs/' + hostname.hostname(self.config)
+    sign_crt_req_url = self.server_url + '/certs/' + hostname.hostname()
     agent_crt_req_f = open(self.getAgentCrtReqName())
     agent_crt_req_content = agent_crt_req_f.read()
     passphrase_env_var = self.config.get('security', 'passphrase_env_var_name')
     passphrase = os.environ[passphrase_env_var]
-    register_data = {'csr': agent_crt_req_content,
-                    'passphrase': passphrase}
+    register_data = {'csr'       : agent_crt_req_content,
+                    'passphrase' : passphrase}
     data = json.dumps(register_data)
     proxy_handler = urllib2.ProxyHandler({})
     opener = urllib2.build_opener(proxy_handler)
@@ -223,9 +219,9 @@ class CertificateManager():
     except Exception:
       logger.warn("Malformed response! data: " + str(data))
       data = {'result': 'ERROR'}
-    result = data['result']
+    result=data['result']
     if result == 'OK':
-      agentCrtContent = data['signedCa']
+      agentCrtContent=data['signedCa']
       agentCrtF = open(self.getAgentCrtName(), "w")
       agentCrtF.write(agentCrtContent)
     else:
@@ -239,11 +235,13 @@ class CertificateManager():
       raise ssl.SSLError
 
   def genAgentCrtReq(self):
-    generate_script = GEN_AGENT_KEY % {'hostname': hostname.hostname(self.config),
-                                     'keysdir': self.config.get('security', 'keysdir')}
+    generate_script = GEN_AGENT_KEY % {'hostname': hostname.hostname(),
+                                     'keysdir' : os.path.abspath(self.config.get('security', 'keysdir'))}
     logger.info(generate_script)
-    p = subprocess.Popen([generate_script], shell=True, stdout=subprocess.PIPE)
-    p.communicate()
-
+    if platform.system() == 'Windows':
+      subprocess.Popen(generate_script, stdout=subprocess.PIPE)
+    else:
+      subprocess.Popen([generate_script], shell=True, stdout=subprocess.PIPE)
+      
   def initSecurity(self):
     self.checkCertExists()

http://git-wip-us.apache.org/repos/asf/ambari/blob/7e28d1e3/ambari-agent/src/main/python/ambari_agent/shell.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/shell.py b/ambari-agent/src/main/python/ambari_agent/shell.py
index 4081bb0..201fee8 100644
--- a/ambari-agent/src/main/python/ambari_agent/shell.py
+++ b/ambari-agent/src/main/python/ambari_agent/shell.py
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+# !/usr/bin/env python
 
 '''
 Licensed to the Apache Software Foundation (ASF) under one
@@ -29,26 +29,79 @@ import time
 import traceback
 import AmbariConfig
 import pprint
+import platform
 
-try:
+if platform.system() != "Windows":
+  try:
     import pwd
-except ImportError:
+  except ImportError:
     import winpwd as pwd
 
-global serverTracker
-serverTracker = {}
 logger = logging.getLogger()
 
+shellRunner = None
 threadLocal = threading.local()
-gracefull_kill_delay = 5 # seconds between SIGTERM and SIGKILL
-tempFiles = [] 
+gracefull_kill_delay = 5  # seconds between SIGTERM and SIGKILL
+
+tempFiles = []
+
+
 def noteTempFile(filename):
   tempFiles.append(filename)
 
+
 def getTempFiles():
   return tempFiles
 
-def kill_process_with_children(parent_pid):
+
+class _dict_to_object:
+  def __init__(self, entries):
+    self.__dict__.update(entries)
+
+  def __getitem__(self, item):
+    return self.__dict__[item]
+
+
+# windows specific code
+def _kill_process_with_children_windows(parent_pid):
+  shellRunner().run(["taskkill", "/T", "/PID", "{0}".format(parent_pid)])
+
+
+class shellRunnerWindows:
+  # Run any command
+  def run(self, script, user=None):
+    logger.warn("user argument ignored on windows")
+    code = 0
+    if not isinstance(script, list):
+      cmd = " "
+      cmd = cmd.join(script)
+    else:
+      cmd = script
+    p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
+                         stderr=subprocess.PIPE, shell=False)
+    out, err = p.communicate()
+    code = p.wait()
+    logger.debug("Exitcode for %s is %d" % (cmd, code))
+    return {'exitCode': code, 'output': out, 'error': err}
+
+  def runPowershell(self, file=None, script_block=None, args=[]):
+    logger.warn("user argument ignored on windows")
+    code = 0
+    cmd = None
+    if file:
+      cmd = ['powershell', '-WindowStyle', 'Hidden', '-File', file] + args
+    elif script_block:
+      cmd = ['powershell', '-WindowStyle', 'Hidden', '-Command', script_block] + args
+    p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
+                         stderr=subprocess.PIPE, shell=False)
+    out, err = p.communicate()
+    code = p.wait()
+    logger.debug("Exitcode for %s is %d" % (cmd, code))
+    return _dict_to_object({'exitCode': code, 'output': out, 'error': err})
+
+
+#linux specific code
+def _kill_process_with_children_linux(parent_pid):
   def kill_tree_function(pid, signal):
     '''
     Kills process tree starting from a given pid.
@@ -58,15 +111,17 @@ def kill_process_with_children(parent_pid):
     # a given PID and then passes list of "kill -<SIGNAL> PID" commands to 'sh'
     # shell.
     CMD = """ps xf | awk -v PID=""" + str(pid) + \
-        """ ' $1 == PID { P = $1; next } P && /_/ { P = P " " $1;""" + \
-        """K=P } P && !/_/ { P="" }  END { print "kill -""" \
-        + str(signal) + """ "K }' | sh """
+          """ ' $1 == PID { P = $1; next } P && /_/ { P = P " " $1;""" + \
+          """K=P } P && !/_/ { P="" }  END { print "kill -""" \
+          + str(signal) + """ "K }' | sh """
     process = subprocess.Popen(CMD, stdout=subprocess.PIPE,
                                stderr=subprocess.PIPE, shell=True)
     process.communicate()
-  run_kill_function(kill_tree_function, parent_pid)
 
-def run_kill_function(kill_function, pid):
+  _run_kill_function(kill_tree_function, parent_pid)
+
+
+def _run_kill_function(kill_function, pid):
   try:
     kill_function(pid, signal.SIGTERM)
   except Exception, e:
@@ -81,17 +136,19 @@ def run_kill_function(kill_function, pid):
     logger.error("Failed to send SIGKILL to PID %d. Process exited?" % (pid))
     logger.error("Reported error: " + repr(e))
 
-def changeUid():
+
+def _changeUid():
   try:
     os.setuid(threadLocal.uid)
   except Exception:
     logger.warn("can not switch user for running command.")
 
-class shellRunner:
+
+class shellRunnerLinux:
   # Run any command
   def run(self, script, user=None):
     try:
-      if user!=None:
+      if user != None:
         user = pwd.getpwnam(user)[2]
       else:
         user = os.getuid()
@@ -101,12 +158,22 @@ class shellRunner:
     code = 0
     cmd = " "
     cmd = cmd.join(script)
-    p = subprocess.Popen(cmd, preexec_fn=changeUid, stdout=subprocess.PIPE, 
+    p = subprocess.Popen(cmd, preexec_fn=_changeUid, stdout=subprocess.PIPE,
                          stderr=subprocess.PIPE, shell=True, close_fds=True)
     out, err = p.communicate()
     code = p.wait()
-    logger.debug("Exitcode for %s is %d" % (cmd,code))
+    logger.debug("Exitcode for %s is %d" % (cmd, code))
     return {'exitCode': code, 'output': out, 'error': err}
 
-  def getServerTracker(self):
-    return serverTracker
\ No newline at end of file
+
+def kill_process_with_children(parent_pid):
+  if platform.system() == "Windows":
+    _kill_process_with_children_windows(parent_pid)
+  else:
+    _kill_process_with_children_linux(parent_pid)
+
+
+if platform.system() == "Windows":
+  shellRunner = shellRunnerWindows
+else:
+  shellRunner = shellRunnerLinux
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/7e28d1e3/ambari-agent/src/main/python/resource_management/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/resource_management/__init__.py b/ambari-agent/src/main/python/resource_management/__init__.py
new file mode 100644
index 0000000..fee91fd
--- /dev/null
+++ b/ambari-agent/src/main/python/resource_management/__init__.py
@@ -0,0 +1,26 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+from resource_management.libraries import *
+from resource_management.core import *
+
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/7e28d1e3/ambari-agent/src/main/python/resource_management/core/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/resource_management/core/__init__.py b/ambari-agent/src/main/python/resource_management/core/__init__.py
new file mode 100644
index 0000000..1af793b
--- /dev/null
+++ b/ambari-agent/src/main/python/resource_management/core/__init__.py
@@ -0,0 +1,33 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+from resource_management.core.base import *
+from resource_management.core.environment import *
+from resource_management.core.exceptions import *
+from resource_management.core.providers import *
+from resource_management.core.resources import *
+from resource_management.core.source import *
+from resource_management.core.system import *
+from resource_management.core.shell import *
+from resource_management.core.logger import *
+
+__version__ = "0.4.1"

http://git-wip-us.apache.org/repos/asf/ambari/blob/7e28d1e3/ambari-agent/src/main/python/resource_management/core/base.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/resource_management/core/base.py b/ambari-agent/src/main/python/resource_management/core/base.py
new file mode 100644
index 0000000..52f1dff
--- /dev/null
+++ b/ambari-agent/src/main/python/resource_management/core/base.py
@@ -0,0 +1,173 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+__all__ = ["Resource", "ResourceArgument", "ForcedListArgument",
+           "BooleanArgument"]
+
+from resource_management.core.exceptions import Fail, InvalidArgument
+from resource_management.core.environment import Environment
+from resource_management.core.logger import Logger
+
+class ResourceArgument(object):
+  def __init__(self, default=None, required=False):
+    self.required = False # Prevents the initial validate from failing
+    if hasattr(default, '__call__'):
+      self.default = default
+    else:
+      self.default = self.validate(default)
+    self.required = required
+
+  def validate(self, value):
+    if self.required and value is None:
+      raise InvalidArgument("Required argument %s missing" % self.name)
+    return value
+
+
+class ForcedListArgument(ResourceArgument):
+  def validate(self, value):
+    value = super(ForcedListArgument, self).validate(value)
+    if not isinstance(value, (tuple, list)):
+      value = [value]
+    return value
+
+
+class BooleanArgument(ResourceArgument):
+  def validate(self, value):
+    value = super(BooleanArgument, self).validate(value)
+    if not value in (True, False):
+      raise InvalidArgument(
+        "Expected a boolean for %s received %r" % (self.name, value))
+    return value
+
+
+class Accessor(object):
+  def __init__(self, name):
+    self.name = name
+
+  def __get__(self, obj, cls):
+    try:
+      return obj.arguments[self.name]
+    except KeyError:
+      val = obj._arguments[self.name].default
+      if hasattr(val, '__call__'):
+        val = val(obj)
+      return val
+
+  def __set__(self, obj, value):
+    obj.arguments[self.name] = obj._arguments[self.name].validate(value)
+
+
+class ResourceMetaclass(type):
+  # def __new__(cls, name, bases, attrs):
+  #     super_new = super(ResourceMetaclass, cls).__new__
+  #     return super_new(cls, name, bases, attrs)
+
+  def __init__(mcs, _name, bases, attrs):
+    mcs._arguments = getattr(bases[0], '_arguments', {}).copy()
+    for key, value in list(attrs.items()):
+      if isinstance(value, ResourceArgument):
+        value.name = key
+        mcs._arguments[key] = value
+        setattr(mcs, key, Accessor(key))
+  
+  
+class Resource(object):
+  __metaclass__ = ResourceMetaclass
+
+  action = ForcedListArgument(default="nothing")
+  ignore_failures = BooleanArgument(default=False)
+  not_if = ResourceArgument() # pass command e.g. not_if = ('ls','/root/jdk')
+  only_if = ResourceArgument() # pass command
+  initial_wait = ResourceArgument() # in seconds
+
+  actions = ["nothing"]
+  
+  def __new__(cls, name, env=None, provider=None, **kwargs):
+    if isinstance(name, list):
+      while len(name) != 1:
+        cls(name.pop(0), env, provider, **kwargs)
+        
+      name = name[0]
+    
+    env = env or Environment.get_instance()
+    provider = provider or getattr(cls, 'provider', None)
+    
+    r_type = cls.__name__
+    if r_type not in env.resources:
+      env.resources[r_type] = {}
+
+    obj = super(Resource, cls).__new__(cls)
+    env.resources[r_type][name] = obj
+    env.resource_list.append(obj)
+    return obj
+
+  def __init__(self, name, env=None, provider=None, **kwargs):
+    if isinstance(name, list):
+      name = name.pop(0)
+    
+    if hasattr(self, 'name'):
+      return
+
+    self.env = env or Environment.get_instance()
+    self.name = name
+     
+    self.provider = provider or getattr(self, 'provider', None)
+
+    self.arguments = {}
+    for key, value in kwargs.items():
+      try:
+        arg = self._arguments[key]
+      except KeyError:
+        raise Fail("%s received unsupported argument %s" % (self, key))
+      else:
+        try:
+          self.arguments[key] = arg.validate(value)
+        except InvalidArgument, exc:
+          raise InvalidArgument("%s %s" % (self, exc))
+    
+    if not self.env.test_mode:
+      self.env.run()
+
+  def validate(self):
+    pass
+
+  def __repr__(self):
+    return "%s['%s']" % (self.__class__.__name__, self.name)
+
+  def __unicode__(self):
+    return u"%s['%s']" % (self.__class__.__name__, self.name)
+
+  def __getstate__(self):
+    return dict(
+      name=self.name,
+      provider=self.provider,
+      arguments=self.arguments,
+      env=self.env,
+    )
+
+  def __setstate__(self, state):
+    self.name = state['name']
+    self.provider = state['provider']
+    self.arguments = state['arguments']
+    self.env = state['env']
+
+    self.validate()

http://git-wip-us.apache.org/repos/asf/ambari/blob/7e28d1e3/ambari-agent/src/main/python/resource_management/core/environment.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/resource_management/core/environment.py b/ambari-agent/src/main/python/resource_management/core/environment.py
new file mode 100644
index 0000000..8f0ec27
--- /dev/null
+++ b/ambari-agent/src/main/python/resource_management/core/environment.py
@@ -0,0 +1,198 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+__all__ = ["Environment"]
+
+import os
+import shutil
+import time
+from datetime import datetime
+
+from resource_management.core import shell
+from resource_management.core.exceptions import Fail
+from resource_management.core.providers import find_provider
+from resource_management.core.utils import AttributeDictionary
+from resource_management.core.system import System
+from resource_management.core.logger import Logger
+
+
+class Environment(object):
+  _instances = []
+
+  def __init__(self, basedir=None, test_mode=False):
+    """
+    @param basedir: basedir/files, basedir/templates are the places where templates / static files
+    are looked up
+    @param test_mode: if this is enabled, resources won't be executed until manualy running env.run().
+    """
+    self.reset(basedir, test_mode)
+
+  def reset(self, basedir, test_mode):
+    self.system = System.get_instance()
+    self.config = AttributeDictionary()
+    self.resources = {}
+    self.resource_list = []
+    self.delayed_actions = set()
+    self.test_mode = test_mode
+    self.update_config({
+      # current time
+      'date': datetime.now(),
+      # backups here files which were rewritten while executing File resource
+      'backup.path': '/tmp/resource_management/backup',
+      # prefix for this files 
+      'backup.prefix': datetime.now().strftime("%Y%m%d%H%M%S"),
+      # dir where templates,failes dirs are 
+      'basedir': basedir, 
+      # variables, which can be used in templates
+      'params': {},
+    })
+
+  def backup_file(self, path):
+    if self.config.backup:
+      if not os.path.exists(self.config.backup.path):
+        os.makedirs(self.config.backup.path, 0700)
+      new_name = self.config.backup.prefix + path.replace('/', '-')
+      backup_path = os.path.join(self.config.backup.path, new_name)
+      Logger.info("backing up %s to %s" % (path, backup_path))
+      shutil.copy(path, backup_path)
+
+  def update_config(self, attributes, overwrite=True):
+    for key, value in attributes.items():
+      attr = self.config
+      path = key.split('.')
+      for pth in path[:-1]:
+        if pth not in attr:
+          attr[pth] = AttributeDictionary()
+        attr = attr[pth]
+      if overwrite or path[-1] not in attr:
+        attr[path[-1]] = value
+        
+  def set_params(self, arg):
+    """
+    @param arg: is a dictionary of configurations, or a module with the configurations
+    """
+    if isinstance(arg, dict):
+      variables = arg
+    else:
+      variables = dict((var, getattr(arg, var)) for var in dir(arg))
+    
+    for variable, value in variables.iteritems():
+      # don't include system variables, methods, classes, modules
+      if not variable.startswith("__") and \
+          not hasattr(value, '__call__')and \
+          not hasattr(value, '__file__'):
+        self.config.params[variable] = value
+        
+  def run_action(self, resource, action):
+    Logger.debug("Performing action %s on %s" % (action, resource))
+
+    provider_class = find_provider(self, resource.__class__.__name__,
+                                   resource.provider)
+    provider = provider_class(resource)
+    try:
+      provider_action = getattr(provider, 'action_%s' % action)
+    except AttributeError:
+      raise Fail("%r does not implement action %s" % (provider, action))
+    provider_action()
+
+  def _check_condition(self, cond):
+    if hasattr(cond, '__call__'):
+      return cond()
+
+    if isinstance(cond, basestring):
+      ret, out = shell.call(cond)
+      return ret == 0
+
+    raise Exception("Unknown condition type %r" % cond) 
+    
+  def run(self):
+    with self:
+      # Run resource actions
+      while self.resource_list:
+        resource = self.resource_list.pop(0)
+        Logger.info_resource(resource)
+        
+        if resource.initial_wait:
+          time.sleep(resource.initial_wait)
+
+        if resource.not_if is not None and self._check_condition(
+          resource.not_if):
+          Logger.info("Skipping %s due to not_if" % resource)
+          continue
+
+        if resource.only_if is not None and not self._check_condition(
+          resource.only_if):
+          Logger.info("Skipping %s due to only_if" % resource)
+          continue
+
+        for action in resource.action:
+          if not resource.ignore_failures:
+            self.run_action(resource, action)
+          else:
+            try:
+              self.run_action(resource, action)
+            except Exception as ex:
+              Logger.info("Skipping failure of %s due to ignore_failures. Failure reason: %s" % (resource, str(ex)))
+              pass
+
+      # Run delayed actions
+      while self.delayed_actions:
+        action, resource = self.delayed_actions.pop()
+        self.run_action(resource, action)
+
+  @classmethod
+  def get_instance(cls):
+    return cls._instances[-1]
+  
+  @classmethod
+  def get_instance_copy(cls):
+    """
+    Copy only configurations, but not resources execution state
+    """
+    old_instance = cls.get_instance()
+    new_instance = Environment()
+    new_instance.config = old_instance.config.copy()
+    
+    return new_instance
+
+  def __enter__(self):
+    self.__class__._instances.append(self)
+    return self
+
+  def __exit__(self, exc_type, exc_val, exc_tb):
+    self.__class__._instances.pop()
+    return False
+
+  def __getstate__(self):
+    return dict(
+      config=self.config,
+      resources=self.resources,
+      resource_list=self.resource_list,
+      delayed_actions=self.delayed_actions,
+    )
+
+  def __setstate__(self, state):
+    self.__init__()
+    self.config = state['config']
+    self.resources = state['resources']
+    self.resource_list = state['resource_list']
+    self.delayed_actions = state['delayed_actions']

http://git-wip-us.apache.org/repos/asf/ambari/blob/7e28d1e3/ambari-agent/src/main/python/resource_management/core/exceptions.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/resource_management/core/exceptions.py b/ambari-agent/src/main/python/resource_management/core/exceptions.py
new file mode 100644
index 0000000..3c001cc
--- /dev/null
+++ b/ambari-agent/src/main/python/resource_management/core/exceptions.py
@@ -0,0 +1,46 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+class Fail(Exception):
+  pass
+
+class ExecuteTimeoutException(Exception):
+  pass
+
+class InvalidArgument(Fail):
+  pass
+
+class ClientComponentHasNoStatus(Fail):
+  """
+  Thrown when status() method is called for a CLIENT component.
+  The only valid status for CLIENT component is installed,
+  that's why exception is thrown and later silently processed at script.py
+  """
+  pass
+
+class ComponentIsNotRunning(Fail):
+  """
+  Thrown when status() method is called for a component (only
+  in situations when component process is not running).
+  Later exception is silently processed at script.py
+  """
+  pass

http://git-wip-us.apache.org/repos/asf/ambari/blob/7e28d1e3/ambari-agent/src/main/python/resource_management/core/logger.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/resource_management/core/logger.py b/ambari-agent/src/main/python/resource_management/core/logger.py
new file mode 100644
index 0000000..eaf2187
--- /dev/null
+++ b/ambari-agent/src/main/python/resource_management/core/logger.py
@@ -0,0 +1,95 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+__all__ = ["Logger"]
+import logging
+from resource_management.libraries.script.config_dictionary import UnknownConfiguration
+
+class Logger:
+  logger = logging.getLogger("resource_management")
+  
+  # unprotected_strings : protected_strings map
+  sensitive_strings = {}
+  
+  @staticmethod
+  def info(text):
+    Logger.logger.info(Logger.get_protected_text(text))
+  
+  @staticmethod  
+  def debug(text):
+    Logger.logger.debug(Logger.get_protected_text(text))
+
+  @staticmethod
+  def info_resource(resource):
+    Logger.info(Logger.get_protected_text(Logger._get_resource_repr(resource)))
+  
+  @staticmethod  
+  def debug_resource(resource):
+    Logger.debug(Logger.get_protected_text(Logger._get_resource_repr(resource)))
+    
+  @staticmethod
+  def get_protected_text(text):
+    """
+    Replace passwords with [PROTECTED]
+    """
+    for unprotected_string, protected_string in Logger.sensitive_strings.iteritems():
+      text = text.replace(unprotected_string, protected_string)
+      
+    return text
+    
+  @staticmethod  
+  def _get_resource_repr(resource):
+    MESSAGE_MAX_LEN = 256
+    logger_level = logging._levelNames[Logger.logger.level]
+    
+    arguments_str = ""
+    for x,y in resource.arguments.iteritems():
+      
+      # strip unicode 'u' sign
+      if isinstance(y, unicode):
+        # don't show long messages
+        if len(y) > MESSAGE_MAX_LEN:
+          y = '...'
+        val = repr(y).lstrip('u')
+      # don't show dicts of configurations
+      # usually too long  
+      elif logger_level != 'DEBUG' and isinstance(y, dict):
+        val = "..."
+      # for configs which didn't come
+      elif isinstance(y, UnknownConfiguration):
+        val = "[EMPTY]"
+      # correctly output 'mode' (as they are octal values like 0755)
+      elif y and x == 'mode':
+        try:
+          val = oct(y)
+        except:
+          val = repr(y)
+      else:
+        val = repr(y)
+      
+      
+      arguments_str += "'{0}': {1}, ".format(x, val)
+      
+    if arguments_str:  
+      arguments_str = arguments_str[:-2]
+    
+    return "{0} {{{1}}}".format(resource, arguments_str)
\ No newline at end of file


Mime
View raw message