ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ey...@apache.org
Subject ambari git commit: AMBARI-21821, update Ambari Metrics service check to support HTTP SPNEGO authentication. (Qin Liu via eyang)
Date Sun, 10 Sep 2017 15:48:12 GMT
Repository: ambari
Updated Branches:
  refs/heads/trunk acde50281 -> 285cbafe3


AMBARI-21821, update Ambari Metrics service check to support
HTTP SPNEGO authentication. (Qin Liu via eyang)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/285cbafe
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/285cbafe
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/285cbafe

Branch: refs/heads/trunk
Commit: 285cbafe31df4dd574e6a9828934b49269b169c3
Parents: acde502
Author: Eric Yang <eyang@apache.org>
Authored: Sun Sep 10 08:46:49 2017 -0700
Committer: Eric Yang <eyang@apache.org>
Committed: Sun Sep 10 08:46:49 2017 -0700

----------------------------------------------------------------------
 .../0.1.0/package/scripts/params.py             |   2 +
 .../0.1.0/package/scripts/service_check.py      | 201 ++++++++++++++-----
 2 files changed, 151 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/285cbafe/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
index 6975bec..756da26 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
@@ -313,6 +313,8 @@ if security_enabled:
   _hostname_lowercase = config['hostname'].lower()
   client_jaas_config_file = format("{hbase_conf_dir}/hbase_client_jaas.conf")
   smoke_user_keytab = config['configurations']['cluster-env']['smokeuser_keytab']
+  smoke_user_princ = config['configurations']['cluster-env']['smokeuser_principal_name']
+  smoke_user = config['configurations']['cluster-env']['smokeuser']
   hbase_user_keytab = config['configurations']['ams-hbase-env']['hbase_user_keytab']
 
   ams_collector_jaas_config_file = format("{hbase_conf_dir}/ams_collector_jaas.conf")

http://git-wip-us.apache.org/repos/asf/ambari/blob/285cbafe/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py
b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py
index 2b3dfa9..b31475a 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py
@@ -22,6 +22,7 @@ from resource_management.core.logger import Logger
 from resource_management.core.base import Fail
 from resource_management import Script
 from resource_management import Template
+from resource_management.libraries.functions.curl_krb_request import curl_krb_request
 
 from ambari_commons import OSConst
 from ambari_commons.os_family_impl import OsFamilyFuncImpl, OsFamilyImpl
@@ -65,16 +66,32 @@ class AMSServiceCheck(Script):
 
   def service_check_for_single_host(self, metric_collector_host, params):
     random_value1 = random.random()
-    headers = {"Content-type": "application/json"}
-    ca_certs = os.path.join(params.ams_monitor_conf_dir,
-                            params.metric_truststore_ca_certs)
 
     current_time = int(time.time()) * 1000
     metric_json = Template('smoketest_metrics.json.j2', hostname=params.hostname, random1=random_value1,
                            current_time=current_time).get_content()
     try:
-      post_metrics_to_collector(self.AMS_METRICS_POST_URL, metric_collector_host, params.metric_collector_port,
params.metric_collector_https_enabled,
-                                metric_json, headers, ca_certs, self.AMS_CONNECT_TRIES, self.AMS_CONNECT_TIMEOUT)
+      if is_spnego_enabled(params):
+        header= 'Content-Type: application/json'
+        method = 'POST'
+        tmp_dir = Script.get_tmp_dir()
+
+        protocol = "http"
+        if not callable(params.metric_collector_https_enabled):
+          if params.metric_collector_https_enabled:
+            protocol = "https"
+        port = str(params.metric_collector_port)
+        uri = '{0}://{1}:{2}{3}'.format(
+        protocol, metric_collector_host, port, self.AMS_METRICS_POST_URL)
+
+        call_curl_krb_request(tmp_dir, params.smoke_user_keytab, params.smoke_user_princ,
uri, params.kinit_path_local, params.smoke_user,
+                              self.AMS_CONNECT_TIMEOUT, method, metric_json, header, tries
= self.AMS_CONNECT_TRIES)
+      else :
+        headers = {"Content-type": "application/json"}
+        ca_certs = os.path.join(params.ams_monitor_conf_dir,
+                                params.metric_truststore_ca_certs)
+        post_metrics_to_collector(self.AMS_METRICS_POST_URL, metric_collector_host, params.metric_collector_port,
params.metric_collector_https_enabled,
+                                  metric_json, headers, ca_certs, self.AMS_CONNECT_TRIES,
self.AMS_CONNECT_TIMEOUT)
 
       get_metrics_parameters = {
         "metricNames": "AMBARI_METRICS.SmokeTest.FakeMetric",
@@ -87,55 +104,63 @@ class AMSServiceCheck(Script):
       }
       encoded_get_metrics_parameters = urllib.urlencode(get_metrics_parameters)
 
