ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alejan...@apache.org
Subject ambari git commit: AMBARI-9717. Kafka & Spark service checks fail intermittently on kerberized cluster (alejandro)
Date Sat, 21 Feb 2015 00:35:31 GMT
Repository: ambari
Updated Branches:
  refs/heads/trunk 225afe35c -> 6e23af6a4


AMBARI-9717. Kafka & Spark service checks fail intermittently on kerberized cluster (alejandro)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/6e23af6a
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/6e23af6a
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/6e23af6a

Branch: refs/heads/trunk
Commit: 6e23af6a443de04f148cfff0e7da572497ee9d9e
Parents: 225afe3
Author: Alejandro Fernandez <afernandez@hortonworks.com>
Authored: Thu Feb 19 14:18:17 2015 -0800
Committer: Alejandro Fernandez <afernandez@hortonworks.com>
Committed: Fri Feb 20 15:38:59 2015 -0800

----------------------------------------------------------------------
 .../libraries/functions/validate.py             | 36 +++++++++++++++++
 .../KAFKA/0.8.1.2.2/package/scripts/params.py   |  9 ++++-
 .../0.8.1.2.2/package/scripts/service_check.py  | 42 +++++++++++---------
 .../1.2.0.2.2/package/scripts/service_check.py  | 35 ++++++----------
 .../package/scripts/zookeeper_server.py         | 22 ++--------
 5 files changed, 82 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/6e23af6a/ambari-common/src/main/python/resource_management/libraries/functions/validate.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/validate.py
b/ambari-common/src/main/python/resource_management/libraries/functions/validate.py
new file mode 100644
index 0000000..e56ec85
--- /dev/null
+++ b/ambari-common/src/main/python/resource_management/libraries/functions/validate.py
@@ -0,0 +1,36 @@
+#!/usr/bin/env python
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+import re
+
+from resource_management.libraries.functions.decorator import retry
+from resource_management.core.shell import call
+from resource_management.core.exceptions import Fail
+
+
+@retry(times=10, sleep_time=2)
+def call_and_match_output(command, regex_expression, err_message):
+  """
+  Call the command and performs a regex match on the output for the specified expression.
+  :param command: Command to call
+  :param regex_expression: Regex expression to search in the output
+  """
+  code, out = call(command, logoutput=True)
+  if not (out and re.search(regex_expression, out, re.IGNORECASE)):
+    raise Fail(err_message)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/6e23af6a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py
b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py
index 80b91b6..419639e 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py
@@ -17,10 +17,11 @@ See the License for the specific language governing permissions and
 limitations under the License.
 
 """
-
+from resource_management.libraries.script.script import Script
 from resource_management.libraries.functions.version import format_hdp_stack_version, compare_versions
 from resource_management.libraries.functions.default import default
-from resource_management import *
+from resource_management.core.logger import Logger
+
 import status_params
 
 # server configurations
@@ -89,3 +90,7 @@ if has_metric_collector:
       kafka_metrics_reporters = kafka_metrics_reporters + ','
 
   kafka_metrics_reporters = kafka_metrics_reporters + "org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter"
+
+
+# Security-related params
+security_enabled = config['configurations']['cluster-env']['security_enabled']

http://git-wip-us.apache.org/repos/asf/ambari/blob/6e23af6a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/service_check.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/service_check.py
b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/service_check.py
index b10b602..ef6f62f 100644
--- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/service_check.py
+++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/service_check.py
@@ -17,34 +17,38 @@ See the License for the specific language governing permissions and
 limitations under the License.
 
 """
-from resource_management import *
+from resource_management.libraries.script.script import Script
+from resource_management.libraries.functions.validate import call_and_match_output
+from resource_management.libraries.functions.format import format
+from resource_management.core.logger import Logger
 
 class ServiceCheck(Script):
   def service_check(self, env):
