Return-Path: X-Original-To: apmail-ambari-commits-archive@www.apache.org Delivered-To: apmail-ambari-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 05BFF10979 for ; Sat, 21 Feb 2015 00:35:32 +0000 (UTC) Received: (qmail 26654 invoked by uid 500); 21 Feb 2015 00:35:32 -0000 Delivered-To: apmail-ambari-commits-archive@ambari.apache.org Received: (qmail 26626 invoked by uid 500); 21 Feb 2015 00:35:31 -0000 Mailing-List: contact commits-help@ambari.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ambari-dev@ambari.apache.org Delivered-To: mailing list commits@ambari.apache.org Received: (qmail 26617 invoked by uid 99); 21 Feb 2015 00:35:31 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 21 Feb 2015 00:35:31 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AFDF0E03F7; Sat, 21 Feb 2015 00:35:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: alejandro@apache.org To: commits@ambari.apache.org Message-Id: <21935104bfc84abaa610eb7dfefa32ed@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: ambari git commit: AMBARI-9717. Kafka & Spark service checks fail intermittently on kerberized cluster (alejandro) Date: Sat, 21 Feb 2015 00:35:31 +0000 (UTC) Repository: ambari Updated Branches: refs/heads/trunk 225afe35c -> 6e23af6a4 AMBARI-9717. Kafka & Spark service checks fail intermittently on kerberized cluster (alejandro) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/6e23af6a Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/6e23af6a Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/6e23af6a Branch: refs/heads/trunk Commit: 6e23af6a443de04f148cfff0e7da572497ee9d9e Parents: 225afe3 Author: Alejandro Fernandez Authored: Thu Feb 19 14:18:17 2015 -0800 Committer: Alejandro Fernandez Committed: Fri Feb 20 15:38:59 2015 -0800 ---------------------------------------------------------------------- .../libraries/functions/validate.py | 36 +++++++++++++++++ .../KAFKA/0.8.1.2.2/package/scripts/params.py | 9 ++++- .../0.8.1.2.2/package/scripts/service_check.py | 42 +++++++++++--------- .../1.2.0.2.2/package/scripts/service_check.py | 35 ++++++---------- .../package/scripts/zookeeper_server.py | 22 ++-------- 5 files changed, 82 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/6e23af6a/ambari-common/src/main/python/resource_management/libraries/functions/validate.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/validate.py b/ambari-common/src/main/python/resource_management/libraries/functions/validate.py new file mode 100644 index 0000000..e56ec85 --- /dev/null +++ b/ambari-common/src/main/python/resource_management/libraries/functions/validate.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" +import re + +from resource_management.libraries.functions.decorator import retry +from resource_management.core.shell import call +from resource_management.core.exceptions import Fail + + +@retry(times=10, sleep_time=2) +def call_and_match_output(command, regex_expression, err_message): + """ + Call the command and performs a regex match on the output for the specified expression. + :param command: Command to call + :param regex_expression: Regex expression to search in the output + """ + code, out = call(command, logoutput=True) + if not (out and re.search(regex_expression, out, re.IGNORECASE)): + raise Fail(err_message) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/6e23af6a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py index 80b91b6..419639e 100644 --- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py @@ -17,10 +17,11 @@ See the License for the specific language governing permissions and limitations under the License. """ - +from resource_management.libraries.script.script import Script from resource_management.libraries.functions.version import format_hdp_stack_version, compare_versions from resource_management.libraries.functions.default import default -from resource_management import * +from resource_management.core.logger import Logger + import status_params # server configurations @@ -89,3 +90,7 @@ if has_metric_collector: kafka_metrics_reporters = kafka_metrics_reporters + ',' kafka_metrics_reporters = kafka_metrics_reporters + "org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter" + + +# Security-related params +security_enabled = config['configurations']['cluster-env']['security_enabled'] http://git-wip-us.apache.org/repos/asf/ambari/blob/6e23af6a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/service_check.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/service_check.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/service_check.py index b10b602..ef6f62f 100644 --- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/service_check.py +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/service_check.py @@ -17,34 +17,38 @@ See the License for the specific language governing permissions and limitations under the License. """ -from resource_management import * +from resource_management.libraries.script.script import Script +from resource_management.libraries.functions.validate import call_and_match_output +from resource_management.libraries.functions.format import format +from resource_management.core.logger import Logger class ServiceCheck(Script): def service_check(self, env): - import params - env.set_params(params) - - kafka_config = self.read_kafka_config() - - create_topic_cmd_created_output = "Created topic \"ambari_kafka_service_check\"." - create_topic_cmd_exists_output = "Topic \"ambari_kafka_service_check\" already exists." - - source_cmd = format("source {conf_dir}/kafka-env.sh") - create_topic_cmd = format("{kafka_home}/bin/kafka-topics.sh --zookeeper {kafka_config[zookeeper.connect]} --create --topic ambari_kafka_service_check --partitions 1 --replication-factor 1") - - print "Running kafka create topic command" - Execute(format("{source_cmd} ; {create_topic_cmd} | grep '{create_topic_cmd_created_output}\|{create_topic_cmd_exists_output}'"), - logoutput=True, - ) + import params + env.set_params(params) + + # TODO, Kafka was introduced in HDP 2.2 but will not support running in a kerberized cluster until HDP 2.3 (tentatively) + # Kafka uses its own Zookeeper instance and it does not yet have the capability of running in a secure mode. + kafka_config = self.read_kafka_config() + + create_topic_cmd_created_output = "Created topic \"ambari_kafka_service_check\"." + create_topic_cmd_exists_output = "Topic \"ambari_kafka_service_check\" already exists." + + source_cmd = format("source {conf_dir}/kafka-env.sh") + create_topic_cmd = format("{kafka_home}/bin/kafka-topics.sh --zookeeper {kafka_config[zookeeper.connect]} --create --topic ambari_kafka_service_check --partitions 1 --replication-factor 1") + command = source_cmd + " ; " + create_topic_cmd + + Logger.info("Running kafka create topic command: %s" % command) + call_and_match_output(command, format("({create_topic_cmd_created_output})|({create_topic_cmd_exists_output})"), "Failed to check that topic exists") def read_kafka_config(self): import params kafka_config = {} - with open(params.conf_dir+"/server.properties","r") as conf_file: + with open(params.conf_dir+"/server.properties", "r") as conf_file: for line in conf_file: - key,value = line.split("=") - kafka_config[key] = value.replace("\n","") + key,value = line.split("=") + kafka_config[key] = value.replace("\n","") return kafka_config http://git-wip-us.apache.org/repos/asf/ambari/blob/6e23af6a/ambari-server/src/main/resources/common-services/SPARK/1.2.0.2.2/package/scripts/service_check.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/SPARK/1.2.0.2.2/package/scripts/service_check.py b/ambari-server/src/main/resources/common-services/SPARK/1.2.0.2.2/package/scripts/service_check.py index d00ae40..b548b4d 100644 --- a/ambari-server/src/main/resources/common-services/SPARK/1.2.0.2.2/package/scripts/service_check.py +++ b/ambari-server/src/main/resources/common-services/SPARK/1.2.0.2.2/package/scripts/service_check.py @@ -15,24 +15,23 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ - -from resource_management import * import subprocess import time +from resource_management.libraries.script.script import Script +from resource_management.libraries.functions.format import format +from resource_management.core.resources.system import Execute +from resource_management.core.logger import Logger + class SparkServiceCheck(Script): def service_check(self, env): import params env.set_params(params) - # smoke_cmd = params.spark_service_check_cmd - # code, output = shell.call(smoke_cmd, timeout=100) - # if code == 0: - # Logger.info('Spark-on-Yarn Job submitted successfully') - # else: - # Logger.info('Spark-on-Yarn Job cannot be submitted') - # raise ComponentIsNotRunning() + if params.security_enabled: + spark_kinit_cmd = format("{kinit_path_local} -kt {spark_kerberos_keytab} {spark_principal}; ") + Execute(spark_kinit_cmd, user=params.spark_user) command = "curl" httpGssnegotiate = "--negotiate" @@ -46,31 +45,21 @@ class SparkServiceCheck(Script): command_with_flags = [command, silent, out, head, httpGssnegotiate, userpswd, insecure, url] is_running = False - for i in range(0,10): + for i in range(1,11): proc = subprocess.Popen(command_with_flags, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + Logger.info("Try %d, command: %s" % (i, " ".join(command_with_flags))) (stdout, stderr) = proc.communicate() response = stdout if '200' in response: is_running = True Logger.info('Spark Job History Server up and running') break + Logger.info("Response: %s" % str(response)) time.sleep(5) if is_running == False : Logger.info('Spark Job History Server not running.') - raise ComponentIsNotRunning() - - - - #command_with_flags = [command, silent, out, head, httpGssnegotiate, userpswd, insecure, url] - # proc = subprocess.Popen(command_with_flags, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - # (stdout, stderr) = proc.communicate() - # response = stdout - # if '200' in response: - # Logger.info('Spark Job History Server up and running') - # else: - # Logger.info('Spark Job History Server not running.') - # raise ComponentIsNotRunning() + raise ComponentIsNotRunning() if __name__ == "__main__": SparkServiceCheck().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/6e23af6a/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.5.2.0/package/scripts/zookeeper_server.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.5.2.0/package/scripts/zookeeper_server.py b/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.5.2.0/package/scripts/zookeeper_server.py index e402eaa..eb97974 100644 --- a/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.5.2.0/package/scripts/zookeeper_server.py +++ b/ambari-server/src/main/resources/common-services/ZOOKEEPER/3.4.5.2.0/package/scripts/zookeeper_server.py @@ -18,35 +18,21 @@ limitations under the License. Ambari Agent """ +import random -import re - -from resource_management import * +from resource_management.libraries.script.script import Script from resource_management.libraries.functions import get_unique_id_and_date -from resource_management.libraries.functions.decorator import retry from resource_management.libraries.functions.version import compare_versions, format_hdp_stack_version from resource_management.libraries.functions.security_commons import build_expectations, \ cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, \ FILE_TYPE_JAAS_CONF from resource_management.libraries.functions.format import format from resource_management.core.shell import call +from resource_management.libraries.functions.validate import call_and_match_output +from resource_management.core.logger import Logger from zookeeper import zookeeper from zookeeper_service import zookeeper_service -import random - -@retry(times=10, sleep_time=2, err_class=Fail) -def call_and_match_output(command, regex_expression, err_message): - """ - Call the command and performs a regex match on the output for the specified expression. - :param command: Command to call - :param regex_expression: Regex expression to search in the output - """ - # TODO Rolling Upgrade, does this work in Ubuntu? If it doesn't see dynamic_variable_interpretation.py to see how stdout was redirected - # to a temporary file, which was then read. - code, out = call(command) - if not (out and re.search(regex_expression, out, re.IGNORECASE)): - raise Fail(err_message) class ZookeeperServer(Script):