-      Logger.info("Connecting (GET) to %s:%s%s" % (metric_collector_host,
-                                                   params.metric_collector_port,
-                                                   self.AMS_METRICS_GET_URL % encoded_get_metrics_parameters))
-      for i in xrange(0, self.AMS_READ_TRIES):
-        conn = network.get_http_connection(
-          metric_collector_host,
-          int(params.metric_collector_port),
-          params.metric_collector_https_enabled,
-          ca_certs,
-          ssl_version=Script.get_force_https_protocol_value()
-        )
-        conn.request("GET", self.AMS_METRICS_GET_URL % encoded_get_metrics_parameters)
-        response = conn.getresponse()
-        Logger.info("Http response for host %s : %s %s" % (metric_collector_host, response.status,
response.reason))
-
-        data = response.read()
-        Logger.info("Http data: %s" % data)
-        conn.close()
-
-        if response.status == 200:
-          Logger.info("Metrics were retrieved from host %s" % metric_collector_host)
-        else:
-          raise Fail("Metrics were not retrieved from host %s. GET request status: %s %s
\n%s" %
-                     (metric_collector_host, response.status, response.reason, data))
-        data_json = json.loads(data)
-
-        def floats_eq(f1, f2, delta):
-          return abs(f1-f2) < delta
-
-        values_are_present = False
-        for metrics_data in data_json["metrics"]:
-          if (str(current_time) in metrics_data["metrics"] and str(current_time + 1000) in
metrics_data["metrics"]
-              and floats_eq(metrics_data["metrics"][str(current_time)], random_value1, 0.0000001)
-              and floats_eq(metrics_data["metrics"][str(current_time + 1000)], current_time,
1)):
-            Logger.info("Values %s and %s were found in the response from host %s." % (metric_collector_host,
random_value1, current_time))
-            values_are_present = True
-            break
-            pass
+      if is_spnego_enabled(params):
+        method = 'GET'
+        uri = '{0}://{1}:{2}{3}'.format(
+        protocol, metric_collector_host, port, self.AMS_METRICS_GET_URL % encoded_get_metrics_parameters)
 
-        if not values_are_present:
-          if i < self.AMS_READ_TRIES - 1:  #range/xrange returns items from start to end-1
-            Logger.info("Values weren't stored yet. Retrying in %s seconds."
-                        % (self.AMS_READ_TIMEOUT))
-            time.sleep(self.AMS_READ_TIMEOUT)
+        call_curl_krb_request(tmp_dir, params.smoke_user_keytab, params.smoke_user_princ,
uri, params.kinit_path_local, params.smoke_user,
+                              self.AMS_READ_TIMEOUT, method, tries = self.AMS_READ_TRIES,
current_time = current_time, random_value = random_value1)
+      else:
+        Logger.info("Connecting (GET) to %s:%s%s" % (metric_collector_host,
+                                                     params.metric_collector_port,
+                                                     self.AMS_METRICS_GET_URL % encoded_get_metrics_parameters))
+        for i in xrange(0, self.AMS_READ_TRIES):
+          conn = network.get_http_connection(
+            metric_collector_host,
+            int(params.metric_collector_port),
+            params.metric_collector_https_enabled,
+            ca_certs,
+            ssl_version=Script.get_force_https_protocol_value()
+          )
+          conn.request("GET", self.AMS_METRICS_GET_URL % encoded_get_metrics_parameters)
+          response = conn.getresponse()
+          Logger.info("Http response for host %s : %s %s" % (metric_collector_host, response.status,
response.reason))
+
+          data = response.read()
+          Logger.info("Http data: %s" % data)
+          conn.close()
+
+          if response.status == 200:
+            Logger.info("Metrics were retrieved from host %s" % metric_collector_host)
           else:
