Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 73AF3200CDB for ; Wed, 28 Jun 2017 02:24:06 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 72888160C08; Wed, 28 Jun 2017 00:24:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A0C08160C0E for ; Wed, 28 Jun 2017 02:24:03 +0200 (CEST) Received: (qmail 88242 invoked by uid 500); 28 Jun 2017 00:24:02 -0000 Mailing-List: contact commits-help@ambari.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ambari-dev@ambari.apache.org Delivered-To: mailing list commits@ambari.apache.org Received: (qmail 88023 invoked by uid 99); 28 Jun 2017 00:24:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 Jun 2017 00:24:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 86C9EF3246; Wed, 28 Jun 2017 00:23:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: alejandro@apache.org To: commits@ambari.apache.org Date: Wed, 28 Jun 2017 00:24:07 -0000 Message-Id: <30f52f2efe6c4124ae0ec8cc8832261f@git.apache.org> In-Reply-To: <452d25062466449ebd977f81b4bcc9cc@git.apache.org> References: <452d25062466449ebd977f81b4bcc9cc@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [09/51] [partial] ambari git commit: AMBARI-21349. Create BigInsights Stack Skeleton in Ambari 2.5 (alejandro) archived-at: Wed, 28 Jun 2017 00:24:06 -0000 http://git-wip-us.apache.org/repos/asf/ambari/blob/1863c3b9/ambari-server/src/main/resources/stacks/BigInsights/4.0/stack-advisor/stack_advisor_22.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/stack-advisor/stack_advisor_22.py b/ambari-server/src/main/resources/stacks/BigInsights/4.0/stack-advisor/stack_advisor_22.py new file mode 100755 index 0000000..6848635 --- /dev/null +++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/stack-advisor/stack_advisor_22.py @@ -0,0 +1,1713 @@ +#!/usr/bin/env ambari-python-wrap +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import math +from math import floor +from urlparse import urlparse +import os +import fnmatch +import socket +import re +import xml.etree.ElementTree as ET + +from resource_management.core.logger import Logger + +try: + from stack_advisor_21 import * +except ImportError: + #Ignore ImportError + print("stack_advisor_21 not found") + +class HDP22StackAdvisor(HDP21StackAdvisor): + + def getServiceConfigurationRecommenderDict(self): + parentRecommendConfDict = super(HDP22StackAdvisor, self).getServiceConfigurationRecommenderDict() + childRecommendConfDict = { + "HDFS": self.recommendHDFSConfigurations, + "HIVE": self.recommendHIVEConfigurations, + "HBASE": self.recommendHBASEConfigurations, + "MAPREDUCE2": self.recommendMapReduce2Configurations, + "TEZ": self.recommendTezConfigurations, + "AMBARI_METRICS": self.recommendAmsConfigurations, + "YARN": self.recommendYARNConfigurations, + "STORM": self.recommendStormConfigurations, + "KNOX": self.recommendKnoxConfigurations, + "RANGER": self.recommendRangerConfigurations, + "LOGSEARCH" : self.recommendLogsearchConfigurations, + "SPARK": self.recommendSparkConfigurations, + } + parentRecommendConfDict.update(childRecommendConfDict) + return parentRecommendConfDict + + + def recommendSparkConfigurations(self, configurations, clusterData, services, hosts): + """ + :type configurations dict + :type clusterData dict + :type services dict + :type hosts dict + """ + putSparkProperty = self.putProperty(configurations, "spark-defaults", services) + + spark_queue = self.recommendYarnQueue(services, "spark-defaults", "spark.yarn.queue") + if spark_queue is not None: + putSparkProperty("spark.yarn.queue", spark_queue) + + # add only if spark supports this config + if "configurations" in services and "spark-thrift-sparkconf" in services["configurations"]: + putSparkThriftSparkConf = self.putProperty(configurations, "spark-thrift-sparkconf", services) + recommended_spark_queue = self.recommendYarnQueue(services, "spark-thrift-sparkconf", "spark.yarn.queue") + if recommended_spark_queue is not None: + putSparkThriftSparkConf("spark.yarn.queue", recommended_spark_queue) + + + def recommendYARNConfigurations(self, configurations, clusterData, services, hosts): + super(HDP22StackAdvisor, self).recommendYARNConfigurations(configurations, clusterData, services, hosts) + putYarnProperty = self.putProperty(configurations, "yarn-site", services) + putYarnProperty('yarn.nodemanager.resource.cpu-vcores', clusterData['cpu']) + putYarnProperty('yarn.scheduler.minimum-allocation-vcores', 1) + putYarnProperty('yarn.scheduler.maximum-allocation-vcores', configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.cpu-vcores"]) + # Property Attributes + putYarnPropertyAttribute = self.putPropertyAttribute(configurations, "yarn-site") + nodeManagerHost = self.getHostWithComponent("YARN", "NODEMANAGER", services, hosts) + if (nodeManagerHost is not None): + cpuPercentageLimit = 0.8 + if "yarn.nodemanager.resource.percentage-physical-cpu-limit" in configurations["yarn-site"]["properties"]: + cpuPercentageLimit = float(configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.percentage-physical-cpu-limit"]) + cpuLimit = max(1, int(floor(nodeManagerHost["Hosts"]["cpu_count"] * cpuPercentageLimit))) + putYarnProperty('yarn.nodemanager.resource.cpu-vcores', str(cpuLimit)) + putYarnProperty('yarn.scheduler.maximum-allocation-vcores', configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.cpu-vcores"]) + putYarnPropertyAttribute('yarn.nodemanager.resource.memory-mb', 'maximum', int(nodeManagerHost["Hosts"]["total_mem"] / 1024)) # total_mem in kb + putYarnPropertyAttribute('yarn.nodemanager.resource.cpu-vcores', 'maximum', nodeManagerHost["Hosts"]["cpu_count"] * 2) + putYarnPropertyAttribute('yarn.scheduler.minimum-allocation-vcores', 'maximum', configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.cpu-vcores"]) + putYarnPropertyAttribute('yarn.scheduler.maximum-allocation-vcores', 'maximum', configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.cpu-vcores"]) + putYarnPropertyAttribute('yarn.scheduler.minimum-allocation-mb', 'maximum', configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.memory-mb"]) + putYarnPropertyAttribute('yarn.scheduler.maximum-allocation-mb', 'maximum', configurations["yarn-site"]["properties"]["yarn.nodemanager.resource.memory-mb"]) + + # Above is the default calculated 'maximum' values derived purely from hosts. + # However, there are 'maximum' and other attributes that actually change based on the values + # of other configs. We need to update those values. + if ("yarn-site" in services["configurations"]): + if ("yarn.nodemanager.resource.memory-mb" in services["configurations"]["yarn-site"]["properties"]): + putYarnPropertyAttribute('yarn.scheduler.maximum-allocation-mb', 'maximum', services["configurations"]["yarn-site"]["properties"]["yarn.nodemanager.resource.memory-mb"]) + putYarnPropertyAttribute('yarn.scheduler.minimum-allocation-mb', 'maximum', services["configurations"]["yarn-site"]["properties"]["yarn.nodemanager.resource.memory-mb"]) + if ("yarn.nodemanager.resource.cpu-vcores" in services["configurations"]["yarn-site"]["properties"]): + putYarnPropertyAttribute('yarn.scheduler.maximum-allocation-vcores', 'maximum', services["configurations"]["yarn-site"]["properties"]["yarn.nodemanager.resource.cpu-vcores"]) + putYarnPropertyAttribute('yarn.scheduler.minimum-allocation-vcores', 'maximum', services["configurations"]["yarn-site"]["properties"]["yarn.nodemanager.resource.cpu-vcores"]) + + kerberos_authentication_enabled = self.isSecurityEnabled(services) + if kerberos_authentication_enabled: + putYarnProperty('yarn.nodemanager.container-executor.class', + 'org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor') + + if "yarn-env" in services["configurations"] and "yarn_cgroups_enabled" in services["configurations"]["yarn-env"]["properties"]: + yarn_cgroups_enabled = services["configurations"]["yarn-env"]["properties"]["yarn_cgroups_enabled"].lower() == "true" + if yarn_cgroups_enabled: + putYarnProperty('yarn.nodemanager.container-executor.class', 'org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor') + putYarnProperty('yarn.nodemanager.linux-container-executor.group', 'hadoop') + putYarnProperty('yarn.nodemanager.linux-container-executor.resources-handler.class', 'org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler') + putYarnProperty('yarn.nodemanager.linux-container-executor.cgroups.hierarchy', '/yarn') + putYarnProperty('yarn.nodemanager.linux-container-executor.cgroups.mount', 'true') + putYarnProperty('yarn.nodemanager.linux-container-executor.cgroups.mount-path', '/cgroup') + else: + if not kerberos_authentication_enabled: + putYarnProperty('yarn.nodemanager.container-executor.class', 'org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor') + putYarnPropertyAttribute('yarn.nodemanager.linux-container-executor.resources-handler.class', 'delete', 'true') + putYarnPropertyAttribute('yarn.nodemanager.linux-container-executor.cgroups.hierarchy', 'delete', 'true') + putYarnPropertyAttribute('yarn.nodemanager.linux-container-executor.cgroups.mount', 'delete', 'true') + putYarnPropertyAttribute('yarn.nodemanager.linux-container-executor.cgroups.mount-path', 'delete', 'true') + # recommend hadoop.registry.rm.enabled based on SLIDER in services + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + if "SLIDER" in servicesList: + putYarnProperty('hadoop.registry.rm.enabled', 'true') + else: + putYarnProperty('hadoop.registry.rm.enabled', 'false') + + def recommendHDFSConfigurations(self, configurations, clusterData, services, hosts): + super(HDP22StackAdvisor, self).recommendHDFSConfigurations(configurations, clusterData, services, hosts) + putHdfsSiteProperty = self.putProperty(configurations, "hdfs-site", services) + putHdfsSitePropertyAttribute = self.putPropertyAttribute(configurations, "hdfs-site") + putHdfsSiteProperty("dfs.datanode.max.transfer.threads", 16384 if clusterData["hBaseInstalled"] else 4096) + + dataDirsCount = 1 + # Use users 'dfs.datanode.data.dir' first + if "hdfs-site" in services["configurations"] and "dfs.datanode.data.dir" in services["configurations"]["hdfs-site"]["properties"]: + dataDirsCount = len(str(services["configurations"]["hdfs-site"]["properties"]["dfs.datanode.data.dir"]).split(",")) + elif "dfs.datanode.data.dir" in configurations["hdfs-site"]["properties"]: + dataDirsCount = len(str(configurations["hdfs-site"]["properties"]["dfs.datanode.data.dir"]).split(",")) + if dataDirsCount <= 2: + failedVolumesTolerated = 0 + elif dataDirsCount <= 4: + failedVolumesTolerated = 1 + else: + failedVolumesTolerated = 2 + putHdfsSiteProperty("dfs.datanode.failed.volumes.tolerated", failedVolumesTolerated) + + namenodeHosts = self.getHostsWithComponent("HDFS", "NAMENODE", services, hosts) + + # 25 * # of cores on NameNode + nameNodeCores = 4 + if namenodeHosts is not None and len(namenodeHosts): + nameNodeCores = int(namenodeHosts[0]['Hosts']['cpu_count']) + putHdfsSiteProperty("dfs.namenode.handler.count", 25 * nameNodeCores) + if 25 * nameNodeCores > 200: + putHdfsSitePropertyAttribute("dfs.namenode.handler.count", "maximum", 25 * nameNodeCores) + + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + if ('ranger-hdfs-plugin-properties' in services['configurations']) and ('ranger-hdfs-plugin-enabled' in services['configurations']['ranger-hdfs-plugin-properties']['properties']): + rangerPluginEnabled = services['configurations']['ranger-hdfs-plugin-properties']['properties']['ranger-hdfs-plugin-enabled'] + if ("RANGER" in servicesList) and (rangerPluginEnabled.lower() == 'Yes'.lower()): + putHdfsSiteProperty("dfs.permissions.enabled",'true') + + putHdfsSiteProperty("dfs.namenode.safemode.threshold-pct", "0.999" if len(namenodeHosts) > 1 else "1.000") + + putHdfsEnvProperty = self.putProperty(configurations, "hadoop-env", services) + putHdfsEnvPropertyAttribute = self.putPropertyAttribute(configurations, "hadoop-env") + + putHdfsEnvProperty('namenode_heapsize', max(int(clusterData['totalAvailableRam'] / 2), 1024)) + + nn_heapsize_limit = None + if (namenodeHosts is not None and len(namenodeHosts) > 0): + if len(namenodeHosts) > 1: + nn_max_heapsize = min(int(namenodeHosts[0]["Hosts"]["total_mem"]), int(namenodeHosts[1]["Hosts"]["total_mem"])) / 1024 + masters_at_host = max(self.getHostComponentsByCategories(namenodeHosts[0]["Hosts"]["host_name"], ["MASTER"], services, hosts), + self.getHostComponentsByCategories(namenodeHosts[1]["Hosts"]["host_name"], ["MASTER"], services, hosts)) + else: + nn_max_heapsize = int(namenodeHosts[0]["Hosts"]["total_mem"] / 1024) # total_mem in kb + masters_at_host = self.getHostComponentsByCategories(namenodeHosts[0]["Hosts"]["host_name"], ["MASTER"], services, hosts) + + putHdfsEnvPropertyAttribute('namenode_heapsize', 'maximum', max(nn_max_heapsize, 1024)) + + nn_heapsize_limit = nn_max_heapsize + nn_heapsize_limit -= clusterData["reservedRam"] + if len(masters_at_host) > 1: + nn_heapsize_limit = int(nn_heapsize_limit/2) + + putHdfsEnvProperty('namenode_heapsize', max(nn_heapsize_limit, 1024)) + + + datanodeHosts = self.getHostsWithComponent("HDFS", "DATANODE", services, hosts) + if datanodeHosts is not None and len(datanodeHosts) > 0: + min_datanode_ram_kb = 1073741824 # 1 TB + for datanode in datanodeHosts: + ram_kb = datanode['Hosts']['total_mem'] + min_datanode_ram_kb = min(min_datanode_ram_kb, ram_kb) + + datanodeFilesM = len(datanodeHosts)*dataDirsCount/10 # in millions, # of files = # of disks * 100'000 + nn_memory_configs = [ + {'nn_heap':1024, 'nn_opt':128}, + {'nn_heap':3072, 'nn_opt':512}, + {'nn_heap':5376, 'nn_opt':768}, + {'nn_heap':9984, 'nn_opt':1280}, + {'nn_heap':14848, 'nn_opt':2048}, + {'nn_heap':19456, 'nn_opt':2560}, + {'nn_heap':24320, 'nn_opt':3072}, + {'nn_heap':33536, 'nn_opt':4352}, + {'nn_heap':47872, 'nn_opt':6144}, + {'nn_heap':59648, 'nn_opt':7680}, + {'nn_heap':71424, 'nn_opt':8960}, + {'nn_heap':94976, 'nn_opt':8960} + ] + index = { + datanodeFilesM < 1 : 0, + 1 <= datanodeFilesM < 5 : 1, + 5 <= datanodeFilesM < 10 : 2, + 10 <= datanodeFilesM < 20 : 3, + 20 <= datanodeFilesM < 30 : 4, + 30 <= datanodeFilesM < 40 : 5, + 40 <= datanodeFilesM < 50 : 6, + 50 <= datanodeFilesM < 70 : 7, + 70 <= datanodeFilesM < 100 : 8, + 100 <= datanodeFilesM < 125 : 9, + 125 <= datanodeFilesM < 150 : 10, + 150 <= datanodeFilesM : 11 + }[1] + + nn_memory_config = nn_memory_configs[index] + + #override with new values if applicable + if nn_heapsize_limit is not None and nn_memory_config['nn_heap'] <= nn_heapsize_limit: + putHdfsEnvProperty('namenode_heapsize', nn_memory_config['nn_heap']) + + putHdfsEnvPropertyAttribute('dtnode_heapsize', 'maximum', int(min_datanode_ram_kb/1024)) + + nn_heapsize = int(configurations["hadoop-env"]["properties"]["namenode_heapsize"]) + putHdfsEnvProperty('namenode_opt_newsize', max(int(nn_heapsize / 8), 128)) + putHdfsEnvProperty('namenode_opt_maxnewsize', max(int(nn_heapsize / 8), 128)) + + putHdfsSitePropertyAttribute = self.putPropertyAttribute(configurations, "hdfs-site") + putHdfsSitePropertyAttribute('dfs.datanode.failed.volumes.tolerated', 'maximum', dataDirsCount) + + keyserverHostsString = None + keyserverPortString = None + if "hadoop-env" in services["configurations"] and "keyserver_host" in services["configurations"]["hadoop-env"]["properties"] and "keyserver_port" in services["configurations"]["hadoop-env"]["properties"]: + keyserverHostsString = services["configurations"]["hadoop-env"]["properties"]["keyserver_host"] + keyserverPortString = services["configurations"]["hadoop-env"]["properties"]["keyserver_port"] + + # Irrespective of what hadoop-env has, if Ranger-KMS is installed, we use its values. + rangerKMSServerHosts = self.getHostsWithComponent("RANGER_KMS", "RANGER_KMS_SERVER", services, hosts) + if rangerKMSServerHosts is not None and len(rangerKMSServerHosts) > 0: + rangerKMSServerHostsArray = [] + for rangeKMSServerHost in rangerKMSServerHosts: + rangerKMSServerHostsArray.append(rangeKMSServerHost["Hosts"]["host_name"]) + keyserverHostsString = ";".join(rangerKMSServerHostsArray) + if "kms-env" in services["configurations"] and "kms_port" in services["configurations"]["kms-env"]["properties"]: + keyserverPortString = services["configurations"]["kms-env"]["properties"]["kms_port"] + + if keyserverHostsString is not None and len(keyserverHostsString.strip()) > 0: + urlScheme = "http" + if "ranger-kms-site" in services["configurations"] and \ + "ranger.service.https.attrib.ssl.enabled" in services["configurations"]["ranger-kms-site"]["properties"] and \ + services["configurations"]["ranger-kms-site"]["properties"]["ranger.service.https.attrib.ssl.enabled"].lower() == "true": + urlScheme = "https" + + if keyserverPortString is None or len(keyserverPortString.strip()) < 1: + keyserverPortString = ":9292" + else: + keyserverPortString = ":" + keyserverPortString.strip() + putCoreSiteProperty = self.putProperty(configurations, "core-site", services) + kmsPath = "kms://" + urlScheme + "@" + keyserverHostsString.strip() + keyserverPortString + "/kms" + putCoreSiteProperty("hadoop.security.key.provider.path", kmsPath) + putHdfsSiteProperty("dfs.encryption.key.provider.uri", kmsPath) + + if "ranger-env" in services["configurations"] and "ranger-hdfs-plugin-properties" in services["configurations"] and \ + "ranger-hdfs-plugin-enabled" in services["configurations"]["ranger-env"]["properties"]: + putHdfsRangerPluginProperty = self.putProperty(configurations, "ranger-hdfs-plugin-properties", services) + rangerEnvHdfsPluginProperty = services["configurations"]["ranger-env"]["properties"]["ranger-hdfs-plugin-enabled"] + putHdfsRangerPluginProperty("ranger-hdfs-plugin-enabled", rangerEnvHdfsPluginProperty) + + putHdfsSitePropertyAttribute = self.putPropertyAttribute(configurations, "hdfs-site") + putCoreSitePropertyAttribute = self.putPropertyAttribute(configurations, "core-site") + if not "RANGER_KMS" in servicesList: + putCoreSitePropertyAttribute('hadoop.security.key.provider.path','delete','true') + putHdfsSitePropertyAttribute('dfs.encryption.key.provider.uri','delete','true') + + def recommendHIVEConfigurations(self, configurations, clusterData, services, hosts): + super(HDP22StackAdvisor, self).recommendHiveConfigurations(configurations, clusterData, services, hosts) + + putHiveServerProperty = self.putProperty(configurations, "hiveserver2-site", services) + putHiveEnvProperty = self.putProperty(configurations, "hive-env", services) + putHiveSiteProperty = self.putProperty(configurations, "hive-site", services) + putWebhcatSiteProperty = self.putProperty(configurations, "webhcat-site", services) + putHiveSitePropertyAttribute = self.putPropertyAttribute(configurations, "hive-site") + putHiveEnvPropertyAttributes = self.putPropertyAttribute(configurations, "hive-env") + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + + # Storage + putHiveEnvProperty("hive_exec_orc_storage_strategy", "SPEED") + putHiveSiteProperty("hive.exec.orc.encoding.strategy", configurations["hive-env"]["properties"]["hive_exec_orc_storage_strategy"]) + putHiveSiteProperty("hive.exec.orc.compression.strategy", configurations["hive-env"]["properties"]["hive_exec_orc_storage_strategy"]) + + putHiveSiteProperty("hive.exec.orc.default.stripe.size", "67108864") + putHiveSiteProperty("hive.exec.orc.default.compress", "ZLIB") + putHiveSiteProperty("hive.optimize.index.filter", "true") + putHiveSiteProperty("hive.optimize.sort.dynamic.partition", "false") + + # Vectorization + putHiveSiteProperty("hive.vectorized.execution.enabled", "true") + putHiveSiteProperty("hive.vectorized.execution.reduce.enabled", "false") + + # Transactions + putHiveEnvProperty("hive_txn_acid", "off") + if str(configurations["hive-env"]["properties"]["hive_txn_acid"]).lower() == "on": + putHiveSiteProperty("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager") + putHiveSiteProperty("hive.support.concurrency", "true") + putHiveSiteProperty("hive.compactor.initiator.on", "true") + putHiveSiteProperty("hive.compactor.worker.threads", "1") + putHiveSiteProperty("hive.enforce.bucketing", "true") + putHiveSiteProperty("hive.exec.dynamic.partition.mode", "nonstrict") + else: + putHiveSiteProperty("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager") + putHiveSiteProperty("hive.support.concurrency", "false") + putHiveSiteProperty("hive.compactor.initiator.on", "false") + putHiveSiteProperty("hive.compactor.worker.threads", "0") + putHiveSiteProperty("hive.enforce.bucketing", "false") + putHiveSiteProperty("hive.exec.dynamic.partition.mode", "strict") + + hiveMetastoreHost = self.getHostWithComponent("HIVE", "HIVE_METASTORE", services, hosts) + if hiveMetastoreHost is not None and len(hiveMetastoreHost) > 0: + putHiveSiteProperty("hive.metastore.uris", "thrift://" + hiveMetastoreHost["Hosts"]["host_name"] + ":9083") + + # ATS + putHiveEnvProperty("hive_timeline_logging_enabled", "true") + + hooks_properties = ["hive.exec.pre.hooks", "hive.exec.post.hooks", "hive.exec.failure.hooks"] + include_ats_hook = str(configurations["hive-env"]["properties"]["hive_timeline_logging_enabled"]).lower() == "true" + + ats_hook_class = "org.apache.hadoop.hive.ql.hooks.ATSHook" + for hooks_property in hooks_properties: + if hooks_property in configurations["hive-site"]["properties"]: + hooks_value = configurations["hive-site"]["properties"][hooks_property] + else: + hooks_value = " " + if include_ats_hook and ats_hook_class not in hooks_value: + if hooks_value == " ": + hooks_value = ats_hook_class + else: + hooks_value = hooks_value + "," + ats_hook_class + if not include_ats_hook and ats_hook_class in hooks_value: + hooks_classes = [] + for hook_class in hooks_value.split(","): + if hook_class != ats_hook_class and hook_class != " ": + hooks_classes.append(hook_class) + if hooks_classes: + hooks_value = ",".join(hooks_classes) + else: + hooks_value = " " + + putHiveSiteProperty(hooks_property, hooks_value) + + # Tez Engine + if "TEZ" in servicesList: + putHiveSiteProperty("hive.execution.engine", "tez") + else: + putHiveSiteProperty("hive.execution.engine", "mr") + + container_size = "512" + + if not "yarn-site" in configurations: + self.recommendYARNConfigurations(configurations, clusterData, services, hosts) + #properties below should be always present as they are provided in HDP206 stack advisor at least + yarnMaxAllocationSize = min(30 * int(configurations["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"]), int(configurations["yarn-site"]["properties"]["yarn.scheduler.maximum-allocation-mb"])) + #duplicate tez task resource calc logic, direct dependency doesn't look good here (in case of Hive without Tez) + container_size = clusterData['mapMemory'] if clusterData['mapMemory'] > 2048 else int(clusterData['reduceMemory']) + container_size = min(clusterData['containers'] * clusterData['ramPerContainer'], container_size, yarnMaxAllocationSize) + + if "TEZ" in servicesList: + putHiveSiteProperty("hive.tez.container.size", min(int(configurations["yarn-site"]["properties"]["yarn.scheduler.maximum-allocation-mb"]), container_size)) + + putHiveSitePropertyAttribute("hive.tez.container.size", "minimum", int(configurations["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"])) + putHiveSitePropertyAttribute("hive.tez.container.size", "maximum", int(configurations["yarn-site"]["properties"]["yarn.scheduler.maximum-allocation-mb"])) + + if "yarn-site" in services["configurations"]: + if "yarn.scheduler.minimum-allocation-mb" in services["configurations"]["yarn-site"]["properties"]: + putHiveSitePropertyAttribute("hive.tez.container.size", "minimum", int(services["configurations"]["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"])) + if "yarn.scheduler.maximum-allocation-mb" in services["configurations"]["yarn-site"]["properties"]: + putHiveSitePropertyAttribute("hive.tez.container.size", "maximum", int(services["configurations"]["yarn-site"]["properties"]["yarn.scheduler.maximum-allocation-mb"])) + + putHiveSiteProperty("hive.prewarm.enabled", "false") + putHiveSiteProperty("hive.prewarm.numcontainers", "3") + putHiveSiteProperty("hive.tez.auto.reducer.parallelism", "true") + putHiveSiteProperty("hive.tez.dynamic.partition.pruning", "true") + putHiveSiteProperty("hive.server2.tez.initialize.default.sessions", "false") + putHiveSiteProperty("hive.server2.tez.sessions.per.default.queue", "1") + + container_size = configurations["hive-site"]["properties"]["hive.tez.container.size"] + + container_size_bytes = int(int(container_size)*0.8*1024*1024) # Xmx == 80% of container + # Memory + # set noconditionaltask.size only if TEZ is in the service list. This value is too high for non tez case + if "TEZ" in servicesList: + putHiveSiteProperty("hive.auto.convert.join.noconditionaltask.size", int(round(container_size_bytes/3))) + putHiveSitePropertyAttribute("hive.auto.convert.join.noconditionaltask.size", "maximum", container_size_bytes) + putHiveSiteProperty("hive.exec.reducers.bytes.per.reducer", "67108864") + + + # CBO + if "hive-site" in services["configurations"] and "hive.cbo.enable" in services["configurations"]["hive-site"]["properties"]: + hive_cbo_enable = services["configurations"]["hive-site"]["properties"]["hive.cbo.enable"] + putHiveSiteProperty("hive.stats.fetch.partition.stats", hive_cbo_enable) + putHiveSiteProperty("hive.stats.fetch.column.stats", hive_cbo_enable) + + putHiveSiteProperty("hive.compute.query.using.stats", "true") + putHiveSiteProperty("hive.server2.enable.doAs", "true") + + yarn_queues = "default" + capacitySchedulerProperties = {} + if "capacity-scheduler" in services['configurations']: + if "capacity-scheduler" in services['configurations']["capacity-scheduler"]["properties"]: + properties = str(services['configurations']["capacity-scheduler"]["properties"]["capacity-scheduler"]).split('\n') + for property in properties: + key,sep,value = property.partition("=") + capacitySchedulerProperties[key] = value + if "yarn.scheduler.capacity.root.queues" in capacitySchedulerProperties: + yarn_queues = str(capacitySchedulerProperties["yarn.scheduler.capacity.root.queues"]) + elif "yarn.scheduler.capacity.root.queues" in services['configurations']["capacity-scheduler"]["properties"]: + yarn_queues = services['configurations']["capacity-scheduler"]["properties"]["yarn.scheduler.capacity.root.queues"] + # Interactive Queues property attributes + putHiveServerPropertyAttribute = self.putPropertyAttribute(configurations, "hiveserver2-site") + toProcessQueues = yarn_queues.split(",") + leafQueueNames = set() # Remove duplicates + while len(toProcessQueues) > 0: + queue = toProcessQueues.pop() + queueKey = "yarn.scheduler.capacity.root." + queue + ".queues" + if queueKey in capacitySchedulerProperties: + # This is a parent queue - need to add children + subQueues = capacitySchedulerProperties[queueKey].split(",") + for subQueue in subQueues: + toProcessQueues.append(queue + "." + subQueue) + else: + # This is a leaf queue + queueName = queue.split(".")[-1] # Fully qualified queue name does not work, we should use only leaf name + leafQueueNames.add(queueName) + leafQueues = [{"label": str(queueName) + " queue", "value": queueName} for queueName in leafQueueNames] + leafQueues = sorted(leafQueues, key=lambda q:q['value']) + putHiveSitePropertyAttribute("hive.server2.tez.default.queues", "entries", leafQueues) + putHiveSiteProperty("hive.server2.tez.default.queues", ",".join([leafQueue['value'] for leafQueue in leafQueues])) + + webhcat_queue = self.recommendYarnQueue(services, "webhcat-site", "templeton.hadoop.queue.name") + if webhcat_queue is not None: + putWebhcatSiteProperty("templeton.hadoop.queue.name", webhcat_queue) + + + # Recommend Ranger Hive authorization as per Ranger Hive plugin property + if "ranger-env" in services["configurations"] and "hive-env" in services["configurations"] and \ + "ranger-hive-plugin-enabled" in services["configurations"]["ranger-env"]["properties"]: + rangerEnvHivePluginProperty = services["configurations"]["ranger-env"]["properties"]["ranger-hive-plugin-enabled"] + if (rangerEnvHivePluginProperty.lower() == "yes"): + putHiveEnvProperty("hive_security_authorization", "RANGER") + + # Security + if ("configurations" not in services) or ("hive-env" not in services["configurations"]) or \ + ("properties" not in services["configurations"]["hive-env"]) or \ + ("hive_security_authorization" not in services["configurations"]["hive-env"]["properties"]) or \ + str(services["configurations"]["hive-env"]["properties"]["hive_security_authorization"]).lower() == "none": + putHiveEnvProperty("hive_security_authorization", "None") + else: + putHiveEnvProperty("hive_security_authorization", services["configurations"]["hive-env"]["properties"]["hive_security_authorization"]) + + + # Recommend Ranger Hive authorization as per Ranger Hive plugin property + if "ranger-env" in services["configurations"] and "hive-env" in services["configurations"] and \ + "ranger-hive-plugin-enabled" in services["configurations"]["ranger-env"]["properties"]: + rangerEnvHivePluginProperty = services["configurations"]["ranger-env"]["properties"]["ranger-hive-plugin-enabled"] + rangerEnvHiveAuthProperty = services["configurations"]["hive-env"]["properties"]["hive_security_authorization"] + if (rangerEnvHivePluginProperty.lower() == "yes"): + putHiveEnvProperty("hive_security_authorization", "Ranger") + elif (rangerEnvHiveAuthProperty.lower() == "ranger"): + putHiveEnvProperty("hive_security_authorization", "None") + + # hive_security_authorization == 'none' + # this property is unrelated to Kerberos + if str(configurations["hive-env"]["properties"]["hive_security_authorization"]).lower() == "none": + putHiveSiteProperty("hive.security.authorization.manager", "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdConfOnlyAuthorizerFactory") + if ("hive.security.authorization.manager" in configurations["hiveserver2-site"]["properties"]) or \ + ("hiveserver2-site" not in services["configurations"]) or \ + ("hiveserver2-site" in services["configurations"] and "hive.security.authorization.manager" in services["configurations"]["hiveserver2-site"]["properties"]): + putHiveServerPropertyAttribute("hive.security.authorization.manager", "delete", "true") + if ("hive.security.authenticator.manager" in configurations["hiveserver2-site"]["properties"]) or \ + ("hiveserver2-site" not in services["configurations"]) or \ + ("hiveserver2-site" in services["configurations"] and "hive.security.authenticator.manager" in services["configurations"]["hiveserver2-site"]["properties"]): + putHiveServerPropertyAttribute("hive.security.authenticator.manager", "delete", "true") + if ("hive.conf.restricted.list" in configurations["hiveserver2-site"]["properties"]) or \ + ("hiveserver2-site" not in services["configurations"]) or \ + ("hiveserver2-site" in services["configurations"] and "hive.conf.restricted.list" in services["configurations"]["hiveserver2-site"]["properties"]): + putHiveServerPropertyAttribute("hive.conf.restricted.list", "delete", "true") + if "KERBEROS" not in servicesList: # Kerberos security depends on this property + putHiveSiteProperty("hive.security.authorization.enabled", "false") + else: + putHiveSiteProperty("hive.security.authorization.enabled", "true") + + try: + auth_manager_value = str(configurations["hive-env"]["properties"]["hive.security.metastore.authorization.manager"]) + except KeyError: + auth_manager_value = 'org.apache.hadoop.hive.ql.security.authorization.StorageBasedAuthorizationProvider' + pass + auth_manager_values = auth_manager_value.split(",") + sqlstdauth_class = "org.apache.hadoop.hive.ql.security.authorization.MetaStoreAuthzAPIAuthorizerEmbedOnly" + + putHiveSiteProperty("hive.server2.enable.doAs", "true") + + # hive_security_authorization == 'sqlstdauth' + if str(configurations["hive-env"]["properties"]["hive_security_authorization"]).lower() == "sqlstdauth": + putHiveSiteProperty("hive.server2.enable.doAs", "false") + putHiveServerProperty("hive.security.authorization.enabled", "true") + putHiveServerProperty("hive.security.authorization.manager", "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory") + putHiveServerProperty("hive.security.authenticator.manager", "org.apache.hadoop.hive.ql.security.SessionStateUserAuthenticator") + putHiveServerProperty("hive.conf.restricted.list", "hive.security.authenticator.manager,hive.security.authorization.manager,hive.users.in.admin.role") + putHiveSiteProperty("hive.security.authorization.manager", "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdConfOnlyAuthorizerFactory") + if sqlstdauth_class not in auth_manager_values: + auth_manager_values.append(sqlstdauth_class) + elif sqlstdauth_class in auth_manager_values: + #remove item from csv + auth_manager_values = [x for x in auth_manager_values if x != sqlstdauth_class] + pass + putHiveSiteProperty("hive.security.metastore.authorization.manager", ",".join(auth_manager_values)) + + # hive_security_authorization == 'ranger' + if str(configurations["hive-env"]["properties"]["hive_security_authorization"]).lower() == "ranger": + putHiveSiteProperty("hive.server2.enable.doAs", "false") + putHiveServerProperty("hive.security.authorization.enabled", "true") + putHiveServerProperty("hive.security.authorization.manager", "com.xasecure.authorization.hive.authorizer.XaSecureHiveAuthorizerFactory") + putHiveServerProperty("hive.security.authenticator.manager", "org.apache.hadoop.hive.ql.security.SessionStateUserAuthenticator") + putHiveServerProperty("hive.conf.restricted.list", "hive.security.authorization.enabled,hive.security.authorization.manager,hive.security.authenticator.manager") + + putHiveSiteProperty("hive.server2.use.SSL", "false") + + #Hive authentication + hive_server2_auth = None + if "hive-site" in services["configurations"] and "hive.server2.authentication" in services["configurations"]["hive-site"]["properties"]: + hive_server2_auth = str(services["configurations"]["hive-site"]["properties"]["hive.server2.authentication"]).lower() + elif "hive.server2.authentication" in configurations["hive-site"]["properties"]: + hive_server2_auth = str(configurations["hive-site"]["properties"]["hive.server2.authentication"]).lower() + + if hive_server2_auth == "ldap": + putHiveSiteProperty("hive.server2.authentication.ldap.url", "") + else: + if ("hive.server2.authentication.ldap.url" in configurations["hive-site"]["properties"]) or \ + ("hive-site" not in services["configurations"]) or \ + ("hive-site" in services["configurations"] and "hive.server2.authentication.ldap.url" in services["configurations"]["hive-site"]["properties"]): + putHiveSitePropertyAttribute("hive.server2.authentication.ldap.url", "delete", "true") + + if hive_server2_auth == "kerberos": + if "hive-site" in services["configurations"] and "hive.server2.authentication.kerberos.keytab" not in services["configurations"]["hive-site"]["properties"]: + putHiveSiteProperty("hive.server2.authentication.kerberos.keytab", "") + if "hive-site" in services["configurations"] and "hive.server2.authentication.kerberos.principal" not in services["configurations"]["hive-site"]["properties"]: + putHiveSiteProperty("hive.server2.authentication.kerberos.principal", "") + elif "KERBEROS" not in servicesList: # Since 'hive_server2_auth' cannot be relied on within the default, empty recommendations request + if ("hive.server2.authentication.kerberos.keytab" in configurations["hive-site"]["properties"]) or \ + ("hive-site" not in services["configurations"]) or \ + ("hive-site" in services["configurations"] and "hive.server2.authentication.kerberos.keytab" in services["configurations"]["hive-site"]["properties"]): + putHiveSitePropertyAttribute("hive.server2.authentication.kerberos.keytab", "delete", "true") + if ("hive.server2.authentication.kerberos.principal" in configurations["hive-site"]["properties"]) or \ + ("hive-site" not in services["configurations"]) or \ + ("hive-site" in services["configurations"] and "hive.server2.authentication.kerberos.principal" in services["configurations"]["hive-site"]["properties"]): + putHiveSitePropertyAttribute("hive.server2.authentication.kerberos.principal", "delete", "true") + + if hive_server2_auth == "pam": + putHiveSiteProperty("hive.server2.authentication.pam.services", "") + else: + if ("hive.server2.authentication.pam.services" in configurations["hive-site"]["properties"]) or \ + ("hive-site" not in services["configurations"]) or \ + ("hive-site" in services["configurations"] and "hive.server2.authentication.pam.services" in services["configurations"]["hive-site"]["properties"]): + putHiveSitePropertyAttribute("hive.server2.authentication.pam.services", "delete", "true") + + if hive_server2_auth == "custom": + putHiveSiteProperty("hive.server2.custom.authentication.class", "") + else: + if ("hive.server2.authentication" in configurations["hive-site"]["properties"]) or \ + ("hive-site" not in services["configurations"]) or \ + ("hive-site" in services["configurations"] and "hive.server2.custom.authentication.class" in services["configurations"]["hive-site"]["properties"]): + putHiveSitePropertyAttribute("hive.server2.custom.authentication.class", "delete", "true") + + # HiveServer, Client, Metastore heapsize + hs_heapsize_multiplier = 3.0/8 + hm_heapsize_multiplier = 1.0/8 + # HiveServer2 and HiveMetastore located on the same host + hive_server_hosts = self.getHostsWithComponent("HIVE", "HIVE_SERVER", services, hosts) + hive_client_hosts = self.getHostsWithComponent("HIVE", "HIVE_CLIENT", services, hosts) + + if hive_server_hosts is not None and len(hive_server_hosts): + hs_host_ram = hive_server_hosts[0]["Hosts"]["total_mem"]/1024 + putHiveEnvProperty("hive.metastore.heapsize", max(512, int(hs_host_ram*hm_heapsize_multiplier))) + putHiveEnvProperty("hive.heapsize", max(512, int(hs_host_ram*hs_heapsize_multiplier))) + putHiveEnvPropertyAttributes("hive.metastore.heapsize", "maximum", max(1024, hs_host_ram)) + putHiveEnvPropertyAttributes("hive.heapsize", "maximum", max(1024, hs_host_ram)) + + if hive_client_hosts is not None and len(hive_client_hosts): + putHiveEnvProperty("hive.client.heapsize", 1024) + putHiveEnvPropertyAttributes("hive.client.heapsize", "maximum", max(1024, int(hive_client_hosts[0]["Hosts"]["total_mem"]/1024))) + + + def recommendHBASEConfigurations(self, configurations, clusterData, services, hosts): + super(HDP22StackAdvisor, self).recommendHbaseConfigurations(configurations, clusterData, services, hosts) + putHbaseEnvPropertyAttributes = self.putPropertyAttribute(configurations, "hbase-env") + + hmaster_host = self.getHostWithComponent("HBASE", "HBASE_MASTER", services, hosts) + if hmaster_host is not None: + host_ram = hmaster_host["Hosts"]["total_mem"] + putHbaseEnvPropertyAttributes('hbase_master_heapsize', 'maximum', max(1024, int(host_ram/1024))) + + rs_hosts = self.getHostsWithComponent("HBASE", "HBASE_REGIONSERVER", services, hosts) + if rs_hosts is not None and len(rs_hosts) > 0: + min_ram = rs_hosts[0]["Hosts"]["total_mem"] + for host in rs_hosts: + host_ram = host["Hosts"]["total_mem"] + min_ram = min(min_ram, host_ram) + + putHbaseEnvPropertyAttributes('hbase_regionserver_heapsize', 'maximum', max(1024, int(min_ram*0.8/1024))) + + putHbaseSiteProperty = self.putProperty(configurations, "hbase-site", services) + putHbaseSitePropertyAttributes = self.putPropertyAttribute(configurations, "hbase-site") + putHbaseSiteProperty("hbase.regionserver.global.memstore.size", '0.4') + + if 'hbase-env' in services['configurations'] and 'phoenix_sql_enabled' in services['configurations']['hbase-env']['properties'] and \ + 'true' == services['configurations']['hbase-env']['properties']['phoenix_sql_enabled'].lower(): + putHbaseSiteProperty("hbase.regionserver.wal.codec", 'org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec') + putHbaseSiteProperty("phoenix.functions.allowUserDefinedFunctions", 'true') + else: + putHbaseSiteProperty("hbase.regionserver.wal.codec", 'org.apache.hadoop.hbase.regionserver.wal.WALCellCodec') + if ('hbase.rpc.controllerfactory.class' in configurations["hbase-site"]["properties"]) or \ + ('hbase-site' in services['configurations'] and 'hbase.rpc.controllerfactory.class' in services['configurations']["hbase-site"]["properties"]): + putHbaseSitePropertyAttributes('hbase.rpc.controllerfactory.class', 'delete', 'true') + if ('phoenix.functions.allowUserDefinedFunctions' in configurations["hbase-site"]["properties"]) or \ + ('hbase-site' in services['configurations'] and 'phoenix.functions.allowUserDefinedFunctions' in services['configurations']["hbase-site"]["properties"]): + putHbaseSitePropertyAttributes('phoenix.functions.allowUserDefinedFunctions', 'delete', 'true') + + if "ranger-env" in services["configurations"] and "ranger-hbase-plugin-properties" in services["configurations"] and \ + "ranger-hbase-plugin-enabled" in services["configurations"]["ranger-env"]["properties"]: + putHbaseRangerPluginProperty = self.putProperty(configurations, "ranger-hbase-plugin-properties", services) + rangerEnvHbasePluginProperty = services["configurations"]["ranger-env"]["properties"]["ranger-hbase-plugin-enabled"] + putHbaseRangerPluginProperty("ranger-hbase-plugin-enabled", rangerEnvHbasePluginProperty) + if "cluster-env" in services["configurations"] and "smokeuser" in services["configurations"]["cluster-env"]["properties"]: + smoke_user = services["configurations"]["cluster-env"]["properties"]["smokeuser"] + putHbaseRangerPluginProperty("policy_user", smoke_user) + rangerPluginEnabled = '' + if 'ranger-hbase-plugin-properties' in configurations and 'ranger-hbase-plugin-enabled' in configurations['ranger-hbase-plugin-properties']['properties']: + rangerPluginEnabled = configurations['ranger-hbase-plugin-properties']['properties']['ranger-hbase-plugin-enabled'] + elif 'ranger-hbase-plugin-properties' in services['configurations'] and 'ranger-hbase-plugin-enabled' in services['configurations']['ranger-hbase-plugin-properties']['properties']: + rangerPluginEnabled = services['configurations']['ranger-hbase-plugin-properties']['properties']['ranger-hbase-plugin-enabled'] + + if rangerPluginEnabled and rangerPluginEnabled.lower() == 'Yes'.lower(): + putHbaseSiteProperty('hbase.security.authorization','true') + + # Recommend configs for bucket cache + threshold = 23 # 2 Gb is reserved for other offheap memory + mb = 1024 + if (int(clusterData["hbaseRam"]) > threshold): + # To enable cache - calculate values + regionserver_total_ram = int(clusterData["hbaseRam"]) * mb + regionserver_heap_size = 20480 + regionserver_max_direct_memory_size = regionserver_total_ram - regionserver_heap_size + hfile_block_cache_size = '0.4' + block_cache_heap = 8192 # int(regionserver_heap_size * hfile_block_cache_size) + hbase_regionserver_global_memstore_size = '0.4' + reserved_offheap_memory = 2048 + bucketcache_offheap_memory = regionserver_max_direct_memory_size - reserved_offheap_memory + hbase_bucketcache_size = bucketcache_offheap_memory + hbase_bucketcache_percentage_in_combinedcache = float(bucketcache_offheap_memory) / hbase_bucketcache_size + hbase_bucketcache_percentage_in_combinedcache_str = "{0:.4f}".format(math.ceil(hbase_bucketcache_percentage_in_combinedcache * 10000) / 10000.0) + + # Set values in hbase-site + putHbaseSiteProperty('hfile.block.cache.size', hfile_block_cache_size) + putHbaseSiteProperty('hbase.regionserver.global.memstore.size', hbase_regionserver_global_memstore_size) + putHbaseSiteProperty('hbase.bucketcache.ioengine', 'offheap') + putHbaseSiteProperty('hbase.bucketcache.size', hbase_bucketcache_size) + putHbaseSiteProperty('hbase.bucketcache.percentage.in.combinedcache', hbase_bucketcache_percentage_in_combinedcache_str) + + # Enable in hbase-env + putHbaseEnvProperty = self.putProperty(configurations, "hbase-env", services) + putHbaseEnvProperty('hbase_max_direct_memory_size', regionserver_max_direct_memory_size) + putHbaseEnvProperty('hbase_regionserver_heapsize', regionserver_heap_size) + else: + # Disable + if ('hbase.bucketcache.ioengine' in configurations["hbase-site"]["properties"]) or \ + ('hbase-site' in services['configurations'] and 'hbase.bucketcache.ioengine' in services['configurations']["hbase-site"]["properties"]): + putHbaseSitePropertyAttributes('hbase.bucketcache.ioengine', 'delete', 'true') + if ('hbase.bucketcache.size' in configurations["hbase-site"]["properties"]) or \ + ('hbase-site' in services['configurations'] and 'hbase.bucketcache.size' in services['configurations']["hbase-site"]["properties"]): + putHbaseSitePropertyAttributes('hbase.bucketcache.size', 'delete', 'true') + if ('hbase.bucketcache.percentage.in.combinedcache' in configurations["hbase-site"]["properties"]) or \ + ('hbase-site' in services['configurations'] and 'hbase.bucketcache.percentage.in.combinedcache' in services['configurations']["hbase-site"]["properties"]): + putHbaseSitePropertyAttributes('hbase.bucketcache.percentage.in.combinedcache', 'delete', 'true') + if ('hbase_max_direct_memory_size' in configurations["hbase-env"]["properties"]) or \ + ('hbase-env' in services['configurations'] and 'hbase_max_direct_memory_size' in services['configurations']["hbase-env"]["properties"]): + putHbaseEnvPropertyAttributes('hbase_max_direct_memory_size', 'delete', 'true') + + # Authorization + hbaseCoProcessorConfigs = { + 'hbase.coprocessor.region.classes': [], + 'hbase.coprocessor.regionserver.classes': [], + 'hbase.coprocessor.master.classes': [] + } + for key in hbaseCoProcessorConfigs: + hbase_coprocessor_classes = None + if key in configurations["hbase-site"]["properties"]: + hbase_coprocessor_classes = configurations["hbase-site"]["properties"][key].strip() + elif 'hbase-site' in services['configurations'] and key in services['configurations']["hbase-site"]["properties"]: + hbase_coprocessor_classes = services['configurations']["hbase-site"]["properties"][key].strip() + if hbase_coprocessor_classes: + hbaseCoProcessorConfigs[key] = hbase_coprocessor_classes.split(',') + + # If configurations has it - it has priority as it is calculated. Then, the service's configurations will be used. + hbase_security_authorization = None + if 'hbase-site' in configurations and 'hbase.security.authorization' in configurations['hbase-site']['properties']: + hbase_security_authorization = configurations['hbase-site']['properties']['hbase.security.authorization'] + elif 'hbase-site' in services['configurations'] and 'hbase.security.authorization' in services['configurations']['hbase-site']['properties']: + hbase_security_authorization = services['configurations']['hbase-site']['properties']['hbase.security.authorization'] + if hbase_security_authorization: + if 'true' == hbase_security_authorization.lower(): + hbaseCoProcessorConfigs['hbase.coprocessor.master.classes'].append('org.apache.hadoop.hbase.security.access.AccessController') + hbaseCoProcessorConfigs['hbase.coprocessor.regionserver.classes'].append('org.apache.hadoop.hbase.security.access.AccessController') + # regional classes when hbase authorization is enabled + authRegionClasses = ['org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint', 'org.apache.hadoop.hbase.security.access.AccessController'] + for item in range(len(authRegionClasses)): + hbaseCoProcessorConfigs['hbase.coprocessor.region.classes'].append(authRegionClasses[item]) + else: + if 'org.apache.hadoop.hbase.security.access.AccessController' in hbaseCoProcessorConfigs['hbase.coprocessor.region.classes']: + hbaseCoProcessorConfigs['hbase.coprocessor.region.classes'].remove('org.apache.hadoop.hbase.security.access.AccessController') + if 'org.apache.hadoop.hbase.security.access.AccessController' in hbaseCoProcessorConfigs['hbase.coprocessor.master.classes']: + hbaseCoProcessorConfigs['hbase.coprocessor.master.classes'].remove('org.apache.hadoop.hbase.security.access.AccessController') + + hbaseCoProcessorConfigs['hbase.coprocessor.region.classes'].append("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint") + if ('hbase.coprocessor.regionserver.classes' in configurations["hbase-site"]["properties"]) or \ + ('hbase-site' in services['configurations'] and 'hbase.coprocessor.regionserver.classes' in services['configurations']["hbase-site"]["properties"]): + putHbaseSitePropertyAttributes('hbase.coprocessor.regionserver.classes', 'delete', 'true') + else: + hbaseCoProcessorConfigs['hbase.coprocessor.region.classes'].append("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint") + if ('hbase.coprocessor.regionserver.classes' in configurations["hbase-site"]["properties"]) or \ + ('hbase-site' in services['configurations'] and 'hbase.coprocessor.regionserver.classes' in services['configurations']["hbase-site"]["properties"]): + putHbaseSitePropertyAttributes('hbase.coprocessor.regionserver.classes', 'delete', 'true') + + # Authentication + if 'hbase-site' in services['configurations'] and 'hbase.security.authentication' in services['configurations']['hbase-site']['properties']: + if 'kerberos' == services['configurations']['hbase-site']['properties']['hbase.security.authentication'].lower(): + if 'org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint' not in hbaseCoProcessorConfigs['hbase.coprocessor.region.classes']: + hbaseCoProcessorConfigs['hbase.coprocessor.region.classes'].append('org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint') + if 'org.apache.hadoop.hbase.security.token.TokenProvider' not in hbaseCoProcessorConfigs['hbase.coprocessor.region.classes']: + hbaseCoProcessorConfigs['hbase.coprocessor.region.classes'].append('org.apache.hadoop.hbase.security.token.TokenProvider') + else: + if 'org.apache.hadoop.hbase.security.token.TokenProvider' in hbaseCoProcessorConfigs['hbase.coprocessor.region.classes']: + hbaseCoProcessorConfigs['hbase.coprocessor.region.classes'].remove('org.apache.hadoop.hbase.security.token.TokenProvider') + + #Remove duplicates + for key in hbaseCoProcessorConfigs: + uniqueCoprocessorRegionClassList = [] + [uniqueCoprocessorRegionClassList.append(i) + for i in hbaseCoProcessorConfigs[key] if + not i in uniqueCoprocessorRegionClassList + and (i.strip() not in ['{{hbase_coprocessor_region_classes}}', '{{hbase_coprocessor_master_classes}}', '{{hbase_coprocessor_regionserver_classes}}'])] + putHbaseSiteProperty(key, ','.join(set(uniqueCoprocessorRegionClassList))) + + + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + rangerServiceVersion='' + if 'RANGER' in servicesList: + rangerServiceVersion = [service['StackServices']['service_version'] for service in services["services"] if service['StackServices']['service_name'] == 'RANGER'][0] + + if rangerServiceVersion and rangerServiceVersion == '0.4.0': + rangerClass = 'com.xasecure.authorization.hbase.XaSecureAuthorizationCoprocessor' + else: + rangerClass = 'org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor' + + nonRangerClass = 'org.apache.hadoop.hbase.security.access.AccessController' + hbaseClassConfigs = hbaseCoProcessorConfigs.keys() + + for item in range(len(hbaseClassConfigs)): + if 'hbase-site' in services['configurations']: + if hbaseClassConfigs[item] in services['configurations']['hbase-site']['properties']: + if 'hbase-site' in configurations and hbaseClassConfigs[item] in configurations['hbase-site']['properties']: + coprocessorConfig = configurations['hbase-site']['properties'][hbaseClassConfigs[item]] + else: + coprocessorConfig = services['configurations']['hbase-site']['properties'][hbaseClassConfigs[item]] + coprocessorClasses = coprocessorConfig.split(",") + coprocessorClasses = filter(None, coprocessorClasses) # Removes empty string elements from array + if rangerPluginEnabled and rangerPluginEnabled.lower() == 'Yes'.lower(): + if nonRangerClass in coprocessorClasses: + coprocessorClasses.remove(nonRangerClass) + if not rangerClass in coprocessorClasses: + coprocessorClasses.append(rangerClass) + putHbaseSiteProperty(hbaseClassConfigs[item], ','.join(coprocessorClasses)) + elif rangerPluginEnabled and rangerPluginEnabled.lower() == 'No'.lower(): + if rangerClass in coprocessorClasses: + coprocessorClasses.remove(rangerClass) + if not nonRangerClass in coprocessorClasses: + coprocessorClasses.append(nonRangerClass) + putHbaseSiteProperty(hbaseClassConfigs[item], ','.join(coprocessorClasses)) + elif rangerPluginEnabled and rangerPluginEnabled.lower() == 'Yes'.lower(): + putHbaseSiteProperty(hbaseClassConfigs[item], rangerClass) + + + def recommendTezConfigurations(self, configurations, clusterData, services, hosts): + if not "yarn-site" in configurations: + self.recommendYARNConfigurations(configurations, clusterData, services, hosts) + #properties below should be always present as they are provided in HDP206 stack advisor + yarnMaxAllocationSize = min(30 * int(configurations["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"]), int(configurations["yarn-site"]["properties"]["yarn.scheduler.maximum-allocation-mb"])) + + putTezProperty = self.putProperty(configurations, "tez-site", services) + putTezProperty("tez.am.resource.memory.mb", min(int(configurations["yarn-site"]["properties"]["yarn.scheduler.maximum-allocation-mb"]), int(clusterData['amMemory']) * 2 if int(clusterData['amMemory']) < 3072 else int(clusterData['amMemory']))) + + taskResourceMemory = clusterData['mapMemory'] if clusterData['mapMemory'] > 2048 else int(clusterData['reduceMemory']) + taskResourceMemory = min(clusterData['containers'] * clusterData['ramPerContainer'], taskResourceMemory, yarnMaxAllocationSize) + putTezProperty("tez.task.resource.memory.mb", min(int(configurations["yarn-site"]["properties"]["yarn.scheduler.maximum-allocation-mb"]), taskResourceMemory)) + taskResourceMemory = int(configurations["tez-site"]["properties"]["tez.task.resource.memory.mb"]) + putTezProperty("tez.runtime.io.sort.mb", min(int(taskResourceMemory * 0.4), 2047)) + putTezProperty("tez.runtime.unordered.output.buffer.size-mb", int(taskResourceMemory * 0.075)) + putTezProperty("tez.session.am.dag.submit.timeout.secs", "600") + + tez_queue = self.recommendYarnQueue(services, "tez-site", "tez.queue.name") + if tez_queue is not None: + putTezProperty("tez.queue.name", tez_queue) + + serverProperties = services["ambari-server-properties"] + latest_tez_jar_version = None + + server_host = socket.getfqdn() + for host in hosts["items"]: + if server_host == host["Hosts"]["host_name"]: + server_host = host["Hosts"]["public_host_name"] + server_port = '8080' + server_protocol = 'http' + views_dir = '/var/lib/ambari-server/resources/views/' + + if serverProperties: + if 'client.api.port' in serverProperties: + server_port = serverProperties['client.api.port'] + if 'views.dir' in serverProperties: + views_dir = serverProperties['views.dir'] + if 'api.ssl' in serverProperties: + if serverProperties['api.ssl'].lower() == 'true': + server_protocol = 'https' + + views_work_dir = os.path.join(views_dir, 'work') + + if os.path.exists(views_work_dir) and os.path.isdir(views_work_dir): + last_version = '0.0.0' + for file in os.listdir(views_work_dir): + if fnmatch.fnmatch(file, 'TEZ{*}'): + current_version = file.lstrip("TEZ{").rstrip("}") # E.g.: TEZ{0.7.0.2.3.0.0-2154} + if self.versionCompare(current_version.replace("-", "."), last_version.replace("-", ".")) >= 0: + latest_tez_jar_version = current_version + last_version = current_version + pass + pass + pass + pass + + if latest_tez_jar_version: + tez_url = '{0}://{1}:{2}/#/main/views/TEZ/{3}/TEZ_CLUSTER_INSTANCE'.format(server_protocol, server_host, server_port, latest_tez_jar_version) + putTezProperty("tez.tez-ui.history-url.base", tez_url) + pass + + def recommendStormConfigurations(self, configurations, clusterData, services, hosts): + super(HDP22StackAdvisor, self).recommendStormConfigurations(configurations, clusterData, services, hosts) + putStormSiteProperty = self.putProperty(configurations, "storm-site", services) + putStormSiteAttributes = self.putPropertyAttribute(configurations, "storm-site") + storm_site = getServicesSiteProperties(services, "storm-site") + security_enabled = (storm_site is not None and "storm.zookeeper.superACL" in storm_site) + if "ranger-env" in services["configurations"] and "ranger-storm-plugin-properties" in services["configurations"] and \ + "ranger-storm-plugin-enabled" in services["configurations"]["ranger-env"]["properties"]: + putStormRangerPluginProperty = self.putProperty(configurations, "ranger-storm-plugin-properties", services) + rangerEnvStormPluginProperty = services["configurations"]["ranger-env"]["properties"]["ranger-storm-plugin-enabled"] + putStormRangerPluginProperty("ranger-storm-plugin-enabled", rangerEnvStormPluginProperty) + + rangerPluginEnabled = '' + if 'ranger-storm-plugin-properties' in configurations and 'ranger-storm-plugin-enabled' in configurations['ranger-storm-plugin-properties']['properties']: + rangerPluginEnabled = configurations['ranger-storm-plugin-properties']['properties']['ranger-storm-plugin-enabled'] + elif 'ranger-storm-plugin-properties' in services['configurations'] and 'ranger-storm-plugin-enabled' in services['configurations']['ranger-storm-plugin-properties']['properties']: + rangerPluginEnabled = services['configurations']['ranger-storm-plugin-properties']['properties']['ranger-storm-plugin-enabled'] + + nonRangerClass = 'backtype.storm.security.auth.authorizer.SimpleACLAuthorizer' + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + rangerServiceVersion='' + if 'RANGER' in servicesList: + rangerServiceVersion = [service['StackServices']['service_version'] for service in services["services"] if service['StackServices']['service_name'] == 'RANGER'][0] + + if rangerServiceVersion and rangerServiceVersion == '0.4.0': + rangerClass = 'com.xasecure.authorization.storm.authorizer.XaSecureStormAuthorizer' + else: + rangerClass = 'org.apache.ranger.authorization.storm.authorizer.RangerStormAuthorizer' + # Cluster is kerberized + if security_enabled: + if rangerPluginEnabled and (rangerPluginEnabled.lower() == 'Yes'.lower()): + putStormSiteProperty('nimbus.authorizer',rangerClass) + elif (services["configurations"]["storm-site"]["properties"]["nimbus.authorizer"] == rangerClass): + putStormSiteProperty('nimbus.authorizer', nonRangerClass) + else: + putStormSiteAttributes('nimbus.authorizer', 'delete', 'true') + + def recommendKnoxConfigurations(self, configurations, clusterData, services, hosts): + if "ranger-env" in services["configurations"] and "ranger-knox-plugin-properties" in services["configurations"] and \ + "ranger-knox-plugin-enabled" in services["configurations"]["ranger-env"]["properties"]: + putKnoxRangerPluginProperty = self.putProperty(configurations, "ranger-knox-plugin-properties", services) + rangerEnvKnoxPluginProperty = services["configurations"]["ranger-env"]["properties"]["ranger-knox-plugin-enabled"] + putKnoxRangerPluginProperty("ranger-knox-plugin-enabled", rangerEnvKnoxPluginProperty) + + if 'topology' in services["configurations"] and 'content' in services["configurations"]["topology"]["properties"]: + putKnoxTopologyContent = self.putProperty(configurations, "topology", services) + rangerPluginEnabled = '' + if 'ranger-knox-plugin-properties' in configurations and 'ranger-knox-plugin-enabled' in configurations['ranger-knox-plugin-properties']['properties']: + rangerPluginEnabled = configurations['ranger-knox-plugin-properties']['properties']['ranger-knox-plugin-enabled'] + elif 'ranger-knox-plugin-properties' in services['configurations'] and 'ranger-knox-plugin-enabled' in services['configurations']['ranger-knox-plugin-properties']['properties']: + rangerPluginEnabled = services['configurations']['ranger-knox-plugin-properties']['properties']['ranger-knox-plugin-enabled'] + + # check if authorization provider already added + topologyContent = services["configurations"]["topology"]["properties"]["content"] + authorizationProviderExists = False + authNameChanged = False + root = ET.fromstring(topologyContent) + if root is not None: + gateway = root.find("gateway") + if gateway is not None: + for provider in gateway.findall('provider'): + role = provider.find('role') + if role is not None and role.text and role.text.lower() == "authorization": + authorizationProviderExists = True + + name = provider.find('name') + if name is not None and name.text == "AclsAuthz" and rangerPluginEnabled \ + and rangerPluginEnabled.lower() == "Yes".lower(): + newAuthName = "XASecurePDPKnox" + authNameChanged = True + elif name is not None and (((not rangerPluginEnabled) or rangerPluginEnabled.lower() != "Yes".lower()) \ + and name.text == 'XASecurePDPKnox'): + newAuthName = "AclsAuthz" + authNameChanged = True + + if authNameChanged: + name.text = newAuthName + putKnoxTopologyContent('content', ET.tostring(root)) + + if authorizationProviderExists: + break + + if not authorizationProviderExists: + if root is not None: + gateway = root.find("gateway") + if gateway is not None: + provider = ET.SubElement(gateway, 'provider') + + role = ET.SubElement(provider, 'role') + role.text = "authorization" + + name = ET.SubElement(provider, 'name') + if rangerPluginEnabled and rangerPluginEnabled.lower() == "Yes".lower(): + name.text = "XASecurePDPKnox" + else: + name.text = "AclsAuthz" + + enabled = ET.SubElement(provider, 'enabled') + enabled.text = "true" + + #TODO add pretty format for newly added provider + putKnoxTopologyContent('content', ET.tostring(root)) + + + + def recommendRangerConfigurations(self, configurations, clusterData, services, hosts): + super(HDP22StackAdvisor, self).recommendRangerConfigurations(configurations, clusterData, services, hosts) + putRangerEnvProperty = self.putProperty(configurations, "ranger-env") + cluster_env = getServicesSiteProperties(services, "cluster-env") + security_enabled = cluster_env is not None and "security_enabled" in cluster_env and \ + cluster_env["security_enabled"].lower() == "true" + if "ranger-env" in configurations and not security_enabled: + putRangerEnvProperty("ranger-storm-plugin-enabled", "No") + + def getServiceConfigurationValidators(self): + parentValidators = super(HDP22StackAdvisor, self).getServiceConfigurationValidators() + childValidators = { + "HDFS": {"hdfs-site": self.validateHDFSConfigurations, + "hadoop-env": self.validateHDFSConfigurationsEnv, + "ranger-hdfs-plugin-properties": self.validateHDFSRangerPluginConfigurations}, + "YARN": {"yarn-env": self.validateYARNEnvConfigurations, + "ranger-yarn-plugin-properties": self.validateYARNRangerPluginConfigurations}, + "HIVE": {"hiveserver2-site": self.validateHiveServer2Configurations, + "hive-site": self.validateHiveConfigurations, + "hive-env": self.validateHiveConfigurationsEnv, + "webhcat-site": self.validateWebhcatConfigurations}, + "HBASE": {"hbase-site": self.validateHBASEConfigurations, + "hbase-env": self.validateHBASEEnvConfigurations, + "ranger-hbase-plugin-properties": self.validateHBASERangerPluginConfigurations}, + "KNOX": {"ranger-knox-plugin-properties": self.validateKnoxRangerPluginConfigurations}, + "KAFKA": {"ranger-kafka-plugin-properties": self.validateKafkaRangerPluginConfigurations}, + "STORM": {"ranger-storm-plugin-properties": self.validateStormRangerPluginConfigurations}, + "MAPREDUCE2": {"mapred-site": self.validateMapReduce2Configurations}, + "TEZ": {"tez-site": self.validateTezConfigurations}, + "RANGER": {"ranger-env": self.validateRangerConfigurationsEnv}, + "SPARK": {"spark-defaults": self.validateSparkDefaults, + "spark-thrift-sparkconf": self.validateSparkThriftSparkConf} + } + self.mergeValidators(parentValidators, childValidators) + return parentValidators + + def recommendLogsearchConfigurations(self, configurations, clusterData, services, hosts): + putLogsearchProperty = self.putProperty(configurations, "logsearch-properties", services) + infraSolrHosts = self.getComponentHostNames(services, "AMBARI_INFRA", "INFRA_SOLR") + + if infraSolrHosts is not None and len(infraSolrHosts) > 0 \ + and "logsearch-properties" in services["configurations"]: + recommendedMinShards = len(infraSolrHosts) + recommendedShards = 2 * len(infraSolrHosts) + recommendedMaxShards = 3 * len(infraSolrHosts) + # recommend number of shard + putLogsearchAttribute = self.putPropertyAttribute(configurations, "logsearch-properties") + putLogsearchAttribute('logsearch.collection.service.logs.numshards', 'minimum', recommendedMinShards) + putLogsearchAttribute('logsearch.collection.service.logs.numshards', 'maximum', recommendedMaxShards) + putLogsearchProperty("logsearch.collection.service.logs.numshards", recommendedShards) + + putLogsearchAttribute('logsearch.collection.audit.logs.numshards', 'minimum', recommendedMinShards) + putLogsearchAttribute('logsearch.collection.audit.logs.numshards', 'maximum', recommendedMaxShards) + putLogsearchProperty("logsearch.collection.audit.logs.numshards", recommendedShards) + # recommend replication factor + replicationReccomendFloat = math.log(len(infraSolrHosts), 5) + recommendedReplicationFactor = int(1 + math.floor(replicationReccomendFloat)) + putLogsearchProperty("logsearch.collection.service.logs.replication.factor", recommendedReplicationFactor) + putLogsearchProperty("logsearch.collection.audit.logs.replication.factor", recommendedReplicationFactor) + + def validateTezConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): + validationItems = [ {"config-name": 'tez.am.resource.memory.mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'tez.am.resource.memory.mb')}, + {"config-name": 'tez.task.resource.memory.mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'tez.task.resource.memory.mb')}, + {"config-name": 'tez.runtime.io.sort.mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'tez.runtime.io.sort.mb')}, + {"config-name": 'tez.runtime.unordered.output.buffer.size-mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'tez.runtime.unordered.output.buffer.size-mb')}, + {"config-name": 'tez.queue.name', "item": self.validatorYarnQueue(properties, recommendedDefaults, 'tez.queue.name', services)} ] + if "tez.tez-ui.history-url.base" in recommendedDefaults: + validationItems.append({"config-name": 'tez.tez-ui.history-url.base', "item": self.validatorEqualsToRecommendedItem(properties, recommendedDefaults, 'tez.tez-ui.history-url.base')}) + + tez_site = properties + prop_name1 = 'tez.am.resource.memory.mb' + prop_name2 = 'tez.task.resource.memory.mb' + yarnSiteProperties = getSiteProperties(configurations, "yarn-site") + if yarnSiteProperties: + yarnMaxAllocationSize = min(30 * int(configurations["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"]),int(configurations["yarn-site"]["properties"]["yarn.scheduler.maximum-allocation-mb"])) + if int(tez_site[prop_name1]) > yarnMaxAllocationSize: + validationItems.append({"config-name": prop_name1, + "item": self.getWarnItem( + "{0} should be less than YARN max allocation size ({1})".format(prop_name1, yarnMaxAllocationSize))}) + if int(tez_site[prop_name2]) > yarnMaxAllocationSize: + validationItems.append({"config-name": prop_name2, + "item": self.getWarnItem( + "{0} should be less than YARN max allocation size ({1})".format(prop_name2, yarnMaxAllocationSize))}) + + return self.toConfigurationValidationProblems(validationItems, "tez-site") + + def recommendMapReduce2Configurations(self, configurations, clusterData, services, hosts): + self.recommendYARNConfigurations(configurations, clusterData, services, hosts) + putMapredProperty = self.putProperty(configurations, "mapred-site", services) + nodemanagerMinRam = 1048576 # 1TB in mb + if "referenceNodeManagerHost" in clusterData: + nodemanagerMinRam = min(clusterData["referenceNodeManagerHost"]["total_mem"]/1024, nodemanagerMinRam) + putMapredProperty('yarn.app.mapreduce.am.resource.mb', configurations["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"]) + putMapredProperty('yarn.app.mapreduce.am.command-opts', "-Xmx" + str(int(0.8 * int(configurations["mapred-site"]["properties"]["yarn.app.mapreduce.am.resource.mb"]))) + "m" + " -Dhdp.version=${hdp.version}") + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + min_mapreduce_map_memory_mb = 0 + min_mapreduce_reduce_memory_mb = 0 + min_mapreduce_map_java_opts = 0 + if ("PIG" in servicesList) and clusterData["totalAvailableRam"] >= 4096: + min_mapreduce_map_memory_mb = 1536 + min_mapreduce_reduce_memory_mb = 1536 + min_mapreduce_map_java_opts = 1024 + putMapredProperty('mapreduce.map.memory.mb', min(int(configurations["yarn-site"]["properties"]["yarn.scheduler.maximum-allocation-mb"]), max(min_mapreduce_map_memory_mb, int(configurations["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"])))) + putMapredProperty('mapreduce.reduce.memory.mb', min(int(configurations["yarn-site"]["properties"]["yarn.scheduler.maximum-allocation-mb"]), max(min_mapreduce_reduce_memory_mb, min(2*int(configurations["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"]), int(nodemanagerMinRam))))) + mapredMapXmx = int(0.8*int(configurations["mapred-site"]["properties"]["mapreduce.map.memory.mb"])); + putMapredProperty('mapreduce.map.java.opts', "-Xmx" + str(max(min_mapreduce_map_java_opts, mapredMapXmx)) + "m") + putMapredProperty('mapreduce.reduce.java.opts', "-Xmx" + str(int(0.8*int(configurations["mapred-site"]["properties"]["mapreduce.reduce.memory.mb"]))) + "m") + putMapredProperty('mapreduce.task.io.sort.mb', str(min(int(0.7*mapredMapXmx), 2047))) + # Property Attributes + putMapredPropertyAttribute = self.putPropertyAttribute(configurations, "mapred-site") + yarnMinAllocationSize = int(configurations["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"]) + yarnMaxAllocationSize = min(30 * int(configurations["yarn-site"]["properties"]["yarn.scheduler.minimum-allocation-mb"]), int(configurations["yarn-site"]["properties"]["yarn.scheduler.maximum-allocation-mb"])) + putMapredPropertyAttribute("mapreduce.map.memory.mb", "maximum", yarnMaxAllocationSize) + putMapredPropertyAttribute("mapreduce.map.memory.mb", "minimum", yarnMinAllocationSize) + putMapredPropertyAttribute("mapreduce.reduce.memory.mb", "maximum", yarnMaxAllocationSize) + putMapredPropertyAttribute("mapreduce.reduce.memory.mb", "minimum", yarnMinAllocationSize) + putMapredPropertyAttribute("yarn.app.mapreduce.am.resource.mb", "maximum", yarnMaxAllocationSize) + putMapredPropertyAttribute("yarn.app.mapreduce.am.resource.mb", "minimum", yarnMinAllocationSize) + # Hadoop MR limitation + putMapredPropertyAttribute("mapreduce.task.io.sort.mb", "maximum", "2047") + + mr_queue = self.recommendYarnQueue(services, "mapred-site", "mapreduce.job.queuename") + if mr_queue is not None: + putMapredProperty("mapreduce.job.queuename", mr_queue) + + def validateMapReduce2Configurations(self, properties, recommendedDefaults, configurations, services, hosts): + validationItems = [ {"config-name": 'mapreduce.map.java.opts', "item": self.validateXmxValue(properties, recommendedDefaults, 'mapreduce.map.java.opts')}, + {"config-name": 'mapreduce.reduce.java.opts', "item": self.validateXmxValue(properties, recommendedDefaults, 'mapreduce.reduce.java.opts')}, + {"config-name": 'mapreduce.task.io.sort.mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'mapreduce.task.io.sort.mb')}, + {"config-name": 'mapreduce.map.memory.mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'mapreduce.map.memory.mb')}, + {"config-name": 'mapreduce.reduce.memory.mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'mapreduce.reduce.memory.mb')}, + {"config-name": 'yarn.app.mapreduce.am.resource.mb', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'yarn.app.mapreduce.am.resource.mb')}, + {"config-name": 'yarn.app.mapreduce.am.command-opts', "item": self.validateXmxValue(properties, recommendedDefaults, 'yarn.app.mapreduce.am.command-opts')}, + {"config-name": 'mapreduce.job.queuename', "item": self.validatorYarnQueue(properties, recommendedDefaults, 'mapreduce.job.queuename', services)} ] + + if 'mapreduce.map.java.opts' in properties and \ + checkXmxValueFormat(properties['mapreduce.map.java.opts']): + mapreduceMapJavaOpts = formatXmxSizeToBytes(getXmxSize(properties['mapreduce.map.java.opts'])) / (1024.0 * 1024) + mapreduceMapMemoryMb = to_number(properties['mapreduce.map.memory.mb']) + if mapreduceMapJavaOpts > mapreduceMapMemoryMb: + validationItems.append({"config-name": 'mapreduce.map.java.opts', "item": self.getWarnItem("mapreduce.map.java.opts Xmx should be less than mapreduce.map.memory.mb ({0})".format(mapreduceMapMemoryMb))}) + + if 'mapreduce.reduce.java.opts' in properties and \ + checkXmxValueFormat(properties['mapreduce.reduce.java.opts']): + mapreduceReduceJavaOpts = formatXmxSizeToBytes(getXmxSize(properties['mapreduce.reduce.java.opts'])) / (1024.0 * 1024) + mapreduceReduceMemoryMb = to_number(properties['mapreduce.reduce.memory.mb']) + if mapreduceReduceJavaOpts > mapreduceReduceMemoryMb: + validationItems.append({"config-name": 'mapreduce.reduce.java.opts', "item": self.getWarnItem("mapreduce.reduce.java.opts Xmx should be less than mapreduce.reduce.memory.mb ({0})".format(mapreduceReduceMemoryMb))}) + + if 'yarn.app.mapreduce.am.command-opts' in properties and \ + checkXmxValueFormat(properties['yarn.app.mapreduce.am.command-opts']): + yarnAppMapreduceAmCommandOpts = formatXmxSizeToBytes(getXmxSize(properties['yarn.app.mapreduce.am.command-opts'])) / (1024.0 * 1024) + yarnAppMapreduceAmResourceMb = to_number(properties['yarn.app.mapreduce.am.resource.mb']) + if yarnAppMapreduceAmCommandOpts > yarnAppMapreduceAmResourceMb: + validationItems.append({"config-name": 'yarn.app.mapreduce.am.command-opts', "item": self.getWarnItem("yarn.app.mapreduce.am.command-opts Xmx should be less than yarn.app.mapreduce.am.resource.mb ({0})".format(yarnAppMapreduceAmResourceMb))}) + + return self.toConfigurationValidationProblems(validationItems, "mapred-site") + + def validateHDFSConfigurationsEnv(self, properties, recommendedDefaults, configurations, services, hosts): + validationItems = [ {"config-name": 'namenode_heapsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_heapsize')}, + {"config-name": 'namenode_opt_newsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_opt_newsize')}, + {"config-name": 'namenode_opt_maxnewsize', "item": self.validatorLessThenDefaultValue(properties, recommendedDefaults, 'namenode_opt_maxnewsize')}] + return self.toConfigurationValidationProblems(validationItems, "hadoop-env") + + def validateHDFSRangerPluginConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): + validationItems = [] + ranger_plugin_properties = getSiteProperties(configurations, "ranger-hdfs-plugin-properties") + ranger_plugin_enabled = ranger_plugin_properties['ranger-hdfs-plugin-enabled'] if ranger_plugin_properties else 'No' + if (ranger_plugin_enabled.lower() == 'yes'): + # ranger-hdfs-plugin must be enabled in ranger-env + ranger_env = getServicesSiteProperties(services, 'ranger-env') + if not ranger_env or not 'ranger-hdfs-plugin-enabled' in ranger_env or \ + ranger_env['ranger-hdfs-plugin-enabled'].lower() != 'yes': + validationItems.append({"config-name": 'ranger-hdfs-plugin-enabled', + "item": self.getWarnItem( + "ranger-hdfs-plugin-properties/ranger-hdfs-plugin-enabled must correspond ranger-env/ranger-hdfs-plugin-enabled")}) + return self.toConfigurationValidationProblems(validationItems, "ranger-hdfs-plugin-properties") + + + def validateHDFSConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): + parentValidationProblems = super(HDP22StackAdvisor, self).validateHDFSConfigurations(properties, recommendedDefaults, configurations, services, hosts) + # We can not access property hadoop.security.authentication from the + # other config (core-site). That's why we are using another heuristics here + hdfs_site = properties + core_site = getSiteProperties(configurations, "core-site") + + dfs_encrypt_data_transfer = 'dfs.encrypt.data.transfer' # Hadoop Wire encryption + try: + wire_encryption_enabled = hdfs_site[dfs_encrypt_data_transfer] == "true" + except KeyError: + wire_encryption_enabled = False + + HTTP_ONLY = 'HTTP_ONLY' + HTTPS_ONLY = 'HTTPS_ONLY' + HTTP_AND_HTTPS = 'HTTP_AND_HTTPS' + + VALID_HTTP_POLICY_VALUES = [HTTP_ONLY, HTTPS_ONLY, HTTP_AND_HTTPS] + VALID_TRANSFER_PROTECTION_VALUES = ['authentication', 'integrity', 'privacy'] + + validationItems = [] + address_properties = [ + # "dfs.datanode.address", + # "dfs.datanode.http.address", + # "dfs.datanode.https.address", + # "dfs.datanode.ipc.address", + # "dfs.journalnode.http-address", + # "dfs.journalnode.https-address", + # "dfs.namenode.rpc-address", + # "dfs.namenode.secondary.http-address", + "dfs.namenode.http-address", + "dfs.namenode.https-address", + ] + #Validating *address properties for correct values + + for address_property in address_properties: + if address_property in hdfs_site: + value = hdfs_site[address_property] + if not is_valid_host_port_authority(value): + validationItems.append({"config-name" : address_property, "item" : + self.getErrorItem(address_property + " does not contain a valid host:port authority: " + value)}) + + #Adding Ranger Plugin logic here + ranger_plugin_properties = getSiteProperties(configurations, "ranger-hdfs-plugin-properties") + ranger_plugin_enabled = ranger_plugin_properties['ranger-hdfs-plugin-enabled'] if ranger_plugin_properties else 'No' + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + if ("RANGER" in servicesList) and (ranger_plugin_enabled.lower() == 'Yes'.lower()): + if 'dfs.permissions.enabled' in hdfs_site and \ + hdfs_site['dfs.permissions.enabled'] != 'true': + validationItems.append({"config-name": 'dfs.permissions.enabled', + "item": self.getWarnItem( + "dfs.permissions.enabled needs to be set to true if Ranger HDFS Plugin is enabled.")}) + + if (not wire_encryption_enabled and # If wire encryption is enabled at Hadoop, it disables all our checks + 'hadoop.security.authentication' in core_site and + core_site['hadoop.security.authentication'] == 'kerberos' and + 'hadoop.security.authorization' in core_site and + core_site['hadoop.security.authorization'] == 'true'): + # security is enabled + + dfs_http_policy = 'dfs.http.policy' + dfs_datanode_address = 'dfs.datanode.address' + datanode_http_address = 'dfs.datanode.http.address' + datanode_https_address = 'dfs.datanode.https.address' + data_transfer_protection = 'dfs.data.transfer.protection' + + try: # Params may be absent + privileged_dfs_dn_port = isSecurePort(getPort(hdfs_site[dfs_datanode_address])) + except KeyError: + privileged_dfs_dn_port = False + try: + privileged_dfs_http_port = isSecurePort(getPort(hdfs_site[datanode_http_address])) + except KeyError: + privileged_dfs_http_port = False + try: + privileged_dfs_https_port = isSecurePort(getPort(hdfs_site[datanode_https_address])) + except KeyError: + privileged_dfs_https_port = False + try: + dfs_http_policy_value = hdfs_site[dfs_http_policy] + except KeyError: + dfs_http_policy_value = HTTP_ONLY # Default + try: + data_transfer_protection_value = hdfs_site[data_transfer_protection] + except KeyError: + data_transfer_protection_value = None + + if dfs_http_policy_value not in VALID_HTTP_POLICY_VALUES: + validationItems.append({"config-name": dfs_http_policy, + "item": self.getWarnItem( + "Invalid property value: {0}. Valid values are {1}".format( + dfs_http_policy_value, VALID_HTTP_POLICY_VALUES))}) + + # determine whether we use secure ports + address_properties_with_warnings = [] + if dfs_http_policy_value == HTTPS_ONLY: + if not privileged_dfs_dn_port and (privileged_dfs_https_port or datanode_https_address not in hdfs_site): + important_properties = [dfs_datanode_address, datanode_https_address] + message = "You set up datanode to use some non-secure ports. " \ + "If you want to run Datanode under non-root user in a secure cluster, " \ + "you should set all these properties {2} " \ + "to use non-secure ports (if property {3} does not exist, " \ + "just add it). You may also set up property {4} ('{5}' is a good default value). " \ + "Also, set up WebHDFS with SSL as " \ + "described in manual in order to be able to " \ + "use HTTPS.".format(dfs_http_policy, dfs_http_policy_value, important_properties, + datanode_https_address, data_transfer_protection, + VALID_TRANSFER_PROTECTION_VALUES[0]) + address_properties_with_warnings.extend(important_properties) + else: # dfs_http_policy_value == HTTP_AND_HTTPS or HTTP_ONLY + # We don't enforce datanode_https_address to use privileged ports here + any_nonprivileged_ports_are_in_use = not privileged_dfs_dn_port or not privileged_dfs_http_port + if any_nonprivileged_ports_are_in_use: + important_properties = [dfs_datanode_address, datanode_http_address] + message = "You have set up datanode to use some non-secure ports, but {0} is set to {1}. " \ + "In a secure cluster, Datanode forbids using non-secure ports " \ + "if {0} is not set to {3}. " \ + "Please make sure that properties {2} use secure ports.".format( + dfs_http_policy, dfs_http_policy_value, important_properties, HTTPS_ONLY) + address_properties_with_warnings.extend(important_properties) + + # Generate port-related warnings if any + for prop in address_properties_with_warnings: + validationItems.append({"config-name": prop, + "item": self.getWarnItem(message)}) + + # Check if it is appropriate to use dfs.data.transfer.protection + if data_transfer_protection_value is not None: + if dfs_http_policy_value in [HTTP_ONLY, HTTP_AND_HTTPS]: + validationItems.append({"config-name": data_transfer_protection, + "item": self.getWarnItem( + "{0} property can not be used when {1} is set to any " + "value other then {2}. Tip: When {1} property is not defined, it defaults to {3}".format( + data_transfer_protection, dfs_http_policy, HTTPS_ONLY, HTTP_ONLY))}) + elif not data_transfer_protection_value in VALID_TRANSFER_PROTECTION_VALUES: + validationItems.append({"config-name": data_transfer_protection, + "item": self.getWarnItem( + "Invalid property value: {0}. Valid values are {1}.".format( + d