Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DCC17200AF7 for ; Tue, 14 Jun 2016 11:13:31 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D803A160A47; Tue, 14 Jun 2016 09:13:31 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 306321602C5 for ; Tue, 14 Jun 2016 11:13:30 +0200 (CEST) Received: (qmail 42821 invoked by uid 500); 14 Jun 2016 09:13:29 -0000 Mailing-List: contact commits-help@atlas.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@atlas.incubator.apache.org Delivered-To: mailing list commits@atlas.incubator.apache.org Received: (qmail 42812 invoked by uid 99); 14 Jun 2016 09:13:29 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Jun 2016 09:13:29 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id D0F15C028E for ; Tue, 14 Jun 2016 09:13:28 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -1.646 X-Spam-Level: X-Spam-Status: No, score=-1.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, KAM_TIME=3, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 8vYVszg5JrWU for ; Tue, 14 Jun 2016 09:13:22 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with SMTP id DB8605F253 for ; Tue, 14 Jun 2016 09:13:21 +0000 (UTC) Received: (qmail 42798 invoked by uid 99); 14 Jun 2016 09:13:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Jun 2016 09:13:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F1998DFC7E; Tue, 14 Jun 2016 09:13:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yhemanth@apache.org To: commits@atlas.incubator.apache.org Message-Id: <1a3e0a8622f94f49aae56bade0f2cef3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-atlas git commit: ATLAS-515 Ability to initialize Kafka topics with more than 1 replica (yhemanth) Date: Tue, 14 Jun 2016 09:13:20 +0000 (UTC) archived-at: Tue, 14 Jun 2016 09:13:32 -0000 Repository: incubator-atlas Updated Branches: refs/heads/master f47cea3f8 -> 0a44790e7 ATLAS-515 Ability to initialize Kafka topics with more than 1 replica (yhemanth) Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/0a44790e Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/0a44790e Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/0a44790e Branch: refs/heads/master Commit: 0a44790e7866e92b6591d442f140705a079fef30 Parents: f47cea3 Author: Hemanth Yamijala Authored: Tue Jun 14 14:42:58 2016 +0530 Committer: Hemanth Yamijala Committed: Tue Jun 14 14:42:58 2016 +0530 ---------------------------------------------------------------------- .../apache/atlas/utils/AuthenticationUtil.java | 19 +- distro/src/bin/atlas_client_cmdline.py | 10 + distro/src/bin/atlas_config.py | 14 + distro/src/bin/atlas_kafka_setup.py | 37 +++ distro/src/bin/atlas_kafka_setup_hook.py | 37 +++ distro/src/conf/atlas-application.properties | 8 +- .../src/main/assemblies/standalone-package.xml | 18 ++ docs/src/site/twiki/Configuration.twiki | 17 ++ docs/src/site/twiki/InstallationSteps.twiki | 10 + notification/pom.xml | 134 +++++++++ .../apache/atlas/hook/AtlasTopicCreator.java | 136 +++++++++ .../atlas/hook/AtlasTopicCreatorTest.java | 281 +++++++++++++++++++ pom.xml | 8 + release-log.txt | 1 + 14 files changed, 721 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/common/src/main/java/org/apache/atlas/utils/AuthenticationUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/atlas/utils/AuthenticationUtil.java b/common/src/main/java/org/apache/atlas/utils/AuthenticationUtil.java index 3dbab17..bf1175f 100644 --- a/common/src/main/java/org/apache/atlas/utils/AuthenticationUtil.java +++ b/common/src/main/java/org/apache/atlas/utils/AuthenticationUtil.java @@ -38,20 +38,23 @@ public final class AuthenticationUtil { public static boolean isKerberosAuthenticationEnabled() { boolean isKerberosAuthenticationEnabled = false; try { - Configuration atlasConf = ApplicationProperties.get(); - - if ("true".equalsIgnoreCase(atlasConf.getString("atlas.authentication.method.kerberos"))) { - isKerberosAuthenticationEnabled = true; - } else { - isKerberosAuthenticationEnabled = false; - } - + isKerberosAuthenticationEnabled = isKerberosAuthenticationEnabled(ApplicationProperties.get()); } catch (AtlasException e) { LOG.error("Error while isKerberosAuthenticationEnabled ", e); } return isKerberosAuthenticationEnabled; } + public static boolean isKerberosAuthenticationEnabled(Configuration atlasConf) { + boolean isKerberosAuthenticationEnabled; + if ("true".equalsIgnoreCase(atlasConf.getString("atlas.authentication.method.kerberos"))) { + isKerberosAuthenticationEnabled = true; + } else { + isKerberosAuthenticationEnabled = false; + } + return isKerberosAuthenticationEnabled; + } + public static String[] getBasicAuthenticationInput() { String username = null; String password = null; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/distro/src/bin/atlas_client_cmdline.py ---------------------------------------------------------------------- diff --git a/distro/src/bin/atlas_client_cmdline.py b/distro/src/bin/atlas_client_cmdline.py index f109ad3..f05a3c8 100644 --- a/distro/src/bin/atlas_client_cmdline.py +++ b/distro/src/bin/atlas_client_cmdline.py @@ -40,6 +40,16 @@ def get_atlas_classpath(confdir): atlas_classpath = mc.convertCygwinPath(atlas_classpath, True) return atlas_classpath +def get_atlas_hook_classpath(confdir): + atlas_home = mc.atlasDir() + kafka_topic_setup_dir = mc.kafkaTopicSetupDir(atlas_home) + p = os.pathsep + atlas_hook_classpath = confdir + p \ + + os.path.join(kafka_topic_setup_dir, "*") + if mc.isCygwin(): + atlas_hook_classpath = mc.convertCygwinPath(atlas_hook_classpath, True) + return atlas_hook_classpath + def setup_jvm_opts_list(confdir, log_name): atlas_home = mc.atlasDir() mc.executeEnvSh(confdir) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/distro/src/bin/atlas_config.py ---------------------------------------------------------------------- diff --git a/distro/src/bin/atlas_config.py b/distro/src/bin/atlas_config.py index fab4046..c211823 100755 --- a/distro/src/bin/atlas_config.py +++ b/distro/src/bin/atlas_config.py @@ -64,6 +64,7 @@ HBASE_STORAGE_LOCAL_CONF_ENTRY="atlas.graph.storage.hostname\s*=\s*localhost" SOLR_INDEX_CONF_ENTRY="atlas.graph.index.search.backend\s*=\s*solr5" SOLR_INDEX_LOCAL_CONF_ENTRY="atlas.graph.index.search.solr.zookeeper-url\s*=\s*localhost" SOLR_INDEX_ZK_URL="atlas.graph.index.search.solr.zookeeper-url" +TOPICS_TO_CREATE="atlas.notification.topics" DEBUG = False @@ -121,6 +122,9 @@ def webAppDir(dir): webapp = os.path.join(dir, WEBAPP) return os.environ.get(ATLAS_WEBAPP, webapp) +def kafkaTopicSetupDir(homeDir): + return os.path.join(homeDir, "hook", "kafka-topic-setup") + def expandWebApp(dir): webappDir = webAppDir(dir) webAppMetadataDir = os.path.join(webappDir, "atlas") @@ -429,6 +433,16 @@ def get_solr_zk_url(confdir): confdir = os.path.join(confdir, CONF_FILE) return getConfig(confdir, SOLR_INDEX_ZK_URL) +def get_topics_to_create(confdir): + confdir = os.path.join(confdir, CONF_FILE) + topic_list = getConfig(confdir, TOPICS_TO_CREATE) + if topic_list is not None: + topics = topic_list.split(",") + else: + topics = ["ATLAS_HOOK", "ATLAS_ENTITIES"] + return topics + + def run_solr(dir, action, zk_url = None, port = None, logdir = None, wait=True): solrScript = "solr" http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/distro/src/bin/atlas_kafka_setup.py ---------------------------------------------------------------------- diff --git a/distro/src/bin/atlas_kafka_setup.py b/distro/src/bin/atlas_kafka_setup.py new file mode 100644 index 0000000..146a7e5 --- /dev/null +++ b/distro/src/bin/atlas_kafka_setup.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import sys +import atlas_client_cmdline as cmdline +import atlas_config as mc + +def main(): + conf_dir = cmdline.setup_conf_dir() + jvm_opts_list = cmdline.setup_jvm_opts_list(conf_dir, 'atlas_kafka_setup.log') + atlas_classpath = cmdline.get_atlas_classpath(conf_dir) + topics_array = mc.get_topics_to_create(conf_dir) + process = mc.java("org.apache.atlas.hook.AtlasTopicCreator", topics_array, atlas_classpath, jvm_opts_list) + return process.wait() + +if __name__ == '__main__': + try: + returncode = main() + except Exception as e: + print "Exception in setting up Kafka topics for Atlas: %s" % str(e) + returncode = -1 + + sys.exit(returncode) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/distro/src/bin/atlas_kafka_setup_hook.py ---------------------------------------------------------------------- diff --git a/distro/src/bin/atlas_kafka_setup_hook.py b/distro/src/bin/atlas_kafka_setup_hook.py new file mode 100644 index 0000000..6fdff74 --- /dev/null +++ b/distro/src/bin/atlas_kafka_setup_hook.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import sys +import atlas_client_cmdline as cmdline +import atlas_config as mc + +def main(): + conf_dir = cmdline.setup_conf_dir() + jvm_opts_list = cmdline.setup_jvm_opts_list(conf_dir, 'atlas_kafka_setup_hook.log') + atlas_classpath = cmdline.get_atlas_hook_classpath(conf_dir) + topics_array = mc.get_topics_to_create(conf_dir) + process = mc.java("org.apache.atlas.hook.AtlasTopicCreator", topics_array, atlas_classpath, jvm_opts_list) + return process.wait() + +if __name__ == '__main__': + try: + returncode = main() + except Exception as e: + print "Exception in setting up Kafka topics for Atlas: %s" % str(e) + returncode = -1 + + sys.exit(returncode) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/distro/src/conf/atlas-application.properties ---------------------------------------------------------------------- diff --git a/distro/src/conf/atlas-application.properties b/distro/src/conf/atlas-application.properties index 5bd0f74..cc0e4c1 100755 --- a/distro/src/conf/atlas-application.properties +++ b/distro/src/conf/atlas-application.properties @@ -56,12 +56,18 @@ atlas.kafka.data=${sys:atlas.home}/data/kafka atlas.kafka.zookeeper.connect=localhost:9026 atlas.kafka.bootstrap.servers=localhost:9027 atlas.kafka.zookeeper.session.timeout.ms=400 +atlas.kafka.zookeeper.connection.timeout.ms=200 atlas.kafka.zookeeper.sync.time.ms=20 atlas.kafka.auto.commit.interval.ms=1000 atlas.kafka.auto.offset.reset=smallest atlas.kafka.hook.group.id=atlas atlas.kafka.auto.commit.enable=false - +atlas.notification.create.topics=true +atlas.notification.replicas=1 +atlas.notification.topics=ATLAS_HOOK,ATLAS_ENTITIES +# Enable for Kerberized Kafka clusters +#atlas.notification.kafka.service.principal=kafka/_HOST@EXAMPLE.COM +#atlas.notification.kafka.keytab.location=/etc/security/keytabs/kafka.service.keytab ######### Hive Lineage Configs ######### ## Schema http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/distro/src/main/assemblies/standalone-package.xml ---------------------------------------------------------------------- diff --git a/distro/src/main/assemblies/standalone-package.xml b/distro/src/main/assemblies/standalone-package.xml index 1c7b2c5..26c24ea 100755 --- a/distro/src/main/assemblies/standalone-package.xml +++ b/distro/src/main/assemblies/standalone-package.xml @@ -55,6 +55,18 @@ + target/bin + hook-bin + + atlas_client_cmdline.py + atlas_config.py + atlas_kafka_setup_hook.py + + 0755 + 0755 + + + target/hbase hbase 0755 @@ -156,6 +168,12 @@ ../addons/storm-bridge/target/models models + + + + ../notification/target/dependency/hook + hook + http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/docs/src/site/twiki/Configuration.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/Configuration.twiki b/docs/src/site/twiki/Configuration.twiki index 7150483..0e122fe 100644 --- a/docs/src/site/twiki/Configuration.twiki +++ b/docs/src/site/twiki/Configuration.twiki @@ -154,6 +154,23 @@ Note that Kafka group ids are specified for a specific topic. The Kafka group i atlas.kafka.entities.group.id= +These configuration parameters are useful for setting up Kafka topics via Atlas provided scripts, described in the +[[InstallationSteps][Installation Steps]] page. + + +# Whether to create the topics automatically, default is true. +# Comma separated list of topics to be created, default is "ATLAS_HOOK,ATLAS_ENTITES" +atlas.notification.topics=ATLAS_HOOK,ATLAS_ENTITIES +# Number of replicas for the Atlas topics, default is 1. Increase for higher resilience to Kafka failures. +atlas.notification.replicas=1 +# Enable the below two properties if Kafka is running in Kerberized mode. +# Set this to the service principal representing the Kafka service +atlas.notification.kafka.service.principal=kafka/_HOST@EXAMPLE.COM +# Set this to the location of the keytab file for Kafka +#atlas.notification.kafka.keytab.location=/etc/security/keytabs/kafka.service.keytab + + + ---++ Client Configs http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/docs/src/site/twiki/InstallationSteps.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/InstallationSteps.twiki b/docs/src/site/twiki/InstallationSteps.twiki index 518c380..1eac288 100644 --- a/docs/src/site/twiki/InstallationSteps.twiki +++ b/docs/src/site/twiki/InstallationSteps.twiki @@ -262,6 +262,16 @@ Pre-requisites for running Solr in cloud mode * !SolrCloud has support for replication and sharding. It is highly recommended to use !SolrCloud with at least two Solr nodes running on different servers with replication enabled. If using !SolrCloud, then you also need !ZooKeeper installed and configured with 3 or 5 !ZooKeeper nodes +*Configuring Kafka Topics* + +Atlas uses Kafka to ingest metadata from other components at runtime. This is described in the [[Architecture][Architecture page]] +in more detail. Depending on the configuration of Kafka, sometimes you might need to setup the topics explicitly before +using Atlas. To do so, Atlas provides a script =bin/atlas_kafka_setup.py= which can be run from the Atlas server. In some +environments, the hooks might start getting used first before Atlas server itself is setup. In such cases, the topics +can be run on the hosts where hooks are installed using a similar script =hook-bin/atlas_kafka_setup_hook.py=. Both these +use configuration in =atlas-application.properties= for setting up the topics. Please refer to the [[Configuration][Configuration page]] +for these details. + ---++++ Setting up Atlas There are a few steps that setup dependencies of Atlas. One such example is setting up the Titan schema http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/notification/pom.xml ---------------------------------------------------------------------- diff --git a/notification/pom.xml b/notification/pom.xml index b3738db..fc08115 100644 --- a/notification/pom.xml +++ b/notification/pom.xml @@ -90,5 +90,139 @@ org.mockito mockito-all + + + com.101tec + zkclient + ${zkclient.version} + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-hook-dependencies + package + + copy + + + ${project.build.directory}/dependency/hook/kafka-topic-setup + false + false + true + + + ${project.groupId} + ${project.artifactId} + ${project.version} + + + ${project.groupId} + atlas-common + ${project.version} + + + commons-logging + commons-logging + ${commons-logging.version} + + + commons-configuration + commons-configuration + ${commons-conf.version} + + + commons-collections + commons-collections + ${commons-collections.version} + + + commons-lang + commons-lang + ${commons-lang.version} + + + com.google.guava + guava + ${guava.version} + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + org.apache.hadoop + hadoop-auth + ${hadoop.version} + + + org.slf4j + slf4j-api + ${slf4j.version} + + + org.slf4j + slf4j-log4j12 + ${slf4j.version} + + + log4j + log4j + 1.2.17 + + + org.apache.kafka + kafka_${scala.binary.version} + ${kafka.version} + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + org.scala-lang + scala-compiler + ${scala.version} + + + org.scala-lang + scala-reflect + ${scala.version} + + + org.scala-lang + scala-library + ${scala.version} + + + org.scala-lang + scalap + ${scala.version} + + + com.101tec + zkclient + ${zkclient.version} + + + org.apache.zookeeper + zookeeper + 3.4.6 + + + + + + + + + http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/notification/src/main/java/org/apache/atlas/hook/AtlasTopicCreator.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasTopicCreator.java b/notification/src/main/java/org/apache/atlas/hook/AtlasTopicCreator.java new file mode 100644 index 0000000..7a1e07a --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/hook/AtlasTopicCreator.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.hook; + +import com.google.common.annotations.VisibleForTesting; +import kafka.admin.AdminUtils; +import kafka.utils.ZkUtils; +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasException; +import org.apache.atlas.utils.AuthenticationUtil; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.io.IOException; +import java.util.Properties; + +/** + * A class to create Kafka topics used by Atlas components. + * + * Use this class to create a Kafka topic with specific configuration like number of partitions, replicas, etc. + */ +public class AtlasTopicCreator { + + private static final Logger LOG = LoggerFactory.getLogger(AtlasTopicCreator.class); + + public static final String ATLAS_NOTIFICATION_CREATE_TOPICS_KEY = "atlas.notification.create.topics"; + + /** + * Create an Atlas topic. + * + * The topic will get created based on following conditions: + * {@link #ATLAS_NOTIFICATION_CREATE_TOPICS_KEY} is set to true. + * The topic does not already exist. + * Note that despite this, there could be multiple topic creation calls that happen in parallel because hooks + * run in a distributed fashion. Exceptions are caught and logged by this method to prevent the startup of + * the hooks from failing. + * @param atlasProperties {@link Configuration} containing properties to be used for creating topics. + * @param topicNames list of topics to create + */ + public void createAtlasTopic(Configuration atlasProperties, String... topicNames) { + if (atlasProperties.getBoolean(ATLAS_NOTIFICATION_CREATE_TOPICS_KEY, true)) { + if (!handleSecurity(atlasProperties)) { + return; + } + ZkUtils zkUtils = createZkUtils(atlasProperties); + for (String topicName : topicNames) { + try { + LOG.warn("Attempting to create topic {}", topicName); + if (!ifTopicExists(topicName, zkUtils)) { + createTopic(atlasProperties, topicName, zkUtils); + } else { + LOG.warn("Ignoring call to create topic {}, as it already exists.", topicName); + } + } catch (Throwable t) { + LOG.error("Failed while creating topic {}", topicName, t); + } + } + zkUtils.close(); + } else { + LOG.info("Not creating topics {} as {} is false", StringUtils.join(topicNames, ","), + ATLAS_NOTIFICATION_CREATE_TOPICS_KEY); + } + } + + @VisibleForTesting + protected boolean handleSecurity(Configuration atlasProperties) { + if (AuthenticationUtil.isKerberosAuthenticationEnabled(atlasProperties)) { + String kafkaPrincipal = atlasProperties.getString("atlas.notification.kafka.service.principal"); + String kafkaKeyTab = atlasProperties.getString("atlas.notification.kafka.keytab.location"); + org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration(); + SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS, hadoopConf); + try { + String serverPrincipal = SecurityUtil.getServerPrincipal(kafkaPrincipal, (String) null); + UserGroupInformation.setConfiguration(hadoopConf); + UserGroupInformation.loginUserFromKeytab(serverPrincipal, kafkaKeyTab); + } catch (IOException e) { + LOG.warn("Could not login as {} from keytab file {}", kafkaPrincipal, kafkaKeyTab, e); + return false; + } + } + return true; + } + + @VisibleForTesting + protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) { + return AdminUtils.topicExists(zkUtils, topicName); + } + + @VisibleForTesting + protected void createTopic(Configuration atlasProperties, String topicName, ZkUtils zkUtils) { + int numPartitions = atlasProperties.getInt("atlas.notification.hook.numthreads", 1); + int numReplicas = atlasProperties.getInt("atlas.notification.replicas", 1); + AdminUtils.createTopic(zkUtils, topicName, numPartitions, numReplicas, + new Properties()); + LOG.warn("Created topic {} with partitions {} and replicas {}", topicName, numPartitions, numReplicas); + } + + @VisibleForTesting + protected ZkUtils createZkUtils(Configuration atlasProperties) { + String zkConnect = atlasProperties.getString("atlas.kafka.zookeeper.connect"); + int sessionTimeout = atlasProperties.getInt("atlas.kafka.zookeeper.session.timeout.ms", 400); + int connectionTimeout = atlasProperties.getInt("atlas.kafka.zookeeper.connection.timeout.ms", 200); + Tuple2 zkClientAndConnection = ZkUtils.createZkClientAndConnection( + zkConnect, sessionTimeout, connectionTimeout); + return new ZkUtils(zkClientAndConnection._1(), zkClientAndConnection._2(), false); + } + + public static void main(String[] args) throws AtlasException { + Configuration configuration = ApplicationProperties.get(); + AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator(); + atlasTopicCreator.createAtlasTopic(configuration, args); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java b/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java new file mode 100644 index 0000000..8585898 --- /dev/null +++ b/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java @@ -0,0 +1,281 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.hook; + +import kafka.utils.ZkUtils; +import org.apache.commons.configuration.Configuration; +import org.testng.annotations.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +public class AtlasTopicCreatorTest { + + @Test + public void shouldNotCreateAtlasTopicIfNotConfiguredToDoSo() { + + Configuration configuration = mock(Configuration.class); + when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY, true)). + thenReturn(false); + when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false"); + final boolean[] topicExistsCalled = new boolean[] {false}; + AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() { + @Override + protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) { + topicExistsCalled[0] = true; + return false; + } + }; + atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK"); + assertFalse(topicExistsCalled[0]); + } + + @Test + public void shouldNotCreateTopicIfItAlreadyExists() { + Configuration configuration = mock(Configuration.class); + when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY, true)). + thenReturn(true); + when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false"); + final ZkUtils zookeeperUtils = mock(ZkUtils.class); + final boolean[] topicExistsCalled = new boolean[]{false}; + final boolean[] createTopicCalled = new boolean[]{false}; + + AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() { + @Override + protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) { + topicExistsCalled[0] = true; + return true; + } + + @Override + protected ZkUtils createZkUtils(Configuration atlasProperties) { + return zookeeperUtils; + } + + @Override + protected void createTopic(Configuration atlasProperties, String topicName, ZkUtils zkUtils) { + createTopicCalled[0] = true; + } + }; + atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK"); + assertTrue(topicExistsCalled[0]); + assertFalse(createTopicCalled[0]); + } + + @Test + public void shouldCreateTopicIfItDoesNotExist() { + Configuration configuration = mock(Configuration.class); + when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY, true)). + thenReturn(true); + when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false"); + final ZkUtils zookeeperUtils = mock(ZkUtils.class); + + final boolean[] createdTopic = new boolean[]{false}; + + AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() { + @Override + protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) { + return false; + } + + @Override + protected ZkUtils createZkUtils(Configuration atlasProperties) { + return zookeeperUtils; + } + + @Override + protected void createTopic(Configuration atlasProperties, String topicName, ZkUtils zkUtils) { + createdTopic[0] = true; + } + }; + atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK"); + assertTrue(createdTopic[0]); + } + + @Test + public void shouldNotFailIfExceptionOccursDuringCreatingTopic() { + Configuration configuration = mock(Configuration.class); + when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY, true)). + thenReturn(true); + when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false"); + final ZkUtils zookeeperUtils = mock(ZkUtils.class); + final boolean[] createTopicCalled = new boolean[]{false}; + + AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() { + @Override + protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) { + return false; + } + + @Override + protected ZkUtils createZkUtils(Configuration atlasProperties) { + return zookeeperUtils; + } + + @Override + protected void createTopic(Configuration atlasProperties, String topicName, ZkUtils zkUtils) { + createTopicCalled[0] = true; + throw new RuntimeException("Simulating failure during creating topic"); + } + }; + atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK"); + assertTrue(createTopicCalled[0]); + } + + @Test + public void shouldCreateMultipleTopics() { + Configuration configuration = mock(Configuration.class); + when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY, true)). + thenReturn(true); + when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false"); + final ZkUtils zookeeperUtils = mock(ZkUtils.class); + + final Map createdTopics = new HashMap<>(); + createdTopics.put("ATLAS_HOOK", false); + createdTopics.put("ATLAS_ENTITIES", false); + + AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() { + + @Override + protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) { + return false; + } + + @Override + protected ZkUtils createZkUtils(Configuration atlasProperties) { + return zookeeperUtils; + } + + @Override + protected void createTopic(Configuration atlasProperties, String topicName, ZkUtils zkUtils) { + createdTopics.put(topicName, true); + } + }; + atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK", "ATLAS_ENTITIES"); + assertTrue(createdTopics.get("ATLAS_HOOK")); + assertTrue(createdTopics.get("ATLAS_ENTITIES")); + } + + @Test + public void shouldCreateTopicEvenIfEarlierOneFails() { + Configuration configuration = mock(Configuration.class); + when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY, true)). + thenReturn(true); + when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false"); + final ZkUtils zookeeperUtils = mock(ZkUtils.class); + + final Map createdTopics = new HashMap<>(); + createdTopics.put("ATLAS_ENTITIES", false); + + AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() { + + @Override + protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) { + return false; + } + + @Override + protected ZkUtils createZkUtils(Configuration atlasProperties) { + return zookeeperUtils; + } + + @Override + protected void createTopic(Configuration atlasProperties, String topicName, ZkUtils zkUtils) { + if (topicName.equals("ATLAS_HOOK")) { + throw new RuntimeException("Simulating failure when creating ATLAS_HOOK topic"); + } else { + createdTopics.put(topicName, true); + } + } + }; + atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK", "ATLAS_ENTITIES"); + assertTrue(createdTopics.get("ATLAS_ENTITIES")); + } + + @Test + public void shouldCloseResources() { + Configuration configuration = mock(Configuration.class); + when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY, true)). + thenReturn(true); + when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false"); + final ZkUtils zookeeperUtils = mock(ZkUtils.class); + + AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() { + @Override + protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) { + return false; + } + + @Override + protected ZkUtils createZkUtils(Configuration atlasProperties) { + return zookeeperUtils; + } + + @Override + protected void createTopic(Configuration atlasProperties, String topicName, ZkUtils zkUtils) { + } + }; + atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK", "ATLAS_ENTITIES"); + + verify(zookeeperUtils, times(1)).close(); + } + + @Test + public void shouldNotProcessTopicCreationIfSecurityFails() { + Configuration configuration = mock(Configuration.class); + when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY, true)). + thenReturn(true); + final ZkUtils zookeeperUtils = mock(ZkUtils.class); + final Map createdTopics = new HashMap<>(); + createdTopics.put("ATLAS_HOOK", false); + createdTopics.put("ATLAS_ENTITIES", false); + + AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() { + @Override + protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) { + return false; + } + + @Override + protected ZkUtils createZkUtils(Configuration atlasProperties) { + return zookeeperUtils; + } + + @Override + protected void createTopic(Configuration atlasProperties, String topicName, ZkUtils zkUtils) { + createdTopics.put(topicName, true); + } + + @Override + protected boolean handleSecurity(Configuration atlasProperties) { + return false; + } + }; + atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK", "ATLAS_ENTITIES"); + assertFalse(createdTopics.get("ATLAS_HOOK")); + assertFalse(createdTopics.get("ATLAS_ENTITIES")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index e13345e..6f75740 100755 --- a/pom.xml +++ b/pom.xml @@ -392,9 +392,11 @@ 1.10 3.2.2 1.1.3 + 2.6 1 1.3.7 2.3 + 0.8 64m 512m @@ -1266,6 +1268,12 @@ 3.4 + + commons-lang + commons-lang + ${commons-lang.version} + + org.apache.kafka http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 091e077..6000fc1 100644 --- a/release-log.txt +++ b/release-log.txt @@ -22,6 +22,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags) ALL CHANGES: +ATLAS-515 Ability to initialize Kafka topics with more than 1 replica (yhemanth) ATLAS-891 UI changes to implement Update term (Kalyanikashikar via yhemanth) ATLAS-794 Business Catalog Update (jspeidel via yhemanth) ATLAS-837 Enhance Sqoop addon to handle export operation (venkatnrangan via shwethags)