Return-Path: X-Original-To: apmail-ambari-commits-archive@www.apache.org Delivered-To: apmail-ambari-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2473719372 for ; Wed, 27 Apr 2016 12:51:35 +0000 (UTC) Received: (qmail 66828 invoked by uid 500); 27 Apr 2016 12:51:35 -0000 Delivered-To: apmail-ambari-commits-archive@ambari.apache.org Received: (qmail 66802 invoked by uid 500); 27 Apr 2016 12:51:34 -0000 Mailing-List: contact commits-help@ambari.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ambari-dev@ambari.apache.org Delivered-To: mailing list commits@ambari.apache.org Received: (qmail 66793 invoked by uid 99); 27 Apr 2016 12:51:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 Apr 2016 12:51:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9F603DFD7B; Wed, 27 Apr 2016 12:51:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aonishuk@apache.org To: commits@ambari.apache.org Message-Id: <1d0de66d5e17466b92d5913477f681ab@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: ambari git commit: AMBARI-16120. export_ams_script should save configs into the files (aonishuk) Date: Wed, 27 Apr 2016 12:51:34 +0000 (UTC) Repository: ambari Updated Branches: refs/heads/trunk d0d86edd5 -> e8a79db74 AMBARI-16120. export_ams_script should save configs into the files (aonishuk) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/e8a79db7 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/e8a79db7 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/e8a79db7 Branch: refs/heads/trunk Commit: e8a79db74252a01cd530ce7dc796863eaa733945 Parents: d0d86ed Author: Andrew Onishuk Authored: Wed Apr 27 15:50:22 2016 +0300 Committer: Andrew Onishuk Committed: Wed Apr 27 15:50:22 2016 +0300 ---------------------------------------------------------------------- .../resources/scripts/export_ams_metrics.py | 273 ++++++++++++++----- 1 file changed, 206 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/e8a79db7/ambari-server/src/main/resources/scripts/export_ams_metrics.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/scripts/export_ams_metrics.py b/ambari-server/src/main/resources/scripts/export_ams_metrics.py index e9ef307..31fc07b 100644 --- a/ambari-server/src/main/resources/scripts/export_ams_metrics.py +++ b/ambari-server/src/main/resources/scripts/export_ams_metrics.py @@ -27,6 +27,7 @@ import datetime import time import re import copy +from optparse import OptionGroup from flask import Flask, Response, jsonify, request, abort from flask.ext.cors import CORS from flask_restful import Resource, Api, reqparse @@ -35,6 +36,7 @@ from flask_restful import Resource, Api, reqparse class Params: + ACTION = None AMS_HOSTNAME = 'localhost' AMS_PORT = '6188' AMS_APP_ID = None @@ -51,6 +53,9 @@ class Params: FLASK_SERVER_NAME = None METRICS_FOR_HOSTS = {} HOSTS_WITH_COMPONENTS = {} + INPUT_DIR = None + VERBOSE = None + AGGREGATE = None @staticmethod def get_collector_uri(metricNames, hostname=None): @@ -116,43 +121,117 @@ class Utils: else: return -1 + @staticmethod + def read_json_file(filename): + with open(filename) as f: + return json.load(f) + + @staticmethod + def get_configs(): + conf_file = None + if Params.INPUT_DIR: + for metrics_dir in AmsMetricsProcessor.get_metrics_dirs(Params.INPUT_DIR): + for dir_item in os.listdir(metrics_dir): + dir_item_path = os.path.join(Params.INPUT_DIR, metrics_dir, dir_item) + if dir_item == "configs": + conf_file = dir_item_path + break + if conf_file: + break + + if os.path.exists(conf_file): + json = Utils.read_json_file(conf_file) + Params.AMS_APP_ID = json['APP_ID'] + Params.START_TIME = json['START_TIME'] + Params.END_TIME = json['END_TIME'] + Params.AGGREGATE = json['AGGREGATE'] + else: + logger.warn('Not found config file in {0}'.format(os.path.join(Params.INPUT_DIR), "configs")) + logger.info('Aborting...') + sys.exit(1) + + @staticmethod + def set_configs(): + conf_file = os.path.join(Params.OUT_DIR, "configs") + aggregate = True if not Params.HOSTS else False + properties = {"APP_ID" : Params.AMS_APP_ID, "START_TIME" : Params.START_TIME, "END_TIME" : Params.END_TIME, "AGGREGATE" : aggregate} + + with open(conf_file, 'w') as file: + file.write(json.dumps(properties)) class AmsMetricsProcessor: @staticmethod - def get_metrics_for_host(metrics, host=None): - metrics_result = {} + def write_metrics_to_file(metrics, host=None): + for metric in metrics: uri = Params.get_collector_uri(metric, host) - logger.debug('Request URI: %s' % str(uri)) - metrics_dict = Utils.get_data_from_url(uri) - if metrics_dict and "metrics" in metrics_dict and metrics_dict["metrics"]: - metrics_result[metric] = metrics_dict - else: - logger.debug("No found metric {0} on host {1}".format(metric, host)) - - return metrics_result + logger.info('Request URI: %s' % str(uri)) + metrics_json = Utils.get_data_from_url(uri) + if metrics_json: + if host: + path = os.path.join(Params.OUT_DIR, host, metric) + else: + path = os.path.join(Params.OUT_DIR, metric) + logger.info('Writing metric file: %s' % path) + with open(path, 'w') as file: + file.write(json.dumps(metrics_json)) @staticmethod def get_metrics_metadata(): - app_metrics_metadata = [] for metric in Params.METRICS: - app_metrics_metadata.append({"metricname": metric, "seriesStartTime": Params.START_TIME, "supportsAggregation": "true", "type": "UNDEFINED"}) - logger.debug("Adding {0} to metadata".format(metric)) + if not Params.AGGREGATE: + app_metrics_metadata.append({"metricname": metric, "seriesStartTime": Params.START_TIME, "supportsAggregation": "false", "type": "UNDEFINED"}) + else: + app_metrics_metadata.append({"metricname": metric, "seriesStartTime": Params.START_TIME, "supportsAggregation": "false"}) + logger.debug("Adding {0} to metadata".format(metric)) return {Params.AMS_APP_ID : app_metrics_metadata} @staticmethod def get_hosts_with_components(): hosts_with_components = {} - for host in Params.HOSTS: - hosts_with_components[host] = [Params.AMS_APP_ID] - return hosts_with_components + if Params.AGGREGATE: + return {"fakehostname" : [Params.AMS_APP_ID]} + else: + for host in Params.HOSTS: + hosts_with_components[host] = [Params.AMS_APP_ID] + return hosts_with_components @staticmethod - def export_ams_metrics(): + def get_metrics_dirs(d): + for o in os.listdir(d): + if 'ambari_metrics_export_' in o and os.path.isdir(os.path.join(d, o)): + yield os.path.join(d, o) + + @staticmethod + def ger_metrics_from_input_dir(): + metrics_for_hosts = {} + + for metrics_dir in AmsMetricsProcessor.get_metrics_dirs(Params.INPUT_DIR): + for dir_item in os.listdir(metrics_dir): + dir_item_path = os.path.join(metrics_dir, dir_item) + if os.path.isdir(dir_item_path): + if dir_item not in Params.HOSTS: + Params.HOSTS.append(os.path.basename(dir_item)) + metrics_for_hosts[dir_item] = {} + for metric in os.listdir(dir_item_path): + if metric not in Params.METRICS: + Params.METRICS.append(os.path.basename(metric)) + metric_file = os.path.join(dir_item_path, metric) + metrics_for_hosts[dir_item][metric] = Utils.read_json_file(metric_file) + elif os.path.isfile(dir_item_path): + if dir_item not in Params.METRICS and dir_item != "configs": + Params.METRICS.append(os.path.basename(dir_item)) + metric_file = os.path.join(Params.INPUT_DIR, dir_item_path) + metrics_for_hosts[dir_item] = Utils.read_json_file(metric_file) + + return metrics_for_hosts + + @staticmethod + def export_ams_metrics(): if not os.path.exists(Params.METRICS_FILE): logger.error('Metrics file is required.') sys.exit(1) @@ -162,24 +241,31 @@ class AmsMetricsProcessor: Params.METRICS.append(line.strip()) logger.info('Reading hosts file.') + logger.info('Reading hosts file.') if Params.HOSTS_FILE and os.path.exists(Params.HOSTS_FILE): with open(Params.HOSTS_FILE, 'r') as file: for line in file: Params.HOSTS.append(line.strip()) else: logger.info('No hosts file found, aggregate metrics will be exported.') - hosts_metrics = {} + if Params.HOSTS: for host in Params.HOSTS: - hosts_metrics[host] = AmsMetricsProcessor.get_metrics_for_host(Params.METRICS, host) - return hosts_metrics + os.makedirs(os.path.join(Params.OUT_DIR, host)) # create host dir + AmsMetricsProcessor.write_metrics_to_file(Params.METRICS, host) else: - return AmsMetricsProcessor.get_metrics_for_host(Params.METRICS, None) + os.makedirs(os.path.join(Params.OUT_DIR)) + AmsMetricsProcessor.write_metrics_to_file(Params.METRICS, None) def process(self): - self.metrics_for_hosts = self.export_ams_metrics() - self.metrics_metadata = self.get_metrics_metadata() - self.hosts_with_components = self.get_hosts_with_components() + if Params.ACTION == "export": + self.export_ams_metrics() + Utils.set_configs() + else: + Utils.get_configs() + self.metrics_for_hosts = self.ger_metrics_from_input_dir() + self.metrics_metadata = self.get_metrics_metadata() + self.hosts_with_components = self.get_hosts_with_components() class FlaskServer(): @@ -195,7 +281,7 @@ class FlaskServer(): logger.info("Start Flask server. Server URL = " + Params.FLASK_SERVER_NAME + ":5000") - app.run(debug=True, + app.run(debug=Params.VERBOSE, host=Params.FLASK_SERVER_NAME, port=5000) @@ -237,18 +323,26 @@ class MetricsResource(Resource): metric_dict = {"metrics" : []} for host_name in host_names: if metric_name in self.ams_metrics_processor.metrics_for_hosts[host_name]: - metric_dict["metrics"].append(self.ams_metrics_processor.metrics_for_hosts[host_name][metric_name]["metrics"][0]) + if len(self.ams_metrics_processor.metrics_for_hosts[host_name][metric_name]["metrics"]) > 0: + metric_dict["metrics"].append(self.ams_metrics_processor.metrics_for_hosts[host_name][metric_name]["metrics"][0]) else: continue + + elif Params.AGGREGATE: + for metric in self.ams_metrics_processor.metrics_for_hosts: + if metric_name == metric: + metric_dict = self.ams_metrics_processor.metrics_for_hosts[metric_name] + break + else: for host in self.ams_metrics_processor.metrics_for_hosts: for metric in self.ams_metrics_processor.metrics_for_hosts[host]: - if metric_name == metric: - metric_dict = self.ams_metrics_processor.metrics_for_hosts[host][metric_name] - break + if metric_name == metric and len(self.ams_metrics_processor.metrics_for_hosts[host][metric_name]["metrics"]) > 0: + metric_dict = self.ams_metrics_processor.metrics_for_hosts[host][metric_name] + break if metric_dict: - metrics_json_new = copy.copy(metric_dict) + metrics_json_new = copy.deepcopy(metric_dict) for i in range (0, len(metrics_json_new["metrics"])): metrics_json_new["metrics"][i]["metricname"] += separator + operation return jsonify(metrics_json_new) @@ -266,78 +360,123 @@ def main(): 'from Ambari Metrics Service to a output dir. ' 'The metrics will be exported to a file with name of ' 'the metric and in a directory with the name as the ' - 'hostname under the output dir.') + 'hostname under the output dir. ' + 'Also this python program is a thin REST server ' + 'that implements a subset of the Ambari Metrics Service metrics server interfaces. ' + 'You can use it to visualize information exported by the AMS thin client') d = datetime.datetime.now() time_suffix = '{0}-{1}-{2}-{3}-{4}-{5}'.format(d.year, d.month, d.day, d.hour, d.minute, d.second) print 'Time: %s' % time_suffix + logfile = os.path.join('/tmp', 'ambari_metrics_export.out') - parser.add_option("-v", "--verbose", dest="verbose", action="store_false", + output_dir = os.path.join('/tmp', 'ambari_metrics_export_' + time_suffix) + + parser.add_option("-a", "--action", dest="action", default="set_action", help="Use action 'export' for exporting AMS metrics. " + "Use action 'run' for run REST server") + parser.add_option("-v", "--verbose", dest="verbose", action="store_true", default=False, help="output verbosity.") - parser.add_option("-s", "--host", dest="server_hostname", + parser.add_option("-l", "--logfile", dest="log_file", default=logfile, + metavar='FILE', help="Log file. [default: %s]" % logfile) + + export_options_group = OptionGroup(parser, "Required options for action 'export'") + #export metrics ----------------------------------------------------- + export_options_group.add_option("-s", "--host", dest="server_hostname", help="AMS host name.") - parser.add_option("-p", "--port", dest="server_port", + export_options_group.add_option("-p", "--port", dest="server_port", default="6188", help="AMS port. [default: 6188]") - parser.add_option("-a", "--app-id", dest="app_id", + export_options_group.add_option("-c", "--app-id", dest="app_id", help="AMS app id.") - parser.add_option("-m", "--metrics-file", dest="metrics_file", + export_options_group.add_option("-m", "--metrics-file", dest="metrics_file", help="Metrics file with metric names to query. New line separated.") - parser.add_option("-f", "--host-file", dest="hosts_file", + export_options_group.add_option("-f", "--host-file", dest="hosts_file", help="Host file with hostnames to query. New line separated.") - parser.add_option("-l", "--logfile", dest="log_file", default=logfile, - metavar='FILE', help="Log file. [default: %s]" % logfile) - parser.add_option("-r", "--precision", dest="precision", + export_options_group.add_option("-r", "--precision", dest="precision", default='minutes', help="AMS API precision, default = minutes.") - parser.add_option("-b", "--start_time", dest="start_time", + export_options_group.add_option("-b", "--start_time", dest="start_time", help="Start time in milliseconds since epoch or UTC timestamp in YYYY-MM-DDTHH:mm:ssZ format.") - parser.add_option("-e", "--end_time", dest="end_time", + export_options_group.add_option("-e", "--end_time", dest="end_time", help="End time in milliseconds since epoch or UTC timestamp in YYYY-MM-DDTHH:mm:ssZ format.") - parser.add_option("-n", "--flask-server_name", dest="server_name", + export_options_group.add_option("-o", "--output-dir", dest="output_dir", default=output_dir, + help="Output dir. [default: %s]" % output_dir) + parser.add_option_group(export_options_group) + #start Flask server ----------------------------------------------------- + + run_server_option_group = OptionGroup(parser, "Required options for action 'run'") + + run_server_option_group.add_option("-i", "--input-dir", dest="input_dir", + default='/tmp', help="Input directory for AMS metrics exports. [default: /tmp]") + run_server_option_group.add_option("-n", "--flask-server_name", dest="server_name", help="Flask server name, default = 127.0.0.1", default="127.0.0.1") + parser.add_option_group(run_server_option_group) (options, args) = parser.parse_args() - Params.AMS_HOSTNAME = options.server_hostname - Params.AMS_PORT = options.server_port + #export metrics ----------------------------------------------------- + Params.ACTION = options.action - Params.AMS_APP_ID = options.app_id + Params.VERBOSE = options.verbose - if Params.AMS_APP_ID != "HOST": - Params.AMS_APP_ID = Params.AMS_APP_ID.lower() + Utils.setup_logger(options.verbose, options.log_file) - Params.METRICS_FILE = options.metrics_file + if Params.ACTION == "export": - Params.HOSTS_FILE = options.hosts_file + Params.AMS_HOSTNAME = options.server_hostname - Params.PRECISION = options.precision + Params.AMS_PORT = options.server_port - Params.FLASK_SERVER_NAME = options.server_name + Params.AMS_APP_ID = options.app_id - Utils.setup_logger(options.verbose, options.log_file) + if Params.AMS_APP_ID != "HOST": + Params.AMS_APP_ID = Params.AMS_APP_ID.lower() - Params.START_TIME = Utils.get_epoch(options.start_time) + Params.METRICS_FILE = options.metrics_file - if Params.START_TIME == -1: - logger.warn('No start time provided, or it is in the wrong format. Please ' - 'provide milliseconds since epoch or a value in YYYY-MM-DDTHH:mm:ssZ format') - logger.info('Aborting...') - sys.exit(1) + Params.HOSTS_FILE = options.hosts_file - Params.END_TIME = Utils.get_epoch(options.end_time) + Params.PRECISION = options.precision - if Params.END_TIME == -1: - logger.warn('No end time provided, or it is in the wrong format. Please ' - 'provide milliseconds since epoch or a value in YYYY-MM-DDTHH:mm:ssZ format') + Params.OUT_DIR = options.output_dir + + if Params.START_TIME == -1: + logger.warn('No start time provided, or it is in the wrong format. Please ' + 'provide milliseconds since epoch or a value in YYYY-MM-DDTHH:mm:ssZ format') + logger.info('Aborting...') + sys.exit(1) + + Params.END_TIME = Utils.get_epoch(options.end_time) + + if Params.END_TIME == -1: + logger.warn('No end time provided, or it is in the wrong format. Please ' + 'provide milliseconds since epoch or a value in YYYY-MM-DDTHH:mm:ssZ format') + logger.info('Aborting...') + sys.exit(1) + + Params.START_TIME = Utils.get_epoch(options.start_time) + + ams_metrics_processor = AmsMetricsProcessor() + ams_metrics_processor.process() + + + elif Params.ACTION == "run": + #start Flask server ----------------------------------------------------- + Params.INPUT_DIR = options.input_dir + + Params.FLASK_SERVER_NAME = options.server_name + + ams_metrics_processor = AmsMetricsProcessor() + ams_metrics_processor.process() + FlaskServer(ams_metrics_processor) + + else: + logger.warn('Action \'{0}\' not supported. Please use action \'export\' for exporting AMS metrics ' + 'or use action \'run\' for starting REST server'.format(Params.ACTION)) logger.info('Aborting...') sys.exit(1) - ams_metrics_processor = AmsMetricsProcessor() - ams_metrics_processor.process() - FlaskServer(ams_metrics_processor) - if __name__ == "__main__": try: