Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id EC52D200D3E for ; Mon, 9 Oct 2017 08:01:09 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id EAEDF160BE4; Mon, 9 Oct 2017 06:01:09 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 586F7160BE5 for ; Mon, 9 Oct 2017 08:01:07 +0200 (CEST) Received: (qmail 30136 invoked by uid 500); 9 Oct 2017 06:01:06 -0000 Mailing-List: contact commits-help@ambari.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ambari-dev@ambari.apache.org Delivered-To: mailing list commits@ambari.apache.org Received: (qmail 28335 invoked by uid 99); 9 Oct 2017 06:01:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Oct 2017 06:01:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7C642F5D3B; Mon, 9 Oct 2017 06:01:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jluniya@apache.org To: commits@ambari.apache.org Date: Mon, 09 Oct 2017 06:01:32 -0000 Message-Id: <7f2b82dc935949d08027406618315714@git.apache.org> In-Reply-To: <4e699768ae1c43af9a2dee7c37a9a8a8@git.apache.org> References: <4e699768ae1c43af9a2dee7c37a9a8a8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [34/50] [abbrv] ambari git commit: AMBARI-22124. Refactor AMS logic in stack advisors to service advisors.(vbrodetskyi) archived-at: Mon, 09 Oct 2017 06:01:10 -0000 AMBARI-22124. Refactor AMS logic in stack advisors to service advisors.(vbrodetskyi) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/0f32765d Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/0f32765d Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/0f32765d Branch: refs/heads/branch-feature-AMBARI-14714 Commit: 0f32765dc2250044c7925f4e68e6f61b7a77d8f8 Parents: 9adfcdc Author: Vitaly Brodetskyi Authored: Fri Oct 6 10:40:33 2017 +0300 Committer: Vitaly Brodetskyi Committed: Fri Oct 6 10:40:33 2017 +0300 ---------------------------------------------------------------------- .../AMBARI_METRICS/0.1.0/service_advisor.py | 787 +++++++++++++++++++ .../ATLAS/0.7.0.3.0/service_advisor.py | 5 +- .../stacks/HDP/2.0.6/services/stack_advisor.py | 542 +------------ .../stacks/HDP/2.2/services/stack_advisor.py | 1 - .../AMBARI_METRICS/test_service_advisor.py | 596 ++++++++++++++ .../stacks/2.0.6/common/test_stack_advisor.py | 576 -------------- .../stacks/2.2/common/test_stack_advisor.py | 511 ------------ 7 files changed, 1388 insertions(+), 1630 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/0f32765d/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/service_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/service_advisor.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/service_advisor.py new file mode 100644 index 0000000..eae98bf --- /dev/null +++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/service_advisor.py @@ -0,0 +1,787 @@ +#!/usr/bin/env ambari-python-wrap +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +# Python imports +import imp +import re +import os +import sys +import socket +import traceback +from math import ceil, floor, log + + +from resource_management.core.logger import Logger + +SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +STACKS_DIR = os.path.join(SCRIPT_DIR, '../../../stacks/') +PARENT_FILE = os.path.join(STACKS_DIR, 'service_advisor.py') + +#split points +metricsDir = os.path.join(SCRIPT_DIR, 'package') +print "METRICS_DIR=>" + str(metricsDir) +serviceMetricsDir = os.path.join(metricsDir, 'files', 'service-metrics') +customServiceMetricsDir = os.path.join(SCRIPT_DIR, '../../../dashboards/service-metrics') +sys.path.append(os.path.join(metricsDir, 'scripts')) + +from split_points import FindSplitPointsForAMSRegions + +try: + with open(PARENT_FILE, 'rb') as fp: + service_advisor = imp.load_module('service_advisor', fp, PARENT_FILE, ('.py', 'rb', imp.PY_SOURCE)) +except Exception as e: + traceback.print_exc() + print "Failed to load parent" + +class AMBARI_METRICSServiceAdvisor(service_advisor.ServiceAdvisor): + + def __init__(self, *args, **kwargs): + self.as_super = super(AMBARI_METRICSServiceAdvisor, self) + self.as_super.__init__(*args, **kwargs) + + # Always call these methods + self.modifyMastersWithMultipleInstances() + self.modifyCardinalitiesDict() + self.modifyHeapSizeProperties() + self.modifyNotValuableComponents() + self.modifyComponentsNotPreferableOnServer() + self.modifyComponentLayoutSchemes() + + def modifyMastersWithMultipleInstances(self): + """ + Modify the set of masters with multiple instances. + Must be overriden in child class. + """ + # Nothing to do + pass + + def modifyCardinalitiesDict(self): + """ + Modify the dictionary of cardinalities. + Must be overriden in child class. + """ + min_val = 1 + + self.cardinalitiesDict.update( + { + 'METRICS_COLLECTOR': {"min": min_val} + } + ) + + def modifyHeapSizeProperties(self): + """ + Modify the dictionary of heap size properties. + Must be overriden in child class. + """ + self.heap_size_properties = {"METRICS_COLLECTOR": + [{"config-name": "ams-hbase-env", + "property": "hbase_master_heapsize", + "default": "1024m"}, + {"config-name": "ams-hbase-env", + "property": "hbase_regionserver_heapsize", + "default": "1024m"}, + {"config-name": "ams-env", + "property": "metrics_collector_heapsize", + "default": "512m"}]} + + + def modifyNotValuableComponents(self): + """ + Modify the set of components whose host assignment is based on other services. + Must be overriden in child class. + """ + self.notValuableComponents |= set(['METRICS_MONITOR']) + + def modifyComponentsNotPreferableOnServer(self): + """ + Modify the set of components that are not preferable on the server. + Must be overriden in child class. + """ + self.notPreferableOnServerComponents |= set(['METRICS_COLLECTOR']) + + + def modifyComponentLayoutSchemes(self): + """ + Modify layout scheme dictionaries for components. + The scheme dictionary basically maps the number of hosts to + host index where component should exist. + Must be overriden in child class. + """ + self.componentLayoutSchemes.update({'METRICS_COLLECTOR': {3: 2, 6: 2, 31: 3, "else": 5}}) + + + def getServiceComponentLayoutValidations(self, services, hosts): + """ + Get a list of errors. + Must be overriden in child class. + """ + + return [] + + def getAmsMemoryRecommendation(self, services, hosts): + # MB per sink in hbase heapsize + HEAP_PER_MASTER_COMPONENT = 50 + HEAP_PER_SLAVE_COMPONENT = 10 + + schMemoryMap = { + "HDFS": { + "NAMENODE": HEAP_PER_MASTER_COMPONENT, + "SECONDARY_NAMENODE": HEAP_PER_MASTER_COMPONENT, + "DATANODE": HEAP_PER_SLAVE_COMPONENT + }, + "YARN": { + "RESOURCEMANAGER": HEAP_PER_MASTER_COMPONENT, + "NODEMANAGER": HEAP_PER_SLAVE_COMPONENT, + "HISTORYSERVER" : HEAP_PER_MASTER_COMPONENT, + "APP_TIMELINE_SERVER": HEAP_PER_MASTER_COMPONENT + }, + "HBASE": { + "HBASE_MASTER": HEAP_PER_MASTER_COMPONENT, + "HBASE_REGIONSERVER": HEAP_PER_SLAVE_COMPONENT + }, + "HIVE": { + "HIVE_METASTORE": HEAP_PER_MASTER_COMPONENT, + "HIVE_SERVER": HEAP_PER_MASTER_COMPONENT + }, + "KAFKA": { + "KAFKA_BROKER": HEAP_PER_MASTER_COMPONENT + }, + "FLUME": { + "FLUME_HANDLER": HEAP_PER_SLAVE_COMPONENT + }, + "STORM": { + "NIMBUS": HEAP_PER_MASTER_COMPONENT, + }, + "AMBARI_METRICS": { + "METRICS_COLLECTOR": HEAP_PER_MASTER_COMPONENT, + "METRICS_MONITOR": HEAP_PER_SLAVE_COMPONENT + }, + "ACCUMULO": { + "ACCUMULO_MASTER": HEAP_PER_MASTER_COMPONENT, + "ACCUMULO_TSERVER": HEAP_PER_SLAVE_COMPONENT + }, + "LOGSEARCH": { + "LOGSEARCH_LOGFEEDER" : HEAP_PER_SLAVE_COMPONENT + } + } + total_sinks_count = 0 + # minimum heap size + hbase_heapsize = 500 + for serviceName, componentsDict in schMemoryMap.items(): + for componentName, multiplier in componentsDict.items(): + schCount = len( + self.getHostsWithComponent(serviceName, componentName, services, + hosts)) + hbase_heapsize += int((schCount * multiplier)) + total_sinks_count += schCount + collector_heapsize = int(hbase_heapsize/3 if hbase_heapsize > 2048 else 512) + hbase_heapsize = min(hbase_heapsize, 32768) + + return self.round_to_n(collector_heapsize), self.round_to_n(hbase_heapsize), total_sinks_count + + + def round_to_n(self, mem_size, n=128): + return int(round(float(mem_size) / float(n))) * int(n) + + + def getServiceConfigurationRecommendations(self, configurations, clusterData, services, hosts): + """ + Entry point. + Must be overriden in child class. + """ + #Logger.info("Class: %s, Method: %s. Recommending Service Configurations." % + # (self.__class__.__name__, inspect.stack()[0][3])) + + recommender = AMBARI_METRICSRecommender() + recommender.recommendAmsConfigurationsFromHDP206(configurations, clusterData, services, hosts) + + + + + def getServiceConfigurationsValidationItems(self, configurations, recommendedDefaults, services, hosts): + """ + Entry point. + Validate configurations for the service. Return a list of errors. + The code for this function should be the same for each Service Advisor. + """ + #Logger.info("Class: %s, Method: %s. Validating Configurations." % + # (self.__class__.__name__, inspect.stack()[0][3])) + + validator = self.getAMBARI_METRICSValidator() + # Calls the methods of the validator using arguments, + # method(siteProperties, siteRecommendations, configurations, services, hosts) + return validator.validateListOfConfigUsingMethod(configurations, recommendedDefaults, services, hosts, validator.validators) + + + def getAMBARI_METRICSValidator(self): + return AMBARI_METRICSValidator() + +class AMBARI_METRICSRecommender(service_advisor.ServiceAdvisor): + """ + AMS Recommender suggests properties when adding the service for the first time or modifying configs via the UI. + """ + + def __init__(self, *args, **kwargs): + self.as_super = super(AMBARI_METRICSRecommender, self) + self.as_super.__init__(*args, **kwargs) + + + + def getPreferredMountPoints(self, hostInfo): + + # '/etc/resolv.conf', '/etc/hostname', '/etc/hosts' are docker specific mount points + undesirableMountPoints = ["/", "/home", "/etc/resolv.conf", "/etc/hosts", + "/etc/hostname", "/tmp"] + undesirableFsTypes = ["devtmpfs", "tmpfs", "vboxsf", "CDFS"] + mountPoints = [] + if hostInfo and "disk_info" in hostInfo: + mountPointsDict = {} + for mountpoint in hostInfo["disk_info"]: + if not (mountpoint["mountpoint"] in undesirableMountPoints or + mountpoint["mountpoint"].startswith(("/boot", "/mnt")) or + mountpoint["type"] in undesirableFsTypes or + mountpoint["available"] == str(0)): + mountPointsDict[mountpoint["mountpoint"]] = self.to_number(mountpoint["available"]) + if mountPointsDict: + mountPoints = sorted(mountPointsDict, key=mountPointsDict.get, reverse=True) + mountPoints.append("/") + return mountPoints + + def recommendAmsConfigurationsFromHDP206(self, configurations, clusterData, services, hosts): + putAmsEnvProperty = self.putProperty(configurations, "ams-env", services) + putAmsHbaseSiteProperty = self.putProperty(configurations, "ams-hbase-site", services) + putAmsSiteProperty = self.putProperty(configurations, "ams-site", services) + putHbaseEnvProperty = self.putProperty(configurations, "ams-hbase-env", services) + putGrafanaProperty = self.putProperty(configurations, "ams-grafana-env", services) + putGrafanaPropertyAttribute = self.putPropertyAttribute(configurations, "ams-grafana-env") + + amsCollectorHosts = self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR") + + serviceAdvisor = AMBARI_METRICSServiceAdvisor() + + # TODO set "timeline.metrics.service.webapp.address" to 0.0.0.0:port in upgrade catalog + timeline_metrics_service_webapp_address = '0.0.0.0' + + putAmsSiteProperty("timeline.metrics.service.webapp.address", str(timeline_metrics_service_webapp_address) + ":6188") + + log_dir = "/var/log/ambari-metrics-collector" + if "ams-env" in services["configurations"]: + if "metrics_collector_log_dir" in services["configurations"]["ams-env"]["properties"]: + log_dir = services["configurations"]["ams-env"]["properties"]["metrics_collector_log_dir"] + putHbaseEnvProperty("hbase_log_dir", log_dir) + + defaultFs = 'file:///' + if "core-site" in services["configurations"] and \ + "fs.defaultFS" in services["configurations"]["core-site"]["properties"]: + defaultFs = services["configurations"]["core-site"]["properties"]["fs.defaultFS"] + + operatingMode = "embedded" + if "ams-site" in services["configurations"]: + if "timeline.metrics.service.operation.mode" in services["configurations"]["ams-site"]["properties"]: + operatingMode = services["configurations"]["ams-site"]["properties"]["timeline.metrics.service.operation.mode"] + + if len(amsCollectorHosts) > 1 : + operatingMode = "distributed" + putAmsSiteProperty("timeline.metrics.service.operation.mode", operatingMode) + + if operatingMode == "distributed": + putAmsSiteProperty("timeline.metrics.service.watcher.disabled", 'true') + putAmsHbaseSiteProperty("hbase.cluster.distributed", 'true') + else: + putAmsSiteProperty("timeline.metrics.service.watcher.disabled", 'false') + putAmsHbaseSiteProperty("hbase.cluster.distributed", 'false') + + rootDir = "file:///var/lib/ambari-metrics-collector/hbase" + tmpDir = "/var/lib/ambari-metrics-collector/hbase-tmp" + zk_port_default = [] + if "ams-hbase-site" in services["configurations"]: + if "hbase.rootdir" in services["configurations"]["ams-hbase-site"]["properties"]: + rootDir = services["configurations"]["ams-hbase-site"]["properties"]["hbase.rootdir"] + if "hbase.tmp.dir" in services["configurations"]["ams-hbase-site"]["properties"]: + tmpDir = services["configurations"]["ams-hbase-site"]["properties"]["hbase.tmp.dir"] + if "hbase.zookeeper.property.clientPort" in services["configurations"]["ams-hbase-site"]["properties"]: + zk_port_default = services["configurations"]["ams-hbase-site"]["properties"]["hbase.zookeeper.property.clientPort"] + + # Skip recommendation item if default value is present + if operatingMode == "distributed" and not "{{zookeeper_clientPort}}" in zk_port_default: + zkPort = self.getZKPort(services) + putAmsHbaseSiteProperty("hbase.zookeeper.property.clientPort", zkPort) + elif operatingMode == "embedded" and not "{{zookeeper_clientPort}}" in zk_port_default: + putAmsHbaseSiteProperty("hbase.zookeeper.property.clientPort", "61181") + + mountpoints = ["/"] + for collectorHostName in amsCollectorHosts: + for host in hosts["items"]: + if host["Hosts"]["host_name"] == collectorHostName: + mountpoints = self.getPreferredMountPoints(host["Hosts"]) + break + isLocalRootDir = rootDir.startswith("file://") or (defaultFs.startswith("file://") and rootDir.startswith("/")) + if isLocalRootDir: + rootDir = re.sub("^file:///|/", "", rootDir, count=1) + rootDir = "file://" + os.path.join(mountpoints[0], rootDir) + tmpDir = re.sub("^file:///|/", "", tmpDir, count=1) + if len(mountpoints) > 1 and isLocalRootDir: + tmpDir = os.path.join(mountpoints[1], tmpDir) + else: + tmpDir = os.path.join(mountpoints[0], tmpDir) + putAmsHbaseSiteProperty("hbase.tmp.dir", tmpDir) + + if operatingMode == "distributed": + putAmsHbaseSiteProperty("hbase.rootdir", "/user/ams/hbase") + + if operatingMode == "embedded": + if isLocalRootDir: + putAmsHbaseSiteProperty("hbase.rootdir", rootDir) + else: + putAmsHbaseSiteProperty("hbase.rootdir", "file:///var/lib/ambari-metrics-collector/hbase") + + collector_heapsize, hbase_heapsize, total_sinks_count = serviceAdvisor.getAmsMemoryRecommendation(services, hosts) + + putAmsEnvProperty("metrics_collector_heapsize", collector_heapsize) + + putAmsSiteProperty("timeline.metrics.cache.size", max(100, int(log(total_sinks_count)) * 100)) + putAmsSiteProperty("timeline.metrics.cache.commit.interval", min(10, max(12 - int(log(total_sinks_count)), 2))) + + # blockCache = 0.3, memstore = 0.35, phoenix-server = 0.15, phoenix-client = 0.25 + putAmsHbaseSiteProperty("hfile.block.cache.size", 0.3) + putAmsHbaseSiteProperty("hbase.hregion.memstore.flush.size", 134217728) + putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.upperLimit", 0.35) + putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.lowerLimit", 0.3) + + if len(amsCollectorHosts) > 1: + pass + else: + # blockCache = 0.3, memstore = 0.3, phoenix-server = 0.2, phoenix-client = 0.3 + if total_sinks_count >= 2000: + putAmsHbaseSiteProperty("hbase.regionserver.handler.count", 60) + putAmsHbaseSiteProperty("hbase.regionserver.hlog.blocksize", 134217728) + putAmsHbaseSiteProperty("hbase.regionserver.maxlogs", 64) + putAmsHbaseSiteProperty("hbase.hregion.memstore.flush.size", 268435456) + putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.upperLimit", 0.3) + putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.lowerLimit", 0.25) + putAmsHbaseSiteProperty("phoenix.query.maxGlobalMemoryPercentage", 20) + putAmsHbaseSiteProperty("phoenix.coprocessor.maxMetaDataCacheSize", 81920000) + putAmsSiteProperty("phoenix.query.maxGlobalMemoryPercentage", 30) + putAmsSiteProperty("timeline.metrics.service.resultset.fetchSize", 10000) + elif total_sinks_count >= 1000: + putAmsHbaseSiteProperty("hbase.regionserver.handler.count", 60) + putAmsHbaseSiteProperty("hbase.regionserver.hlog.blocksize", 134217728) + putAmsHbaseSiteProperty("hbase.regionserver.maxlogs", 64) + putAmsHbaseSiteProperty("hbase.hregion.memstore.flush.size", 268435456) + putAmsHbaseSiteProperty("phoenix.coprocessor.maxMetaDataCacheSize", 40960000) + putAmsSiteProperty("timeline.metrics.service.resultset.fetchSize", 5000) + else: + putAmsHbaseSiteProperty("phoenix.coprocessor.maxMetaDataCacheSize", 20480000) + pass + + metrics_api_handlers = min(50, max(20, int(total_sinks_count / 100))) + putAmsSiteProperty("timeline.metrics.service.handler.thread.count", metrics_api_handlers) + + serviceAdvisor = AMBARI_METRICSServiceAdvisor() + + # Distributed mode heap size + if operatingMode == "distributed": + hbase_heapsize = max(hbase_heapsize, 1024) + putHbaseEnvProperty("hbase_master_heapsize", "512") + putHbaseEnvProperty("hbase_master_xmn_size", "102") #20% of 512 heap size + putHbaseEnvProperty("hbase_regionserver_heapsize", hbase_heapsize) + putHbaseEnvProperty("regionserver_xmn_size", serviceAdvisor.round_to_n(0.15 * hbase_heapsize,64)) + else: + # Embedded mode heap size : master + regionserver + hbase_rs_heapsize = 512 + putHbaseEnvProperty("hbase_regionserver_heapsize", hbase_rs_heapsize) + putHbaseEnvProperty("hbase_master_heapsize", hbase_heapsize) + putHbaseEnvProperty("hbase_master_xmn_size", serviceAdvisor.round_to_n(0.15*(hbase_heapsize + hbase_rs_heapsize),64)) + + # If no local DN in distributed mode + if operatingMode == "distributed": + dn_hosts = self.getComponentHostNames(services, "HDFS", "DATANODE") + # call by Kerberos wizard sends only the service being affected + # so it is possible for dn_hosts to be None but not amsCollectorHosts + if dn_hosts and len(dn_hosts) > 0: + if set(amsCollectorHosts).intersection(dn_hosts): + collector_cohosted_with_dn = "true" + else: + collector_cohosted_with_dn = "false" + putAmsHbaseSiteProperty("dfs.client.read.shortcircuit", collector_cohosted_with_dn) + + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + + ams_hbase_site = None + ams_hbase_env = None + + # Overriden properties form the UI + if "ams-hbase-site" in services["configurations"]: + ams_hbase_site = services["configurations"]["ams-hbase-site"]["properties"] + if "ams-hbase-env" in services["configurations"]: + ams_hbase_env = services["configurations"]["ams-hbase-env"]["properties"] + + # Recommendations + if not ams_hbase_site: + ams_hbase_site = configurations["ams-hbase-site"]["properties"] + if not ams_hbase_env: + ams_hbase_env = configurations["ams-hbase-env"]["properties"] + + split_point_finder = FindSplitPointsForAMSRegions( + ams_hbase_site, ams_hbase_env, serviceMetricsDir, customServiceMetricsDir, operatingMode, servicesList) + + result = split_point_finder.get_split_points() + precision_splits = ' ' + aggregate_splits = ' ' + if result.precision: + precision_splits = result.precision + if result.aggregate: + aggregate_splits = result.aggregate + putAmsSiteProperty("timeline.metrics.host.aggregate.splitpoints", ','.join(precision_splits)) + putAmsSiteProperty("timeline.metrics.cluster.aggregate.splitpoints", ','.join(aggregate_splits)) + + component_grafana_exists = False + for service in services['services']: + if 'components' in service: + for component in service['components']: + if 'StackServiceComponents' in component: + # If Grafana is installed the hostnames would indicate its location + if 'METRICS_GRAFANA' in component['StackServiceComponents']['component_name'] and\ + len(component['StackServiceComponents']['hostnames']) != 0: + component_grafana_exists = True + break + pass + + if not component_grafana_exists: + putGrafanaPropertyAttribute("metrics_grafana_password", "visible", "false") + + pass + + + +class AMBARI_METRICSValidator(service_advisor.ServiceAdvisor): + """ + AMS Validator checks the correctness of properties whenever the service is first added or the user attempts to + change configs via the UI. + """ + + def __init__(self, *args, **kwargs): + self.as_super = super(AMBARI_METRICSValidator, self) + self.as_super.__init__(*args, **kwargs) + + self.validators = [("ams-hbase-site", self.validateAmsHbaseSiteConfigurationsFromHDP206), + ("ams-hbase-env", self.validateAmsHbaseEnvConfigurationsFromHDP206), + ("ams-site", self.validateAmsSiteConfigurationsFromHDP206), + ("ams-env", self.validateAmsEnvConfigurationsFromHDP206), + ("ams-grafana-env", self.validateGrafanaEnvConfigurationsFromHDP206)] + + + + def getPreferredMountPoints(self, hostInfo): + + # '/etc/resolv.conf', '/etc/hostname', '/etc/hosts' are docker specific mount points + undesirableMountPoints = ["/", "/home", "/etc/resolv.conf", "/etc/hosts", + "/etc/hostname", "/tmp"] + undesirableFsTypes = ["devtmpfs", "tmpfs", "vboxsf", "CDFS"] + mountPoints = [] + if hostInfo and "disk_info" in hostInfo: + mountPointsDict = {} + for mountpoint in hostInfo["disk_info"]: + if not (mountpoint["mountpoint"] in undesirableMountPoints or + mountpoint["mountpoint"].startswith(("/boot", "/mnt")) or + mountpoint["type"] in undesirableFsTypes or + mountpoint["available"] == str(0)): + mountPointsDict[mountpoint["mountpoint"]] = self.to_number(mountpoint["available"]) + if mountPointsDict: + mountPoints = sorted(mountPointsDict, key=mountPointsDict.get, reverse=True) + mountPoints.append("/") + return mountPoints + + def validateAmsHbaseSiteConfigurationsFromHDP206(self, properties, recommendedDefaults, configurations, services, hosts): + + amsCollectorHosts = self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR") + ams_site = self.getSiteProperties(configurations, "ams-site") + core_site = self.getSiteProperties(configurations, "core-site") + + serviceAdvisor = AMBARI_METRICSServiceAdvisor() + + collector_heapsize, hbase_heapsize, total_sinks_count = serviceAdvisor.getAmsMemoryRecommendation(services, hosts) + recommendedDiskSpace = 10485760 + # TODO validate configuration for multiple AMBARI_METRICS collectors + if len(amsCollectorHosts) > 1: + pass + else: + if total_sinks_count > 2000: + recommendedDiskSpace = 104857600 # * 1k == 100 Gb + elif total_sinks_count > 500: + recommendedDiskSpace = 52428800 # * 1k == 50 Gb + elif total_sinks_count > 250: + recommendedDiskSpace = 20971520 # * 1k == 20 Gb + + validationItems = [] + + rootdir_item = None + op_mode = ams_site.get("timeline.metrics.service.operation.mode") + default_fs = core_site.get("fs.defaultFS") if core_site else "file:///" + hbase_rootdir = properties.get("hbase.rootdir") + hbase_tmpdir = properties.get("hbase.tmp.dir") + distributed = properties.get("hbase.cluster.distributed") + is_local_root_dir = hbase_rootdir.startswith("file://") or (default_fs.startswith("file://") and hbase_rootdir.startswith("/")) + + if op_mode == "distributed" and is_local_root_dir: + rootdir_item = self.getWarnItem("In distributed mode hbase.rootdir should point to HDFS.") + elif op_mode == "embedded": + if distributed.lower() == "false" and hbase_rootdir.startswith('/') or hbase_rootdir.startswith("hdfs://"): + rootdir_item = self.getWarnItem("In embedded mode hbase.rootdir cannot point to schemaless values or HDFS, " + "Example - file:// for localFS") + pass + + distributed_item = None + if op_mode == "distributed" and not distributed.lower() == "true": + distributed_item = self.getErrorItem("hbase.cluster.distributed property should be set to true for " + "distributed mode") + if op_mode == "embedded" and distributed.lower() == "true": + distributed_item = self.getErrorItem("hbase.cluster.distributed property should be set to false for embedded mode") + + hbase_zk_client_port = properties.get("hbase.zookeeper.property.clientPort") + zkPort = self.getZKPort(services) + hbase_zk_client_port_item = None + if distributed.lower() == "true" and op_mode == "distributed" and \ + hbase_zk_client_port != zkPort and hbase_zk_client_port != "{{zookeeper_clientPort}}": + hbase_zk_client_port_item = self.getErrorItem("In AMS distributed mode, hbase.zookeeper.property.clientPort " + "should be the cluster zookeeper server port : {0}".format(zkPort)) + + if distributed.lower() == "false" and op_mode == "embedded" and \ + hbase_zk_client_port == zkPort and hbase_zk_client_port != "{{zookeeper_clientPort}}": + hbase_zk_client_port_item = self.getErrorItem("In AMS embedded mode, hbase.zookeeper.property.clientPort " + "should be a different port than cluster zookeeper port." + "(default:61181)") + + validationItems.extend([{"config-name":'hbase.rootdir', "item": rootdir_item }, + {"config-name":'hbase.cluster.distributed', "item": distributed_item }, + {"config-name":'hbase.zookeeper.property.clientPort', "item": hbase_zk_client_port_item }]) + + for collectorHostName in amsCollectorHosts: + for host in hosts["items"]: + if host["Hosts"]["host_name"] == collectorHostName: + if op_mode == 'embedded' or is_local_root_dir: + validationItems.extend([{"config-name": 'hbase.rootdir', "item": self.validatorEnoughDiskSpace(properties, 'hbase.rootdir', host["Hosts"], recommendedDiskSpace)}]) + validationItems.extend([{"config-name": 'hbase.rootdir', "item": self.validatorNotRootFs(properties, recommendedDefaults, 'hbase.rootdir', host["Hosts"])}]) + validationItems.extend([{"config-name": 'hbase.tmp.dir', "item": self.validatorNotRootFs(properties, recommendedDefaults, 'hbase.tmp.dir', host["Hosts"])}]) + + dn_hosts = self.getComponentHostNames(services, "HDFS", "DATANODE") + if is_local_root_dir: + mountPoints = [] + for mountPoint in host["Hosts"]["disk_info"]: + mountPoints.append(mountPoint["mountpoint"]) + hbase_rootdir_mountpoint = self.getMountPointForDir(hbase_rootdir, mountPoints) + hbase_tmpdir_mountpoint = self.getMountPointForDir(hbase_tmpdir, mountPoints) + preferred_mountpoints = self.getPreferredMountPoints(host['Hosts']) + # hbase.rootdir and hbase.tmp.dir shouldn't point to the same partition + # if multiple preferred_mountpoints exist + if hbase_rootdir_mountpoint == hbase_tmpdir_mountpoint and \ + len(preferred_mountpoints) > 1: + item = self.getWarnItem("Consider not using {0} partition for storing metrics temporary data. " + "{0} partition is already used as hbase.rootdir to store metrics data".format(hbase_tmpdir_mountpoint)) + validationItems.extend([{"config-name":'hbase.tmp.dir', "item": item}]) + + # if METRICS_COLLECTOR is co-hosted with DATANODE + # cross-check dfs.datanode.data.dir and hbase.rootdir + # they shouldn't share same disk partition IO + hdfs_site = self.getSiteProperties(configurations, "hdfs-site") + dfs_datadirs = hdfs_site.get("dfs.datanode.data.dir").split(",") if hdfs_site and "dfs.datanode.data.dir" in hdfs_site else [] + if dn_hosts and collectorHostName in dn_hosts and ams_site and \ + dfs_datadirs and len(preferred_mountpoints) > len(dfs_datadirs): + for dfs_datadir in dfs_datadirs: + dfs_datadir_mountpoint = self.getMountPointForDir(dfs_datadir, mountPoints) + if dfs_datadir_mountpoint == hbase_rootdir_mountpoint: + item = self.getWarnItem("Consider not using {0} partition for storing metrics data. " + "{0} is already used by datanode to store HDFS data".format(hbase_rootdir_mountpoint)) + validationItems.extend([{"config-name": 'hbase.rootdir', "item": item}]) + break + # If no local DN in distributed mode + elif collectorHostName not in dn_hosts and distributed.lower() == "true": + item = self.getWarnItem("It's recommended to install Datanode component on {0} " + "to speed up IO operations between HDFS and Metrics " + "Collector in distributed mode ".format(collectorHostName)) + validationItems.extend([{"config-name": "hbase.cluster.distributed", "item": item}]) + # Short circuit read should be enabled in distibuted mode + # if local DN installed + else: + validationItems.extend([{"config-name": "dfs.client.read.shortcircuit", "item": self.validatorEqualsToRecommendedItem(properties, recommendedDefaults, "dfs.client.read.shortcircuit")}]) + + return self.toConfigurationValidationProblems(validationItems, "ams-hbase-site") + + + def validateAmsHbaseEnvConfigurationsFromHDP206(self, properties, recommendedDefaults, configurations, services, hosts): + + ams_env = self.getSiteProperties(configurations, "ams-env") + amsHbaseSite = self.getSiteProperties(configurations, "ams-hbase-site") + validationItems = [] + mb = 1024 * 1024 + gb = 1024 * mb + + regionServerItem = self.validatorLessThenDefaultValue(properties, recommendedDefaults, "hbase_regionserver_heapsize") ## FIXME if new service added + if regionServerItem: + validationItems.extend([{"config-name": "hbase_regionserver_heapsize", "item": regionServerItem}]) + + hbaseMasterHeapsizeItem = self.validatorLessThenDefaultValue(properties, recommendedDefaults, "hbase_master_heapsize") + if hbaseMasterHeapsizeItem: + validationItems.extend([{"config-name": "hbase_master_heapsize", "item": hbaseMasterHeapsizeItem}]) + + logDirItem = self.validatorEqualsPropertyItem(properties, "hbase_log_dir", ams_env, "metrics_collector_log_dir") + if logDirItem: + validationItems.extend([{"config-name": "hbase_log_dir", "item": logDirItem}]) + + hbase_master_heapsize = self.to_number(properties["hbase_master_heapsize"]) + hbase_master_xmn_size = self.to_number(properties["hbase_master_xmn_size"]) + hbase_regionserver_heapsize = self.to_number(properties["hbase_regionserver_heapsize"]) + hbase_regionserver_xmn_size = self.to_number(properties["regionserver_xmn_size"]) + + # Validate Xmn settings. + masterXmnItem = None + regionServerXmnItem = None + is_hbase_distributed = amsHbaseSite.get("hbase.cluster.distributed").lower() == 'true' + + if is_hbase_distributed: + + if not regionServerItem and hbase_regionserver_heapsize > 32768: + regionServerItem = self.getWarnItem("Value is more than the recommended maximum heap size of 32G.") + validationItems.extend([{"config-name": "hbase_regionserver_heapsize", "item": regionServerItem}]) + + minMasterXmn = 0.12 * hbase_master_heapsize + maxMasterXmn = 0.2 * hbase_master_heapsize + if hbase_master_xmn_size < minMasterXmn: + masterXmnItem = self.getWarnItem("Value is lesser than the recommended minimum Xmn size of {0} " + "(12% of hbase_master_heapsize)".format(int(ceil(minMasterXmn)))) + + if hbase_master_xmn_size > maxMasterXmn: + masterXmnItem = self.getWarnItem("Value is greater than the recommended maximum Xmn size of {0} " + "(20% of hbase_master_heapsize)".format(int(floor(maxMasterXmn)))) + + minRegionServerXmn = 0.12 * hbase_regionserver_heapsize + maxRegionServerXmn = 0.2 * hbase_regionserver_heapsize + if hbase_regionserver_xmn_size < minRegionServerXmn: + regionServerXmnItem = self.getWarnItem("Value is lesser than the recommended minimum Xmn size of {0} " + "(12% of hbase_regionserver_heapsize)" + .format(int(ceil(minRegionServerXmn)))) + + if hbase_regionserver_xmn_size > maxRegionServerXmn: + regionServerXmnItem = self.getWarnItem("Value is greater than the recommended maximum Xmn size of {0} " + "(20% of hbase_regionserver_heapsize)" + .format(int(floor(maxRegionServerXmn)))) + else: + + if not hbaseMasterHeapsizeItem and (hbase_master_heapsize + hbase_regionserver_heapsize) > 32768: + hbaseMasterHeapsizeItem = self.getWarnItem("Value of Master + Regionserver heapsize is more than the recommended maximum heap size of 32G.") + validationItems.extend([{"config-name": "hbase_master_heapsize", "item": hbaseMasterHeapsizeItem}]) + + minMasterXmn = 0.12 * (hbase_master_heapsize + hbase_regionserver_heapsize) + maxMasterXmn = 0.2 * (hbase_master_heapsize + hbase_regionserver_heapsize) + if hbase_master_xmn_size < minMasterXmn: + masterXmnItem = self.getWarnItem("Value is lesser than the recommended minimum Xmn size of {0} " + "(12% of hbase_master_heapsize + hbase_regionserver_heapsize)" + .format(int(ceil(minMasterXmn)))) + + if hbase_master_xmn_size > maxMasterXmn: + masterXmnItem = self.getWarnItem("Value is greater than the recommended maximum Xmn size of {0} " + "(20% of hbase_master_heapsize + hbase_regionserver_heapsize)" + .format(int(floor(maxMasterXmn)))) + if masterXmnItem: + validationItems.extend([{"config-name": "hbase_master_xmn_size", "item": masterXmnItem}]) + + if regionServerXmnItem: + validationItems.extend([{"config-name": "regionserver_xmn_size", "item": regionServerXmnItem}]) + + if hbaseMasterHeapsizeItem is None: + hostMasterComponents = {} + + for service in services["services"]: + for component in service["components"]: + if component["StackServiceComponents"]["hostnames"] is not None: + for hostName in component["StackServiceComponents"]["hostnames"]: + if self.isMasterComponent(component): + if hostName not in hostMasterComponents.keys(): + hostMasterComponents[hostName] = [] + hostMasterComponents[hostName].append(component["StackServiceComponents"]["component_name"]) + + amsCollectorHosts = self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR") + for collectorHostName in amsCollectorHosts: + for host in hosts["items"]: + if host["Hosts"]["host_name"] == collectorHostName: + # AMS Collector co-hosted with other master components in bigger clusters + if len(hosts['items']) > 31 and \ + len(hostMasterComponents[collectorHostName]) > 2 and \ + host["Hosts"]["total_mem"] < 32*mb: # < 32Gb(total_mem in k) + masterHostMessage = "Host {0} is used by multiple master components ({1}). " \ + "It is recommended to use a separate host for the " \ + "Ambari Metrics Collector component and ensure " \ + "the host has sufficient memory available." + + hbaseMasterHeapsizeItem = self.getWarnItem(masterHostMessage.format( + collectorHostName, str(", ".join(hostMasterComponents[collectorHostName])))) + if hbaseMasterHeapsizeItem: + validationItems.extend([{"config-name": "hbase_master_heapsize", "item": hbaseMasterHeapsizeItem}]) + pass + + return self.toConfigurationValidationProblems(validationItems, "ams-hbase-env") + + + def validateAmsSiteConfigurationsFromHDP206(self, properties, recommendedDefaults, configurations, services, hosts): + validationItems = [] + + serviceAdvisor = AMBARI_METRICSServiceAdvisor() + + op_mode = properties.get("timeline.metrics.service.operation.mode") + correct_op_mode_item = None + if op_mode not in ("embedded", "distributed"): + correct_op_mode_item = self.getErrorItem("Correct value should be set.") + pass + elif len(self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR")) > 1 and op_mode != 'distributed': + correct_op_mode_item = self.getErrorItem("Correct value should be 'distributed' for clusters with more then 1 Metrics collector") + elif op_mode == 'embedded': + collector_heapsize, hbase_heapsize, total_sinks_count = serviceAdvisor.getAmsMemoryRecommendation(services, hosts) + if total_sinks_count > 1000: + correct_op_mode_item = self.getWarnItem("Number of sinks writing metrics to collector is expected to be more than 1000. " + "'Embedded' mode AMS might not be able to handle the load. Consider moving to distributed mode.") + + validationItems.extend([{"config-name":'timeline.metrics.service.operation.mode', "item": correct_op_mode_item }]) + return self.toConfigurationValidationProblems(validationItems, "ams-site") + + + def validateAmsEnvConfigurationsFromHDP206(self, properties, recommendedDefaults, configurations, services, hosts): + + validationItems = [] + collectorHeapsizeDefaultItem = self.validatorLessThenDefaultValue(properties, recommendedDefaults, "metrics_collector_heapsize") + validationItems.extend([{"config-name": "metrics_collector_heapsize", "item": collectorHeapsizeDefaultItem}]) + + ams_env = self.getSiteProperties(configurations, "ams-env") + collector_heapsize = self.to_number(ams_env.get("metrics_collector_heapsize")) + if collector_heapsize > 32768: + collectorHeapsizeMaxItem = self.getWarnItem("Value is more than the recommended maximum heap size of 32G.") + validationItems.extend([{"config-name": "metrics_collector_heapsize", "item": collectorHeapsizeMaxItem}]) + + return self.toConfigurationValidationProblems(validationItems, "ams-env") + + + def validateGrafanaEnvConfigurationsFromHDP206(self, properties, recommendedDefaults, configurations, services, hosts): + validationItems = [] + + grafana_pwd = properties.get("metrics_grafana_password") + grafana_pwd_length_item = None + if len(grafana_pwd) < 4: + grafana_pwd_length_item = self.getErrorItem("Grafana password length should be at least 4.") + pass + validationItems.extend([{"config-name":'metrics_grafana_password', "item": grafana_pwd_length_item }]) + return self.toConfigurationValidationProblems(validationItems, "ams-site") \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/0f32765d/ambari-server/src/main/resources/common-services/ATLAS/0.7.0.3.0/service_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/ATLAS/0.7.0.3.0/service_advisor.py b/ambari-server/src/main/resources/common-services/ATLAS/0.7.0.3.0/service_advisor.py index a2e31cc..058e086 100644 --- a/ambari-server/src/main/resources/common-services/ATLAS/0.7.0.3.0/service_advisor.py +++ b/ambari-server/src/main/resources/common-services/ATLAS/0.7.0.3.0/service_advisor.py @@ -74,7 +74,10 @@ class AtlasServiceAdvisor(service_advisor.ServiceAdvisor): Modify the dictionary of heap size properties. Must be overriden in child class. """ - pass + self.heap_size_properties = {"ATLAS_SERVER": + [{"config-name": "atlas-env", + "property": "atlas_server_xmx", + "default": "2048m"}]} def modifyNotValuableComponents(self): """ http://git-wip-us.apache.org/repos/asf/ambari/blob/0f32765d/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py index 5307176..a194332 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py @@ -172,7 +172,6 @@ class HDP206StackAdvisor(DefaultStackAdvisor): "HDFS": self.recommendHDFSConfigurations, "HBASE": self.recommendHbaseConfigurations, "STORM": self.recommendStormConfigurations, - "AMBARI_METRICS": self.recommendAmsConfigurations, "RANGER": self.recommendRangerConfigurations, "ZOOKEEPER": self.recommendZookeeperConfigurations, "OOZIE": self.recommendOozieConfigurations @@ -509,66 +508,6 @@ class HDP206StackAdvisor(DefaultStackAdvisor): putRangerAuditProperty(item['target_configname'], rangerAuditProperty) - def getAmsMemoryRecommendation(self, services, hosts): - # MB per sink in hbase heapsize - HEAP_PER_MASTER_COMPONENT = 50 - HEAP_PER_SLAVE_COMPONENT = 10 - - schMemoryMap = { - "HDFS": { - "NAMENODE": HEAP_PER_MASTER_COMPONENT, - "SECONDARY_NAMENODE": HEAP_PER_MASTER_COMPONENT, - "DATANODE": HEAP_PER_SLAVE_COMPONENT - }, - "YARN": { - "RESOURCEMANAGER": HEAP_PER_MASTER_COMPONENT, - "NODEMANAGER": HEAP_PER_SLAVE_COMPONENT, - "HISTORYSERVER" : HEAP_PER_MASTER_COMPONENT, - "APP_TIMELINE_SERVER": HEAP_PER_MASTER_COMPONENT - }, - "HBASE": { - "HBASE_MASTER": HEAP_PER_MASTER_COMPONENT, - "HBASE_REGIONSERVER": HEAP_PER_SLAVE_COMPONENT - }, - "HIVE": { - "HIVE_METASTORE": HEAP_PER_MASTER_COMPONENT, - "HIVE_SERVER": HEAP_PER_MASTER_COMPONENT - }, - "KAFKA": { - "KAFKA_BROKER": HEAP_PER_MASTER_COMPONENT - }, - "FLUME": { - "FLUME_HANDLER": HEAP_PER_SLAVE_COMPONENT - }, - "STORM": { - "NIMBUS": HEAP_PER_MASTER_COMPONENT, - }, - "AMBARI_METRICS": { - "METRICS_COLLECTOR": HEAP_PER_MASTER_COMPONENT, - "METRICS_MONITOR": HEAP_PER_SLAVE_COMPONENT - }, - "ACCUMULO": { - "ACCUMULO_MASTER": HEAP_PER_MASTER_COMPONENT, - "ACCUMULO_TSERVER": HEAP_PER_SLAVE_COMPONENT - }, - "LOGSEARCH": { - "LOGSEARCH_LOGFEEDER" : HEAP_PER_SLAVE_COMPONENT - } - } - total_sinks_count = 0 - # minimum heap size - hbase_heapsize = 500 - for serviceName, componentsDict in schMemoryMap.items(): - for componentName, multiplier in componentsDict.items(): - schCount = len( - self.getHostsWithComponent(serviceName, componentName, services, - hosts)) - hbase_heapsize += int((schCount * multiplier)) - total_sinks_count += schCount - collector_heapsize = int(hbase_heapsize/3 if hbase_heapsize > 2048 else 512) - hbase_heapsize = min(hbase_heapsize, 32768) - - return round_to_n(collector_heapsize), round_to_n(hbase_heapsize), total_sinks_count def recommendStormConfigurations(self, configurations, clusterData, services, hosts): putStormSiteProperty = self.putProperty(configurations, "storm-site", services) @@ -577,216 +516,6 @@ class HDP206StackAdvisor(DefaultStackAdvisor): if 'AMBARI_METRICS' in servicesList: putStormSiteProperty('metrics.reporter.register', 'org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter') - def recommendAmsConfigurations(self, configurations, clusterData, services, hosts): - putAmsEnvProperty = self.putProperty(configurations, "ams-env", services) - putAmsHbaseSiteProperty = self.putProperty(configurations, "ams-hbase-site", services) - putAmsSiteProperty = self.putProperty(configurations, "ams-site", services) - putHbaseEnvProperty = self.putProperty(configurations, "ams-hbase-env", services) - putGrafanaProperty = self.putProperty(configurations, "ams-grafana-env", services) - putGrafanaPropertyAttribute = self.putPropertyAttribute(configurations, "ams-grafana-env") - - amsCollectorHosts = self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR") - - # TODO set "timeline.metrics.service.webapp.address" to 0.0.0.0:port in upgrade catalog - timeline_metrics_service_webapp_address = '0.0.0.0' - - putAmsSiteProperty("timeline.metrics.service.webapp.address", str(timeline_metrics_service_webapp_address) + ":6188") - - log_dir = "/var/log/ambari-metrics-collector" - if "ams-env" in services["configurations"]: - if "metrics_collector_log_dir" in services["configurations"]["ams-env"]["properties"]: - log_dir = services["configurations"]["ams-env"]["properties"]["metrics_collector_log_dir"] - putHbaseEnvProperty("hbase_log_dir", log_dir) - - defaultFs = 'file:///' - if "core-site" in services["configurations"] and \ - "fs.defaultFS" in services["configurations"]["core-site"]["properties"]: - defaultFs = services["configurations"]["core-site"]["properties"]["fs.defaultFS"] - - operatingMode = "embedded" - if "ams-site" in services["configurations"]: - if "timeline.metrics.service.operation.mode" in services["configurations"]["ams-site"]["properties"]: - operatingMode = services["configurations"]["ams-site"]["properties"]["timeline.metrics.service.operation.mode"] - - if len(amsCollectorHosts) > 1 : - operatingMode = "distributed" - putAmsSiteProperty("timeline.metrics.service.operation.mode", operatingMode) - - if operatingMode == "distributed": - putAmsSiteProperty("timeline.metrics.service.watcher.disabled", 'true') - putAmsHbaseSiteProperty("hbase.cluster.distributed", 'true') - else: - putAmsSiteProperty("timeline.metrics.service.watcher.disabled", 'false') - putAmsHbaseSiteProperty("hbase.cluster.distributed", 'false') - - rootDir = "file:///var/lib/ambari-metrics-collector/hbase" - tmpDir = "/var/lib/ambari-metrics-collector/hbase-tmp" - zk_port_default = [] - if "ams-hbase-site" in services["configurations"]: - if "hbase.rootdir" in services["configurations"]["ams-hbase-site"]["properties"]: - rootDir = services["configurations"]["ams-hbase-site"]["properties"]["hbase.rootdir"] - if "hbase.tmp.dir" in services["configurations"]["ams-hbase-site"]["properties"]: - tmpDir = services["configurations"]["ams-hbase-site"]["properties"]["hbase.tmp.dir"] - if "hbase.zookeeper.property.clientPort" in services["configurations"]["ams-hbase-site"]["properties"]: - zk_port_default = services["configurations"]["ams-hbase-site"]["properties"]["hbase.zookeeper.property.clientPort"] - - # Skip recommendation item if default value is present - if operatingMode == "distributed" and not "{{zookeeper_clientPort}}" in zk_port_default: - zkPort = self.getZKPort(services) - putAmsHbaseSiteProperty("hbase.zookeeper.property.clientPort", zkPort) - elif operatingMode == "embedded" and not "{{zookeeper_clientPort}}" in zk_port_default: - putAmsHbaseSiteProperty("hbase.zookeeper.property.clientPort", "61181") - - mountpoints = ["/"] - for collectorHostName in amsCollectorHosts: - for host in hosts["items"]: - if host["Hosts"]["host_name"] == collectorHostName: - mountpoints = self.getPreferredMountPoints(host["Hosts"]) - break - isLocalRootDir = rootDir.startswith("file://") or (defaultFs.startswith("file://") and rootDir.startswith("/")) - if isLocalRootDir: - rootDir = re.sub("^file:///|/", "", rootDir, count=1) - rootDir = "file://" + os.path.join(mountpoints[0], rootDir) - tmpDir = re.sub("^file:///|/", "", tmpDir, count=1) - if len(mountpoints) > 1 and isLocalRootDir: - tmpDir = os.path.join(mountpoints[1], tmpDir) - else: - tmpDir = os.path.join(mountpoints[0], tmpDir) - putAmsHbaseSiteProperty("hbase.tmp.dir", tmpDir) - - if operatingMode == "distributed": - putAmsHbaseSiteProperty("hbase.rootdir", "/user/ams/hbase") - - if operatingMode == "embedded": - if isLocalRootDir: - putAmsHbaseSiteProperty("hbase.rootdir", rootDir) - else: - putAmsHbaseSiteProperty("hbase.rootdir", "file:///var/lib/ambari-metrics-collector/hbase") - - collector_heapsize, hbase_heapsize, total_sinks_count = self.getAmsMemoryRecommendation(services, hosts) - - putAmsEnvProperty("metrics_collector_heapsize", collector_heapsize) - - putAmsSiteProperty("timeline.metrics.cache.size", max(100, int(log(total_sinks_count)) * 100)) - putAmsSiteProperty("timeline.metrics.cache.commit.interval", min(10, max(12 - int(log(total_sinks_count)), 2))) - - # blockCache = 0.3, memstore = 0.35, phoenix-server = 0.15, phoenix-client = 0.25 - putAmsHbaseSiteProperty("hfile.block.cache.size", 0.3) - putAmsHbaseSiteProperty("hbase.hregion.memstore.flush.size", 134217728) - putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.upperLimit", 0.35) - putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.lowerLimit", 0.3) - - if len(amsCollectorHosts) > 1: - pass - else: - # blockCache = 0.3, memstore = 0.3, phoenix-server = 0.2, phoenix-client = 0.3 - if total_sinks_count >= 2000: - putAmsHbaseSiteProperty("hbase.regionserver.handler.count", 60) - putAmsHbaseSiteProperty("hbase.regionserver.hlog.blocksize", 134217728) - putAmsHbaseSiteProperty("hbase.regionserver.maxlogs", 64) - putAmsHbaseSiteProperty("hbase.hregion.memstore.flush.size", 268435456) - putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.upperLimit", 0.3) - putAmsHbaseSiteProperty("hbase.regionserver.global.memstore.lowerLimit", 0.25) - putAmsHbaseSiteProperty("phoenix.query.maxGlobalMemoryPercentage", 20) - putAmsHbaseSiteProperty("phoenix.coprocessor.maxMetaDataCacheSize", 81920000) - putAmsSiteProperty("phoenix.query.maxGlobalMemoryPercentage", 30) - putAmsSiteProperty("timeline.metrics.service.resultset.fetchSize", 10000) - elif total_sinks_count >= 1000: - putAmsHbaseSiteProperty("hbase.regionserver.handler.count", 60) - putAmsHbaseSiteProperty("hbase.regionserver.hlog.blocksize", 134217728) - putAmsHbaseSiteProperty("hbase.regionserver.maxlogs", 64) - putAmsHbaseSiteProperty("hbase.hregion.memstore.flush.size", 268435456) - putAmsHbaseSiteProperty("phoenix.coprocessor.maxMetaDataCacheSize", 40960000) - putAmsSiteProperty("timeline.metrics.service.resultset.fetchSize", 5000) - else: - putAmsHbaseSiteProperty("phoenix.coprocessor.maxMetaDataCacheSize", 20480000) - pass - - metrics_api_handlers = min(50, max(20, int(total_sinks_count / 100))) - putAmsSiteProperty("timeline.metrics.service.handler.thread.count", metrics_api_handlers) - - # Distributed mode heap size - if operatingMode == "distributed": - hbase_heapsize = max(hbase_heapsize, 1024) - putHbaseEnvProperty("hbase_master_heapsize", "512") - putHbaseEnvProperty("hbase_master_xmn_size", "102") #20% of 512 heap size - putHbaseEnvProperty("hbase_regionserver_heapsize", hbase_heapsize) - putHbaseEnvProperty("regionserver_xmn_size", round_to_n(0.15 * hbase_heapsize,64)) - else: - # Embedded mode heap size : master + regionserver - hbase_rs_heapsize = 512 - putHbaseEnvProperty("hbase_regionserver_heapsize", hbase_rs_heapsize) - putHbaseEnvProperty("hbase_master_heapsize", hbase_heapsize) - putHbaseEnvProperty("hbase_master_xmn_size", round_to_n(0.15*(hbase_heapsize + hbase_rs_heapsize),64)) - - # If no local DN in distributed mode - if operatingMode == "distributed": - dn_hosts = self.getComponentHostNames(services, "HDFS", "DATANODE") - # call by Kerberos wizard sends only the service being affected - # so it is possible for dn_hosts to be None but not amsCollectorHosts - if dn_hosts and len(dn_hosts) > 0: - if set(amsCollectorHosts).intersection(dn_hosts): - collector_cohosted_with_dn = "true" - else: - collector_cohosted_with_dn = "false" - putAmsHbaseSiteProperty("dfs.client.read.shortcircuit", collector_cohosted_with_dn) - - #split points - scriptDir = os.path.dirname(os.path.abspath(__file__)) - metricsDir = os.path.join(scriptDir, '../../../../common-services/AMBARI_METRICS/0.1.0/package') - serviceMetricsDir = os.path.join(metricsDir, 'files', 'service-metrics') - customServiceMetricsDir = os.path.join(scriptDir, '../../../../dashboards/service-metrics') - sys.path.append(os.path.join(metricsDir, 'scripts')) - servicesList = [service["StackServices"]["service_name"] for service in services["services"]] - - from split_points import FindSplitPointsForAMSRegions - - ams_hbase_site = None - ams_hbase_env = None - - # Overriden properties form the UI - if "ams-hbase-site" in services["configurations"]: - ams_hbase_site = services["configurations"]["ams-hbase-site"]["properties"] - if "ams-hbase-env" in services["configurations"]: - ams_hbase_env = services["configurations"]["ams-hbase-env"]["properties"] - - # Recommendations - if not ams_hbase_site: - ams_hbase_site = configurations["ams-hbase-site"]["properties"] - if not ams_hbase_env: - ams_hbase_env = configurations["ams-hbase-env"]["properties"] - - split_point_finder = FindSplitPointsForAMSRegions( - ams_hbase_site, ams_hbase_env, serviceMetricsDir, customServiceMetricsDir, operatingMode, servicesList) - - result = split_point_finder.get_split_points() - precision_splits = ' ' - aggregate_splits = ' ' - if result.precision: - precision_splits = result.precision - if result.aggregate: - aggregate_splits = result.aggregate - putAmsSiteProperty("timeline.metrics.host.aggregate.splitpoints", ','.join(precision_splits)) - putAmsSiteProperty("timeline.metrics.cluster.aggregate.splitpoints", ','.join(aggregate_splits)) - - component_grafana_exists = False - for service in services['services']: - if 'components' in service: - for component in service['components']: - if 'StackServiceComponents' in component: - # If Grafana is installed the hostnames would indicate its location - if 'METRICS_GRAFANA' in component['StackServiceComponents']['component_name'] and\ - len(component['StackServiceComponents']['hostnames']) != 0: - component_grafana_exists = True - break - pass - - if not component_grafana_exists: - putGrafanaPropertyAttribute("metrics_grafana_password", "visible", "false") - - pass - - def getServiceConfigurationValidators(self): return { @@ -797,12 +526,7 @@ class HDP206StackAdvisor(DefaultStackAdvisor): "YARN": {"yarn-site": self.validateYARNConfigurations, "yarn-env": self.validateYARNEnvConfigurations}, "HBASE": {"hbase-env": self.validateHbaseEnvConfigurations}, - "STORM": {"storm-site": self.validateStormConfigurations}, - "AMBARI_METRICS": {"ams-hbase-site": self.validateAmsHbaseSiteConfigurations, - "ams-hbase-env": self.validateAmsHbaseEnvConfigurations, - "ams-site": self.validateAmsSiteConfigurations, - "ams-env": self.validateAmsEnvConfigurations, - "ams-grafana-env": self.validateGrafanaEnvConfigurations} + "STORM": {"storm-site": self.validateStormConfigurations} } def validateMinMax(self, items, recommendedDefaults, configurations): @@ -834,148 +558,7 @@ class HDP206StackAdvisor(DefaultStackAdvisor): items.extend(self.toConfigurationValidationProblems(validationItems, configName)) pass - def validateAmsSiteConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): - validationItems = [] - op_mode = properties.get("timeline.metrics.service.operation.mode") - correct_op_mode_item = None - if op_mode not in ("embedded", "distributed"): - correct_op_mode_item = self.getErrorItem("Correct value should be set.") - pass - elif len(self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR")) > 1 and op_mode != 'distributed': - correct_op_mode_item = self.getErrorItem("Correct value should be 'distributed' for clusters with more then 1 Metrics collector") - elif op_mode == 'embedded': - collector_heapsize, hbase_heapsize, total_sinks_count = self.getAmsMemoryRecommendation(services, hosts) - if total_sinks_count > 1000: - correct_op_mode_item = self.getWarnItem("Number of sinks writing metrics to collector is expected to be more than 1000. " - "'Embedded' mode AMS might not be able to handle the load. Consider moving to distributed mode.") - - validationItems.extend([{"config-name":'timeline.metrics.service.operation.mode', "item": correct_op_mode_item }]) - return self.toConfigurationValidationProblems(validationItems, "ams-site") - - def validateGrafanaEnvConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): - validationItems = [] - - grafana_pwd = properties.get("metrics_grafana_password") - grafana_pwd_length_item = None - if len(grafana_pwd) < 4: - grafana_pwd_length_item = self.getErrorItem("Grafana password length should be at least 4.") - pass - validationItems.extend([{"config-name":'metrics_grafana_password', "item": grafana_pwd_length_item }]) - return self.toConfigurationValidationProblems(validationItems, "ams-site") - - def validateAmsHbaseSiteConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): - - amsCollectorHosts = self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR") - ams_site = getSiteProperties(configurations, "ams-site") - core_site = getSiteProperties(configurations, "core-site") - - collector_heapsize, hbase_heapsize, total_sinks_count = self.getAmsMemoryRecommendation(services, hosts) - recommendedDiskSpace = 10485760 - # TODO validate configuration for multiple AMBARI_METRICS collectors - if len(amsCollectorHosts) > 1: - pass - else: - if total_sinks_count > 2000: - recommendedDiskSpace = 104857600 # * 1k == 100 Gb - elif total_sinks_count > 500: - recommendedDiskSpace = 52428800 # * 1k == 50 Gb - elif total_sinks_count > 250: - recommendedDiskSpace = 20971520 # * 1k == 20 Gb - - validationItems = [] - - rootdir_item = None - op_mode = ams_site.get("timeline.metrics.service.operation.mode") - default_fs = core_site.get("fs.defaultFS") if core_site else "file:///" - hbase_rootdir = properties.get("hbase.rootdir") - hbase_tmpdir = properties.get("hbase.tmp.dir") - distributed = properties.get("hbase.cluster.distributed") - is_local_root_dir = hbase_rootdir.startswith("file://") or (default_fs.startswith("file://") and hbase_rootdir.startswith("/")) - - if op_mode == "distributed" and is_local_root_dir: - rootdir_item = self.getWarnItem("In distributed mode hbase.rootdir should point to HDFS.") - elif op_mode == "embedded": - if distributed.lower() == "false" and hbase_rootdir.startswith('/') or hbase_rootdir.startswith("hdfs://"): - rootdir_item = self.getWarnItem("In embedded mode hbase.rootdir cannot point to schemaless values or HDFS, " - "Example - file:// for localFS") - pass - - distributed_item = None - if op_mode == "distributed" and not distributed.lower() == "true": - distributed_item = self.getErrorItem("hbase.cluster.distributed property should be set to true for " - "distributed mode") - if op_mode == "embedded" and distributed.lower() == "true": - distributed_item = self.getErrorItem("hbase.cluster.distributed property should be set to false for embedded mode") - - hbase_zk_client_port = properties.get("hbase.zookeeper.property.clientPort") - zkPort = self.getZKPort(services) - hbase_zk_client_port_item = None - if distributed.lower() == "true" and op_mode == "distributed" and \ - hbase_zk_client_port != zkPort and hbase_zk_client_port != "{{zookeeper_clientPort}}": - hbase_zk_client_port_item = self.getErrorItem("In AMS distributed mode, hbase.zookeeper.property.clientPort " - "should be the cluster zookeeper server port : {0}".format(zkPort)) - - if distributed.lower() == "false" and op_mode == "embedded" and \ - hbase_zk_client_port == zkPort and hbase_zk_client_port != "{{zookeeper_clientPort}}": - hbase_zk_client_port_item = self.getErrorItem("In AMS embedded mode, hbase.zookeeper.property.clientPort " - "should be a different port than cluster zookeeper port." - "(default:61181)") - - validationItems.extend([{"config-name":'hbase.rootdir', "item": rootdir_item }, - {"config-name":'hbase.cluster.distributed', "item": distributed_item }, - {"config-name":'hbase.zookeeper.property.clientPort', "item": hbase_zk_client_port_item }]) - - for collectorHostName in amsCollectorHosts: - for host in hosts["items"]: - if host["Hosts"]["host_name"] == collectorHostName: - if op_mode == 'embedded' or is_local_root_dir: - validationItems.extend([{"config-name": 'hbase.rootdir', "item": self.validatorEnoughDiskSpace(properties, 'hbase.rootdir', host["Hosts"], recommendedDiskSpace)}]) - validationItems.extend([{"config-name": 'hbase.rootdir', "item": self.validatorNotRootFs(properties, recommendedDefaults, 'hbase.rootdir', host["Hosts"])}]) - validationItems.extend([{"config-name": 'hbase.tmp.dir', "item": self.validatorNotRootFs(properties, recommendedDefaults, 'hbase.tmp.dir', host["Hosts"])}]) - - dn_hosts = self.getComponentHostNames(services, "HDFS", "DATANODE") - if is_local_root_dir: - mountPoints = [] - for mountPoint in host["Hosts"]["disk_info"]: - mountPoints.append(mountPoint["mountpoint"]) - hbase_rootdir_mountpoint = self.getMountPointForDir(hbase_rootdir, mountPoints) - hbase_tmpdir_mountpoint = self.getMountPointForDir(hbase_tmpdir, mountPoints) - preferred_mountpoints = self.getPreferredMountPoints(host['Hosts']) - # hbase.rootdir and hbase.tmp.dir shouldn't point to the same partition - # if multiple preferred_mountpoints exist - if hbase_rootdir_mountpoint == hbase_tmpdir_mountpoint and \ - len(preferred_mountpoints) > 1: - item = self.getWarnItem("Consider not using {0} partition for storing metrics temporary data. " - "{0} partition is already used as hbase.rootdir to store metrics data".format(hbase_tmpdir_mountpoint)) - validationItems.extend([{"config-name":'hbase.tmp.dir', "item": item}]) - - # if METRICS_COLLECTOR is co-hosted with DATANODE - # cross-check dfs.datanode.data.dir and hbase.rootdir - # they shouldn't share same disk partition IO - hdfs_site = getSiteProperties(configurations, "hdfs-site") - dfs_datadirs = hdfs_site.get("dfs.datanode.data.dir").split(",") if hdfs_site and "dfs.datanode.data.dir" in hdfs_site else [] - if dn_hosts and collectorHostName in dn_hosts and ams_site and \ - dfs_datadirs and len(preferred_mountpoints) > len(dfs_datadirs): - for dfs_datadir in dfs_datadirs: - dfs_datadir_mountpoint = self.getMountPointForDir(dfs_datadir, mountPoints) - if dfs_datadir_mountpoint == hbase_rootdir_mountpoint: - item = self.getWarnItem("Consider not using {0} partition for storing metrics data. " - "{0} is already used by datanode to store HDFS data".format(hbase_rootdir_mountpoint)) - validationItems.extend([{"config-name": 'hbase.rootdir', "item": item}]) - break - # If no local DN in distributed mode - elif collectorHostName not in dn_hosts and distributed.lower() == "true": - item = self.getWarnItem("It's recommended to install Datanode component on {0} " - "to speed up IO operations between HDFS and Metrics " - "Collector in distributed mode ".format(collectorHostName)) - validationItems.extend([{"config-name": "hbase.cluster.distributed", "item": item}]) - # Short circuit read should be enabled in distibuted mode - # if local DN installed - else: - validationItems.extend([{"config-name": "dfs.client.read.shortcircuit", "item": self.validatorEqualsToRecommendedItem(properties, recommendedDefaults, "dfs.client.read.shortcircuit")}]) - - return self.toConfigurationValidationProblems(validationItems, "ams-hbase-site") def validateStormConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): validationItems = [] @@ -990,132 +573,9 @@ class HDP206StackAdvisor(DefaultStackAdvisor): return self.toConfigurationValidationProblems(validationItems, "storm-site") - def validateAmsHbaseEnvConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): - - ams_env = getSiteProperties(configurations, "ams-env") - amsHbaseSite = getSiteProperties(configurations, "ams-hbase-site") - validationItems = [] - mb = 1024 * 1024 - gb = 1024 * mb - - regionServerItem = self.validatorLessThenDefaultValue(properties, recommendedDefaults, "hbase_regionserver_heapsize") ## FIXME if new service added - if regionServerItem: - validationItems.extend([{"config-name": "hbase_regionserver_heapsize", "item": regionServerItem}]) - - hbaseMasterHeapsizeItem = self.validatorLessThenDefaultValue(properties, recommendedDefaults, "hbase_master_heapsize") - if hbaseMasterHeapsizeItem: - validationItems.extend([{"config-name": "hbase_master_heapsize", "item": hbaseMasterHeapsizeItem}]) - - logDirItem = self.validatorEqualsPropertyItem(properties, "hbase_log_dir", ams_env, "metrics_collector_log_dir") - if logDirItem: - validationItems.extend([{"config-name": "hbase_log_dir", "item": logDirItem}]) - - hbase_master_heapsize = self.to_number(properties["hbase_master_heapsize"]) - hbase_master_xmn_size = self.to_number(properties["hbase_master_xmn_size"]) - hbase_regionserver_heapsize = self.to_number(properties["hbase_regionserver_heapsize"]) - hbase_regionserver_xmn_size = self.to_number(properties["regionserver_xmn_size"]) - - # Validate Xmn settings. - masterXmnItem = None - regionServerXmnItem = None - is_hbase_distributed = amsHbaseSite.get("hbase.cluster.distributed").lower() == 'true' - - if is_hbase_distributed: - - if not regionServerItem and hbase_regionserver_heapsize > 32768: - regionServerItem = self.getWarnItem("Value is more than the recommended maximum heap size of 32G.") - validationItems.extend([{"config-name": "hbase_regionserver_heapsize", "item": regionServerItem}]) - - minMasterXmn = 0.12 * hbase_master_heapsize - maxMasterXmn = 0.2 * hbase_master_heapsize - if hbase_master_xmn_size < minMasterXmn: - masterXmnItem = self.getWarnItem("Value is lesser than the recommended minimum Xmn size of {0} " - "(12% of hbase_master_heapsize)".format(int(ceil(minMasterXmn)))) - - if hbase_master_xmn_size > maxMasterXmn: - masterXmnItem = self.getWarnItem("Value is greater than the recommended maximum Xmn size of {0} " - "(20% of hbase_master_heapsize)".format(int(floor(maxMasterXmn)))) - - minRegionServerXmn = 0.12 * hbase_regionserver_heapsize - maxRegionServerXmn = 0.2 * hbase_regionserver_heapsize - if hbase_regionserver_xmn_size < minRegionServerXmn: - regionServerXmnItem = self.getWarnItem("Value is lesser than the recommended minimum Xmn size of {0} " - "(12% of hbase_regionserver_heapsize)" - .format(int(ceil(minRegionServerXmn)))) - - if hbase_regionserver_xmn_size > maxRegionServerXmn: - regionServerXmnItem = self.getWarnItem("Value is greater than the recommended maximum Xmn size of {0} " - "(20% of hbase_regionserver_heapsize)" - .format(int(floor(maxRegionServerXmn)))) - else: - - if not hbaseMasterHeapsizeItem and (hbase_master_heapsize + hbase_regionserver_heapsize) > 32768: - hbaseMasterHeapsizeItem = self.getWarnItem("Value of Master + Regionserver heapsize is more than the recommended maximum heap size of 32G.") - validationItems.extend([{"config-name": "hbase_master_heapsize", "item": hbaseMasterHeapsizeItem}]) - - minMasterXmn = 0.12 * (hbase_master_heapsize + hbase_regionserver_heapsize) - maxMasterXmn = 0.2 * (hbase_master_heapsize + hbase_regionserver_heapsize) - if hbase_master_xmn_size < minMasterXmn: - masterXmnItem = self.getWarnItem("Value is lesser than the recommended minimum Xmn size of {0} " - "(12% of hbase_master_heapsize + hbase_regionserver_heapsize)" - .format(int(ceil(minMasterXmn)))) - - if hbase_master_xmn_size > maxMasterXmn: - masterXmnItem = self.getWarnItem("Value is greater than the recommended maximum Xmn size of {0} " - "(20% of hbase_master_heapsize + hbase_regionserver_heapsize)" - .format(int(floor(maxMasterXmn)))) - if masterXmnItem: - validationItems.extend([{"config-name": "hbase_master_xmn_size", "item": masterXmnItem}]) - - if regionServerXmnItem: - validationItems.extend([{"config-name": "regionserver_xmn_size", "item": regionServerXmnItem}]) - - if hbaseMasterHeapsizeItem is None: - hostMasterComponents = {} - - for service in services["services"]: - for component in service["components"]: - if component["StackServiceComponents"]["hostnames"] is not None: - for hostName in component["StackServiceComponents"]["hostnames"]: - if self.isMasterComponent(component): - if hostName not in hostMasterComponents.keys(): - hostMasterComponents[hostName] = [] - hostMasterComponents[hostName].append(component["StackServiceComponents"]["component_name"]) - - amsCollectorHosts = self.getComponentHostNames(services, "AMBARI_METRICS", "METRICS_COLLECTOR") - for collectorHostName in amsCollectorHosts: - for host in hosts["items"]: - if host["Hosts"]["host_name"] == collectorHostName: - # AMS Collector co-hosted with other master components in bigger clusters - if len(hosts['items']) > 31 and \ - len(hostMasterComponents[collectorHostName]) > 2 and \ - host["Hosts"]["total_mem"] < 32*mb: # < 32Gb(total_mem in k) - masterHostMessage = "Host {0} is used by multiple master components ({1}). " \ - "It is recommended to use a separate host for the " \ - "Ambari Metrics Collector component and ensure " \ - "the host has sufficient memory available." - - hbaseMasterHeapsizeItem = self.getWarnItem(masterHostMessage.format( - collectorHostName, str(", ".join(hostMasterComponents[collectorHostName])))) - if hbaseMasterHeapsizeItem: - validationItems.extend([{"config-name": "hbase_master_heapsize", "item": hbaseMasterHeapsizeItem}]) - pass - return self.toConfigurationValidationProblems(validationItems, "ams-hbase-env") - - def validateAmsEnvConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): - - validationItems = [] - collectorHeapsizeDefaultItem = self.validatorLessThenDefaultValue(properties, recommendedDefaults, "metrics_collector_heapsize") - validationItems.extend([{"config-name": "metrics_collector_heapsize", "item": collectorHeapsizeDefaultItem}]) - ams_env = getSiteProperties(configurations, "ams-env") - collector_heapsize = self.to_number(ams_env.get("metrics_collector_heapsize")) - if collector_heapsize > 32768: - collectorHeapsizeMaxItem = self.getWarnItem("Value is more than the recommended maximum heap size of 32G.") - validationItems.extend([{"config-name": "metrics_collector_heapsize", "item": collectorHeapsizeMaxItem}]) - return self.toConfigurationValidationProblems(validationItems, "ams-env") def getMemorySizeRequired(self, services, components, configurations): totalMemoryRequired = 512*1024*1024 # 512Mb for OS needs http://git-wip-us.apache.org/repos/asf/ambari/blob/0f32765d/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py index 2dc1738..4cb0d9e 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.2/services/stack_advisor.py @@ -91,7 +91,6 @@ class HDP22StackAdvisor(HDP21StackAdvisor): "HBASE": self.recommendHBASEConfigurations, "MAPREDUCE2": self.recommendMapReduce2Configurations, "TEZ": self.recommendTezConfigurations, - "AMBARI_METRICS": self.recommendAmsConfigurations, "YARN": self.recommendYARNConfigurations, "STORM": self.recommendStormConfigurations, "KNOX": self.recommendKnoxConfigurations,