-            raise Fail("Values %s and %s were not found in the response." % (random_value1,
current_time))
-        else:
-          break
-          pass
+            raise Fail("Metrics were not retrieved from host %s. GET request status: %s %s
\n%s" %
+                       (metric_collector_host, response.status, response.reason, data))
+          data_json = json.loads(data)
+
+          def floats_eq(f1, f2, delta):
+            return abs(f1-f2) < delta
+
+          values_are_present = False
+          for metrics_data in data_json["metrics"]:
+            if (str(current_time) in metrics_data["metrics"] and str(current_time + 1000)
in metrics_data["metrics"]
+                and floats_eq(metrics_data["metrics"][str(current_time)], random_value1,
0.0000001)
+                and floats_eq(metrics_data["metrics"][str(current_time + 1000)], current_time,
1)):
+              Logger.info("Values %s and %s were found in the response from host %s." % (metric_collector_host,
random_value1, current_time))
+              values_are_present = True
+              break
+              pass
+
+          if not values_are_present:
+            if i < self.AMS_READ_TRIES - 1:  #range/xrange returns items from start to
end-1
+              Logger.info("Values weren't stored yet. Retrying in %s seconds."
+                          % (self.AMS_READ_TIMEOUT))
+              time.sleep(self.AMS_READ_TIMEOUT)
+            else:
+              raise Fail("Values %s and %s were not found in the response." % (random_value1,
current_time))
+          else:
+            break
+            pass
     except Fail as ex:
       Logger.warning("Ambari Metrics service check failed on collector host %s. Reason :
%s" % (metric_collector_host, str(ex)))
       raise Fail("Ambari Metrics service check failed on collector host %s. Reason : %s"
% (metric_collector_host, str(ex)))
@@ -158,6 +183,78 @@ class AMSServiceCheck(Script):
           Logger.warning(results[host].result)
     raise Fail("All metrics collectors are unavailable.")
 
+def is_spnego_enabled(params):
+  return params.security_enabled \
+      and 'core-site' in params.config['configurations'] \
+      and 'hadoop.http.authentication.type' in params.config['configurations']['core-site']
\
+      and params.config['configurations']['core-site']['hadoop.http.authentication.type']
== "kerberos" \
+      and 'hadoop.http.filter.initializers' in params.config['configurations']['core-site']
\
+      and params.config['configurations']['core-site']['hadoop.http.filter.initializers']
== "org.apache.hadoop.security.AuthenticationFilterInitializer"
+
+def call_curl_krb_request(tmp_dir, user_keytab, user_princ, uri, kinit_path, user,
+                          connection_timeout, method='GET', metric_json='', header='', tries
= 1, current_time = 0, random_value = 0):
+  if method == 'POST':
+    Logger.info("Generated metrics for %s:\n%s" % (uri, metric_json))
+
+  for i in xrange(0, tries):
+    try:
+      Logger.info("Connecting (%s) to %s" % (method, uri));
+
+      response = None
+      errmsg = None
+      time_millis = 0
+
+      response, errmsg, time_millis = curl_krb_request(tmp_dir, user_keytab, user_princ,
uri, 'ams_service_check',
+                                                       kinit_path, False, "AMS Service Check",
user,
+                                                       connection_timeout=connection_timeout,
kinit_timer_ms=0,
+                                                       method=method, body=metric_json, header=header)
+    except Exception, exception:
+      if i < tries - 1:  #range/xrange returns items from start to end-1
+        time.sleep(connection_timeout)
+        Logger.info("Connection failed for %s. Next retry in %s seconds."
+                    % (uri, connection_timeout))
+        continue
+      else:
+        raise Fail("Unable to {0} metrics on: {1}. Exception: {2}".format(method, uri, str(exception)))
+    finally:
+      if not response:
+        Logger.error("Unable to {0} metrics on: {1}.  Error: {2}".format(method, uri, errmsg))
+      else:
+        Logger.info("%s response from %s: %s, errmsg: %s" % (method, uri, response, errmsg));
+        try:
+          response.close()
+        except:
+          Logger.debug("Unable to close {0} connection to {1}".format(method, uri))
+
+    if method == 'GET':
+      data_json = json.loads(response)
+
+      def floats_eq(f1, f2, delta):
+        return abs(f1-f2) < delta
+
+      values_are_present = False
+      for metrics_data in data_json["metrics"]:
+        if (str(current_time) in metrics_data["metrics"] and str(current_time + 1000) in
metrics_data["metrics"]
+            and floats_eq(metrics_data["metrics"][str(current_time)], random_value, 0.0000001)
+            and floats_eq(metrics_data["metrics"][str(current_time + 1000)], current_time,
1)):
+          Logger.info("Values %s and %s were found in the response from %s." % (uri, random_value,
current_time))
+          values_are_present = True
+          break
+          pass
+
+      if not values_are_present:
+        if i < tries - 1:  #range/xrange returns items from start to end-1
+          Logger.info("Values weren't stored yet. Retrying in %s seconds."
+                      % (tries))
+          time.sleep(connection_timeout)
+        else:
+          raise Fail("Values %s and %s were not found in the response." % (random_value,
current_time))
+      else:
+        break
+        pass
+    else:
+      break
+
 def post_metrics_to_collector(ams_metrics_post_url, metric_collector_host, metric_collector_port,
metric_collector_https_enabled,
                               metric_json, headers, ca_certs, tries = 1, connect_timeout
= 10):
   for i in xrange(0, tries):


Mime
View raw message