ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nc...@apache.org
Subject [15/49] ambari git commit: AMBARI-19098. HDP 3.0 TP - create Service Advisor for YARN/MR (alejandro)
Date Fri, 27 Jan 2017 18:17:26 GMT
http://git-wip-us.apache.org/repos/asf/ambari/blob/539a4149/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
index 55f3d30..af4539d 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
@@ -197,7 +197,7 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
     if "referenceNodeManagerHost" in clusterData:
       nodemanagerMinRam = min(clusterData["referenceNodeManagerHost"]["total_mem"]/1024, nodemanagerMinRam)
 
-    callContext = getCallContext(services)
+    callContext = self.getCallContext(services)
     putYarnProperty('yarn.nodemanager.resource.memory-mb', int(round(min(clusterData['containers'] * clusterData['ramPerContainer'], nodemanagerMinRam))))
     # read from the supplied config
     #if 'recommendConfigurations' != callContext and \
@@ -248,7 +248,6 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
             putYarnPropertyAttribute("yarn.timeline-service.http-authentication.proxyuser.{0}.hosts".format(old_ambari_user), 'delete', 'true')
             putYarnPropertyAttribute("yarn.timeline-service.http-authentication.proxyuser.{0}.groups".format(old_ambari_user), 'delete', 'true')
 
-
   def recommendMapReduce2Configurations(self, configurations, clusterData, services, hosts):
     putMapredProperty = self.putProperty(configurations, "mapred-site", services)
     putMapredProperty('yarn.app.mapreduce.am.resource.mb', int(clusterData['amMemory']))
@@ -269,27 +268,6 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
     if mr_queue is not None:
       putMapredProperty("mapreduce.job.queuename", mr_queue)
 
-  def getAmbariUser(self, services):
-    ambari_user = services['ambari-server-properties']['ambari-server.user']
-    if "cluster-env" in services["configurations"] \
-          and "ambari_principal_name" in services["configurations"]["cluster-env"]["properties"] \
-                and "security_enabled" in services["configurations"]["cluster-env"]["properties"] \
-                    and services["configurations"]["cluster-env"]["properties"]["security_enabled"].lower() == "true":
-      ambari_user = services["configurations"]["cluster-env"]["properties"]["ambari_principal_name"]
-      ambari_user = ambari_user.split('@')[0]
-    return ambari_user
-
-  def getOldAmbariUser(self, services):
-    ambari_user = None
-    if "cluster-env" in services["configurations"]:
-      if "security_enabled" in services["configurations"]["cluster-env"]["properties"] \
-              and services["configurations"]["cluster-env"]["properties"]["security_enabled"].lower() == "true":
-         ambari_user = services['ambari-server-properties']['ambari-server.user']
-      elif "ambari_principal_name" in services["configurations"]["cluster-env"]["properties"]:
-         ambari_user = services["configurations"]["cluster-env"]["properties"]["ambari_principal_name"]
-         ambari_user = ambari_user.split('@')[0]
-    return ambari_user
-
   def getAmbariProxyUsersForHDFSValidationItems(self, properties, services):
     validationItems = []
     servicesList = self.get_services_list(services)
@@ -828,148 +806,6 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
       zookeeper_port = services['configurations']['zoo.cfg']['properties']['clientPort']
     return zookeeper_port
 
