Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BE182200CD9 for ; Thu, 3 Aug 2017 14:27:55 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id BC85116B705; Thu, 3 Aug 2017 12:27:55 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A4FE416B701 for ; Thu, 3 Aug 2017 14:27:52 +0200 (CEST) Received: (qmail 12621 invoked by uid 500); 3 Aug 2017 12:27:50 -0000 Mailing-List: contact commits-help@ambari.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ambari-dev@ambari.apache.org Delivered-To: mailing list commits@ambari.apache.org Received: (qmail 12329 invoked by uid 99); 3 Aug 2017 12:27:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 03 Aug 2017 12:27:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D65B6F3342; Thu, 3 Aug 2017 12:27:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jonathanhurley@apache.org To: commits@ambari.apache.org Date: Thu, 03 Aug 2017 12:27:56 -0000 Message-Id: <61d2508a8e8c4f168e8ebf31da431673@git.apache.org> In-Reply-To: <04febb3fc9344ba382b4147073b872e3@git.apache.org> References: <04febb3fc9344ba382b4147073b872e3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [08/13] ambari git commit: AMBARI-21624. HDFS restart failed post Ambari upgrade when lzo compression was enabled archived-at: Thu, 03 Aug 2017 12:27:55 -0000 AMBARI-21624. HDFS restart failed post Ambari upgrade when lzo compression was enabled Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/8f051fc5 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/8f051fc5 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/8f051fc5 Branch: refs/heads/branch-2.6 Commit: 8f051fc5bb6aa5ca495372a800efabc5191662df Parents: 963d7c6 Author: Attila Doroszlai Authored: Tue Aug 1 23:12:17 2017 +0200 Committer: Attila Doroszlai Committed: Wed Aug 2 15:06:55 2017 +0200 ---------------------------------------------------------------------- .../package/alerts/alert_checkpoint_time.py | 255 +++++++++ .../alerts/alert_datanode_unmounted_data_dir.py | 177 ++++++ .../package/alerts/alert_ha_namenode_health.py | 243 ++++++++ .../package/alerts/alert_metrics_deviation.py | 470 ++++++++++++++++ .../package/alerts/alert_upgrade_finalized.py | 179 ++++++ .../services/HDFS/package/files/checkWebUI.py | 86 +++ .../services/HDFS/package/scripts/__init__.py | 20 + .../scripts/balancer-emulator/hdfs-command.py | 45 ++ .../services/HDFS/package/scripts/datanode.py | 174 ++++++ .../HDFS/package/scripts/datanode_upgrade.py | 156 +++++ .../4.2.5/services/HDFS/package/scripts/hdfs.py | 173 ++++++ .../HDFS/package/scripts/hdfs_client.py | 123 ++++ .../HDFS/package/scripts/hdfs_datanode.py | 84 +++ .../HDFS/package/scripts/hdfs_namenode.py | 562 +++++++++++++++++++ .../HDFS/package/scripts/hdfs_nfsgateway.py | 75 +++ .../HDFS/package/scripts/hdfs_rebalance.py | 130 +++++ .../HDFS/package/scripts/hdfs_snamenode.py | 64 +++ .../HDFS/package/scripts/install_params.py | 38 ++ .../HDFS/package/scripts/journalnode.py | 198 +++++++ .../HDFS/package/scripts/journalnode_upgrade.py | 152 +++++ .../services/HDFS/package/scripts/namenode.py | 420 ++++++++++++++ .../HDFS/package/scripts/namenode_ha_state.py | 219 ++++++++ .../HDFS/package/scripts/namenode_upgrade.py | 322 +++++++++++ .../services/HDFS/package/scripts/nfsgateway.py | 147 +++++ .../services/HDFS/package/scripts/params.py | 30 + .../HDFS/package/scripts/params_linux.py | 527 +++++++++++++++++ .../HDFS/package/scripts/params_windows.py | 76 +++ .../HDFS/package/scripts/service_check.py | 153 +++++ .../HDFS/package/scripts/setup_ranger_hdfs.py | 121 ++++ .../services/HDFS/package/scripts/snamenode.py | 152 +++++ .../HDFS/package/scripts/status_params.py | 58 ++ .../services/HDFS/package/scripts/utils.py | 385 +++++++++++++ .../services/HDFS/package/scripts/zkfc_slave.py | 200 +++++++ .../package/templates/exclude_hosts_list.j2 | 21 + .../HDFS/package/templates/hdfs.conf.j2 | 35 ++ .../services/HDFS/package/templates/slaves.j2 | 21 + 36 files changed, 6291 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/8f051fc5/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/alerts/alert_checkpoint_time.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/alerts/alert_checkpoint_time.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/alerts/alert_checkpoint_time.py new file mode 100644 index 0000000..26127c3 --- /dev/null +++ b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/alerts/alert_checkpoint_time.py @@ -0,0 +1,255 @@ +#!/usr/bin/env python + +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import time +import urllib2 +import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set. +import logging +import traceback + +from resource_management.libraries.functions.namenode_ha_utils import get_all_namenode_addresses +from resource_management.libraries.functions.curl_krb_request import curl_krb_request +from resource_management.libraries.functions.curl_krb_request import DEFAULT_KERBEROS_KINIT_TIMER_MS +from resource_management.libraries.functions.curl_krb_request import KERBEROS_KINIT_TIMER_PARAMETER +from resource_management.core.environment import Environment + +LABEL = 'Last Checkpoint: [{h} hours, {m} minutes, {tx} transactions]' +HDFS_SITE_KEY = '{{hdfs-site}}' + +RESULT_STATE_UNKNOWN = 'UNKNOWN' +RESULT_STATE_SKIPPED = 'SKIPPED' + +NN_HTTP_ADDRESS_KEY = '{{hdfs-site/dfs.namenode.http-address}}' +NN_HTTPS_ADDRESS_KEY = '{{hdfs-site/dfs.namenode.https-address}}' +NN_HTTP_POLICY_KEY = '{{hdfs-site/dfs.http.policy}}' +NN_CHECKPOINT_TX_KEY = '{{hdfs-site/dfs.namenode.checkpoint.txns}}' +NN_CHECKPOINT_PERIOD_KEY = '{{hdfs-site/dfs.namenode.checkpoint.period}}' + +PERCENT_WARNING_KEY = 'checkpoint.time.warning.threshold' +PERCENT_WARNING_DEFAULT = 200 + +PERCENT_CRITICAL_KEY = 'checkpoint.time.critical.threshold' +PERCENT_CRITICAL_DEFAULT = 200 + +CHECKPOINT_TX_MULTIPLIER_WARNING_KEY = 'checkpoint.txns.multiplier.warning.threshold' +CHECKPOINT_TX_MULTIPLIER_WARNING_DEFAULT = 2 + +CHECKPOINT_TX_MULTIPLIER_CRITICAL_KEY = 'checkpoint.txns.multiplier.critical.threshold' +CHECKPOINT_TX_MULTIPLIER_CRITICAL_DEFAULT = 4 + +CHECKPOINT_TX_DEFAULT = 1000000 +CHECKPOINT_PERIOD_DEFAULT = 21600 + +CONNECTION_TIMEOUT_KEY = 'connection.timeout' +CONNECTION_TIMEOUT_DEFAULT = 5.0 + +KERBEROS_KEYTAB = '{{hdfs-site/dfs.web.authentication.kerberos.keytab}}' +KERBEROS_PRINCIPAL = '{{hdfs-site/dfs.web.authentication.kerberos.principal}}' +SECURITY_ENABLED_KEY = '{{cluster-env/security_enabled}}' +SMOKEUSER_KEY = "{{cluster-env/smokeuser}}" +EXECUTABLE_SEARCH_PATHS = '{{kerberos-env/executable_search_paths}}' + +logger = logging.getLogger('ambari_alerts') + +def get_tokens(): + """ + Returns a tuple of tokens in the format {{site/property}} that will be used + to build the dictionary passed into execute + """ + return (HDFS_SITE_KEY, NN_HTTP_ADDRESS_KEY, NN_HTTPS_ADDRESS_KEY, NN_HTTP_POLICY_KEY, EXECUTABLE_SEARCH_PATHS, + NN_CHECKPOINT_TX_KEY, NN_CHECKPOINT_PERIOD_KEY, KERBEROS_KEYTAB, KERBEROS_PRINCIPAL, SECURITY_ENABLED_KEY, SMOKEUSER_KEY) + + +def execute(configurations={}, parameters={}, host_name=None): + """ + Returns a tuple containing the result code and a pre-formatted result label + + Keyword arguments: + configurations (dictionary): a mapping of configuration key to value + parameters (dictionary): a mapping of script parameter key to value + host_name (string): the name of this host where the alert is running + """ + + if configurations is None: + return (('UNKNOWN', ['There were no configurations supplied to the script.'])) + + uri = None + scheme = 'http' + http_uri = None + https_uri = None + http_policy = 'HTTP_ONLY' + checkpoint_tx = CHECKPOINT_TX_DEFAULT + checkpoint_period = CHECKPOINT_PERIOD_DEFAULT + + # hdfs-site is required + if not HDFS_SITE_KEY in configurations: + return (RESULT_STATE_UNKNOWN, ['{0} is a required parameter for the script'.format(HDFS_SITE_KEY)]) + + if NN_HTTP_POLICY_KEY in configurations: + http_policy = configurations[NN_HTTP_POLICY_KEY] + + if NN_CHECKPOINT_TX_KEY in configurations: + checkpoint_tx = configurations[NN_CHECKPOINT_TX_KEY] + + if NN_CHECKPOINT_PERIOD_KEY in configurations: + checkpoint_period = configurations[NN_CHECKPOINT_PERIOD_KEY] + + if SMOKEUSER_KEY in configurations: + smokeuser = configurations[SMOKEUSER_KEY] + + executable_paths = None + if EXECUTABLE_SEARCH_PATHS in configurations: + executable_paths = configurations[EXECUTABLE_SEARCH_PATHS] + + security_enabled = False + if SECURITY_ENABLED_KEY in configurations: + security_enabled = str(configurations[SECURITY_ENABLED_KEY]).upper() == 'TRUE' + + kerberos_keytab = None + if KERBEROS_KEYTAB in configurations: + kerberos_keytab = configurations[KERBEROS_KEYTAB] + + kerberos_principal = None + if KERBEROS_PRINCIPAL in configurations: + kerberos_principal = configurations[KERBEROS_PRINCIPAL] + kerberos_principal = kerberos_principal.replace('_HOST', host_name) + + # parse script arguments + connection_timeout = CONNECTION_TIMEOUT_DEFAULT + if CONNECTION_TIMEOUT_KEY in parameters: + connection_timeout = float(parameters[CONNECTION_TIMEOUT_KEY]) + + percent_warning = PERCENT_WARNING_DEFAULT + if PERCENT_WARNING_KEY in parameters: + percent_warning = float(parameters[PERCENT_WARNING_KEY]) + + percent_critical = PERCENT_CRITICAL_DEFAULT + if PERCENT_CRITICAL_KEY in parameters: + percent_critical = float(parameters[PERCENT_CRITICAL_KEY]) + + checkpoint_txn_multiplier_warning = CHECKPOINT_TX_MULTIPLIER_WARNING_DEFAULT + if CHECKPOINT_TX_MULTIPLIER_WARNING_KEY in parameters: + checkpoint_txn_multiplier_warning = float(parameters[CHECKPOINT_TX_MULTIPLIER_WARNING_KEY]) + + checkpoint_txn_multiplier_critical = CHECKPOINT_TX_MULTIPLIER_CRITICAL_DEFAULT + if CHECKPOINT_TX_MULTIPLIER_CRITICAL_KEY in parameters: + checkpoint_txn_multiplier_critical = float(parameters[CHECKPOINT_TX_MULTIPLIER_CRITICAL_KEY]) + + kinit_timer_ms = parameters.get(KERBEROS_KINIT_TIMER_PARAMETER, DEFAULT_KERBEROS_KINIT_TIMER_MS) + + # determine the right URI and whether to use SSL + hdfs_site = configurations[HDFS_SITE_KEY] + + scheme = "https" if http_policy == "HTTPS_ONLY" else "http" + + nn_addresses = get_all_namenode_addresses(hdfs_site) + for nn_address in nn_addresses: + if nn_address.startswith(host_name + ":"): + uri = nn_address + break + if not uri: + return (RESULT_STATE_SKIPPED, ['NameNode on host {0} not found (namenode adresses = {1})'.format(host_name, ', '.join(nn_addresses))]) + + current_time = int(round(time.time() * 1000)) + + last_checkpoint_time_qry = "{0}://{1}/jmx?qry=Hadoop:service=NameNode,name=FSNamesystem".format(scheme,uri) + journal_transaction_info_qry = "{0}://{1}/jmx?qry=Hadoop:service=NameNode,name=NameNodeInfo".format(scheme,uri) + + # start out assuming an OK status + label = None + result_code = "OK" + + try: + if kerberos_principal is not None and kerberos_keytab is not None and security_enabled: + env = Environment.get_instance() + + # curl requires an integer timeout + curl_connection_timeout = int(connection_timeout) + + last_checkpoint_time_response, error_msg, time_millis = curl_krb_request(env.tmp_dir, kerberos_keytab, + kerberos_principal, last_checkpoint_time_qry,"checkpoint_time_alert", executable_paths, False, + "NameNode Last Checkpoint", smokeuser, connection_timeout=curl_connection_timeout, + kinit_timer_ms = kinit_timer_ms) + + last_checkpoint_time_response_json = json.loads(last_checkpoint_time_response) + last_checkpoint_time = int(last_checkpoint_time_response_json["beans"][0]["LastCheckpointTime"]) + + journal_transaction_info_response, error_msg, time_millis = curl_krb_request(env.tmp_dir, kerberos_keytab, + kerberos_principal, journal_transaction_info_qry,"checkpoint_time_alert", executable_paths, + False, "NameNode Last Checkpoint", smokeuser, connection_timeout=curl_connection_timeout, + kinit_timer_ms = kinit_timer_ms) + + journal_transaction_info_response_json = json.loads(journal_transaction_info_response) + journal_transaction_info = journal_transaction_info_response_json["beans"][0]["JournalTransactionInfo"] + else: + last_checkpoint_time = int(get_value_from_jmx(last_checkpoint_time_qry, + "LastCheckpointTime", connection_timeout)) + + journal_transaction_info = get_value_from_jmx(journal_transaction_info_qry, + "JournalTransactionInfo", connection_timeout) + + journal_transaction_info_dict = json.loads(journal_transaction_info) + + last_tx = int(journal_transaction_info_dict['LastAppliedOrWrittenTxId']) + most_recent_tx = int(journal_transaction_info_dict['MostRecentCheckpointTxId']) + transaction_difference = last_tx - most_recent_tx + + delta = (current_time - last_checkpoint_time)/1000 + + label = LABEL.format(h=get_time(delta)['h'], m=get_time(delta)['m'], tx=transaction_difference) + + is_checkpoint_txn_warning = transaction_difference > checkpoint_txn_multiplier_warning * int(checkpoint_tx) + is_checkpoint_txn_critical = transaction_difference > checkpoint_txn_multiplier_critical * int(checkpoint_tx) + + # Either too many uncommitted transactions or missed check-pointing for + # long time decided by the thresholds + if is_checkpoint_txn_critical or (float(delta) / int(checkpoint_period)*100 >= int(percent_critical)): + logger.debug('Raising critical alert: transaction_difference = {0}, checkpoint_tx = {1}'.format(transaction_difference, checkpoint_tx)) + result_code = 'CRITICAL' + elif is_checkpoint_txn_warning or (float(delta) / int(checkpoint_period)*100 >= int(percent_warning)): + logger.debug('Raising warning alert: transaction_difference = {0}, checkpoint_tx = {1}'.format(transaction_difference, checkpoint_tx)) + result_code = 'WARNING' + + except: + label = traceback.format_exc() + result_code = 'UNKNOWN' + + return ((result_code, [label])) + +def get_time(delta): + h = int(delta/3600) + m = int((delta % 3600)/60) + return {'h':h, 'm':m} + + +def get_value_from_jmx(query, jmx_property, connection_timeout): + response = None + + try: + response = urllib2.urlopen(query, timeout=connection_timeout) + data = response.read() + data_dict = json.loads(data) + return data_dict["beans"][0][jmx_property] + finally: + if response is not None: + try: + response.close() + except: + pass http://git-wip-us.apache.org/repos/asf/ambari/blob/8f051fc5/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/alerts/alert_datanode_unmounted_data_dir.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/alerts/alert_datanode_unmounted_data_dir.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/alerts/alert_datanode_unmounted_data_dir.py new file mode 100644 index 0000000..765831d --- /dev/null +++ b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/alerts/alert_datanode_unmounted_data_dir.py @@ -0,0 +1,177 @@ +#!/usr/bin/env python + +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import os +import logging +import urlparse + +from resource_management.libraries.functions import file_system +from resource_management.libraries.functions import mounted_dirs_helper + +RESULT_STATE_OK = 'OK' +RESULT_STATE_WARNING = 'WARNING' +RESULT_STATE_CRITICAL = 'CRITICAL' +RESULT_STATE_UNKNOWN = 'UNKNOWN' + +DFS_DATA_DIR = '{{hdfs-site/dfs.datanode.data.dir}}' +DATA_STORAGE_TAGS = ['[DISK]','[SSD]','[RAM_DISK]','[ARCHIVE]'] +DATA_DIR_MOUNT_FILE = "/var/lib/ambari-agent/data/datanode/dfs_data_dir_mount.hist" + +logger = logging.getLogger() + + +def get_tokens(): + """ + Returns a tuple of tokens in the format {{site/property}} that will be used + to build the dictionary passed into execute + """ + return (DFS_DATA_DIR, DATA_DIR_MOUNT_FILE) + + +def execute(configurations={}, parameters={}, host_name=None): + """ + Returns a tuple containing the result code and a pre-formatted result label + + Keyword arguments: + configurations (dictionary): a mapping of configuration key to value + parameters (dictionary): a mapping of script parameter key to value + host_name (string): the name of this host where the alert is running + + DataNode directories can be of the following formats and each needs to be supported: + /grid/dn/archive0 + [SSD]/grid/dn/archive0 + [ARCHIVE]file:///grid/dn/archive0 + """ + warnings = [] + errors = [] + + if configurations is None: + return (RESULT_STATE_UNKNOWN, ['There were no configurations supplied to the script.']) + + # Check required properties + if DFS_DATA_DIR not in configurations: + return (RESULT_STATE_UNKNOWN, ['{0} is a required parameter for the script'.format(DFS_DATA_DIR)]) + + dfs_data_dir = configurations[DFS_DATA_DIR] + + if dfs_data_dir is None: + return (RESULT_STATE_UNKNOWN, ['{0} is a required parameter for the script and the value is null'.format(DFS_DATA_DIR)]) + + # This follows symlinks and will return False for a broken link (even in the middle of the linked list) + data_dir_mount_file_exists = True + if not os.path.exists(DATA_DIR_MOUNT_FILE): + data_dir_mount_file_exists = False + warnings.append("{0} was not found.".format(DATA_DIR_MOUNT_FILE)) + + normalized_data_dirs = set() # data dirs that have been normalized + data_dirs_not_exist = set() # data dirs that do not exist + data_dirs_unknown = set() # data dirs for which could not determine mount + data_dirs_on_root = set() # set of data dirs that are on root mount + data_dirs_on_mount = set() # set of data dirs that are mounted on a device + data_dirs_unmounted = [] # list of data dirs that are known to have become unmounted + + # transform each data directory into something that we can use + for data_dir in dfs_data_dir.split(","): + if data_dir is None or data_dir.strip() == "": + continue + + data_dir = data_dir.strip() + + # filter out data storage tags + for tag in DATA_STORAGE_TAGS: + if data_dir.startswith(tag): + data_dir = data_dir.replace(tag, "") + continue + + # parse the path in case it contains a URI scheme + data_dir = urlparse.urlparse(data_dir).path + + normalized_data_dirs.add(data_dir) + + # Sort the data dirs, which is needed for deterministic behavior when running the unit tests. + normalized_data_dirs = sorted(normalized_data_dirs) + for data_dir in normalized_data_dirs: + # This follows symlinks and will return False for a broken link (even in the middle of the linked list) + if os.path.isdir(data_dir): + curr_mount_point = file_system.get_mount_point_for_dir(data_dir) + curr_mount_point = curr_mount_point.strip() if curr_mount_point else curr_mount_point + + if curr_mount_point is not None and curr_mount_point != "": + if curr_mount_point == "/": + data_dirs_on_root.add(data_dir) + else: + data_dirs_on_mount.add(data_dir) + else: + data_dirs_unknown.add(data_dir) + else: + data_dirs_not_exist.add(data_dir) + + # To keep the messages consistent for all hosts, sort the sets into lists + normalized_data_dirs = sorted(normalized_data_dirs) + data_dirs_not_exist = sorted(data_dirs_not_exist) + data_dirs_unknown = sorted(data_dirs_unknown) + data_dirs_on_root = sorted(data_dirs_on_root) + + if data_dirs_not_exist: + errors.append("The following data dir(s) were not found: {0}\n".format("\n".join(data_dirs_not_exist))) + + if data_dirs_unknown: + errors.append("Cannot find the mount point for the following data dir(s):\n{0}".format("\n".join(data_dirs_unknown))) + + if data_dir_mount_file_exists: + # This dictionary contains the expected values of + # Hence, we only need to analyze the data dirs that are currently on the root partition + # and report an error if they were expected to be on a mount. + # + # If one of the data dirs is not present in the file, it means that DataNode has not been restarted after + # the configuration was changed on the server, so we cannot make any assertions about it. + expected_data_dir_to_mount = mounted_dirs_helper.get_dir_to_mount_from_file(DATA_DIR_MOUNT_FILE) + for data_dir in data_dirs_on_root: + if data_dir in expected_data_dir_to_mount and expected_data_dir_to_mount[data_dir] != "/": + data_dirs_unmounted.append(data_dir) + + if len(data_dirs_unmounted) > 0: + errors.append("Detected data dir(s) that became unmounted and are now writing to the root partition:\n{0}".format("\n".join(data_dirs_unmounted))) + else: + # Couldn't make guarantees about the expected value of mount points, so rely on this strategy that is likely to work. + # It will report false positives (aka false alarms) if the user actually intended to have + # 1+ data dirs on a mount and 1+ data dirs on the root partition. + if len(data_dirs_on_mount) >= 1 and len(data_dirs_on_root) >= 1: + errors.append("Detected at least one data dir on a mount point, but these are writing to the root partition:\n{0}".format("\n".join(data_dirs_on_root))) + + # Determine the status based on warnings and errors. + if len(errors) == 0: + status = RESULT_STATE_OK + messages = [] + + # Check for warnings + if len(warnings) > 0: + status = RESULT_STATE_WARNING + messages += warnings + + if len(normalized_data_dirs) > 0: + messages.append("The following data dir(s) are valid:\n{0}".format("\n".join(normalized_data_dirs))) + else: + messages.append("There are no data directories to analyze.") + + return (status, ["\n".join(messages)]) + else: + # Report errors + return (RESULT_STATE_CRITICAL, ["\n".join(errors)]) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/8f051fc5/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/alerts/alert_ha_namenode_health.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/alerts/alert_ha_namenode_health.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/alerts/alert_ha_namenode_health.py new file mode 100644 index 0000000..28b3f22 --- /dev/null +++ b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/alerts/alert_ha_namenode_health.py @@ -0,0 +1,243 @@ +#!/usr/bin/env python + +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import urllib2 +import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set. +import logging + +from resource_management.libraries.functions.curl_krb_request import curl_krb_request +from resource_management.libraries.functions.curl_krb_request import DEFAULT_KERBEROS_KINIT_TIMER_MS +from resource_management.libraries.functions.curl_krb_request import KERBEROS_KINIT_TIMER_PARAMETER +from resource_management.core.environment import Environment + +RESULT_STATE_OK = 'OK' +RESULT_STATE_CRITICAL = 'CRITICAL' +RESULT_STATE_UNKNOWN = 'UNKNOWN' +RESULT_STATE_SKIPPED = 'SKIPPED' + +HDFS_NN_STATE_ACTIVE = 'active' +HDFS_NN_STATE_STANDBY = 'standby' + +HDFS_SITE_KEY = '{{hdfs-site}}' +NAMESERVICE_KEY = '{{hdfs-site/dfs.internal.nameservices}}' +NN_HTTP_ADDRESS_KEY = '{{hdfs-site/dfs.namenode.http-address}}' +NN_HTTPS_ADDRESS_KEY = '{{hdfs-site/dfs.namenode.https-address}}' +DFS_POLICY_KEY = '{{hdfs-site/dfs.http.policy}}' + +KERBEROS_KEYTAB = '{{hdfs-site/dfs.web.authentication.kerberos.keytab}}' +KERBEROS_PRINCIPAL = '{{hdfs-site/dfs.web.authentication.kerberos.principal}}' +SECURITY_ENABLED_KEY = '{{cluster-env/security_enabled}}' +SMOKEUSER_KEY = '{{cluster-env/smokeuser}}' +EXECUTABLE_SEARCH_PATHS = '{{kerberos-env/executable_search_paths}}' +INADDR_ANY = '0.0.0.0' +NAMENODE_HTTP_FRAGMENT = 'dfs.namenode.http-address.{0}.{1}' +NAMENODE_HTTPS_FRAGMENT = 'dfs.namenode.https-address.{0}.{1}' +NAMENODE_RPC_FRAGMENT = 'dfs.namenode.rpc-address.{0}.{1}' + +CONNECTION_TIMEOUT_KEY = 'connection.timeout' +CONNECTION_TIMEOUT_DEFAULT = 5.0 + +LOGGER_EXCEPTION_MESSAGE = "[Alert] NameNode High Availability Health on {0} fails:" +logger = logging.getLogger('ambari_alerts') + +def get_tokens(): + """ + Returns a tuple of tokens in the format {{site/property}} that will be used + to build the dictionary passed into execute + """ + return (HDFS_SITE_KEY, NAMESERVICE_KEY, NN_HTTP_ADDRESS_KEY, EXECUTABLE_SEARCH_PATHS, + NN_HTTPS_ADDRESS_KEY, DFS_POLICY_KEY, SMOKEUSER_KEY, KERBEROS_KEYTAB, KERBEROS_PRINCIPAL, SECURITY_ENABLED_KEY) + + +def execute(configurations={}, parameters={}, host_name=None): + """ + Returns a tuple containing the result code and a pre-formatted result label + + Keyword arguments: + configurations (dictionary): a mapping of configuration key to value + parameters (dictionary): a mapping of script parameter key to value + host_name (string): the name of this host where the alert is running + """ + if configurations is None: + return (RESULT_STATE_UNKNOWN, ['There were no configurations supplied to the script.']) + + # if not in HA mode, then SKIP + if not NAMESERVICE_KEY in configurations: + return (RESULT_STATE_SKIPPED, ['NameNode HA is not enabled']) + + # hdfs-site is required + if not HDFS_SITE_KEY in configurations: + return (RESULT_STATE_UNKNOWN, ['{0} is a required parameter for the script'.format(HDFS_SITE_KEY)]) + + if SMOKEUSER_KEY in configurations: + smokeuser = configurations[SMOKEUSER_KEY] + + executable_paths = None + if EXECUTABLE_SEARCH_PATHS in configurations: + executable_paths = configurations[EXECUTABLE_SEARCH_PATHS] + + # parse script arguments + connection_timeout = CONNECTION_TIMEOUT_DEFAULT + if CONNECTION_TIMEOUT_KEY in parameters: + connection_timeout = float(parameters[CONNECTION_TIMEOUT_KEY]) + + security_enabled = False + if SECURITY_ENABLED_KEY in configurations: + security_enabled = str(configurations[SECURITY_ENABLED_KEY]).upper() == 'TRUE' + + kerberos_keytab = None + if KERBEROS_KEYTAB in configurations: + kerberos_keytab = configurations[KERBEROS_KEYTAB] + + kerberos_principal = None + if KERBEROS_PRINCIPAL in configurations: + kerberos_principal = configurations[KERBEROS_PRINCIPAL] + kerberos_principal = kerberos_principal.replace('_HOST', host_name) + + kinit_timer_ms = parameters.get(KERBEROS_KINIT_TIMER_PARAMETER, DEFAULT_KERBEROS_KINIT_TIMER_MS) + + # determine whether or not SSL is enabled + is_ssl_enabled = False + if DFS_POLICY_KEY in configurations: + dfs_policy = configurations[DFS_POLICY_KEY] + if dfs_policy == "HTTPS_ONLY": + is_ssl_enabled = True + + name_service = configurations[NAMESERVICE_KEY] + hdfs_site = configurations[HDFS_SITE_KEY] + + # look for dfs.ha.namenodes.foo + nn_unique_ids_key = 'dfs.ha.namenodes.' + name_service + if not nn_unique_ids_key in hdfs_site: + return (RESULT_STATE_UNKNOWN, ['Unable to find unique namenode alias key {0}'.format(nn_unique_ids_key)]) + + namenode_http_fragment = NAMENODE_HTTP_FRAGMENT + jmx_uri_fragment = "http://{0}/jmx?qry=Hadoop:service=NameNode,name=*" + + if is_ssl_enabled: + namenode_http_fragment = NAMENODE_HTTPS_FRAGMENT + jmx_uri_fragment = "https://{0}/jmx?qry=Hadoop:service=NameNode,name=*" + + + active_namenodes = [] + standby_namenodes = [] + unknown_namenodes = [] + + # now we have something like 'nn1,nn2,nn3,nn4' + # turn it into dfs.namenode.[property].[dfs.nameservices].[nn_unique_id] + # ie dfs.namenode.http-address.hacluster.nn1 + nn_unique_ids = hdfs_site[nn_unique_ids_key].split(',') + for nn_unique_id in nn_unique_ids: + key = namenode_http_fragment.format(name_service,nn_unique_id) + rpc_key = NAMENODE_RPC_FRAGMENT.format(name_service,nn_unique_id) + + if key in hdfs_site: + # use str() to ensure that unicode strings do not have the u' in them + value = str(hdfs_site[key]) + if INADDR_ANY in value and rpc_key in hdfs_site: + rpc_value = str(hdfs_site[rpc_key]) + if INADDR_ANY not in rpc_value: + rpc_host = rpc_value.split(":")[0] + value = value.replace(INADDR_ANY, rpc_host) + + try: + jmx_uri = jmx_uri_fragment.format(value) + if kerberos_principal is not None and kerberos_keytab is not None and security_enabled: + env = Environment.get_instance() + + # curl requires an integer timeout + curl_connection_timeout = int(connection_timeout) + + state_response, error_msg, time_millis = curl_krb_request(env.tmp_dir, + kerberos_keytab, kerberos_principal, jmx_uri,"ha_nn_health", executable_paths, False, + "NameNode High Availability Health", smokeuser, connection_timeout=curl_connection_timeout, + kinit_timer_ms = kinit_timer_ms) + + state = _get_ha_state_from_json(state_response) + else: + state_response = get_jmx(jmx_uri, connection_timeout) + state = _get_ha_state_from_json(state_response) + + if state == HDFS_NN_STATE_ACTIVE: + active_namenodes.append(value) + elif state == HDFS_NN_STATE_STANDBY: + standby_namenodes.append(value) + else: + unknown_namenodes.append(value) + except: + logger.exception(LOGGER_EXCEPTION_MESSAGE.format(host_name)) + unknown_namenodes.append(value) + + # there's only one scenario here; there is exactly 1 active and 1 standby + is_topology_healthy = len(active_namenodes) == 1 and len(standby_namenodes) == 1 + + result_label = 'Active{0}, Standby{1}, Unknown{2}'.format(str(active_namenodes), + str(standby_namenodes), str(unknown_namenodes)) + + if is_topology_healthy: + # if there is exactly 1 active and 1 standby NN + return (RESULT_STATE_OK, [result_label]) + else: + # other scenario + return (RESULT_STATE_CRITICAL, [result_label]) + + +def get_jmx(query, connection_timeout): + response = None + + try: + response = urllib2.urlopen(query, timeout=connection_timeout) + json_data = response.read() + return json_data + finally: + if response is not None: + try: + response.close() + except: + pass + + +def _get_ha_state_from_json(string_json): + """ + Searches through the specified JSON string looking for HA state + enumerations. + :param string_json: the string JSON + :return: the value of the HA state (active, standby, etc) + """ + json_data = json.loads(string_json) + jmx_beans = json_data["beans"] + + # look for NameNodeStatus-State first + for jmx_bean in jmx_beans: + if "name" not in jmx_bean: + continue + + jmx_bean_name = jmx_bean["name"] + if jmx_bean_name == "Hadoop:service=NameNode,name=NameNodeStatus" and "State" in jmx_bean: + return jmx_bean["State"] + + # look for FSNamesystem-tag.HAState last + for jmx_bean in jmx_beans: + if "name" not in jmx_bean: + continue + + jmx_bean_name = jmx_bean["name"] + if jmx_bean_name == "Hadoop:service=NameNode,name=FSNamesystem": + return jmx_bean["tag.HAState"] http://git-wip-us.apache.org/repos/asf/ambari/blob/8f051fc5/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/alerts/alert_metrics_deviation.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/alerts/alert_metrics_deviation.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/alerts/alert_metrics_deviation.py new file mode 100644 index 0000000..f0a1d5c --- /dev/null +++ b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/alerts/alert_metrics_deviation.py @@ -0,0 +1,470 @@ +#!/usr/bin/env python + +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import httplib +import locale +import json +import logging +import urllib +import time +import urllib2 + +from resource_management import Environment +from ambari_commons.aggregate_functions import sample_standard_deviation, mean + +from resource_management.libraries.functions.curl_krb_request import curl_krb_request +from resource_management.libraries.functions.curl_krb_request import DEFAULT_KERBEROS_KINIT_TIMER_MS +from resource_management.libraries.functions.curl_krb_request import KERBEROS_KINIT_TIMER_PARAMETER + + +RESULT_STATE_OK = 'OK' +RESULT_STATE_CRITICAL = 'CRITICAL' +RESULT_STATE_WARNING = 'WARNING' +RESULT_STATE_UNKNOWN = 'UNKNOWN' +RESULT_STATE_SKIPPED = 'SKIPPED' + +HDFS_NN_STATE_ACTIVE = 'active' +HDFS_NN_STATE_STANDBY = 'standby' + +HDFS_SITE_KEY = '{{hdfs-site}}' +NAMESERVICE_KEY = '{{hdfs-site/dfs.internal.nameservices}}' +NN_HTTP_ADDRESS_KEY = '{{hdfs-site/dfs.namenode.http-address}}' +NN_HTTPS_ADDRESS_KEY = '{{hdfs-site/dfs.namenode.https-address}}' +DFS_POLICY_KEY = '{{hdfs-site/dfs.http.policy}}' + +KERBEROS_KEYTAB = '{{hdfs-site/dfs.web.authentication.kerberos.keytab}}' +KERBEROS_PRINCIPAL = '{{hdfs-site/dfs.web.authentication.kerberos.principal}}' +SECURITY_ENABLED_KEY = '{{cluster-env/security_enabled}}' +SMOKEUSER_KEY = '{{cluster-env/smokeuser}}' +EXECUTABLE_SEARCH_PATHS = '{{kerberos-env/executable_search_paths}}' + +METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY = '{{ams-site/timeline.metrics.service.webapp.address}}' +METRICS_COLLECTOR_VIP_HOST_KEY = '{{cluster-env/metrics_collector_vip_host}}' +METRICS_COLLECTOR_VIP_PORT_KEY = '{{cluster-env/metrics_collector_vip_port}}' + +CONNECTION_TIMEOUT_KEY = 'connection.timeout' +CONNECTION_TIMEOUT_DEFAULT = 5.0 + +MERGE_HA_METRICS_PARAM_KEY = 'mergeHaMetrics' +MERGE_HA_METRICS_PARAM_DEFAULT = False +METRIC_NAME_PARAM_KEY = 'metricName' +METRIC_NAME_PARAM_DEFAULT = '' +METRIC_UNITS_PARAM_KEY = 'metric.units' +METRIC_UNITS_DEFAULT = '' +APP_ID_PARAM_KEY = 'appId' +APP_ID_PARAM_DEFAULT = 'NAMENODE' + +# the interval to check the metric (should be cast to int but could be a float) +INTERVAL_PARAM_KEY = 'interval' +INTERVAL_PARAM_DEFAULT = 60 + +# the default threshold to trigger a CRITICAL (should be cast to int but could a float) +DEVIATION_CRITICAL_THRESHOLD_KEY = 'metric.deviation.critical.threshold' +DEVIATION_CRITICAL_THRESHOLD_DEFAULT = 10 + +# the default threshold to trigger a WARNING (should be cast to int but could be a float) +DEVIATION_WARNING_THRESHOLD_KEY = 'metric.deviation.warning.threshold' +DEVIATION_WARNING_THRESHOLD_DEFAULT = 5 +NAMENODE_SERVICE_RPC_PORT_KEY = '' + +MINIMUM_VALUE_THRESHOLD_KEY = 'minimumValue' + +AMS_METRICS_GET_URL = "/ws/v1/timeline/metrics?%s" + +# The variance for this alert is 27MB which is 27% of the 100MB average (20MB is the limit) +DEVIATION_THRESHOLD_MESSAGE = "The variance for this alert is {0}{1} which is {2:.0f}% of the {3}{4} average ({5}{6} is the limit)" + +# The variance for this alert is 15MB which is within 20% of the 904ms average (20MB is the limit) +DEVIATION_OK_MESSAGE = "The variance for this alert is {0}{1} which is within {2:.0f}% of the {3}{4} average ({5}{6} is the limit)" + +logger = logging.getLogger() + +def get_tokens(): + """ + Returns a tuple of tokens in the format {{site/property}} that will be used + to build the dictionary passed into execute + """ + return (HDFS_SITE_KEY, NAMESERVICE_KEY, NN_HTTP_ADDRESS_KEY, DFS_POLICY_KEY, + EXECUTABLE_SEARCH_PATHS, NN_HTTPS_ADDRESS_KEY, SMOKEUSER_KEY, + KERBEROS_KEYTAB, KERBEROS_PRINCIPAL, SECURITY_ENABLED_KEY, + METRICS_COLLECTOR_VIP_HOST_KEY, METRICS_COLLECTOR_VIP_PORT_KEY, + METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY) + +def execute(configurations={}, parameters={}, host_name=None): + """ + Returns a tuple containing the result code and a pre-formatted result label + + Keyword arguments: + configurations : a mapping of configuration key to value + parameters : a mapping of script parameter key to value + host_name : the name of this host where the alert is running + + :type configurations dict + :type parameters dict + :type host_name str + """ + hostnames = host_name + current_time = int(time.time()) * 1000 + + # parse script arguments + connection_timeout = CONNECTION_TIMEOUT_DEFAULT + if CONNECTION_TIMEOUT_KEY in parameters: + connection_timeout = float(parameters[CONNECTION_TIMEOUT_KEY]) + + merge_ha_metrics = MERGE_HA_METRICS_PARAM_DEFAULT + if MERGE_HA_METRICS_PARAM_KEY in parameters: + merge_ha_metrics = parameters[MERGE_HA_METRICS_PARAM_KEY].lower() == 'true' + + metric_name = METRIC_NAME_PARAM_DEFAULT + if METRIC_NAME_PARAM_KEY in parameters: + metric_name = parameters[METRIC_NAME_PARAM_KEY] + + metric_units = METRIC_UNITS_DEFAULT + if METRIC_UNITS_PARAM_KEY in parameters: + metric_units = parameters[METRIC_UNITS_PARAM_KEY] + + app_id = APP_ID_PARAM_DEFAULT + if APP_ID_PARAM_KEY in parameters: + app_id = parameters[APP_ID_PARAM_KEY] + + interval = INTERVAL_PARAM_DEFAULT + if INTERVAL_PARAM_KEY in parameters: + interval = _coerce_to_integer(parameters[INTERVAL_PARAM_KEY]) + + warning_threshold = DEVIATION_WARNING_THRESHOLD_DEFAULT + if DEVIATION_WARNING_THRESHOLD_KEY in parameters: + warning_threshold = _coerce_to_integer(parameters[DEVIATION_WARNING_THRESHOLD_KEY]) + + critical_threshold = DEVIATION_CRITICAL_THRESHOLD_DEFAULT + if DEVIATION_CRITICAL_THRESHOLD_KEY in parameters: + critical_threshold = _coerce_to_integer(parameters[DEVIATION_CRITICAL_THRESHOLD_KEY]) + + minimum_value_threshold = None + if MINIMUM_VALUE_THRESHOLD_KEY in parameters: + minimum_value_threshold = _coerce_to_integer(parameters[MINIMUM_VALUE_THRESHOLD_KEY]) + + #parse configuration + if configurations is None: + return (RESULT_STATE_UNKNOWN, ['There were no configurations supplied to the script.']) + + # hdfs-site is required + if not HDFS_SITE_KEY in configurations: + return (RESULT_STATE_UNKNOWN, ['{0} is a required parameter for the script'.format(HDFS_SITE_KEY)]) + + if METRICS_COLLECTOR_VIP_HOST_KEY in configurations and METRICS_COLLECTOR_VIP_PORT_KEY in configurations: + collector_host = configurations[METRICS_COLLECTOR_VIP_HOST_KEY] + collector_port = int(configurations[METRICS_COLLECTOR_VIP_PORT_KEY]) + else: + # ams-site/timeline.metrics.service.webapp.address is required + if not METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY in configurations: + return (RESULT_STATE_UNKNOWN, ['{0} is a required parameter for the script'.format(METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY)]) + else: + collector_webapp_address = configurations[METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY].split(":") + if valid_collector_webapp_address(collector_webapp_address): + collector_host = collector_webapp_address[0] + collector_port = int(collector_webapp_address[1]) + else: + return (RESULT_STATE_UNKNOWN, ['{0} value should be set as "fqdn_hostname:port", but set to {1}'.format( + METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY, configurations[METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY])]) + + namenode_service_rpc_address = None + # hdfs-site is required + if not HDFS_SITE_KEY in configurations: + return (RESULT_STATE_UNKNOWN, ['{0} is a required parameter for the script'.format(HDFS_SITE_KEY)]) + + hdfs_site = configurations[HDFS_SITE_KEY] + + if 'dfs.namenode.servicerpc-address' in hdfs_site: + namenode_service_rpc_address = hdfs_site['dfs.namenode.servicerpc-address'] + + # if namenode alert and HA mode + if NAMESERVICE_KEY in configurations and app_id.lower() == 'namenode': + # hdfs-site is required + if not HDFS_SITE_KEY in configurations: + return (RESULT_STATE_UNKNOWN, ['{0} is a required parameter for the script'.format(HDFS_SITE_KEY)]) + + if SMOKEUSER_KEY in configurations: + smokeuser = configurations[SMOKEUSER_KEY] + + executable_paths = None + if EXECUTABLE_SEARCH_PATHS in configurations: + executable_paths = configurations[EXECUTABLE_SEARCH_PATHS] + + # parse script arguments + security_enabled = False + if SECURITY_ENABLED_KEY in configurations: + security_enabled = str(configurations[SECURITY_ENABLED_KEY]).upper() == 'TRUE' + + kerberos_keytab = None + if KERBEROS_KEYTAB in configurations: + kerberos_keytab = configurations[KERBEROS_KEYTAB] + + kerberos_principal = None + if KERBEROS_PRINCIPAL in configurations: + kerberos_principal = configurations[KERBEROS_PRINCIPAL] + kerberos_principal = kerberos_principal.replace('_HOST', host_name) + + # determine whether or not SSL is enabled + is_ssl_enabled = False + if DFS_POLICY_KEY in configurations: + dfs_policy = configurations[DFS_POLICY_KEY] + if dfs_policy == "HTTPS_ONLY": + is_ssl_enabled = True + + kinit_timer_ms = parameters.get(KERBEROS_KINIT_TIMER_PARAMETER, DEFAULT_KERBEROS_KINIT_TIMER_MS) + + name_service = configurations[NAMESERVICE_KEY] + + # look for dfs.ha.namenodes.foo + nn_unique_ids_key = 'dfs.ha.namenodes.' + name_service + if not nn_unique_ids_key in hdfs_site: + return (RESULT_STATE_UNKNOWN, ['Unable to find unique NameNode alias key {0}'.format(nn_unique_ids_key)]) + + namenode_http_fragment = 'dfs.namenode.http-address.{0}.{1}' + jmx_uri_fragment = "http://{0}/jmx?qry=Hadoop:service=NameNode,name=*" + + if is_ssl_enabled: + namenode_http_fragment = 'dfs.namenode.https-address.{0}.{1}' + jmx_uri_fragment = "https://{0}/jmx?qry=Hadoop:service=NameNode,name=*" + + # now we have something like 'nn1,nn2,nn3,nn4' + # turn it into dfs.namenode.[property].[dfs.nameservices].[nn_unique_id] + # ie dfs.namenode.http-address.hacluster.nn1 + namenodes = [] + active_namenodes = [] + nn_unique_ids = hdfs_site[nn_unique_ids_key].split(',') + for nn_unique_id in nn_unique_ids: + key = namenode_http_fragment.format(name_service, nn_unique_id) + + if key in hdfs_site: + # use str() to ensure that unicode strings do not have the u' in them + value = str(hdfs_site[key]) + namenode = str(hdfs_site[key]).split(":")[0] + + namenodes.append(namenode) + try: + jmx_uri = jmx_uri_fragment.format(value) + if kerberos_principal is not None and kerberos_keytab is not None and security_enabled: + env = Environment.get_instance() + + # curl requires an integer timeout + curl_connection_timeout = int(connection_timeout) + state_response, error_msg, time_millis = curl_krb_request(env.tmp_dir, + kerberos_keytab, kerberos_principal, jmx_uri,"ha_nn_health", executable_paths, False, + "NameNode High Availability Health", smokeuser, connection_timeout=curl_connection_timeout, + kinit_timer_ms = kinit_timer_ms) + + state = _get_ha_state_from_json(state_response) + else: + state_response = get_jmx(jmx_uri, connection_timeout) + state = _get_ha_state_from_json(state_response) + + if state == HDFS_NN_STATE_ACTIVE: + active_namenodes.append(namenode) + + # Only check active NN + nn_service_rpc_address_key = 'dfs.namenode.servicerpc-address.{0}.{1}'.format(name_service, nn_unique_id) + if nn_service_rpc_address_key in hdfs_site: + namenode_service_rpc_address = hdfs_site[nn_service_rpc_address_key] + pass + except: + logger.exception("Unable to determine the active NameNode") + pass + + if merge_ha_metrics: + hostnames = ",".join(namenodes) + # run only on active NN, no need to run the same requests from the standby + if host_name not in active_namenodes: + return (RESULT_STATE_SKIPPED, ['This alert will be reported by another host.']) + pass + + # Skip service rpc alert if port is not enabled + if not namenode_service_rpc_address and 'rpc.rpc.datanode' in metric_name: + return (RESULT_STATE_SKIPPED, ['Service RPC port is not enabled.']) + + get_metrics_parameters = { + "metricNames": metric_name, + "appId": app_id, + "hostname": hostnames, + "startTime": current_time - interval * 60 * 1000, + "endTime": current_time, + "grouped": "true", + } + + encoded_get_metrics_parameters = urllib.urlencode(get_metrics_parameters) + + try: + conn = httplib.HTTPConnection(collector_host, int(collector_port), + timeout=connection_timeout) + conn.request("GET", AMS_METRICS_GET_URL % encoded_get_metrics_parameters) + response = conn.getresponse() + data = response.read() + conn.close() + except Exception: + return (RESULT_STATE_UNKNOWN, ["Unable to retrieve metrics from the Ambari Metrics service."]) + + if response.status != 200: + return (RESULT_STATE_UNKNOWN, ["Unable to retrieve metrics from the Ambari Metrics service."]) + + data_json = json.loads(data) + metrics = [] + # will get large standard deviation for multiple hosts, + # if host1 reports small local values, but host2 reports large local values + for metrics_data in data_json["metrics"]: + metrics += metrics_data["metrics"].values() + pass + + if not metrics or len(metrics) < 2: + number_of_data_points = len(metrics) if metrics else 0 + return (RESULT_STATE_SKIPPED, ["There are not enough data points to calculate the standard deviation ({0} sampled)".format( + number_of_data_points)]) + + minimum_value_multiplier = 1 + if 'dfs.FSNamesystem.CapacityUsed' in metric_name: + minimum_value_multiplier = 1024 * 1024 # MB to bytes + elif 'rpc.rpc.datanode' in metric_name or 'rpc.rpc.client' in metric_name: + minimum_value_multiplier = 1000 # seconds to millis + + if minimum_value_threshold: + # Filter out points below min threshold + metrics = [metric for metric in metrics if metric > (minimum_value_threshold * minimum_value_multiplier)] + if len(metrics) < 2: + return (RESULT_STATE_OK, ['There were no data points above the minimum threshold of {0} seconds'.format(minimum_value_threshold)]) + + mean_value = mean(metrics) + stddev = sample_standard_deviation(metrics) + + try: + deviation_percent = stddev / float(mean_value) * 100 + except ZeroDivisionError: + # should not be a case for this alert + return (RESULT_STATE_SKIPPED, ["Unable to calculate the standard deviation because the mean value is 0"]) + + # log the AMS request + if logger.isEnabledFor(logging.DEBUG): + logger.debug(""" + AMS request parameters - {0} + AMS response - {1} + Mean - {2} + Standard deviation - {3} + Percentage standard deviation - {4} + """.format(encoded_get_metrics_parameters, data_json, mean_value, stddev, deviation_percent)) + + mean_value_localized = locale.format("%.0f", mean_value, grouping=True) + + variance_value = (deviation_percent / 100.0) * mean_value + variance_value_localized = locale.format("%.0f", variance_value, grouping=True) + + # check for CRITICAL status + if deviation_percent > critical_threshold: + threshold_value = ((critical_threshold / 100.0) * mean_value) + threshold_value_localized = locale.format("%.0f", threshold_value, grouping=True) + + message = DEVIATION_THRESHOLD_MESSAGE.format(variance_value_localized, metric_units, deviation_percent, + mean_value_localized, metric_units, threshold_value_localized, metric_units) + + return (RESULT_STATE_CRITICAL,[message]) + + # check for WARNING status + if deviation_percent > warning_threshold: + threshold_value = ((warning_threshold / 100.0) * mean_value) + threshold_value_localized = locale.format("%.0f", threshold_value, grouping = True) + + message = DEVIATION_THRESHOLD_MESSAGE.format(variance_value_localized, metric_units, deviation_percent, + mean_value_localized, metric_units, threshold_value_localized, metric_units) + + return (RESULT_STATE_WARNING, [message]) + + # return OK status; use the warning threshold as the value to compare against + threshold_value = ((warning_threshold / 100.0) * mean_value) + threshold_value_localized = locale.format("%.0f", threshold_value, grouping = True) + + message = DEVIATION_OK_MESSAGE.format(variance_value_localized, metric_units, warning_threshold, + mean_value_localized, metric_units, threshold_value_localized, metric_units) + + return (RESULT_STATE_OK,[message]) + + +def valid_collector_webapp_address(webapp_address): + if len(webapp_address) == 2 \ + and webapp_address[0] != '127.0.0.1' \ + and webapp_address[0] != '0.0.0.0' \ + and webapp_address[1].isdigit(): + return True + + return False + + +def get_jmx(query, connection_timeout): + response = None + + try: + response = urllib2.urlopen(query, timeout=connection_timeout) + json_data = response.read() + return json_data + except Exception: + return {"beans": {}} + finally: + if response is not None: + try: + response.close() + except: + pass + +def _get_ha_state_from_json(string_json): + """ + Searches through the specified JSON string looking for HA state + enumerations. + :param string_json: the string JSON + :return: the value of the HA state (active, standby, etc) + """ + json_data = json.loads(string_json) + jmx_beans = json_data["beans"] + + # look for NameNodeStatus-State first + for jmx_bean in jmx_beans: + if "name" not in jmx_bean: + continue + + jmx_bean_name = jmx_bean["name"] + if jmx_bean_name == "Hadoop:service=NameNode,name=NameNodeStatus" and "State" in jmx_bean: + return jmx_bean["State"] + + # look for FSNamesystem-tag.HAState last + for jmx_bean in jmx_beans: + if "name" not in jmx_bean: + continue + + jmx_bean_name = jmx_bean["name"] + if jmx_bean_name == "Hadoop:service=NameNode,name=FSNamesystem": + return jmx_bean["tag.HAState"] + + +def _coerce_to_integer(value): + """ + Attempts to correctly coerce a value to an integer. For the case of an integer or a float, + this will essentially either NOOP or return a truncated value. If the parameter is a string, + then it will first attempt to be coerced from a integer, and failing that, a float. + :param value: the value to coerce + :return: the coerced value as an integer + """ + try: + return int(value) + except ValueError: + return int(float(value)) http://git-wip-us.apache.org/repos/asf/ambari/blob/8f051fc5/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/alerts/alert_upgrade_finalized.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/alerts/alert_upgrade_finalized.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/alerts/alert_upgrade_finalized.py new file mode 100644 index 0000000..427f1d1 --- /dev/null +++ b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/alerts/alert_upgrade_finalized.py @@ -0,0 +1,179 @@ +#!/usr/bin/env python + +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import urllib2 +import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set. +import logging +import traceback + +from resource_management.libraries.functions.curl_krb_request import curl_krb_request +from resource_management.libraries.functions.curl_krb_request import DEFAULT_KERBEROS_KINIT_TIMER_MS +from resource_management.libraries.functions.curl_krb_request import KERBEROS_KINIT_TIMER_PARAMETER +from resource_management.libraries.functions.curl_krb_request import CONNECTION_TIMEOUT_DEFAULT +from resource_management.core.environment import Environment +from resource_management.libraries.functions.namenode_ha_utils import get_all_namenode_addresses + +NN_HTTP_ADDRESS_KEY = '{{hdfs-site/dfs.namenode.http-address}}' +NN_HTTPS_ADDRESS_KEY = '{{hdfs-site/dfs.namenode.https-address}}' +NN_HTTP_POLICY_KEY = '{{hdfs-site/dfs.http.policy}}' + +HDFS_SITE_KEY = '{{hdfs-site}}' +KERBEROS_KEYTAB = '{{hdfs-site/dfs.web.authentication.kerberos.keytab}}' +KERBEROS_PRINCIPAL = '{{hdfs-site/dfs.web.authentication.kerberos.principal}}' +SECURITY_ENABLED_KEY = '{{cluster-env/security_enabled}}' +SMOKEUSER_KEY = "{{cluster-env/smokeuser}}" +EXECUTABLE_SEARCH_PATHS = '{{kerberos-env/executable_search_paths}}' +logger = logging.getLogger('ambari_alerts') + +def get_tokens(): + """ + Returns a tuple of tokens in the format {{site/property}} that will be used + to build the dictionary passed into execute + + :rtype tuple + """ + return (HDFS_SITE_KEY, NN_HTTP_ADDRESS_KEY, NN_HTTPS_ADDRESS_KEY, NN_HTTP_POLICY_KEY, EXECUTABLE_SEARCH_PATHS, + KERBEROS_KEYTAB, KERBEROS_PRINCIPAL, SECURITY_ENABLED_KEY, SMOKEUSER_KEY) + + +def execute(configurations={}, parameters={}, host_name=None): + """ + Returns a tuple containing the result code and a pre-formatted result label + + Keyword arguments: + configurations : a mapping of configuration key to value + parameters : a mapping of script parameter key to value + host_name : the name of this host where the alert is running + + :type configurations dict + :type parameters dict + :type host_name str + """ + + if configurations is None: + return (('UNKNOWN', ['There were no configurations supplied to the script.'])) + + uri = None + http_policy = 'HTTP_ONLY' + + # hdfs-site is required + if not HDFS_SITE_KEY in configurations: + return 'SKIPPED', ['{0} is a required parameter for the script'.format(HDFS_SITE_KEY)] + + if NN_HTTP_POLICY_KEY in configurations: + http_policy = configurations[NN_HTTP_POLICY_KEY] + + if SMOKEUSER_KEY in configurations: + smokeuser = configurations[SMOKEUSER_KEY] + + executable_paths = None + if EXECUTABLE_SEARCH_PATHS in configurations: + executable_paths = configurations[EXECUTABLE_SEARCH_PATHS] + + security_enabled = False + if SECURITY_ENABLED_KEY in configurations: + security_enabled = str(configurations[SECURITY_ENABLED_KEY]).upper() == 'TRUE' + + kerberos_keytab = None + if KERBEROS_KEYTAB in configurations: + kerberos_keytab = configurations[KERBEROS_KEYTAB] + + kerberos_principal = None + if KERBEROS_PRINCIPAL in configurations: + kerberos_principal = configurations[KERBEROS_PRINCIPAL] + kerberos_principal = kerberos_principal.replace('_HOST', host_name) + + kinit_timer_ms = parameters.get(KERBEROS_KINIT_TIMER_PARAMETER, DEFAULT_KERBEROS_KINIT_TIMER_MS) + + # determine the right URI and whether to use SSL + hdfs_site = configurations[HDFS_SITE_KEY] + + scheme = "https" if http_policy == "HTTPS_ONLY" else "http" + + nn_addresses = get_all_namenode_addresses(hdfs_site) + for nn_address in nn_addresses: + if nn_address.startswith(host_name + ":") or nn_address == host_name: + uri = nn_address + break + if not uri: + return 'SKIPPED', [ + 'NameNode on host {0} not found (namenode adresses = {1})'.format(host_name, ', '.join(nn_addresses))] + + upgrade_finalized_qry = "{0}://{1}/jmx?qry=Hadoop:service=NameNode,name=NameNodeInfo".format(scheme, uri) + + # start out assuming an OK status + label = None + result_code = "OK" + + try: + if kerberos_principal is not None and kerberos_keytab is not None and security_enabled: + env = Environment.get_instance() + + last_checkpoint_time_response, error_msg, time_millis = curl_krb_request( + env.tmp_dir, kerberos_keytab, + kerberos_principal, upgrade_finalized_qry, "upgrade_finalized_state", executable_paths, False, + "HDFS Upgrade Finalized State", smokeuser, kinit_timer_ms = kinit_timer_ms + ) + + upgrade_finalized_response_json = json.loads(last_checkpoint_time_response) + upgrade_finalized = bool(upgrade_finalized_response_json["beans"][0]["UpgradeFinalized"]) + + else: + upgrade_finalized = bool(get_value_from_jmx(upgrade_finalized_qry, + "UpgradeFinalized")) + + if upgrade_finalized: + label = "HDFS cluster is not in the upgrade state" + result_code = 'OK' + else: + label = "HDFS cluster is not finalized" + result_code = 'CRITICAL' + + except: + label = traceback.format_exc() + result_code = 'UNKNOWN' + + return ((result_code, [label])) + +def get_value_from_jmx(query, jmx_property): + """ + Read property from the jxm endpoint + + :param query: jmx uri path + :param jmx_property: property name to read + :return: jmx property value + + :type query str + :type jmx_property str + """ + response = None + + try: + response = urllib2.urlopen(query, timeout=int(CONNECTION_TIMEOUT_DEFAULT)) + data = response.read() + + data_dict = json.loads(data) + return data_dict["beans"][0][jmx_property] + finally: + if response is not None: + try: + response.close() + except: + pass http://git-wip-us.apache.org/repos/asf/ambari/blob/8f051fc5/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/files/checkWebUI.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/files/checkWebUI.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/files/checkWebUI.py new file mode 100644 index 0000000..6e4b028 --- /dev/null +++ b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/files/checkWebUI.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import optparse +import httplib +import socket +import ssl + +class ForcedProtocolHTTPSConnection(httplib.HTTPSConnection): + """ + Some of python implementations does not work correctly with sslv3 but trying to use it, we need to change protocol to + tls1. + """ + def __init__(self, host, port, force_protocol, **kwargs): + httplib.HTTPSConnection.__init__(self, host, port, **kwargs) + self.force_protocol = force_protocol + + def connect(self): + sock = socket.create_connection((self.host, self.port), self.timeout) + if getattr(self, '_tunnel_host', None): + self.sock = sock + self._tunnel() + self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file, ssl_version=getattr(ssl, self.force_protocol)) + +def make_connection(host, port, https, force_protocol=None): + try: + conn = httplib.HTTPConnection(host, port) if not https else httplib.HTTPSConnection(host, port) + conn.request("GET", "/") + return conn.getresponse().status + except ssl.SSLError: + # got ssl error, lets try to use TLS1 protocol, maybe it will work + try: + tls1_conn = ForcedProtocolHTTPSConnection(host, port, force_protocol) + tls1_conn.request("GET", "/") + return tls1_conn.getresponse().status + except Exception as e: + print e + finally: + tls1_conn.close() + except Exception as e: + print e + finally: + conn.close() +# +# Main. +# +def main(): + parser = optparse.OptionParser(usage="usage: %prog [options] component ") + parser.add_option("-m", "--hosts", dest="hosts", help="Comma separated hosts list for WEB UI to check it availability") + parser.add_option("-p", "--port", dest="port", help="Port of WEB UI to check it availability") + parser.add_option("-s", "--https", dest="https", help="\"True\" if value of dfs.http.policy is \"HTTPS_ONLY\"") + parser.add_option("-o", "--protocol", dest="protocol", help="Protocol to use when executing https request") + + (options, args) = parser.parse_args() + + hosts = options.hosts.split(',') + port = options.port + https = options.https + protocol = options.protocol + + for host in hosts: + httpCode = make_connection(host, port, https.lower() == "true", protocol) + + if httpCode != 200: + print "Cannot access WEB UI on: http://" + host + ":" + port if not https.lower() == "true" else "Cannot access WEB UI on: https://" + host + ":" + port + exit(1) + +if __name__ == "__main__": + main() http://git-wip-us.apache.org/repos/asf/ambari/blob/8f051fc5/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/__init__.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/__init__.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/__init__.py new file mode 100644 index 0000000..35de4bb --- /dev/null +++ b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/__init__.py @@ -0,0 +1,20 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +Ambari Agent + +""" http://git-wip-us.apache.org/repos/asf/ambari/blob/8f051fc5/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/balancer-emulator/hdfs-command.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/balancer-emulator/hdfs-command.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/balancer-emulator/hdfs-command.py new file mode 100644 index 0000000..88529b4 --- /dev/null +++ b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/balancer-emulator/hdfs-command.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' +import time +import sys +from threading import Thread + + +def write_function(path, handle, interval): + with open(path) as f: + for line in f: + handle.write(line) + handle.flush() + time.sleep(interval) + +thread = Thread(target = write_function, args = ('balancer.out', sys.stdout, 1.5)) +thread.start() + +threaderr = Thread(target = write_function, args = ('balancer.err', sys.stderr, 1.5 * 0.023)) +threaderr.start() + +thread.join() + + +def rebalancer_out(): + write_function('balancer.out', sys.stdout) + +def rebalancer_err(): + write_function('balancer.err', sys.stdout) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/8f051fc5/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/datanode.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/datanode.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/datanode.py new file mode 100644 index 0000000..79c32e4 --- /dev/null +++ b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/datanode.py @@ -0,0 +1,174 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" +import datanode_upgrade +from hdfs_datanode import datanode +from resource_management import * +from resource_management.libraries.functions import conf_select +from resource_management.libraries.functions import stack_select +from resource_management.libraries.functions import StackFeature +from resource_management.libraries.functions.stack_features import check_stack_feature +from resource_management.libraries.functions.security_commons import build_expectations, \ + cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, FILE_TYPE_XML +from hdfs import hdfs +from ambari_commons.os_family_impl import OsFamilyImpl +from ambari_commons import OSConst +from utils import get_hdfs_binary + +class DataNode(Script): + + def get_component_name(self): + return "hadoop-hdfs-datanode" + + def get_hdfs_binary(self): + """ + Get the name or path to the hdfs binary depending on the component name. + """ + component_name = self.get_component_name() + return get_hdfs_binary(component_name) + + + def install(self, env): + import params + env.set_params(params) + self.install_packages(env) + + def configure(self, env): + import params + env.set_params(params) + hdfs("datanode") + datanode(action="configure") + + def start(self, env, upgrade_type=None): + import params + env.set_params(params) + self.configure(env) + datanode(action="start") + + def stop(self, env, upgrade_type=None): + import params + env.set_params(params) + # pre-upgrade steps shutdown the datanode, so there's no need to call + + hdfs_binary = self.get_hdfs_binary() + if upgrade_type == "rolling": + stopped = datanode_upgrade.pre_rolling_upgrade_shutdown(hdfs_binary) + if not stopped: + datanode(action="stop") + else: + datanode(action="stop") + + def status(self, env): + import status_params + env.set_params(status_params) + datanode(action = "status") + + +@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT) +class DataNodeDefault(DataNode): + + def pre_upgrade_restart(self, env, upgrade_type=None): + Logger.info("Executing DataNode Stack Upgrade pre-restart") + import params + env.set_params(params) + if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version): + conf_select.select(params.stack_name, "hadoop", params.version) + stack_select.select("hadoop-hdfs-datanode", params.version) + + def post_upgrade_restart(self, env, upgrade_type=None): + Logger.info("Executing DataNode Stack Upgrade post-restart") + import params + env.set_params(params) + hdfs_binary = self.get_hdfs_binary() + # ensure the DataNode has started and rejoined the cluster + datanode_upgrade.post_upgrade_check(hdfs_binary) + + def security_status(self, env): + import status_params + + env.set_params(status_params) + props_value_check = {"hadoop.security.authentication": "kerberos", + "hadoop.security.authorization": "true"} + props_empty_check = ["hadoop.security.auth_to_local"] + props_read_check = None + core_site_expectations = build_expectations('core-site', props_value_check, props_empty_check, + props_read_check) + props_value_check = None + props_empty_check = ['dfs.datanode.keytab.file', + 'dfs.datanode.kerberos.principal'] + props_read_check = ['dfs.datanode.keytab.file'] + hdfs_site_expectations = build_expectations('hdfs-site', props_value_check, props_empty_check, + props_read_check) + + hdfs_expectations = {} + hdfs_expectations.update(core_site_expectations) + hdfs_expectations.update(hdfs_site_expectations) + + security_params = get_params_from_filesystem(status_params.hadoop_conf_dir, + {'core-site.xml': FILE_TYPE_XML, + 'hdfs-site.xml': FILE_TYPE_XML}) + + if 'core-site' in security_params and 'hadoop.security.authentication' in security_params['core-site'] and \ + security_params['core-site']['hadoop.security.authentication'].lower() == 'kerberos': + result_issues = validate_security_config_properties(security_params, hdfs_expectations) + if not result_issues: # If all validations passed successfully + try: + # Double check the dict before calling execute + if ('hdfs-site' not in security_params or + 'dfs.datanode.keytab.file' not in security_params['hdfs-site'] or + 'dfs.datanode.kerberos.principal' not in security_params['hdfs-site']): + self.put_structured_out({"securityState": "UNSECURED"}) + self.put_structured_out( + {"securityIssuesFound": "Keytab file or principal are not set property."}) + return + + cached_kinit_executor(status_params.kinit_path_local, + status_params.hdfs_user, + security_params['hdfs-site']['dfs.datanode.keytab.file'], + security_params['hdfs-site']['dfs.datanode.kerberos.principal'], + status_params.hostname, + status_params.tmp_dir) + self.put_structured_out({"securityState": "SECURED_KERBEROS"}) + except Exception as e: + self.put_structured_out({"securityState": "ERROR"}) + self.put_structured_out({"securityStateErrorInfo": str(e)}) + else: + issues = [] + for cf in result_issues: + issues.append("Configuration file %s did not pass the validation. Reason: %s" % (cf, result_issues[cf])) + self.put_structured_out({"securityIssuesFound": ". ".join(issues)}) + self.put_structured_out({"securityState": "UNSECURED"}) + else: + self.put_structured_out({"securityState": "UNSECURED"}) + + def get_log_folder(self): + import params + return params.hdfs_log_dir + + def get_user(self): + import params + return params.hdfs_user + +@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY) +class DataNodeWindows(DataNode): + def install(self, env): + import install_params + self.install_packages(env) + +if __name__ == "__main__": + DataNode().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/8f051fc5/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/datanode_upgrade.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/datanode_upgrade.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/datanode_upgrade.py new file mode 100644 index 0000000..b55237d --- /dev/null +++ b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/datanode_upgrade.py @@ -0,0 +1,156 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" +import re + +from resource_management.core.logger import Logger +from resource_management.core.exceptions import Fail +from resource_management.core.resources.system import Execute +from resource_management.core import shell +from resource_management.libraries.functions import format +from resource_management.libraries.functions.decorator import retry +from resource_management.libraries.functions import check_process_status +from resource_management.core import ComponentIsNotRunning +from utils import get_dfsadmin_base_command + + +def pre_rolling_upgrade_shutdown(hdfs_binary): + """ + Runs the "shutdownDatanode {ipc_address} upgrade" command to shutdown the + DataNode in preparation for an upgrade. This will then periodically check + "getDatanodeInfo" to ensure the DataNode has shutdown correctly. + This function will obtain the Kerberos ticket if security is enabled. + :param hdfs_binary: name/path of the HDFS binary to use + :return: Return True if ran ok (even with errors), and False if need to stop the datanode forcefully. + """ + import params + + Logger.info('DataNode executing "shutdownDatanode" command in preparation for upgrade...') + if params.security_enabled: + Execute(params.dn_kinit_cmd, user = params.hdfs_user) + + dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary) + command = format('{dfsadmin_base_command} -shutdownDatanode {dfs_dn_ipc_address} upgrade') + + code, output = shell.call(command, user=params.hdfs_user) + if code == 0: + # verify that the datanode is down + _check_datanode_shutdown(hdfs_binary) + else: + # Due to bug HDFS-7533, DataNode may not always shutdown during stack upgrade, and it is necessary to kill it. + if output is not None and re.search("Shutdown already in progress", output): + Logger.error("Due to a known issue in DataNode, the command {0} did not work, so will need to shutdown the datanode forcefully.".format(command)) + return False + return True + + +def post_upgrade_check(hdfs_binary): + """ + Verifies that the DataNode has rejoined the cluster. This function will + obtain the Kerberos ticket if security is enabled. + :param hdfs_binary: name/path of the HDFS binary to use + :return: + """ + import params + + Logger.info("Checking that the DataNode has rejoined the cluster after upgrade...") + if params.security_enabled: + Execute(params.dn_kinit_cmd, user=params.hdfs_user) + + # verify that the datanode has started and rejoined the HDFS cluster + _check_datanode_startup(hdfs_binary) + + +def is_datanode_process_running(): + import params + try: + check_process_status(params.datanode_pid_file) + return True + except ComponentIsNotRunning: + return False + +@retry(times=24, sleep_time=5, err_class=Fail) +def _check_datanode_shutdown(hdfs_binary): + """ + Checks that a DataNode is down by running "hdfs dfsamin getDatanodeInfo" + several times, pausing in between runs. Once the DataNode stops responding + this method will return, otherwise it will raise a Fail(...) and retry + automatically. + The stack defaults for retrying for HDFS are also way too slow for this + command; they are set to wait about 45 seconds between client retries. As + a result, a single execution of dfsadmin will take 45 seconds to retry and + the DataNode may be marked as dead, causing problems with HBase. + https://issues.apache.org/jira/browse/HDFS-8510 tracks reducing the + times for ipc.client.connect.retry.interval. In the meantime, override them + here, but only for RU. + :param hdfs_binary: name/path of the HDFS binary to use + :return: + """ + import params + + # override stock retry timeouts since after 30 seconds, the datanode is + # marked as dead and can affect HBase during RU + dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary) + command = format('{dfsadmin_base_command} -D ipc.client.connect.max.retries=5 -D ipc.client.connect.retry.interval=1000 -getDatanodeInfo {dfs_dn_ipc_address}') + + try: + Execute(command, user=params.hdfs_user, tries=1) + except: + Logger.info("DataNode has successfully shutdown for upgrade.") + return + + Logger.info("DataNode has not shutdown.") + raise Fail('DataNode has not shutdown.') + + +@retry(times=30, sleep_time=30, err_class=Fail) # keep trying for 15 mins +def _check_datanode_startup(hdfs_binary): + """ + Checks that a DataNode process is running and DataNode is reported as being alive via the + "hdfs dfsadmin -fs {namenode_address} -report -live" command. Once the DataNode is found to be + alive this method will return, otherwise it will raise a Fail(...) and retry + automatically. + :param hdfs_binary: name/path of the HDFS binary to use + :return: + """ + + if not is_datanode_process_running(): + Logger.info("DataNode process is not running") + raise Fail("DataNode process is not running") + + import params + import socket + + try: + dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary) + command = dfsadmin_base_command + ' -report -live' + return_code, hdfs_output = shell.call(command, user=params.hdfs_user) + except: + raise Fail('Unable to determine if the DataNode has started after upgrade.') + + if return_code == 0: + hostname = params.hostname.lower() + hostname_ip = socket.gethostbyname(params.hostname.lower()) + if hostname in hdfs_output.lower() or hostname_ip in hdfs_output.lower(): + Logger.info("DataNode {0} reports that it has rejoined the cluster.".format(params.hostname)) + return + else: + raise Fail("DataNode {0} was not found in the list of live DataNodes".format(params.hostname)) + + # return_code is not 0, fail + raise Fail("Unable to determine if the DataNode has started after upgrade (result code {0})".format(str(return_code))) http://git-wip-us.apache.org/repos/asf/ambari/blob/8f051fc5/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/hdfs.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/hdfs.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/hdfs.py new file mode 100644 index 0000000..1264284 --- /dev/null +++ b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/hdfs.py @@ -0,0 +1,173 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +Ambari Agent + +""" + +from resource_management import * +import os +from ambari_commons.os_family_impl import OsFamilyFuncImpl, OsFamilyImpl +from ambari_commons import OSConst + +@OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT) +def hdfs(name=None): + import params + + if params.create_lib_snappy_symlinks: + install_snappy() + + # On some OS this folder could be not exists, so we will create it before pushing there files + Directory(params.limits_conf_dir, + create_parents = True, + owner='root', + group='root' + ) + + File(os.path.join(params.limits_conf_dir, 'hdfs.conf'), + owner='root', + group='root', + mode=0644, + content=Template("hdfs.conf.j2") + ) + + if params.security_enabled: + tc_mode = 0644 + tc_owner = "root" + else: + tc_mode = None + tc_owner = params.hdfs_user + + if "hadoop-policy" in params.config['configurations']: + XmlConfig("hadoop-policy.xml", + conf_dir=params.hadoop_conf_dir, + configurations=params.config['configurations']['hadoop-policy'], + configuration_attributes=params.config['configuration_attributes']['hadoop-policy'], + owner=params.hdfs_user, + group=params.user_group + ) + + if "ssl-client" in params.config['configurations']: + XmlConfig("ssl-client.xml", + conf_dir=params.hadoop_conf_dir, + configurations=params.config['configurations']['ssl-client'], + configuration_attributes=params.config['configuration_attributes']['ssl-client'], + owner=params.hdfs_user, + group=params.user_group + ) + + Directory(params.hadoop_conf_secure_dir, + create_parents = True, + owner='root', + group=params.user_group, + cd_access='a', + ) + + XmlConfig("ssl-client.xml", + conf_dir=params.hadoop_conf_secure_dir, + configurations=params.config['configurations']['ssl-client'], + configuration_attributes=params.config['configuration_attributes']['ssl-client'], + owner=params.hdfs_user, + group=params.user_group + ) + + if "ssl-server" in params.config['configurations']: + XmlConfig("ssl-server.xml", + conf_dir=params.hadoop_conf_dir, + configurations=params.config['configurations']['ssl-server'], + configuration_attributes=params.config['configuration_attributes']['ssl-server'], + owner=params.hdfs_user, + group=params.user_group + ) + + XmlConfig("hdfs-site.xml", + conf_dir=params.hadoop_conf_dir, + configurations=params.config['configurations']['hdfs-site'], + configuration_attributes=params.config['configuration_attributes']['hdfs-site'], + owner=params.hdfs_user, + group=params.user_group + ) + + XmlConfig("core-site.xml", + conf_dir=params.hadoop_conf_dir, + configurations=params.config['configurations']['core-site'], + configuration_attributes=params.config['configuration_attributes']['core-site'], + owner=params.hdfs_user, + group=params.user_group, + mode=0644 + ) + + File(os.path.join(params.hadoop_conf_dir, 'slaves'), + owner=tc_owner, + content=Template("slaves.j2") + ) + + if params.lzo_enabled and len(params.lzo_packages) > 0: + Package(params.lzo_packages, + retry_on_repo_unavailability=params.agent_stack_retry_on_unavailability, + retry_count=params.agent_stack_retry_count) + +def install_snappy(): + import params + Directory([params.so_target_dir_x86, params.so_target_dir_x64], + create_parents = True, + ) + Link(params.so_target_x86, + to=params.so_src_x86, + ) + Link(params.so_target_x64, + to=params.so_src_x64, + ) + +@OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY) +def hdfs(component=None): + import params + if component == "namenode": + directories = params.dfs_name_dir.split(",") + Directory(directories, + owner=params.hdfs_user, + mode="(OI)(CI)F", + create_parents = True + ) + File(params.exclude_file_path, + content=Template("exclude_hosts_list.j2"), + owner=params.hdfs_user, + mode="f", + ) + if params.service_map.has_key(component): + service_name = params.service_map[component] + ServiceConfig(service_name, + action="change_user", + username=params.hdfs_user, + password=Script.get_password(params.hdfs_user)) + + if "hadoop-policy" in params.config['configurations']: + XmlConfig("hadoop-policy.xml", + conf_dir=params.hadoop_conf_dir, + configurations=params.config['configurations']['hadoop-policy'], + owner=params.hdfs_user, + mode="f", + configuration_attributes=params.config['configuration_attributes']['hadoop-policy'] + ) + + XmlConfig("hdfs-site.xml", + conf_dir=params.hadoop_conf_dir, + configurations=params.config['configurations']['hdfs-site'], + owner=params.hdfs_user, + mode="f", + configuration_attributes=params.config['configuration_attributes']['hdfs-site'] + )