atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject incubator-atlas git commit: ATLAS-515 Ability to initialize Kafka topics with more than 1 replica (yhemanth)
Date Tue, 14 Jun 2016 09:13:20 GMT
Repository: incubator-atlas
Updated Branches:
  refs/heads/master f47cea3f8 -> 0a44790e7


ATLAS-515 Ability to initialize Kafka topics with more than 1 replica (yhemanth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/0a44790e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/0a44790e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/0a44790e

Branch: refs/heads/master
Commit: 0a44790e7866e92b6591d442f140705a079fef30
Parents: f47cea3
Author: Hemanth Yamijala <hyamijala@hortonworks.com>
Authored: Tue Jun 14 14:42:58 2016 +0530
Committer: Hemanth Yamijala <hyamijala@hortonworks.com>
Committed: Tue Jun 14 14:42:58 2016 +0530

----------------------------------------------------------------------
 .../apache/atlas/utils/AuthenticationUtil.java  |  19 +-
 distro/src/bin/atlas_client_cmdline.py          |  10 +
 distro/src/bin/atlas_config.py                  |  14 +
 distro/src/bin/atlas_kafka_setup.py             |  37 +++
 distro/src/bin/atlas_kafka_setup_hook.py        |  37 +++
 distro/src/conf/atlas-application.properties    |   8 +-
 .../src/main/assemblies/standalone-package.xml  |  18 ++
 docs/src/site/twiki/Configuration.twiki         |  17 ++
 docs/src/site/twiki/InstallationSteps.twiki     |  10 +
 notification/pom.xml                            | 134 +++++++++
 .../apache/atlas/hook/AtlasTopicCreator.java    | 136 +++++++++
 .../atlas/hook/AtlasTopicCreatorTest.java       | 281 +++++++++++++++++++
 pom.xml                                         |   8 +
 release-log.txt                                 |   1 +
 14 files changed, 721 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/common/src/main/java/org/apache/atlas/utils/AuthenticationUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/atlas/utils/AuthenticationUtil.java b/common/src/main/java/org/apache/atlas/utils/AuthenticationUtil.java
index 3dbab17..bf1175f 100644
--- a/common/src/main/java/org/apache/atlas/utils/AuthenticationUtil.java
+++ b/common/src/main/java/org/apache/atlas/utils/AuthenticationUtil.java
@@ -38,20 +38,23 @@ public final class AuthenticationUtil {
     public static boolean isKerberosAuthenticationEnabled() {
         boolean isKerberosAuthenticationEnabled = false;
         try {
-            Configuration atlasConf = ApplicationProperties.get();
-
-            if ("true".equalsIgnoreCase(atlasConf.getString("atlas.authentication.method.kerberos")))
{
-                isKerberosAuthenticationEnabled = true;
-            } else {
-                isKerberosAuthenticationEnabled = false;
-            }
-
+            isKerberosAuthenticationEnabled = isKerberosAuthenticationEnabled(ApplicationProperties.get());
         } catch (AtlasException e) {
             LOG.error("Error while isKerberosAuthenticationEnabled ", e);
         }
         return isKerberosAuthenticationEnabled;
     }
 
+    public static boolean isKerberosAuthenticationEnabled(Configuration atlasConf) {
+        boolean isKerberosAuthenticationEnabled;
+        if ("true".equalsIgnoreCase(atlasConf.getString("atlas.authentication.method.kerberos")))
{
+            isKerberosAuthenticationEnabled = true;
+        } else {
+            isKerberosAuthenticationEnabled = false;
+        }
+        return isKerberosAuthenticationEnabled;
+    }
+
     public static String[] getBasicAuthenticationInput() {
         String username = null;
         String password = null;

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/distro/src/bin/atlas_client_cmdline.py
----------------------------------------------------------------------
diff --git a/distro/src/bin/atlas_client_cmdline.py b/distro/src/bin/atlas_client_cmdline.py
index f109ad3..f05a3c8 100644
--- a/distro/src/bin/atlas_client_cmdline.py
+++ b/distro/src/bin/atlas_client_cmdline.py
@@ -40,6 +40,16 @@ def get_atlas_classpath(confdir):
         atlas_classpath = mc.convertCygwinPath(atlas_classpath, True)
     return atlas_classpath
 
+def get_atlas_hook_classpath(confdir):
+    atlas_home = mc.atlasDir()
+    kafka_topic_setup_dir = mc.kafkaTopicSetupDir(atlas_home)
+    p = os.pathsep
+    atlas_hook_classpath = confdir + p \
+                            + os.path.join(kafka_topic_setup_dir, "*")
+    if mc.isCygwin():
+        atlas_hook_classpath = mc.convertCygwinPath(atlas_hook_classpath, True)
+    return atlas_hook_classpath
+
 def setup_jvm_opts_list(confdir, log_name):
     atlas_home = mc.atlasDir()
     mc.executeEnvSh(confdir)

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/distro/src/bin/atlas_config.py
----------------------------------------------------------------------
diff --git a/distro/src/bin/atlas_config.py b/distro/src/bin/atlas_config.py
index fab4046..c211823 100755
--- a/distro/src/bin/atlas_config.py
+++ b/distro/src/bin/atlas_config.py
@@ -64,6 +64,7 @@ HBASE_STORAGE_LOCAL_CONF_ENTRY="atlas.graph.storage.hostname\s*=\s*localhost"
 SOLR_INDEX_CONF_ENTRY="atlas.graph.index.search.backend\s*=\s*solr5"
 SOLR_INDEX_LOCAL_CONF_ENTRY="atlas.graph.index.search.solr.zookeeper-url\s*=\s*localhost"
 SOLR_INDEX_ZK_URL="atlas.graph.index.search.solr.zookeeper-url"
+TOPICS_TO_CREATE="atlas.notification.topics"
 
 DEBUG = False
 
@@ -121,6 +122,9 @@ def webAppDir(dir):
     webapp = os.path.join(dir, WEBAPP)
     return os.environ.get(ATLAS_WEBAPP, webapp)
 
+def kafkaTopicSetupDir(homeDir):
+    return os.path.join(homeDir, "hook", "kafka-topic-setup")
+
 def expandWebApp(dir):
     webappDir = webAppDir(dir)
     webAppMetadataDir = os.path.join(webappDir, "atlas")
@@ -429,6 +433,16 @@ def get_solr_zk_url(confdir):
     confdir = os.path.join(confdir, CONF_FILE)
     return getConfig(confdir, SOLR_INDEX_ZK_URL)
 
+def get_topics_to_create(confdir):
+    confdir = os.path.join(confdir, CONF_FILE)
+    topic_list = getConfig(confdir, TOPICS_TO_CREATE)
+    if topic_list is not None:
+        topics = topic_list.split(",")
+    else:
+        topics = ["ATLAS_HOOK", "ATLAS_ENTITIES"]
+    return topics
+
+
 def run_solr(dir, action, zk_url = None, port = None, logdir = None, wait=True):
 
     solrScript = "solr"

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/distro/src/bin/atlas_kafka_setup.py
----------------------------------------------------------------------
diff --git a/distro/src/bin/atlas_kafka_setup.py b/distro/src/bin/atlas_kafka_setup.py
new file mode 100644
index 0000000..146a7e5
--- /dev/null
+++ b/distro/src/bin/atlas_kafka_setup.py
@@ -0,0 +1,37 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import sys
+import atlas_client_cmdline as cmdline
+import atlas_config as mc
+
+def main():
+    conf_dir = cmdline.setup_conf_dir()
+    jvm_opts_list = cmdline.setup_jvm_opts_list(conf_dir, 'atlas_kafka_setup.log')
+    atlas_classpath = cmdline.get_atlas_classpath(conf_dir)
+    topics_array = mc.get_topics_to_create(conf_dir)
+    process = mc.java("org.apache.atlas.hook.AtlasTopicCreator", topics_array, atlas_classpath,
jvm_opts_list)
+    return process.wait()
+
+if __name__ == '__main__':
+    try:
+        returncode = main()
+    except Exception as e:
+        print "Exception in setting up Kafka topics for Atlas: %s" % str(e)
+        returncode = -1
+
+    sys.exit(returncode)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/distro/src/bin/atlas_kafka_setup_hook.py
----------------------------------------------------------------------
diff --git a/distro/src/bin/atlas_kafka_setup_hook.py b/distro/src/bin/atlas_kafka_setup_hook.py
new file mode 100644
index 0000000..6fdff74
--- /dev/null
+++ b/distro/src/bin/atlas_kafka_setup_hook.py
@@ -0,0 +1,37 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import sys
+import atlas_client_cmdline as cmdline
+import atlas_config as mc
+
+def main():
+    conf_dir = cmdline.setup_conf_dir()
+    jvm_opts_list = cmdline.setup_jvm_opts_list(conf_dir, 'atlas_kafka_setup_hook.log')
+    atlas_classpath = cmdline.get_atlas_hook_classpath(conf_dir)
+    topics_array = mc.get_topics_to_create(conf_dir)
+    process = mc.java("org.apache.atlas.hook.AtlasTopicCreator", topics_array, atlas_classpath,
jvm_opts_list)
+    return process.wait()
+
+if __name__ == '__main__':
+    try:
+        returncode = main()
+    except Exception as e:
+        print "Exception in setting up Kafka topics for Atlas: %s" % str(e)
+        returncode = -1
+
+    sys.exit(returncode)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/distro/src/conf/atlas-application.properties
----------------------------------------------------------------------
diff --git a/distro/src/conf/atlas-application.properties b/distro/src/conf/atlas-application.properties
index 5bd0f74..cc0e4c1 100755
--- a/distro/src/conf/atlas-application.properties
+++ b/distro/src/conf/atlas-application.properties
@@ -56,12 +56,18 @@ atlas.kafka.data=${sys:atlas.home}/data/kafka
 atlas.kafka.zookeeper.connect=localhost:9026
 atlas.kafka.bootstrap.servers=localhost:9027
 atlas.kafka.zookeeper.session.timeout.ms=400
+atlas.kafka.zookeeper.connection.timeout.ms=200
 atlas.kafka.zookeeper.sync.time.ms=20
 atlas.kafka.auto.commit.interval.ms=1000
 atlas.kafka.auto.offset.reset=smallest
 atlas.kafka.hook.group.id=atlas
 atlas.kafka.auto.commit.enable=false
-
+atlas.notification.create.topics=true
+atlas.notification.replicas=1
+atlas.notification.topics=ATLAS_HOOK,ATLAS_ENTITIES
+# Enable for Kerberized Kafka clusters
+#atlas.notification.kafka.service.principal=kafka/_HOST@EXAMPLE.COM
+#atlas.notification.kafka.keytab.location=/etc/security/keytabs/kafka.service.keytab
 
 #########  Hive Lineage Configs  #########
 ## Schema

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/distro/src/main/assemblies/standalone-package.xml
----------------------------------------------------------------------
diff --git a/distro/src/main/assemblies/standalone-package.xml b/distro/src/main/assemblies/standalone-package.xml
index 1c7b2c5..26c24ea 100755
--- a/distro/src/main/assemblies/standalone-package.xml
+++ b/distro/src/main/assemblies/standalone-package.xml
@@ -55,6 +55,18 @@
         </fileSet>
 
         <fileSet>
+            <directory>target/bin</directory>
+            <outputDirectory>hook-bin</outputDirectory>
+            <includes>
+                <include>atlas_client_cmdline.py</include>
+                <include>atlas_config.py</include>
+                <include>atlas_kafka_setup_hook.py</include>
+            </includes>
+            <fileMode>0755</fileMode>
+            <directoryMode>0755</directoryMode>
+        </fileSet>
+
+        <fileSet>
             <directory>target/hbase</directory>
             <outputDirectory>hbase</outputDirectory>
             <fileMode>0755</fileMode>
@@ -156,6 +168,12 @@
             <directory>../addons/storm-bridge/target/models</directory>
             <outputDirectory>models</outputDirectory>
         </fileSet>
+
+        <!-- for kafka topic setup -->
+        <fileSet>
+            <directory>../notification/target/dependency/hook</directory>
+            <outputDirectory>hook</outputDirectory>
+        </fileSet>
     </fileSets>
 
     <files>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/docs/src/site/twiki/Configuration.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Configuration.twiki b/docs/src/site/twiki/Configuration.twiki
index 7150483..0e122fe 100644
--- a/docs/src/site/twiki/Configuration.twiki
+++ b/docs/src/site/twiki/Configuration.twiki
@@ -154,6 +154,23 @@ Note that Kafka group ids are specified for a specific topic.  The Kafka
group i
 atlas.kafka.entities.group.id=<consumer id>
 </verbatim>
 
+These configuration parameters are useful for setting up Kafka topics via Atlas provided
scripts, described in the
+[[InstallationSteps][Installation Steps]] page.
+
+<verbatim>
+# Whether to create the topics automatically, default is true.
+# Comma separated list of topics to be created, default is "ATLAS_HOOK,ATLAS_ENTITES"
+atlas.notification.topics=ATLAS_HOOK,ATLAS_ENTITIES
+# Number of replicas for the Atlas topics, default is 1. Increase for higher resilience to
Kafka failures.
+atlas.notification.replicas=1
+# Enable the below two properties if Kafka is running in Kerberized mode.
+# Set this to the service principal representing the Kafka service
+atlas.notification.kafka.service.principal=kafka/_HOST@EXAMPLE.COM
+# Set this to the location of the keytab file for Kafka
+#atlas.notification.kafka.keytab.location=/etc/security/keytabs/kafka.service.keytab
+
+</verbatim>
+
 
 ---++ Client Configs
 <verbatim>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/docs/src/site/twiki/InstallationSteps.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/InstallationSteps.twiki b/docs/src/site/twiki/InstallationSteps.twiki
index 518c380..1eac288 100644
--- a/docs/src/site/twiki/InstallationSteps.twiki
+++ b/docs/src/site/twiki/InstallationSteps.twiki
@@ -262,6 +262,16 @@ Pre-requisites for running Solr in cloud mode
   * !SolrCloud has support for replication and sharding. It is highly recommended to use
!SolrCloud with at least two Solr nodes running on different servers with replication enabled.
     If using !SolrCloud, then you also need !ZooKeeper installed and configured with 3 or
5 !ZooKeeper nodes
 
+*Configuring Kafka Topics*
+
+Atlas uses Kafka to ingest metadata from other components at runtime. This is described in
the [[Architecture][Architecture page]]
+in more detail. Depending on the configuration of Kafka, sometimes you might need to setup
the topics explicitly before
+using Atlas. To do so, Atlas provides a script =bin/atlas_kafka_setup.py= which can be run
from the Atlas server. In some
+environments, the hooks might start getting used first before Atlas server itself is setup.
In such cases, the topics
+can be run on the hosts where hooks are installed using a similar script =hook-bin/atlas_kafka_setup_hook.py=.
Both these
+use configuration in =atlas-application.properties= for setting up the topics. Please refer
to the [[Configuration][Configuration page]]
+for these details.
+
 ---++++ Setting up Atlas
 
 There are a few steps that setup dependencies of Atlas. One such example is setting up the
Titan schema

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/notification/pom.xml
----------------------------------------------------------------------
diff --git a/notification/pom.xml b/notification/pom.xml
index b3738db..fc08115 100644
--- a/notification/pom.xml
+++ b/notification/pom.xml
@@ -90,5 +90,139 @@
             <groupId>org.mockito</groupId>
             <artifactId>mockito-all</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>com.101tec</groupId>
+            <artifactId>zkclient</artifactId>
+            <version>${zkclient.version}</version>
+        </dependency>
+
     </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>copy-hook-dependencies</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>copy</goal>
+                        </goals>
+                        <configuration>
+                            <outputDirectory>${project.build.directory}/dependency/hook/kafka-topic-setup</outputDirectory>
+                            <overWriteReleases>false</overWriteReleases>
+                            <overWriteSnapshots>false</overWriteSnapshots>
+                            <overWriteIfNewer>true</overWriteIfNewer>
+                            <artifactItems>
+                                <artifactItem>
+                                    <groupId>${project.groupId}</groupId>
+                                    <artifactId>${project.artifactId}</artifactId>
+                                    <version>${project.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>${project.groupId}</groupId>
+                                    <artifactId>atlas-common</artifactId>
+                                    <version>${project.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>commons-logging</groupId>
+                                    <artifactId>commons-logging</artifactId>
+                                    <version>${commons-logging.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>commons-configuration</groupId>
+                                    <artifactId>commons-configuration</artifactId>
+                                    <version>${commons-conf.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>commons-collections</groupId>
+                                    <artifactId>commons-collections</artifactId>
+                                    <version>${commons-collections.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>commons-lang</groupId>
+                                    <artifactId>commons-lang</artifactId>
+                                    <version>${commons-lang.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>com.google.guava</groupId>
+                                    <artifactId>guava</artifactId>
+                                    <version>${guava.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>org.apache.hadoop</groupId>
+                                    <artifactId>hadoop-common</artifactId>
+                                    <version>${hadoop.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>org.apache.hadoop</groupId>
+                                    <artifactId>hadoop-auth</artifactId>
+                                    <version>${hadoop.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>org.slf4j</groupId>
+                                    <artifactId>slf4j-api</artifactId>
+                                    <version>${slf4j.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>org.slf4j</groupId>
+                                    <artifactId>slf4j-log4j12</artifactId>
+                                    <version>${slf4j.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>log4j</groupId>
+                                    <artifactId>log4j</artifactId>
+                                    <version>1.2.17</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>org.apache.kafka</groupId>
+                                    <artifactId>kafka_${scala.binary.version}</artifactId>
+                                    <version>${kafka.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>org.apache.kafka</groupId>
+                                    <artifactId>kafka-clients</artifactId>
+                                    <version>${kafka.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>org.scala-lang</groupId>
+                                    <artifactId>scala-compiler</artifactId>
+                                    <version>${scala.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>org.scala-lang</groupId>
+                                    <artifactId>scala-reflect</artifactId>
+                                    <version>${scala.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>org.scala-lang</groupId>
+                                    <artifactId>scala-library</artifactId>
+                                    <version>${scala.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>org.scala-lang</groupId>
+                                    <artifactId>scalap</artifactId>
+                                    <version>${scala.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>com.101tec</groupId>
+                                    <artifactId>zkclient</artifactId>
+                                    <version>${zkclient.version}</version>
+                                </artifactItem>
+                                <artifactItem>
+                                    <groupId>org.apache.zookeeper</groupId>
+                                    <artifactId>zookeeper</artifactId>
+                                    <version>3.4.6</version>
+                                </artifactItem>
+                            </artifactItems>
+                        </configuration>
+                    </execution>
+
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/notification/src/main/java/org/apache/atlas/hook/AtlasTopicCreator.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasTopicCreator.java b/notification/src/main/java/org/apache/atlas/hook/AtlasTopicCreator.java
new file mode 100644
index 0000000..7a1e07a
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/hook/AtlasTopicCreator.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.hook;
+
+import com.google.common.annotations.VisibleForTesting;
+import kafka.admin.AdminUtils;
+import kafka.utils.ZkUtils;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.utils.AuthenticationUtil;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * A class to create Kafka topics used by Atlas components.
+ *
+ * Use this class to create a Kafka topic with specific configuration like number of partitions,
replicas, etc.
+ */
+public class AtlasTopicCreator {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AtlasTopicCreator.class);
+
+    public static final String ATLAS_NOTIFICATION_CREATE_TOPICS_KEY = "atlas.notification.create.topics";
+
+    /**
+     * Create an Atlas topic.
+     *
+     * The topic will get created based on following conditions:
+     * {@link #ATLAS_NOTIFICATION_CREATE_TOPICS_KEY} is set to true.
+     * The topic does not already exist.
+     * Note that despite this, there could be multiple topic creation calls that happen in
parallel because hooks
+     * run in a distributed fashion. Exceptions are caught and logged by this method to prevent
the startup of
+     * the hooks from failing.
+     * @param atlasProperties {@link Configuration} containing properties to be used for
creating topics.
+     * @param topicNames list of topics to create
+     */
+    public void createAtlasTopic(Configuration atlasProperties, String... topicNames) {
+        if (atlasProperties.getBoolean(ATLAS_NOTIFICATION_CREATE_TOPICS_KEY, true)) {
+            if (!handleSecurity(atlasProperties)) {
+                return;
+            }
+            ZkUtils zkUtils = createZkUtils(atlasProperties);
+            for (String topicName : topicNames) {
+                try {
+                    LOG.warn("Attempting to create topic {}", topicName);
+                    if (!ifTopicExists(topicName, zkUtils)) {
+                        createTopic(atlasProperties, topicName, zkUtils);
+                    } else {
+                        LOG.warn("Ignoring call to create topic {}, as it already exists.",
topicName);
+                    }
+                } catch (Throwable t) {
+                    LOG.error("Failed while creating topic {}", topicName, t);
+                }
+            }
+            zkUtils.close();
+        } else {
+            LOG.info("Not creating topics {} as {} is false", StringUtils.join(topicNames,
","),
+                    ATLAS_NOTIFICATION_CREATE_TOPICS_KEY);
+        }
+    }
+
+    @VisibleForTesting
+    protected boolean handleSecurity(Configuration atlasProperties) {
+        if (AuthenticationUtil.isKerberosAuthenticationEnabled(atlasProperties)) {
+            String kafkaPrincipal = atlasProperties.getString("atlas.notification.kafka.service.principal");
+            String kafkaKeyTab = atlasProperties.getString("atlas.notification.kafka.keytab.location");
+            org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
+            SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS,
hadoopConf);
+            try {
+                String serverPrincipal = SecurityUtil.getServerPrincipal(kafkaPrincipal,
(String) null);
+                UserGroupInformation.setConfiguration(hadoopConf);
+                UserGroupInformation.loginUserFromKeytab(serverPrincipal, kafkaKeyTab);
+            } catch (IOException e) {
+                LOG.warn("Could not login as {} from keytab file {}", kafkaPrincipal, kafkaKeyTab,
e);
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @VisibleForTesting
+    protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) {
+        return AdminUtils.topicExists(zkUtils, topicName);
+    }
+
+    @VisibleForTesting
+    protected void createTopic(Configuration atlasProperties, String topicName, ZkUtils zkUtils)
{
+        int numPartitions = atlasProperties.getInt("atlas.notification.hook.numthreads",
1);
+        int numReplicas = atlasProperties.getInt("atlas.notification.replicas", 1);
+        AdminUtils.createTopic(zkUtils, topicName,  numPartitions, numReplicas,
+                new Properties());
+        LOG.warn("Created topic {} with partitions {} and replicas {}", topicName, numPartitions,
numReplicas);
+    }
+
+    @VisibleForTesting
+    protected ZkUtils createZkUtils(Configuration atlasProperties) {
+        String zkConnect = atlasProperties.getString("atlas.kafka.zookeeper.connect");
+        int sessionTimeout = atlasProperties.getInt("atlas.kafka.zookeeper.session.timeout.ms",
400);
+        int connectionTimeout = atlasProperties.getInt("atlas.kafka.zookeeper.connection.timeout.ms",
200);
+        Tuple2<ZkClient, ZkConnection> zkClientAndConnection = ZkUtils.createZkClientAndConnection(
+                zkConnect, sessionTimeout, connectionTimeout);
+        return new ZkUtils(zkClientAndConnection._1(), zkClientAndConnection._2(), false);
+    }
+
+    public static void main(String[] args) throws AtlasException {
+        Configuration configuration = ApplicationProperties.get();
+        AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator();
+        atlasTopicCreator.createAtlasTopic(configuration, args);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java b/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java
new file mode 100644
index 0000000..8585898
--- /dev/null
+++ b/notification/src/test/java/org/apache/atlas/hook/AtlasTopicCreatorTest.java
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.hook;
+
+import kafka.utils.ZkUtils;
+import org.apache.commons.configuration.Configuration;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+public class AtlasTopicCreatorTest {
+
+    @Test
+    public void shouldNotCreateAtlasTopicIfNotConfiguredToDoSo() {
+
+        Configuration configuration = mock(Configuration.class);
+        when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY,
true)).
+                thenReturn(false);
+        when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false");
+        final boolean[] topicExistsCalled = new boolean[] {false};
+        AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
+            @Override
+            protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) {
+                topicExistsCalled[0] = true;
+                return false;
+            }
+        };
+        atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK");
+        assertFalse(topicExistsCalled[0]);
+    }
+
+    @Test
+    public void shouldNotCreateTopicIfItAlreadyExists() {
+        Configuration configuration = mock(Configuration.class);
+        when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY,
true)).
+                thenReturn(true);
+        when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false");
+        final ZkUtils zookeeperUtils = mock(ZkUtils.class);
+        final boolean[] topicExistsCalled = new boolean[]{false};
+        final boolean[] createTopicCalled = new boolean[]{false};
+
+        AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
+            @Override
+            protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) {
+                topicExistsCalled[0] = true;
+                return true;
+            }
+
+            @Override
+            protected ZkUtils createZkUtils(Configuration atlasProperties) {
+                return zookeeperUtils;
+            }
+
+            @Override
+            protected void createTopic(Configuration atlasProperties, String topicName, ZkUtils
zkUtils) {
+                createTopicCalled[0] = true;
+            }
+        };
+        atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK");
+        assertTrue(topicExistsCalled[0]);
+        assertFalse(createTopicCalled[0]);
+    }
+
+    @Test
+    public void shouldCreateTopicIfItDoesNotExist() {
+        Configuration configuration = mock(Configuration.class);
+        when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY,
true)).
+                thenReturn(true);
+        when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false");
+        final ZkUtils zookeeperUtils = mock(ZkUtils.class);
+
+        final boolean[] createdTopic = new boolean[]{false};
+
+        AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
+            @Override
+            protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) {
+                return false;
+            }
+
+            @Override
+            protected ZkUtils createZkUtils(Configuration atlasProperties) {
+                return zookeeperUtils;
+            }
+
+            @Override
+            protected void createTopic(Configuration atlasProperties, String topicName, ZkUtils
zkUtils) {
+                createdTopic[0] = true;
+            }
+        };
+        atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK");
+        assertTrue(createdTopic[0]);
+    }
+
+    @Test
+    public void shouldNotFailIfExceptionOccursDuringCreatingTopic() {
+        Configuration configuration = mock(Configuration.class);
+        when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY,
true)).
+                thenReturn(true);
+        when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false");
+        final ZkUtils zookeeperUtils = mock(ZkUtils.class);
+        final boolean[] createTopicCalled = new boolean[]{false};
+
+        AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
+            @Override
+            protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) {
+                return false;
+            }
+
+            @Override
+            protected ZkUtils createZkUtils(Configuration atlasProperties) {
+                return zookeeperUtils;
+            }
+
+            @Override
+            protected void createTopic(Configuration atlasProperties, String topicName, ZkUtils
zkUtils) {
+                createTopicCalled[0] = true;
+                throw new RuntimeException("Simulating failure during creating topic");
+            }
+        };
+        atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK");
+        assertTrue(createTopicCalled[0]);
+    }
+
+    @Test
+    public void shouldCreateMultipleTopics() {
+        Configuration configuration = mock(Configuration.class);
+        when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY,
true)).
+                thenReturn(true);
+        when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false");
+        final ZkUtils zookeeperUtils = mock(ZkUtils.class);
+
+        final Map<String, Boolean> createdTopics = new HashMap<>();
+        createdTopics.put("ATLAS_HOOK", false);
+        createdTopics.put("ATLAS_ENTITIES", false);
+
+        AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
+
+            @Override
+            protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) {
+                return false;
+            }
+
+            @Override
+            protected ZkUtils createZkUtils(Configuration atlasProperties) {
+                return zookeeperUtils;
+            }
+
+            @Override
+            protected void createTopic(Configuration atlasProperties, String topicName, ZkUtils
zkUtils) {
+                createdTopics.put(topicName, true);
+            }
+        };
+        atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK", "ATLAS_ENTITIES");
+        assertTrue(createdTopics.get("ATLAS_HOOK"));
+        assertTrue(createdTopics.get("ATLAS_ENTITIES"));
+    }
+
+    @Test
+    public void shouldCreateTopicEvenIfEarlierOneFails() {
+        Configuration configuration = mock(Configuration.class);
+        when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY,
true)).
+                thenReturn(true);
+        when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false");
+        final ZkUtils zookeeperUtils = mock(ZkUtils.class);
+
+        final Map<String, Boolean> createdTopics = new HashMap<>();
+        createdTopics.put("ATLAS_ENTITIES", false);
+
+        AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
+
+            @Override
+            protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) {
+                return false;
+            }
+
+            @Override
+            protected ZkUtils createZkUtils(Configuration atlasProperties) {
+                return zookeeperUtils;
+            }
+
+            @Override
+            protected void createTopic(Configuration atlasProperties, String topicName, ZkUtils
zkUtils) {
+                if (topicName.equals("ATLAS_HOOK")) {
+                    throw new RuntimeException("Simulating failure when creating ATLAS_HOOK
topic");
+                } else {
+                    createdTopics.put(topicName, true);
+                }
+            }
+        };
+        atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK", "ATLAS_ENTITIES");
+        assertTrue(createdTopics.get("ATLAS_ENTITIES"));
+    }
+
+    @Test
+    public void shouldCloseResources() {
+        Configuration configuration = mock(Configuration.class);
+        when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY,
true)).
+                thenReturn(true);
+        when(configuration.getString("atlas.authentication.method.kerberos")).thenReturn("false");
+        final ZkUtils zookeeperUtils = mock(ZkUtils.class);
+
+        AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
+            @Override
+            protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) {
+                return false;
+            }
+
+            @Override
+            protected ZkUtils createZkUtils(Configuration atlasProperties) {
+                return zookeeperUtils;
+            }
+
+            @Override
+            protected void createTopic(Configuration atlasProperties, String topicName, ZkUtils
zkUtils) {
+            }
+        };
+        atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK", "ATLAS_ENTITIES");
+
+        verify(zookeeperUtils, times(1)).close();
+    }
+
+    @Test
+    public void shouldNotProcessTopicCreationIfSecurityFails() {
+        Configuration configuration = mock(Configuration.class);
+        when(configuration.getBoolean(AtlasTopicCreator.ATLAS_NOTIFICATION_CREATE_TOPICS_KEY,
true)).
+                thenReturn(true);
+        final ZkUtils zookeeperUtils = mock(ZkUtils.class);
+        final Map<String, Boolean> createdTopics = new HashMap<>();
+        createdTopics.put("ATLAS_HOOK", false);
+        createdTopics.put("ATLAS_ENTITIES", false);
+
+        AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator() {
+            @Override
+            protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) {
+                return false;
+            }
+
+            @Override
+            protected ZkUtils createZkUtils(Configuration atlasProperties) {
+                return zookeeperUtils;
+            }
+
+            @Override
+            protected void createTopic(Configuration atlasProperties, String topicName, ZkUtils
zkUtils) {
+                createdTopics.put(topicName, true);
+            }
+
+            @Override
+            protected boolean handleSecurity(Configuration atlasProperties) {
+                return false;
+            }
+        };
+        atlasTopicCreator.createAtlasTopic(configuration, "ATLAS_HOOK", "ATLAS_ENTITIES");
+        assertFalse(createdTopics.get("ATLAS_HOOK"));
+        assertFalse(createdTopics.get("ATLAS_ENTITIES"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e13345e..6f75740 100755
--- a/pom.xml
+++ b/pom.xml
@@ -392,9 +392,11 @@
         <commons-conf.version>1.10</commons-conf.version>
         <commons-collections.version>3.2.2</commons-collections.version>
         <commons-logging.version>1.1.3</commons-logging.version>
+        <commons-lang.version>2.6</commons-lang.version>
         <javax-inject.version>1</javax-inject.version>
         <jettison.version>1.3.7</jettison.version>
         <paranamer.version>2.3</paranamer.version>
+        <zkclient.version>0.8</zkclient.version>
 
         <PermGen>64m</PermGen>
         <MaxPermGen>512m</MaxPermGen>
@@ -1266,6 +1268,12 @@
                 <version>3.4</version>
             </dependency>
 
+            <dependency>
+                <groupId>commons-lang</groupId>
+                <artifactId>commons-lang</artifactId>
+                <version>${commons-lang.version}</version>
+            </dependency>
+
             <!-- kafka -->
             <dependency>
                 <groupId>org.apache.kafka</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0a44790e/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 091e077..6000fc1 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -22,6 +22,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file
(dosset
 ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via
shwethags)
 
 ALL CHANGES:
+ATLAS-515 Ability to initialize Kafka topics with more than 1 replica (yhemanth)
 ATLAS-891 UI changes to implement Update term (Kalyanikashikar via yhemanth)
 ATLAS-794 Business Catalog Update (jspeidel via yhemanth)
 ATLAS-837 Enhance Sqoop addon to handle export operation (venkatnrangan via shwethags)



Mime
View raw message