Return-Path: X-Original-To: apmail-ambari-commits-archive@www.apache.org Delivered-To: apmail-ambari-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 484F218F00 for ; Thu, 24 Mar 2016 23:10:45 +0000 (UTC) Received: (qmail 19343 invoked by uid 500); 24 Mar 2016 23:10:45 -0000 Delivered-To: apmail-ambari-commits-archive@ambari.apache.org Received: (qmail 19315 invoked by uid 500); 24 Mar 2016 23:10:45 -0000 Mailing-List: contact commits-help@ambari.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ambari-dev@ambari.apache.org Delivered-To: mailing list commits@ambari.apache.org Received: (qmail 19306 invoked by uid 99); 24 Mar 2016 23:10:44 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 Mar 2016 23:10:44 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AFBFFDFB79; Thu, 24 Mar 2016 23:10:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jluniya@apache.org To: commits@ambari.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: ambari git commit: AMBARI-15526: Stack Featurize Kafka service (jluniya) Date: Thu, 24 Mar 2016 23:10:44 +0000 (UTC) Repository: ambari Updated Branches: refs/heads/trunk 4a64727fb -> 7cbf3f4b4 AMBARI-15526: Stack Featurize Kafka service (jluniya) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/7cbf3f4b Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/7cbf3f4b Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/7cbf3f4b Branch: refs/heads/trunk Commit: 7cbf3f4b44beaca76d95035567ca38b23a92a741 Parents: 4a64727 Author: Jayush Luniya Authored: Thu Mar 24 16:10:36 2016 -0700 Committer: Jayush Luniya Committed: Thu Mar 24 16:10:36 2016 -0700 ---------------------------------------------------------------------- .../libraries/functions/constants.py | 3 +++ .../libraries/functions/stack_features.py | 18 ++++++++++++++- .../KAFKA/0.8.1.2.2/package/scripts/kafka.py | 10 +++++--- .../0.8.1.2.2/package/scripts/kafka_broker.py | 10 +++++--- .../KAFKA/0.8.1.2.2/package/scripts/params.py | 24 ++++++++++++-------- .../KAFKA/0.8.1.2.2/package/scripts/upgrade.py | 4 ++-- .../HDP/2.0.6/properties/stack_features.json | 18 ++++++++++++++- 7 files changed, 67 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/7cbf3f4b/ambari-common/src/main/python/resource_management/libraries/functions/constants.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/constants.py b/ambari-common/src/main/python/resource_management/libraries/functions/constants.py index 9ecb55b..f766a82 100644 --- a/ambari-common/src/main/python/resource_management/libraries/functions/constants.py +++ b/ambari-common/src/main/python/resource_management/libraries/functions/constants.py @@ -50,3 +50,6 @@ class StackFeature: SPARK_THRIFTSERVER = "spark_thriftserver" STORM_KERBEROS = "storm_kerberos" STORM_AMS = "storm_ams" + CREATE_KAFKA_BROKER_ID = "create_kafka_broker_id" + KAFKA_LISTENERS = "kafka_listeners" + KAFKA_KERBEROS = "kafka_kerberos" http://git-wip-us.apache.org/repos/asf/ambari/blob/7cbf3f4b/ambari-common/src/main/python/resource_management/libraries/functions/stack_features.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/stack_features.py b/ambari-common/src/main/python/resource_management/libraries/functions/stack_features.py index 31f9d25..2f0e6bf 100644 --- a/ambari-common/src/main/python/resource_management/libraries/functions/stack_features.py +++ b/ambari-common/src/main/python/resource_management/libraries/functions/stack_features.py @@ -82,6 +82,22 @@ _DEFAULT_STACK_FEATURES = { "name": "storm_ams", "description": "Storm AMS integration (AMBARI-10710)", "min_version": "2.2.0.0" + }, + { + "name": "create_kafka_broker_id", + "description": "Ambari should create Kafka Broker Id (AMBARI-12678)", + "min_version": "2.2.0.0", + "max_version": "2.3.0.0" + }, + { + "name": "kafka_listeners", + "description": "Kafka listeners (AMBARI-10984)", + "min_version": "2.3.0.0" + }, + { + "name": "kafka_kerberos", + "description": "Kafka Kerberos support (AMBARI-10984)", + "min_version": "2.3.0.0" } ] } @@ -111,4 +127,4 @@ def check_stack_feature(stack_feature, stack_version): return False return True - return False \ No newline at end of file + return False http://git-wip-us.apache.org/repos/asf/ambari/blob/7cbf3f4b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py index 43b318c..33275f9 100644 --- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka.py @@ -20,12 +20,14 @@ limitations under the License. import collections import os -from resource_management.libraries.functions.version import format_stack_version, compare_versions +from resource_management.libraries.functions.version import format_stack_version from resource_management.libraries.resources.properties_file import PropertiesFile from resource_management.libraries.resources.template_config import TemplateConfig from resource_management.core.resources.system import Directory, Execute, File, Link from resource_management.core.source import StaticFile, Template, InlineTemplate from resource_management.libraries.functions import format +from resource_management.libraries.functions.stack_features import check_stack_feature +from resource_management.libraries.functions import StackFeature from resource_management.core.logger import Logger @@ -42,14 +44,16 @@ def kafka(upgrade_type=None): effective_version = params.stack_version_formatted if upgrade_type is None else format_stack_version(params.version) Logger.info(format("Effective stack version: {effective_version}")) - if effective_version is not None and effective_version != "" and compare_versions(effective_version, '2.2.0.0') >= 0 and compare_versions(effective_version, '2.3.0.0') < 0: + if effective_version is not None and effective_version != "" and \ + check_stack_feature(StackFeature.CREATE_KAFKA_BROKER_ID, effective_version): if len(params.kafka_hosts) > 0 and params.hostname in params.kafka_hosts: brokerid = str(sorted(params.kafka_hosts).index(params.hostname)) kafka_server_config['broker.id'] = brokerid Logger.info(format("Calculating broker.id as {brokerid}")) # listeners and advertised.listeners are only added in 2.3.0.0 onwards. - if effective_version is not None and effective_version != "" and compare_versions(effective_version, '2.3.0.0') >= 0: + if effective_version is not None and effective_version != "" and \ + check_stack_feature(StackFeature.KAFKA_LISTENERS, effective_version): listeners = kafka_server_config['listeners'].replace("localhost", params.hostname) Logger.info(format("Kafka listeners: {listeners}")) http://git-wip-us.apache.org/repos/asf/ambari/blob/7cbf3f4b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py index 314d702..2043cfa 100644 --- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/kafka_broker.py @@ -25,6 +25,8 @@ from resource_management.libraries.functions import Direction from resource_management.libraries.functions.version import compare_versions, format_stack_version from resource_management.libraries.functions.format import format from resource_management.libraries.functions.check_process_status import check_process_status +from resource_management.libraries.functions import StackFeature +from resource_management.libraries.functions.stack_features import check_stack_feature from kafka import ensure_base_directories import upgrade @@ -34,7 +36,8 @@ from setup_ranger_kafka import setup_ranger_kafka class KafkaBroker(Script): def get_stack_to_component(self): - return {"HDP": "kafka-broker"} + import params + return {params.stack_name : "kafka-broker"} def install(self, env): self.install_packages(env) @@ -48,10 +51,10 @@ class KafkaBroker(Script): import params env.set_params(params) - if params.version and compare_versions(format_stack_version(params.version), '2.2.0.0') >= 0: + if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version): stack_select.select("kafka-broker", params.version) - if params.version and compare_versions(format_stack_version(params.version), '2.3.0.0') >= 0: + if params.version and check_stack_feature(StackFeature.CONFIG_VERSIONING, params.version): conf_select.select(params.stack_name, "kafka", params.version) # This is extremely important since it should only be called if crossing the HDP 2.3.4.0 boundary. @@ -65,6 +68,7 @@ class KafkaBroker(Script): src_version = format_stack_version(params.version) dst_version = format_stack_version(params.downgrade_from_version) + # TODO: How to handle the case of crossing stack version boundary in a stack agnostic way? if compare_versions(src_version, '2.3.4.0') < 0 and compare_versions(dst_version, '2.3.4.0') >= 0: # Calling the acl migration script requires the configs to be present. self.configure(env, upgrade_type=upgrade_type) http://git-wip-us.apache.org/repos/asf/ambari/blob/7cbf3f4b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py index 4ac9401..270a5ac 100644 --- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/params.py @@ -17,9 +17,12 @@ See the License for the specific language governing permissions and limitations under the License. """ +import os from resource_management.libraries.functions import format from resource_management.libraries.script.script import Script -from resource_management.libraries.functions.version import format_stack_version, compare_versions +from resource_management.libraries.functions.version import format_stack_version +from resource_management.libraries.functions import StackFeature +from resource_management.libraries.functions.stack_features import check_stack_feature from resource_management.libraries.functions.default import default from utils import get_bare_principal from resource_management.libraries.functions.get_stack_version import get_stack_version @@ -35,6 +38,7 @@ from resource_management.libraries.functions.get_not_managed_resources import ge # server configurations config = Script.get_config() tmp_dir = Script.get_tmp_dir() +stack_root = Script.get_stack_root() stack_name = default("/hostLevelParams/stack_name", None) retryAble = default("/commandParams/command_retry_enabled", False) @@ -57,7 +61,7 @@ downgrade_from_version = default("/commandParams/downgrade_from_version", None) hostname = config['hostname'] # default kafka parameters -kafka_home = '/usr/lib/kafka/' +kafka_home = '/usr/lib/kafka' kafka_bin = kafka_home+'/bin/kafka' conf_dir = "/etc/kafka/conf" limits_conf_dir = "/etc/security/limits.d" @@ -69,11 +73,10 @@ kafka_user_nofile_limit = config['configurations']['kafka-env']['kafka_user_nofi kafka_user_nproc_limit = config['configurations']['kafka-env']['kafka_user_nproc_limit'] # parameters for 2.2+ -if Script.is_stack_greater_or_equal("2.2"): - kafka_home = '/usr/hdp/current/kafka-broker/' - kafka_bin = kafka_home+'bin/kafka' - conf_dir = "/usr/hdp/current/kafka-broker/config" - +if stack_version_formatted and check_stack_feature(StackFeature.ROLLING_UPGRADE, stack_version_formatted): + kafka_home = os.path.join(stack_root, "current", "kafka-broker") + kafka_bin = os.path.join(kafka_home, "bin", "kafka") + conf_dir = os.path.join(kafka_home, "config") kafka_user = config['configurations']['kafka-env']['kafka_user'] kafka_log_dir = config['configurations']['kafka-env']['kafka_log_dir'] @@ -139,7 +142,8 @@ security_enabled = config['configurations']['cluster-env']['security_enabled'] kafka_kerberos_enabled = ('security.inter.broker.protocol' in config['configurations']['kafka-broker'] and config['configurations']['kafka-broker']['security.inter.broker.protocol'] == "PLAINTEXTSASL") -if security_enabled and stack_version_formatted != "" and 'kafka_principal_name' in config['configurations']['kafka-env'] and compare_versions(stack_version_formatted, '2.3') >= 0: +if security_enabled and stack_version_formatted != "" and 'kafka_principal_name' in config['configurations']['kafka-env'] \ + and check_stack_feature(StackFeature.KAFKA_KERBEROS, stack_version_formatted): _hostname_lowercase = config['hostname'].lower() _kafka_principal_name = config['configurations']['kafka-env']['kafka_principal_name'] kafka_jaas_principal = _kafka_principal_name.replace('_HOST',_hostname_lowercase) @@ -239,7 +243,7 @@ if has_ranger_admin and is_supported_kafka_ranger: downloaded_custom_connector = format("{tmp_dir}/{jdbc_jar_name}") driver_curl_source = format("{jdk_location}/{jdbc_symlink_name}") - driver_curl_target = format("{kafka_home}libs/{jdbc_jar_name}") + driver_curl_target = format("{kafka_home}/libs/{jdbc_jar_name}") ranger_audit_solr_urls = config['configurations']['ranger-admin-site']['ranger.audit.solr.urls'] xa_audit_db_is_enabled = config['configurations']['ranger-kafka-audit']['xasecure.audit.destination.db'] if xml_configurations_supported else None @@ -249,7 +253,7 @@ if has_ranger_admin and is_supported_kafka_ranger: credential_file = format('/etc/ranger/{repo_name}/cred.jceks') if xml_configurations_supported else None stack_version = get_stack_version('kafka-broker') - setup_ranger_env_sh_source = format('/usr/hdp/{stack_version}/ranger-kafka-plugin/install/conf.templates/enable/kafka-ranger-env.sh') + setup_ranger_env_sh_source = format('{stack_root}/{stack_version}/ranger-kafka-plugin/install/conf.templates/enable/kafka-ranger-env.sh') setup_ranger_env_sh_target = format("{conf_dir}/kafka-ranger-env.sh") #For SQLA explicitly disable audit to DB for Ranger http://git-wip-us.apache.org/repos/asf/ambari/blob/7cbf3f4b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/upgrade.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/upgrade.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/upgrade.py index 457a10f..b6e4046 100644 --- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/upgrade.py +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1.2.2/package/scripts/upgrade.py @@ -56,10 +56,10 @@ def run_migration(env, upgrade_type): kafka_acls_script = None command_suffix = "" if params.upgrade_direction == Direction.UPGRADE: - kafka_acls_script = format("/usr/hdp/{version}/kafka/bin/kafka-acls.sh") + kafka_acls_script = format("{stack_root}/{version}/kafka/bin/kafka-acls.sh") command_suffix = "--upgradeAcls" elif params.upgrade_direction == Direction.DOWNGRADE: - kafka_acls_script = format("/usr/hdp/{downgrade_from_version}/kafka/bin/kafka-acls.sh") + kafka_acls_script = format("{stack_root}/{downgrade_from_version}/kafka/bin/kafka-acls.sh") command_suffix = "--downgradeAcls" if kafka_acls_script is not None: http://git-wip-us.apache.org/repos/asf/ambari/blob/7cbf3f4b/ambari-server/src/main/resources/stacks/HDP/2.0.6/properties/stack_features.json ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/properties/stack_features.json b/ambari-server/src/main/resources/stacks/HDP/2.0.6/properties/stack_features.json index 5e5a23c..97bd19c 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/properties/stack_features.json +++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/properties/stack_features.json @@ -56,6 +56,22 @@ "name": "storm_ams", "description": "Storm AMS integration (AMBARI-10710)", "min_version": "2.2.0.0" + }, + { + "name": "create_kafka_broker_id", + "description": "Ambari should create Kafka Broker Id (AMBARI-12678)", + "min_version": "2.2.0.0", + "max_version": "2.3.0.0" + }, + { + "name": "kafka_listeners", + "description": "Kafka listeners (AMBARI-10984)", + "min_version": "2.3.0.0" + }, + { + "name": "kafka_kerberos", + "description": "Kafka Kerberos support (AMBARI-10984)", + "min_version": "2.3.0.0" } ] -} \ No newline at end of file +}