-  def getConfigurationClusterSummary(self, servicesList, hosts, components, services):
-
-    hBaseInstalled = False
-    if 'HBASE' in servicesList:
-      hBaseInstalled = True
-
-    cluster = {
-      "cpu": 0,
-      "disk": 0,
-      "ram": 0,
-      "hBaseInstalled": hBaseInstalled,
-      "components": components
-    }
-
-    if len(hosts["items"]) > 0:
-      nodeManagerHosts = self.getHostsWithComponent("YARN", "NODEMANAGER", services, hosts)
-      # NodeManager host with least memory is generally used in calculations as it will work in larger hosts.
-      if nodeManagerHosts is not None and len(nodeManagerHosts) > 0:
-        nodeManagerHost = nodeManagerHosts[0];
-        for nmHost in nodeManagerHosts:
-          if nmHost["Hosts"]["total_mem"] < nodeManagerHost["Hosts"]["total_mem"]:
-            nodeManagerHost = nmHost
-        host = nodeManagerHost["Hosts"]
-        cluster["referenceNodeManagerHost"] = host
-      else:
-        host = hosts["items"][0]["Hosts"]
-      cluster["referenceHost"] = host
-      cluster["cpu"] = host["cpu_count"]
-      cluster["disk"] = len(host["disk_info"])
-      cluster["ram"] = int(host["total_mem"] / (1024 * 1024))
-
-    ramRecommendations = [
-      {"os":1, "hbase":1},
-      {"os":2, "hbase":1},
-      {"os":2, "hbase":2},
-      {"os":4, "hbase":4},
-      {"os":6, "hbase":8},
-      {"os":8, "hbase":8},
-      {"os":8, "hbase":8},
-      {"os":12, "hbase":16},
-      {"os":24, "hbase":24},
-      {"os":32, "hbase":32},
-      {"os":64, "hbase":32}
-    ]
-    index = {
-      cluster["ram"] <= 4: 0,
-      4 < cluster["ram"] <= 8: 1,
-      8 < cluster["ram"] <= 16: 2,
-      16 < cluster["ram"] <= 24: 3,
-      24 < cluster["ram"] <= 48: 4,
-      48 < cluster["ram"] <= 64: 5,
-      64 < cluster["ram"] <= 72: 6,
-      72 < cluster["ram"] <= 96: 7,
-      96 < cluster["ram"] <= 128: 8,
-      128 < cluster["ram"] <= 256: 9,
-      256 < cluster["ram"]: 10
-    }[1]
-
-
-    cluster["reservedRam"] = ramRecommendations[index]["os"]
-    cluster["hbaseRam"] = ramRecommendations[index]["hbase"]
-
-
-    cluster["minContainerSize"] = {
-      cluster["ram"] <= 3: 128,
-      3 < cluster["ram"] <= 4: 256,
-      4 < cluster["ram"] <= 8: 512,
-      8 < cluster["ram"] <= 24: 1024,
-      24 < cluster["ram"]: 2048
-    }[1]
-
-    totalAvailableRam = cluster["ram"] - cluster["reservedRam"]
-    if cluster["hBaseInstalled"]:
-      totalAvailableRam -= cluster["hbaseRam"]
-    cluster["totalAvailableRam"] = max(512, totalAvailableRam * 1024)
-    Logger.info("Memory for YARN apps - cluster[totalAvailableRam]: " + str(cluster["totalAvailableRam"]))
-
-    suggestedMinContainerRam = 1024   # new smaller value for YARN min container
-    callContext = getCallContext(services)
-
-    operation = getUserOperationContext(services, DefaultStackAdvisor.OPERATION)
-    if operation:
-      Logger.info("user operation context : " + str(operation))
-
-    if services:  # its never None but some unit tests pass it as None
-      # If min container value is changed (user is changing it)
-      # if its a validation call - just used what ever value is set
-      # If its not a cluster create or add yarn service (TBD)
-      if (getOldValue(self, services, "yarn-site", "yarn.scheduler.minimum-allocation-mb") or \
-              'recommendConfigurations' != callContext) and operation != DefaultStackAdvisor.CLUSTER_CREATE_OPERATION:
-        '''yarn.scheduler.minimum-allocation-mb has changed - then pick this value up'''
-        if "yarn-site" in services["configurations"] and \
-                "yarn.scheduler.minimum-allocation-mb" in services["configurations"]["yarn-site"]["properties"] and \
-                str(services["configurations"]["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"]).isdigit():
-          Logger.info("Using user provided yarn.scheduler.minimum-allocation-mb = " +
-                      str(services["configurations"]["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"]))
-          cluster["yarnMinContainerSize"] = int(services["configurations"]["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"])
-          Logger.info("Minimum ram per container due to user input - cluster[yarnMinContainerSize]: " + str(cluster["yarnMinContainerSize"]))
-          if cluster["yarnMinContainerSize"] > cluster["totalAvailableRam"]:
-            cluster["yarnMinContainerSize"] = cluster["totalAvailableRam"]
-            Logger.info("Minimum ram per container after checking against limit - cluster[yarnMinContainerSize]: " + str(cluster["yarnMinContainerSize"]))
-            pass
-          cluster["minContainerSize"] = cluster["yarnMinContainerSize"]    # set to what user has suggested as YARN min container size
-          suggestedMinContainerRam = cluster["yarnMinContainerSize"]
-          pass
-        pass
-      pass
-
-
-    '''containers = max(3, min (2*cores,min (1.8*DISKS,(Total available RAM) / MIN_CONTAINER_SIZE))))'''
-    cluster["containers"] = int(round(max(3,
-                                min(2 * cluster["cpu"],
-                                    min(ceil(1.8 * cluster["disk"]),
-                                            cluster["totalAvailableRam"] / cluster["minContainerSize"])))))
-    Logger.info("Containers per node - cluster[containers]: " + str(cluster["containers"]))
-
-    if cluster["containers"] * cluster["minContainerSize"] > cluster["totalAvailableRam"]:
-      cluster["containers"] = ceil(cluster["totalAvailableRam"] / cluster["minContainerSize"])
-      Logger.info("Modified number of containers based on provided value for yarn.scheduler.minimum-allocation-mb")
-      pass
-
-    cluster["ramPerContainer"] = int(abs(cluster["totalAvailableRam"] / cluster["containers"]))
-    cluster["yarnMinContainerSize"] = min(suggestedMinContainerRam, cluster["ramPerContainer"])
-    Logger.info("Ram per containers before normalization - cluster[ramPerContainer]: " + str(cluster["ramPerContainer"]))
-
-    '''If greater than cluster["yarnMinContainerSize"], value will be in multiples of cluster["yarnMinContainerSize"]'''
-    if cluster["ramPerContainer"] > cluster["yarnMinContainerSize"]:
-      cluster["ramPerContainer"] = int(cluster["ramPerContainer"] / cluster["yarnMinContainerSize"]) * cluster["yarnMinContainerSize"]
-
-
-    cluster["mapMemory"] = int(cluster["ramPerContainer"])
-    cluster["reduceMemory"] = cluster["ramPerContainer"]
-    cluster["amMemory"] = max(cluster["mapMemory"], cluster["reduceMemory"])
-
-    Logger.info("Min container size - cluster[yarnMinContainerSize]: " + str(cluster["yarnMinContainerSize"]))
-    Logger.info("Available memory for map - cluster[mapMemory]: " + str(cluster["mapMemory"]))
-    Logger.info("Available memory for reduce - cluster[reduceMemory]: " + str(cluster["reduceMemory"]))
-    Logger.info("Available memory for am - cluster[amMemory]: " + str(cluster["amMemory"]))
-
-
-    return cluster
-
   def getServiceConfigurationValidators(self):
     return {
       "HDFS": { "hdfs-site": self.validateHDFSConfigurations,
@@ -1399,61 +1235,6 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
     mountPoints.append("/")
     return mountPoints
 
-  # TODO, move to YARN Service Advisor
-  def validatorYarnQueue(self, properties, recommendedDefaults, propertyName, services):
-    if propertyName not in properties:
-      return self.getErrorItem("Value should be set")
-
-    capacity_scheduler_properties, _ = self.getCapacitySchedulerProperties(services)
-    leaf_queue_names = self.getAllYarnLeafQueues(capacity_scheduler_properties)
-    queue_name = properties[propertyName]
-
-    if len(leaf_queue_names) == 0:
-      return None
-    elif queue_name not in leaf_queue_names:
-      return self.getErrorItem("Queue is not exist or not corresponds to existing YARN leaf queue")
-
-    return None
-
-  # TODO, move to YARN Service Advisor
-  def recommendYarnQueue(self, services, catalog_name=None, queue_property=None):
-    old_queue_name = None
-
-    if services and 'configurations' in services:
-        configurations = services["configurations"]
-        if catalog_name in configurations and queue_property in configurations[catalog_name]["properties"]:
-          old_queue_name = configurations[catalog_name]["properties"][queue_property]
-
-        capacity_scheduler_properties, _ = self.getCapacitySchedulerProperties(services)
-        leaf_queues = sorted(self.getAllYarnLeafQueues(capacity_scheduler_properties))
-
-        if leaf_queues and (old_queue_name is None or old_queue_name not in leaf_queues):
-          return leaf_queues.pop()
-        elif old_queue_name and old_queue_name in leaf_queues:
-          return None
-
-    return "default"
-
-  def validateXmxValue(self, properties, recommendedDefaults, propertyName):
-    if not propertyName in properties:
-      return self.getErrorItem("Value should be set")
-    value = properties[propertyName]
-    defaultValue = recommendedDefaults[propertyName]
-    if defaultValue is None:
-      return self.getErrorItem("Config's default value can't be null or undefined")
-    if not self.checkXmxValueFormat(value) and self.checkXmxValueFormat(defaultValue):
-      # Xmx is in the default-value but not the value, should be an error
-      return self.getErrorItem('Invalid value format')
-    if not self.checkXmxValueFormat(defaultValue):
-      # if default value does not contain Xmx, then there is no point in validating existing value
-      return None
-    valueInt = self.formatXmxSizeToBytes(self.getXmxSize(value))
-    defaultValueXmx = self.getXmxSize(defaultValue)
-    defaultValueInt = self.formatXmxSizeToBytes(defaultValueXmx)
-    if valueInt < defaultValueInt:
-      return self.getWarnItem("Value is less than the recommended default of -Xmx" + defaultValueXmx)
-    return None
-
   def validateMapReduce2Configurations(self, properties, recommendedDefaults, configurations, services, hosts):
     validationItems = [ {"config-name": 'mapreduce.map.java.opts', "item": self.validateXmxValue(properties, recommendedDefaults, 'mapreduce.map.java.opts')},
                         {"config-name": 'mapreduce.reduce.java.opts', "item": self.validateXmxValue(properties, recommendedDefaults, 'mapreduce.reduce.java.opts')},
@@ -1568,40 +1349,6 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
         return dataNodeHosts
     return []
 
-  def get_system_min_uid(self):
-    login_defs = '/etc/login.defs'
-    uid_min_tag = 'UID_MIN'
-    comment_tag = '#'
-    uid_min = uid_default = '1000'
-    uid = None
-
-    if os.path.exists(login_defs):
-      with open(login_defs, 'r') as f:
-        data = f.read().split('\n')
-        # look for uid_min_tag in file
-        uid = filter(lambda x: uid_min_tag in x, data)
-        # filter all lines, where uid_min_tag was found in comments
-        uid = filter(lambda x: x.find(comment_tag) > x.find(uid_min_tag) or x.find(comment_tag) == -1, uid)
-
-      if uid is not None and len(uid) > 0:
-        uid = uid[0]
-        comment = uid.find(comment_tag)
-        tag = uid.find(uid_min_tag)
-        if comment == -1:
-          uid_tag = tag + len(uid_min_tag)
-          uid_min = uid[uid_tag:].strip()
-        elif comment > tag:
-          uid_tag = tag + len(uid_min_tag)
-          uid_min = uid[uid_tag:comment].strip()
-
-    # check result for value
-    try:
-      int(uid_min)
-    except ValueError:
-      return uid_default
-
-    return uid_min
-
   def mergeValidators(self, parentValidators, childValidators):
     for service, configsDict in childValidators.iteritems():
       if service not in parentValidators:
@@ -1622,77 +1369,6 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
         return False
     return True
 
-  """
-  Returns the dictionary of configs for 'capacity-scheduler'.
-  """
-  def getCapacitySchedulerProperties(self, services):
-    capacity_scheduler_properties = dict()
-    received_as_key_value_pair = True
-    if "capacity-scheduler" in services['configurations']:
-      if "capacity-scheduler" in services['configurations']["capacity-scheduler"]["properties"]:
-        cap_sched_props_as_str = services['configurations']["capacity-scheduler"]["properties"]["capacity-scheduler"]
-        if cap_sched_props_as_str:
-          cap_sched_props_as_str = str(cap_sched_props_as_str).split('\n')
-          if len(cap_sched_props_as_str) > 0 and cap_sched_props_as_str[0] != 'null':
-            # Received confgs as one "\n" separated string
-            for property in cap_sched_props_as_str:
-              key, sep, value = property.partition("=")
-              capacity_scheduler_properties[key] = value
-            Logger.info("'capacity-scheduler' configs is passed-in as a single '\\n' separated string. "
-                        "count(services['configurations']['capacity-scheduler']['properties']['capacity-scheduler']) = "
-                        "{0}".format(len(capacity_scheduler_properties)))
-            received_as_key_value_pair = False
-          else:
-            Logger.info("Passed-in services['configurations']['capacity-scheduler']['properties']['capacity-scheduler'] is 'null'.")
-        else:
-          Logger.info("'capacity-schdeuler' configs not passed-in as single '\\n' string in "
-                      "services['configurations']['capacity-scheduler']['properties']['capacity-scheduler'].")
-      if not capacity_scheduler_properties:
-        # Received configs as a dictionary (Generally on 1st invocation).
-        capacity_scheduler_properties = services['configurations']["capacity-scheduler"]["properties"]
-        Logger.info("'capacity-scheduler' configs is passed-in as a dictionary. "
-                    "count(services['configurations']['capacity-scheduler']['properties']) = {0}".format(len(capacity_scheduler_properties)))
-    else:
-      Logger.error("Couldn't retrieve 'capacity-scheduler' from services.")
-
-    Logger.info("Retrieved 'capacity-scheduler' received as dictionary : '{0}'. configs : {1}" \
-                .format(received_as_key_value_pair, capacity_scheduler_properties.items()))
-    return capacity_scheduler_properties, received_as_key_value_pair
-
-  """
-  Gets all YARN leaf queues.
-  """
-  def getAllYarnLeafQueues(self, capacitySchedulerProperties):
-    config_list = capacitySchedulerProperties.keys()
-    yarn_queues = None
-    leafQueueNames = set()
-    if 'yarn.scheduler.capacity.root.queues' in config_list:
-      yarn_queues = capacitySchedulerProperties.get('yarn.scheduler.capacity.root.queues')
-
-    if yarn_queues:
-      toProcessQueues = yarn_queues.split(",")
-      while len(toProcessQueues) > 0:
-        queue = toProcessQueues.pop()
-        queueKey = "yarn.scheduler.capacity.root." + queue + ".queues"
-        if queueKey in capacitySchedulerProperties:
-          # If parent queue, add children
-          subQueues = capacitySchedulerProperties[queueKey].split(",")
-          for subQueue in subQueues:
-            toProcessQueues.append(queue + "." + subQueue)
-        else:
-          # Leaf queues
-          # We only take the leaf queue name instead of the complete path, as leaf queue names are unique in YARN.
-          # Eg: If YARN queues are like :
-          #     (1). 'yarn.scheduler.capacity.root.a1.b1.c1.d1',
-          #     (2). 'yarn.scheduler.capacity.root.a1.b1.c2',
-          #     (3). 'yarn.scheduler.capacity.root.default,
-          # Added leaf queues names are as : d1, c2 and default for the 3 leaf queues.
-          leafQueuePathSplits = queue.split(".")
-          if leafQueuePathSplits > 0:
-            leafQueueName = leafQueuePathSplits[-1]
-            leafQueueNames.add(leafQueueName)
-    return leafQueueNames
-
   def get_service_component_meta(self, service, component, services):
     """
     Function retrieve service component meta information as dict from services.json
@@ -1773,23 +1449,6 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
     service_meta = service_meta[0]
     return [item[__stack_service_components]["component_name"] for item in service_meta["components"]]
 
-def getCallContext(services):
-  if services:
-    if DefaultStackAdvisor.ADVISOR_CONTEXT in services:
-      Logger.info("call type context : " + str(services[DefaultStackAdvisor.ADVISOR_CONTEXT]))
-      return services[DefaultStackAdvisor.ADVISOR_CONTEXT][DefaultStackAdvisor.CALL_TYPE]
-  return ""
-
-
-def getOldValue(self, services, configType, propertyName):
-  if services:
-    if 'changed-configurations' in services.keys():
-      changedConfigs = services["changed-configurations"]
-      for changedConfig in changedConfigs:
-        if changedConfig["type"] == configType and changedConfig["name"]== propertyName and "old_value" in changedConfig:
-          return changedConfig["old_value"]
-  return None
-
 def getUserOperationContext(services, contextName):
   if services:
     if 'user-context' in services.keys():

http://git-wip-us.apache.org/repos/asf/ambari/blob/539a4149/ambari-server/src/main/resources/stacks/HDP/2.1/services/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.1/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.1/services/stack_advisor.py
index 4822732..81c9b72 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.1/services/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.1/services/stack_advisor.py
@@ -17,6 +17,9 @@ See the License for the specific language governing permissions and
 limitations under the License.
 """
 
+# Python imports
+import socket
+
 # Local Imports
 from resource_management.core.logger import Logger
 
@@ -118,8 +121,8 @@ class HDP21StackAdvisor(HDP206StackAdvisor):
   def recommendOozieConfigurations(self, configurations, clusterData, services, hosts):
     super(HDP21StackAdvisor, self).recommendOozieConfigurations(configurations, clusterData, services, hosts)
 
-    oozieSiteProperties = getSiteProperties(services['configurations'], 'oozie-site')
-    oozieEnvProperties = getSiteProperties(services['configurations'], 'oozie-env')
+    oozieSiteProperties = self.getSiteProperties(services['configurations'], 'oozie-site')
+    oozieEnvProperties = self.getSiteProperties(services['configurations'], 'oozie-env')
     putOozieProperty = self.putProperty(configurations, "oozie-site", services)
     putOozieEnvProperty = self.putProperty(configurations, "oozie-env", services)
 
@@ -131,7 +134,7 @@ class HDP21StackAdvisor(HDP206StackAdvisor):
         if falconUser is not None:
           putOozieSiteProperty("oozie.service.ProxyUserService.proxyuser.{0}.groups".format(falconUser) , "*")
           putOozieSiteProperty("oozie.service.ProxyUserService.proxyuser.{0}.hosts".format(falconUser) , "*")
-        falconUserOldValue = getOldValue(self, services, "falcon-env", "falcon_user")
+        falconUserOldValue = self.getOldValue(services, "falcon-env", "falcon_user")
         if falconUserOldValue is not None:
           if 'forced-configurations' not in services:
             services["forced-configurations"] = []
@@ -155,7 +158,7 @@ class HDP21StackAdvisor(HDP206StackAdvisor):
       oozieServerHost = self.getHostWithComponent('OOZIE', 'OOZIE_SERVER', services, hosts)
       oozieDBConnectionURL = oozieSiteProperties['oozie.service.JPAService.jdbc.url']
       protocol = self.getProtocol(oozieEnvProperties['oozie_database'])
-      oldSchemaName = getOldValue(self, services, "oozie-site", "oozie.db.schema.name")
+      oldSchemaName = self.getOldValue(services, "oozie-site", "oozie.db.schema.name")
       # under these if constructions we are checking if oozie server hostname available,
       # if schema name was changed or if protocol according to current db type differs with protocol in db connection url(db type was changed)
       if oozieServerHost is not None:
@@ -164,8 +167,8 @@ class HDP21StackAdvisor(HDP206StackAdvisor):
           putOozieProperty('oozie.service.JPAService.jdbc.url', dbConnection)
 
   def recommendHiveConfigurations(self, configurations, clusterData, services, hosts):
-    hiveSiteProperties = getSiteProperties(services['configurations'], 'hive-site')
-    hiveEnvProperties = getSiteProperties(services['configurations'], 'hive-env')
+    hiveSiteProperties = self.getSiteProperties(services['configurations'], 'hive-site')
+    hiveEnvProperties = self.getSiteProperties(services['configurations'], 'hive-env')
     containerSize = clusterData['mapMemory'] if clusterData['mapMemory'] > 2048 else int(clusterData['reduceMemory'])
     containerSize = min(clusterData['containers'] * clusterData['ramPerContainer'], containerSize)
     container_size_bytes = int(containerSize)*1024*1024
@@ -185,8 +188,8 @@ class HDP21StackAdvisor(HDP206StackAdvisor):
       hiveServerHost = self.getHostWithComponent('HIVE', 'HIVE_SERVER', services, hosts)
       hiveDBConnectionURL = hiveSiteProperties['javax.jdo.option.ConnectionURL']
       protocol = self.getProtocol(hiveEnvProperties['hive_database'])
-      oldSchemaName = getOldValue(self, services, "hive-site", "ambari.hive.db.schema.name")
-      oldDBType = getOldValue(self, services, "hive-env", "hive_database")
+      oldSchemaName = self.getOldValue(services, "hive-site", "ambari.hive.db.schema.name")
+      oldDBType = self.getOldValue(services, "hive-env", "hive_database")
       # under these if constructions we are checking if hive server hostname available,
       # if it's default db connection url with "localhost" or if schema name was changed or if db type was changed (only for db type change from default mysql to existing mysql)
       # or if protocol according to current db type differs with protocol in db connection url(other db types changes)

http://git-wip-us.apache.org/repos/asf/ambari/blob/539a4149/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py
index 0014b7c..cba611c 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py
@@ -131,7 +131,6 @@ class HDP22StackAdvisor(HDP21StackAdvisor):
       if recommended_spark_queue is not None:
         putSparkThriftSparkConf("spark.yarn.queue", recommended_spark_queue)
 
-
   def recommendYARNConfigurations(self, configurations, clusterData, services, hosts):
     super(HDP22StackAdvisor, self).recommendYARNConfigurations(configurations, clusterData, services, hosts)
     putYarnProperty = self.putProperty(configurations, "yarn-site", services)

http://git-wip-us.apache.org/repos/asf/ambari/blob/539a4149/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py
index 1425abc..30cbc7c 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py
@@ -423,7 +423,7 @@ class HDP23StackAdvisor(HDP22StackAdvisor):
     servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
     putRangerKmsDbksProperty = self.putProperty(configurations, "dbks-site", services)
     putRangerKmsProperty = self.putProperty(configurations, "kms-properties", services)
-    kmsEnvProperties = getSiteProperties(services['configurations'], 'kms-env')
+    kmsEnvProperties = self.getSiteProperties(services['configurations'], 'kms-env')
     putCoreSiteProperty = self.putProperty(configurations, "core-site", services)
     putCoreSitePropertyAttribute = self.putPropertyAttribute(configurations, "core-site")
     putRangerKmsAuditProperty = self.putProperty(configurations, "ranger-kms-audit", services)
@@ -459,7 +459,7 @@ class HDP23StackAdvisor(HDP22StackAdvisor):
 
     if kmsEnvProperties and self.checkSiteProperties(kmsEnvProperties, 'kms_user') and 'KERBEROS' in servicesList:
       kmsUser = kmsEnvProperties['kms_user']
-      kmsUserOld = getOldValue(self, services, 'kms-env', 'kms_user')
+      kmsUserOld = self.getOldValue(services, 'kms-env', 'kms_user')
       self.put_proxyuser_value(kmsUser, '*', is_groups=True, services=services, configurations=configurations, put_function=putCoreSiteProperty)
       if kmsUserOld is not None and kmsUser != kmsUserOld:
         putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.groups".format(kmsUserOld), 'delete', 'true')
@@ -517,7 +517,7 @@ class HDP23StackAdvisor(HDP22StackAdvisor):
       if service in servicesList:
         if config_type in services['configurations'] and property_name in services['configurations'][config_type]['properties']:
           service_user = services['configurations'][config_type]['properties'][property_name]
-          service_old_user = getOldValue(self, services, config_type, property_name)
+          service_old_user = self.getOldValue(services, config_type, property_name)
 
           if 'groups' in proxy_category:
             putRangerKmsSiteProperty('hadoop.kms.proxyuser.{0}.groups'.format(service_user), '*')
@@ -913,7 +913,7 @@ class HDP23StackAdvisor(HDP22StackAdvisor):
     # other config (core-site). That's why we are using another heuristics here
     hdfs_site = properties
     validationItems = [] #Adding Ranger Plugin logic here
-    ranger_plugin_properties = getSiteProperties(configurations, "ranger-hdfs-plugin-properties")
+    ranger_plugin_properties = self.getSiteProperties(configurations, "ranger-hdfs-plugin-properties")
     ranger_plugin_enabled = ranger_plugin_properties['ranger-hdfs-plugin-enabled'] if ranger_plugin_properties else 'No'
     servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
     if ("RANGER" in servicesList) and (ranger_plugin_enabled.lower() == 'Yes'.lower()):
@@ -931,7 +931,7 @@ class HDP23StackAdvisor(HDP22StackAdvisor):
   def validateHiveConfigurations(self, properties, recommendedDefaults, configurations, services, hosts):
     parentValidationProblems = super(HDP23StackAdvisor, self).validateHiveConfigurations(properties, recommendedDefaults, configurations, services, hosts)
     hive_site = properties
-    hive_env_properties = getSiteProperties(configurations, "hive-env")
+    hive_env_properties = self.getSiteProperties(configurations, "hive-env")
     validationItems = []
     sqla_db_used = "hive_database" in hive_env_properties and \
                    hive_env_properties['hive_database'] == 'Existing SQL Anywhere Database'
@@ -958,8 +958,8 @@ class HDP23StackAdvisor(HDP22StackAdvisor):
     hive_server2 = properties
     validationItems = []
     #Adding Ranger Plugin logic here
-    ranger_plugin_properties = getSiteProperties(configurations, "ranger-hive-plugin-properties")
-    hive_env_properties = getSiteProperties(configurations, "hive-env")
+    ranger_plugin_properties = self.getSiteProperties(configurations, "ranger-hive-plugin-properties")
+    hive_env_properties = self.getSiteProperties(configurations, "hive-env")
     ranger_plugin_enabled = 'hive_security_authorization' in hive_env_properties and hive_env_properties['hive_security_authorization'].lower() == 'ranger'
     servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
     ##Add stack validations only if Ranger is enabled.
@@ -1030,7 +1030,7 @@ class HDP23StackAdvisor(HDP22StackAdvisor):
     validationItems = []
 
     #Adding Ranger Plugin logic here
-    ranger_plugin_properties = getSiteProperties(configurations, "ranger-hbase-plugin-properties")
+    ranger_plugin_properties = self.getSiteProperties(configurations, "ranger-hbase-plugin-properties")
     ranger_plugin_enabled = ranger_plugin_properties['ranger-hbase-plugin-enabled'] if ranger_plugin_properties else 'No'
     prop_name = 'hbase.security.authorization'
     prop_val = "true"
@@ -1071,7 +1071,7 @@ class HDP23StackAdvisor(HDP22StackAdvisor):
     servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
  
     #Adding Ranger Plugin logic here
-    ranger_plugin_properties = getSiteProperties(configurations, "ranger-kafka-plugin-properties")
+    ranger_plugin_properties = self.getSiteProperties(configurations, "ranger-kafka-plugin-properties")
     ranger_plugin_enabled = ranger_plugin_properties['ranger-kafka-plugin-enabled'] if ranger_plugin_properties else 'No'
     prop_name = 'authorizer.class.name'
     prop_val = "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer"

http://git-wip-us.apache.org/repos/asf/ambari/blob/539a4149/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py
index 70da914..8e377da 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.5/services/stack_advisor.py
@@ -439,9 +439,9 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
     if spark_queue is not None:
       putSparkProperty("spark.yarn.queue", spark_queue)
 
-    spart_thrift_queue = self.recommendYarnQueue(services, "spark2-thrift-sparkconf", "spark.yarn.queue")
-    if spart_thrift_queue is not None:
-      putSparkThriftSparkConf("spark.yarn.queue", spart_thrift_queue)
+    spark_thrift_queue = self.recommendYarnQueue(services, "spark2-thrift-sparkconf", "spark.yarn.queue")
+    if spark_thrift_queue is not None:
+      putSparkThriftSparkConf("spark.yarn.queue", spark_thrift_queue)
 
   def recommendStormConfigurations(self, configurations, clusterData, services, hosts):
     super(HDP25StackAdvisor, self).recommendStormConfigurations(configurations, clusterData, services, hosts)
@@ -731,10 +731,10 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
     timeline_plugin_classes_values = []
     timeline_plugin_classpath_values = []
 
-    if self.__isServiceDeployed(services, "TEZ"):
+    if self.isServiceDeployed(services, "TEZ"):
       timeline_plugin_classes_values.append('org.apache.tez.dag.history.logging.ats.TimelineCachePluginImpl')
 
-    if self.__isServiceDeployed(services, "SPARK"):
+    if self.isServiceDeployed(services, "SPARK"):
       timeline_plugin_classes_values.append('org.apache.spark.deploy.history.yarn.plugin.SparkATSPlugin')
       timeline_plugin_classpath_values.append(stack_root + "/${hdp.version}/spark/hdpLib/*")
 
@@ -1217,39 +1217,6 @@ class HDP25StackAdvisor(HDP24StackAdvisor):
     putHiveInteractiveEnvProperty('llap_heap_size', 0)
     putHiveInteractiveEnvProperty('slider_am_container_mb', slider_am_container_size)
 
-  def isConfigPropertiesChanged(self, services, config_type, config_names, all_exists=True):
-    """
-    Checks for the presence of passed-in configuration properties in a given config, if they are changed.
-    Reads from services["changed-configurations"].
-
-    :argument services: Configuration information for the cluster
-    :argument config_type: Type of the configuration
-    :argument config_names: Set of configuration properties to be checked if they are changed.
-    :argument all_exists: If True: returns True only if all properties mentioned in 'config_names_set' we found
-                            in services["changed-configurations"].
-                            Otherwise, returns False.
-                          If False: return True, if any of the properties mentioned in config_names_set we found in
-                            services["changed-configurations"].
-                            Otherwise, returns False.
-
-
-    :type services: dict
-    :type config_type: str
-    :type config_names: list|set
-    :type all_exists: bool
-    """
-    changedConfigs = services["changed-configurations"]
-    changed_config_names_set = set([changedConfig['name'] for changedConfig in changedConfigs if changedConfig['type'] == config_type])
-    config_names_set = set(config_names)
-
-    configs_intersection = changed_config_names_set & config_names_set
-    if all_exists and configs_intersection == config_names_set:
-        return True
-    elif not all_exists and len(configs_intersection) > 0:
-        return True
-
-    return False
-
   def get_num_llap_nodes(self, services, configurations):
     """
     Returns current value of number of LLAP nodes in cluster (num_llap_nodes)
@@ -1998,9 +1965,5 @@ yarn.scheduler.capacity.root.{0}.maximum-am-resource-percent=1""".format(llap_qu
 
     return self.toConfigurationValidationProblems(validationItems, "ranger-tagsync-site")
 
-  def __isServiceDeployed(self, services, serviceName):
-    servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
-    return serviceName in servicesList
-
   def isComponentUsingCardinalityForLayout(self, componentName):
     return super(HDP25StackAdvisor, self).isComponentUsingCardinalityForLayout (componentName) or  componentName in ['SPARK2_THRIFTSERVER', 'LIVY2_SERVER', 'LIVY_SERVER']

http://git-wip-us.apache.org/repos/asf/ambari/blob/539a4149/ambari-server/src/main/resources/stacks/HDP/3.0/role_command_order.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/3.0/role_command_order.json b/ambari-server/src/main/resources/stacks/HDP/3.0/role_command_order.json
index ee7a892..576910f 100644
--- a/ambari-server/src/main/resources/stacks/HDP/3.0/role_command_order.json
+++ b/ambari-server/src/main/resources/stacks/HDP/3.0/role_command_order.json
@@ -132,14 +132,37 @@
   },
   "_comment" : "Dependencies that are used when GLUSTERFS is not present in cluster",
   "optional_no_glusterfs": {
+    "AMBARI_METRICS_SERVICE_CHECK-SERVICE_CHECK": ["METRICS_COLLECTOR-START", "HDFS_SERVICE_CHECK-SERVICE_CHECK"],
     "APP_TIMELINE_SERVER-START": ["NAMENODE-START", "DATANODE-START"],
     "DATANODE-START" : ["RANGER_USERSYNC-START"],
-    "NAMENODE-START" : ["RANGER_USERSYNC-START"],
+    "DATANODE-STOP": ["RESOURCEMANAGER-STOP", "NODEMANAGER-STOP", "HISTORYSERVER-STOP", "HBASE_MASTER-STOP"],
     "FALCON_SERVER-START": ["NAMENODE-START", "DATANODE-START"],
     "FALCON_SERVICE_CHECK-SERVICE_CHECK": ["FALCON_SERVER-START"],
+    "HBASE_MASTER-START": ["NAMENODE-START", "DATANODE-START"],
+    "HDFS_SERVICE_CHECK-SERVICE_CHECK": ["NAMENODE-START", "DATANODE-START", "SECONDARY_NAMENODE-START"],
+    "HISTORYSERVER-START": ["NAMENODE-START", "DATANODE-START"],
+    "HISTORYSERVER-RESTART": ["NAMENODE-RESTART"],
+    "HIVE_SERVER-START": ["DATANODE-START"],
+    "MAPREDUCE2_SERVICE_CHECK-SERVICE_CHECK": ["NODEMANAGER-START", "RESOURCEMANAGER-START", "HISTORYSERVER-START", "YARN_SERVICE_CHECK-SERVICE_CHECK"],
+    "METRICS_COLLECTOR-START": ["NAMENODE-START", "DATANODE-START", "SECONDARY_NAMENODE-START", "ZOOKEEPER_SERVER-START"],
+    "METRICS_COLLECTOR-STOP": ["METRICS_GRAFANA-STOP"],
+    "METRICS_GRAFANA-START": ["METRICS_COLLECTOR-START"],
+    "NAMENODE-START" : ["RANGER_USERSYNC-START"],
+    "NAMENODE-STOP": ["RESOURCEMANAGER-STOP", "NODEMANAGER-STOP", "HISTORYSERVER-STOP", "HBASE_MASTER-STOP", "METRICS_COLLECTOR-STOP"],
+    "NODEMANAGER-RESTART": ["NAMENODE-RESTART"],
+    "NODEMANAGER-START": ["NAMENODE-START", "DATANODE-START", "RESOURCEMANAGER-START"],
+    "OOZIE_SERVER-RESTART": ["NAMENODE-RESTART"],
+    "PIG_SERVICE_CHECK-SERVICE_CHECK": ["RESOURCEMANAGER-START", "NODEMANAGER-START"],
+    "RESOURCEMANAGER-RESTART": ["NAMENODE-RESTART"],
+    "RESOURCEMANAGER-START": ["NAMENODE-START", "DATANODE-START"],
+    "RESOURCEMANAGER_SERVICE_CHECK-SERVICE_CHECK": ["RESOURCEMANAGER-START"],
+    "SECONDARY_NAMENODE-RESTART": ["NAMENODE-RESTART"],
+    "SECONDARY_NAMENODE-START": ["NAMENODE-START"],
     "SPARK_JOBHISTORYSERVER-START" : ["NAMENODE-START"],
     "SPARK2_JOBHISTORYSERVER-START" : ["NAMENODE-START", "DATANODE-START"],
-    "SPARK2_THRIFTSERVER-START" : ["NAMENODE-START", "DATANODE-START", "HIVE_SERVER-START"]
+    "SPARK2_THRIFTSERVER-START" : ["NAMENODE-START", "DATANODE-START", "HIVE_SERVER-START"],
+    "WEBHCAT_SERVER-START": ["DATANODE-START"],
+    "YARN_SERVICE_CHECK-SERVICE_CHECK": ["NODEMANAGER-START", "RESOURCEMANAGER-START"]
   },
   "_comment" : "GLUSTERFS-specific dependencies",
   "optional_glusterfs": {

http://git-wip-us.apache.org/repos/asf/ambari/blob/539a4149/ambari-server/src/main/resources/stacks/HDP/3.0/services/YARN/metainfo.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/3.0/services/YARN/metainfo.xml b/ambari-server/src/main/resources/stacks/HDP/3.0/services/YARN/metainfo.xml
index 096f205..41fe13d 100644
--- a/ambari-server/src/main/resources/stacks/HDP/3.0/services/YARN/metainfo.xml
+++ b/ambari-server/src/main/resources/stacks/HDP/3.0/services/YARN/metainfo.xml
@@ -55,7 +55,8 @@
     <service>
       <name>MAPREDUCE2</name>
       <displayName>MapReduce2</displayName>
-      <version>2.7.1.3.0</version>
+      <version>3.0.0.3.0</version>
+      <extends>common-services/MAPREDUCE2/3.0.0.3.0</extends>
 
       <osSpecifics>
         <osSpecific>

http://git-wip-us.apache.org/repos/asf/ambari/blob/539a4149/ambari-server/src/main/resources/stacks/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/stack_advisor.py b/ambari-server/src/main/resources/stacks/stack_advisor.py
index 215e807..9eb3973 100644
--- a/ambari-server/src/main/resources/stacks/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/stack_advisor.py
@@ -1007,7 +1007,8 @@ class DefaultStackAdvisor(StackAdvisor):
 
 
     cluster["minContainerSize"] = {
-      cluster["ram"] <= 4: 256,
+      cluster["ram"] <= 3: 128,
+      3 < cluster["ram"] <= 4: 256,
       4 < cluster["ram"] <= 8: 512,
       8 < cluster["ram"] <= 24: 1024,
       24 < cluster["ram"]: 2048
@@ -1017,24 +1018,122 @@ class DefaultStackAdvisor(StackAdvisor):
     if cluster["hBaseInstalled"]:
       totalAvailableRam -= cluster["hbaseRam"]
     cluster["totalAvailableRam"] = max(512, totalAvailableRam * 1024)
+    Logger.info("Memory for YARN apps - cluster[totalAvailableRam]: " + str(cluster["totalAvailableRam"]))
+
+    suggestedMinContainerRam = 1024   # new smaller value for YARN min container
+    callContext = self.getCallContext(services)
+
+    operation = self.getUserOperationContext(services, DefaultStackAdvisor.OPERATION)
+    if operation:
+      Logger.info("user operation context : " + str(operation))
+
+    if services:  # its never None but some unit tests pass it as None
+      # If min container value is changed (user is changing it)
+      # if its a validation call - just used what ever value is set
+      # If its not a cluster create or add yarn service (TBD)
+      if (self.getOldValue(services, "yarn-site", "yarn.scheduler.minimum-allocation-mb") or \
+                'recommendConfigurations' != callContext) and operation != DefaultStackAdvisor.CLUSTER_CREATE_OPERATION:
+        '''yarn.scheduler.minimum-allocation-mb has changed - then pick this value up'''
+        if "yarn-site" in services["configurations"] and \
+                "yarn.scheduler.minimum-allocation-mb" in services["configurations"]["yarn-site"]["properties"] and \
+            str(services["configurations"]["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"]).isdigit():
+          Logger.info("Using user provided yarn.scheduler.minimum-allocation-mb = " +
+                      str(services["configurations"]["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"]))
+          cluster["yarnMinContainerSize"] = int(services["configurations"]["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"])
+          Logger.info("Minimum ram per container due to user input - cluster[yarnMinContainerSize]: " + str(cluster["yarnMinContainerSize"]))
+          if cluster["yarnMinContainerSize"] > cluster["totalAvailableRam"]:
+            cluster["yarnMinContainerSize"] = cluster["totalAvailableRam"]
+            Logger.info("Minimum ram per container after checking against limit - cluster[yarnMinContainerSize]: " + str(cluster["yarnMinContainerSize"]))
+            pass
+          cluster["minContainerSize"] = cluster["yarnMinContainerSize"]    # set to what user has suggested as YARN min container size
+          suggestedMinContainerRam = cluster["yarnMinContainerSize"]
+          pass
+        pass
+      pass
+
+
     '''containers = max(3, min (2*cores,min (1.8*DISKS,(Total available RAM) / MIN_CONTAINER_SIZE))))'''
-    cluster["containers"] = round(max(3,
-                                      min(2 * cluster["cpu"],
-                                          min(ceil(1.8 * cluster["disk"]),
-                                              cluster["totalAvailableRam"] / cluster["minContainerSize"]))))
+    cluster["containers"] = int(round(max(3,
+                                          min(2 * cluster["cpu"],
+                                              min(ceil(1.8 * cluster["disk"]),
+                                                  cluster["totalAvailableRam"] / cluster["minContainerSize"])))))
+    Logger.info("Containers per node - cluster[containers]: " + str(cluster["containers"]))
+
+    if cluster["containers"] * cluster["minContainerSize"] > cluster["totalAvailableRam"]:
+      cluster["containers"] = ceil(cluster["totalAvailableRam"] / cluster["minContainerSize"])
+      Logger.info("Modified number of containers based on provided value for yarn.scheduler.minimum-allocation-mb")
+      pass
+
+    cluster["ramPerContainer"] = int(abs(cluster["totalAvailableRam"] / cluster["containers"]))
+    cluster["yarnMinContainerSize"] = min(suggestedMinContainerRam, cluster["ramPerContainer"])
+    Logger.info("Ram per containers before normalization - cluster[ramPerContainer]: " + str(cluster["ramPerContainer"]))
+
+    '''If greater than cluster["yarnMinContainerSize"], value will be in multiples of cluster["yarnMinContainerSize"]'''
+    if cluster["ramPerContainer"] > cluster["yarnMinContainerSize"]:
+      cluster["ramPerContainer"] = int(cluster["ramPerContainer"] / cluster["yarnMinContainerSize"]) * cluster["yarnMinContainerSize"]
 
-    '''ramPerContainers = max(2GB, RAM - reservedRam - hBaseRam) / containers'''
-    cluster["ramPerContainer"] = abs(cluster["totalAvailableRam"] / cluster["containers"])
-    '''If greater than 1GB, value will be in multiples of 512.'''
-    if cluster["ramPerContainer"] > 1024:
-      cluster["ramPerContainer"] = int(cluster["ramPerContainer"] / 512) * 512
 
     cluster["mapMemory"] = int(cluster["ramPerContainer"])
     cluster["reduceMemory"] = cluster["ramPerContainer"]
     cluster["amMemory"] = max(cluster["mapMemory"], cluster["reduceMemory"])
 
+    Logger.info("Min container size - cluster[yarnMinContainerSize]: " + str(cluster["yarnMinContainerSize"]))
+    Logger.info("Available memory for map - cluster[mapMemory]: " + str(cluster["mapMemory"]))
+    Logger.info("Available memory for reduce - cluster[reduceMemory]: " + str(cluster["reduceMemory"]))
+    Logger.info("Available memory for am - cluster[amMemory]: " + str(cluster["amMemory"]))
+
+
     return cluster
 
+  def getCallContext(self, services):
+    if services:
+      if 'context' in services:
+        Logger.info("context : " + str (services['context']))
+        return services['context']['call_type']
+    return ""
+
+  def getUserOperationContext(self, services, contextName):
+    if services:
+      if 'user-context' in services.keys():
+        userContext = services["user-context"]
+        if contextName in userContext:
+          return userContext[contextName]
+    return None
+
+  def get_system_min_uid(self):
+    login_defs = '/etc/login.defs'
+    uid_min_tag = 'UID_MIN'
+    comment_tag = '#'
+    uid_min = uid_default = '1000'
+    uid = None
+
+    if os.path.exists(login_defs):
+      with open(login_defs, 'r') as f:
+        data = f.read().split('\n')
+        # look for uid_min_tag in file
+        uid = filter(lambda x: uid_min_tag in x, data)
+        # filter all lines, where uid_min_tag was found in comments
+        uid = filter(lambda x: x.find(comment_tag) > x.find(uid_min_tag) or x.find(comment_tag) == -1, uid)
+
+      if uid is not None and len(uid) > 0:
+        uid = uid[0]
+        comment = uid.find(comment_tag)
+        tag = uid.find(uid_min_tag)
+        if comment == -1:
+          uid_tag = tag + len(uid_min_tag)
+          uid_min = uid[uid_tag:].strip()
+        elif comment > tag:
+          uid_tag = tag + len(uid_min_tag)
+          uid_min = uid[uid_tag:comment].strip()
+
+    # check result for value
+    try:
+      int(uid_min)
+    except ValueError:
+      return uid_default
+
+    return uid_min
+
   def validateClusterConfigurations(self, configurations, services, hosts):
     validationItems = []
 
@@ -1347,6 +1446,10 @@ class DefaultStackAdvisor(StackAdvisor):
     else:
       return {"min": 1, "max": 1}
 
+  def isServiceDeployed(self, services, serviceName):
+    servicesList = [service["StackServices"]["service_name"] for service in services["services"]]
+    return serviceName in servicesList
+
   def getHostForComponent(self, component, hostsList):
     if len(hostsList) == 0:
       return None
@@ -1922,6 +2025,7 @@ class DefaultStackAdvisor(StackAdvisor):
 
     return [service["StackServices"]["service_name"] for service in services["services"]]
 
+  #region HDFS
   def getHadoopProxyUsersValidationItems(self, properties, services, hosts, configurations):
     validationItems = []
     users = self.getHadoopProxyUsers(services, hosts, configurations)
@@ -2107,7 +2211,9 @@ class DefaultStackAdvisor(StackAdvisor):
         ambari_user = services["configurations"]["cluster-env"]["properties"]["ambari_principal_name"]
         ambari_user = ambari_user.split('@')[0]
     return ambari_user
+  #endregion
 
+  #region Generic
   def put_proxyuser_value(self, user_name, value, is_groups=False, services=None, configurations=None, put_function=None):
     is_wildcard_value, current_value = self.get_data_for_proxyuser(user_name, services, configurations, is_groups)
     result_value = "*"
@@ -2195,6 +2301,28 @@ class DefaultStackAdvisor(StackAdvisor):
       return int(m.group(2))
     else:
       return None
+  #endregion
+
+  #region Validators
+  def validateXmxValue(self, properties, recommendedDefaults, propertyName):
+    if not propertyName in properties:
+      return self.getErrorItem("Value should be set")
+    value = properties[propertyName]
+    defaultValue = recommendedDefaults[propertyName]
+    if defaultValue is None:
+      return self.getErrorItem("Config's default value can't be null or undefined")
+    if not self.checkXmxValueFormat(value) and self.checkXmxValueFormat(defaultValue):
+      # Xmx is in the default-value but not the value, should be an error
+      return self.getErrorItem('Invalid value format')
+    if not self.checkXmxValueFormat(defaultValue):
+      # if default value does not contain Xmx, then there is no point in validating existing value
+      return None
+    valueInt = self.formatXmxSizeToBytes(self.getXmxSize(value))
+    defaultValueXmx = self.getXmxSize(defaultValue)
+    defaultValueInt = self.formatXmxSizeToBytes(defaultValueXmx)
+    if valueInt < defaultValueInt:
+      return self.getWarnItem("Value is less than the recommended default of -Xmx" + defaultValueXmx)
+    return None
 
   def validatorLessThenDefaultValue(self, properties, recommendedDefaults, propertyName):
     if propertyName not in recommendedDefaults:
@@ -2324,6 +2452,145 @@ class DefaultStackAdvisor(StackAdvisor):
     except ValueError:
       pass
     return False
+  #endregion
+
+  #region YARN and MAPREDUCE
+  def validatorYarnQueue(self, properties, recommendedDefaults, propertyName, services):
+    if propertyName not in properties:
+      return self.getErrorItem("Value should be set")
+
+    capacity_scheduler_properties, _ = self.getCapacitySchedulerProperties(services)
+    leaf_queue_names = self.getAllYarnLeafQueues(capacity_scheduler_properties)
+    queue_name = properties[propertyName]
+
+    if len(leaf_queue_names) == 0:
+      return None
+    elif queue_name not in leaf_queue_names:
+      return self.getErrorItem("Queue is not exist or not corresponds to existing YARN leaf queue")
+
+    return None
+
+  def recommendYarnQueue(self, services, catalog_name=None, queue_property=None):
+    old_queue_name = None
+
+    if services and 'configurations' in services:
+        configurations = services["configurations"]
+        if catalog_name in configurations and queue_property in configurations[catalog_name]["properties"]:
+          old_queue_name = configurations[catalog_name]["properties"][queue_property]
+
+        capacity_scheduler_properties, _ = self.getCapacitySchedulerProperties(services)
+        leaf_queues = sorted(self.getAllYarnLeafQueues(capacity_scheduler_properties))
+
+        if leaf_queues and (old_queue_name is None or old_queue_name not in leaf_queues):
+          return leaf_queues.pop()
+        elif old_queue_name and old_queue_name in leaf_queues:
+          return None
+    return "default"
+
+  def isConfigPropertiesChanged(self, services, config_type, config_names, all_exists=True):
+    """
+    Checks for the presence of passed-in configuration properties in a given config, if they are changed.
+    Reads from services["changed-configurations"].
+
+    :argument services: Configuration information for the cluster
+    :argument config_type: Type of the configuration
+    :argument config_names: Set of configuration properties to be checked if they are changed.
+    :argument all_exists: If True: returns True only if all properties mentioned in 'config_names_set' we found
+                            in services["changed-configurations"].
+                            Otherwise, returns False.
+                          If False: return True, if any of the properties mentioned in config_names_set we found in
+                            services["changed-configurations"].
+                            Otherwise, returns False.
+
+
+    :type services: dict
+    :type config_type: str
+    :type config_names: list|set
+    :type all_exists: bool
+    """
+    changedConfigs = services["changed-configurations"]
+    changed_config_names_set = set([changedConfig['name'] for changedConfig in changedConfigs if changedConfig['type'] == config_type])
+    config_names_set = set(config_names)
+
+    configs_intersection = changed_config_names_set & config_names_set
+    if all_exists and configs_intersection == config_names_set:
+      return True
+    elif not all_exists and len(configs_intersection) > 0:
+      return True
+
+    return False
+
+  def getCapacitySchedulerProperties(self, services):
+    """
+    Returns the dictionary of configs for 'capacity-scheduler'.
+    """
+    capacity_scheduler_properties = dict()
+    received_as_key_value_pair = True
+    if "capacity-scheduler" in services['configurations']:
+      if "capacity-scheduler" in services['configurations']["capacity-scheduler"]["properties"]:
+        cap_sched_props_as_str = services['configurations']["capacity-scheduler"]["properties"]["capacity-scheduler"]
+        if cap_sched_props_as_str:
+          cap_sched_props_as_str = str(cap_sched_props_as_str).split('\n')
+          if len(cap_sched_props_as_str) > 0 and cap_sched_props_as_str[0] != 'null':
+            # Received confgs as one "\n" separated string
+            for property in cap_sched_props_as_str:
+              key, sep, value = property.partition("=")
+              capacity_scheduler_properties[key] = value
+            Logger.info("'capacity-scheduler' configs is passed-in as a single '\\n' separated string. "
+                        "count(services['configurations']['capacity-scheduler']['properties']['capacity-scheduler']) = "
+                        "{0}".format(len(capacity_scheduler_properties)))
+            received_as_key_value_pair = False
+          else:
+            Logger.info("Passed-in services['configurations']['capacity-scheduler']['properties']['capacity-scheduler'] is 'null'.")
+        else:
+          Logger.info("'capacity-scheduler' configs not passed-in as single '\\n' string in "
+                      "services['configurations']['capacity-scheduler']['properties']['capacity-scheduler'].")
+      if not capacity_scheduler_properties:
+        # Received configs as a dictionary (Generally on 1st invocation).
+        capacity_scheduler_properties = services['configurations']["capacity-scheduler"]["properties"]
+        Logger.info("'capacity-scheduler' configs is passed-in as a dictionary. "
+                    "count(services['configurations']['capacity-scheduler']['properties']) = {0}".format(len(capacity_scheduler_properties)))
+    else:
+      Logger.error("Couldn't retrieve 'capacity-scheduler' from services.")
+
+    Logger.info("Retrieved 'capacity-scheduler' received as dictionary : '{0}'. configs : {1}" \
+                .format(received_as_key_value_pair, capacity_scheduler_properties.items()))
+    return capacity_scheduler_properties, received_as_key_value_pair
+
+  def getAllYarnLeafQueues(self, capacitySchedulerProperties):
+    """
+    Gets all YARN leaf queues.
+    """
+    config_list = capacitySchedulerProperties.keys()
+    yarn_queues = None
+    leafQueueNames = set()
+    if 'yarn.scheduler.capacity.root.queues' in config_list:
+      yarn_queues = capacitySchedulerProperties.get('yarn.scheduler.capacity.root.queues')
+
+    if yarn_queues:
+      toProcessQueues = yarn_queues.split(",")
+      while len(toProcessQueues) > 0:
+        queue = toProcessQueues.pop()
+        queueKey = "yarn.scheduler.capacity.root." + queue + ".queues"
+        if queueKey in capacitySchedulerProperties:
+          # If parent queue, add children
+          subQueues = capacitySchedulerProperties[queueKey].split(",")
+          for subQueue in subQueues:
+            toProcessQueues.append(queue + "." + subQueue)
+        else:
+          # Leaf queues
+          # We only take the leaf queue name instead of the complete path, as leaf queue names are unique in YARN.
+          # Eg: If YARN queues are like :
+          #     (1). 'yarn.scheduler.capacity.root.a1.b1.c1.d1',
+          #     (2). 'yarn.scheduler.capacity.root.a1.b1.c2',
+          #     (3). 'yarn.scheduler.capacity.root.default,
+          # Added leaf queues names are as : d1, c2 and default for the 3 leaf queues.
+          leafQueuePathSplits = queue.split(".")
+          if leafQueuePathSplits > 0:
+            leafQueueName = leafQueuePathSplits[-1]
+            leafQueueNames.add(leafQueueName)
+    return leafQueueNames
+  #endregion
 
   @classmethod
   def getMountPointForDir(cls, dir, mountPoints):


Mime
View raw message