-      import params
-      env.set_params(params)
-      
-      kafka_config = self.read_kafka_config()
-      
-      create_topic_cmd_created_output = "Created topic \"ambari_kafka_service_check\"."
-      create_topic_cmd_exists_output = "Topic \"ambari_kafka_service_check\" already exists."
-      
-      source_cmd = format("source {conf_dir}/kafka-env.sh")
-      create_topic_cmd = format("{kafka_home}/bin/kafka-topics.sh --zookeeper {kafka_config[zookeeper.connect]}
--create --topic ambari_kafka_service_check --partitions 1 --replication-factor 1")
-      
-      print "Running kafka create topic command"
-      Execute(format("{source_cmd} ; {create_topic_cmd} | grep '{create_topic_cmd_created_output}\|{create_topic_cmd_exists_output}'"),
-              logoutput=True,
-      )
+    import params
+    env.set_params(params)
+
+    # TODO, Kafka was introduced in HDP 2.2 but will not support running in a kerberized
cluster until HDP 2.3 (tentatively)
+    # Kafka uses its own Zookeeper instance and it does not yet have the capability of running
in a secure mode.
+    kafka_config = self.read_kafka_config()
+
+    create_topic_cmd_created_output = "Created topic \"ambari_kafka_service_check\"."
+    create_topic_cmd_exists_output = "Topic \"ambari_kafka_service_check\" already exists."
+
+    source_cmd = format("source {conf_dir}/kafka-env.sh")
+    create_topic_cmd = format("{kafka_home}/bin/kafka-topics.sh --zookeeper {kafka_config[zookeeper.connect]}
--create --topic ambari_kafka_service_check --partitions 1 --replication-factor 1")
+    command = source_cmd + " ; " + create_topic_cmd
+
+    Logger.info("Running kafka create topic command: %s" % command)
+    call_and_match_output(command, format("({create_topic_cmd_created_output})|({create_topic_cmd_exists_output})"),
"Failed to check that topic exists")
 
   def read_kafka_config(self):
     import params
     
     kafka_config = {}
-    with open(params.conf_dir+"/server.properties","r") as conf_file:
+    with open(params.conf_dir+"/server.properties", "r") as conf_file:
       for line in conf_file:
-          key,value = line.split("=")
-          kafka_config[key] = value.replace("\n","")
+        key,value = line.split("=")
+        kafka_config[key] = value.replace("\n","")
     
     return kafka_config
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/6e23af6a/ambari-server/src/main/resources/common-services/SPARK/1.2.0.2.2/package/scripts/service_check.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/SPARK/1.2.0.2.2/package/scripts/service_check.py
b/ambari-server/src/main/resources/common-services/SPARK/1.2.0.2.2/package/scripts/service_check.py
index d00ae40..b548b4d 100644
--- a/ambari-server/src/main/resources/common-services/SPARK/1.2.0.2.2/package/scripts/service_check.py
+++ b/ambari-server/src/main/resources/common-services/SPARK/1.2.0.2.2/package/scripts/service_check.py
@@ -15,24 +15,23 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 """
-
-from resource_management import *
 import subprocess
 import time
 
+from resource_management.libraries.script.script import Script
+from resource_management.libraries.functions.format import format
+from resource_management.core.resources.system import Execute
+from resource_management.core.logger import Logger
+
 class SparkServiceCheck(Script):
   def service_check(self, env):
     import params
 
     env.set_params(params)
 
-    # smoke_cmd = params.spark_service_check_cmd
-    # code, output = shell.call(smoke_cmd, timeout=100)
-    # if code == 0:
-    #   Logger.info('Spark-on-Yarn Job submitted successfully')
-    # else:
-    #   Logger.info('Spark-on-Yarn Job cannot be submitted')
-    #   raise ComponentIsNotRunning()
+    if params.security_enabled:
+      spark_kinit_cmd = format("{kinit_path_local} -kt {spark_kerberos_keytab} {spark_principal};
")
+      Execute(spark_kinit_cmd, user=params.spark_user)
 
     command = "curl"
     httpGssnegotiate = "--negotiate"
@@ -46,31 +45,21 @@ class SparkServiceCheck(Script):
     command_with_flags = [command, silent, out, head, httpGssnegotiate, userpswd, insecure,
url]
 
     is_running = False
-    for i in range(0,10):
+    for i in range(1,11):
       proc = subprocess.Popen(command_with_flags, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+      Logger.info("Try %d, command: %s" % (i, " ".join(command_with_flags)))
       (stdout, stderr) = proc.communicate()
       response = stdout
       if '200' in response:
         is_running = True
         Logger.info('Spark Job History Server up and running')
         break
+      Logger.info("Response: %s" % str(response))
       time.sleep(5)
 
     if is_running == False :
       Logger.info('Spark Job History Server not running.')
-      raise ComponentIsNotRunning()  
-
-
-
-    #command_with_flags = [command, silent, out, head, httpGssnegotiate, userpswd, insecure,
url]
-    # proc = subprocess.Popen(command_with_flags, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-    # (stdout, stderr) = proc.communicate()
-    # response = stdout
-    # if '200' in response:
-    #   Logger.info('Spark Job History Server up and running')
-    # else:
-    #   Logger.info('Spark Job History Server not running.')
-    #   raise ComponentIsNotRunning()
+      raise ComponentIsNotRunning()
 
 if __name__ == "__main__":
   SparkServiceCheck().execute()

http://git-wip-us.apache.org/repos/asf/ambari/blob/6e23af6a/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.5.2.0/package/scripts/zookeeper_server.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.5.2.0/package/scripts/zookeeper_server.py
b/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.5.2.0/package/scripts/zookeeper_server.py
index e402eaa..eb97974 100644
--- a/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.5.2.0/package/scripts/zookeeper_server.py
+++ b/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.5.2.0/package/scripts/zookeeper_server.py
@@ -18,35 +18,21 @@ limitations under the License.
 Ambari Agent
 
 """
+import random
 
-import re
-
-from resource_management import *
+from resource_management.libraries.script.script import Script
 from resource_management.libraries.functions import get_unique_id_and_date
-from resource_management.libraries.functions.decorator import retry
 from resource_management.libraries.functions.version import compare_versions, format_hdp_stack_version
 from resource_management.libraries.functions.security_commons import build_expectations,
\
   cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties,
\
   FILE_TYPE_JAAS_CONF
 from resource_management.libraries.functions.format import format
 from resource_management.core.shell import call
+from resource_management.libraries.functions.validate import call_and_match_output
+from resource_management.core.logger import Logger
 
 from zookeeper import zookeeper
 from zookeeper_service import zookeeper_service
-import random
-
-@retry(times=10, sleep_time=2, err_class=Fail)
-def call_and_match_output(command, regex_expression, err_message):
-  """
-  Call the command and performs a regex match on the output for the specified expression.
-  :param command: Command to call
-  :param regex_expression: Regex expression to search in the output
-  """
-  # TODO Rolling Upgrade, does this work in Ubuntu? If it doesn't see dynamic_variable_interpretation.py
to see how stdout was redirected
-  # to a temporary file, which was then read.
-  code, out = call(command)
-  if not (out and re.search(regex_expression, out, re.IGNORECASE)):
-    raise Fail(err_message)
 
 
 class ZookeeperServer(Script):


Mime
View raw message