atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shweth...@apache.org
Subject [6/7] incubator-atlas git commit: ATLAS-58 Make hive hook reliable (shwethags)
Date Tue, 22 Sep 2015 09:15:03 GMT
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
index 2bf3aba..589ef16 100755
--- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
+++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
@@ -19,11 +19,11 @@
 package org.apache.atlas.hive.hook;
 
 import org.apache.atlas.AtlasClient;
+import org.apache.atlas.ParamChecker;
 import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
 import org.apache.atlas.hive.model.HiveDataModelGenerator;
 import org.apache.atlas.hive.model.HiveDataTypes;
 import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.persistence.Id;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.commons.lang.StringUtils;
@@ -75,7 +75,6 @@ public class HiveHookIT {
         hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, System.getProperty("user.dir") + "/target/metastore");
         hiveConf.set(HiveMetaStoreBridge.ATLAS_ENDPOINT, atlasEndpoint);
         hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, "jdbc:derby:./target/metastore_db;create=true");
-        hiveConf.set(HiveHook.CONF_SYNC, "true");
         hiveConf.set(HiveMetaStoreBridge.HIVE_CLUSTER_NAME, CLUSTER_NAME);
         hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODE, true);  //to not use hdfs
         hiveConf.setVar(HiveConf.ConfVars.HIVETESTMODEPREFIX, "");
@@ -93,6 +92,7 @@ public class HiveHookIT {
         String dbName = "db" + random();
         runCommand("create database " + dbName + " WITH DBPROPERTIES ('p1'='v1', 'p2'='v2')");
         String dbId = assertDatabaseIsRegistered(dbName);
+
         Referenceable definition = dgiCLient.getEntity(dbId);
         Map params = (Map) definition.get("parameters");
         Assert.assertNotNull(params);
@@ -145,11 +145,10 @@ public class HiveHookIT {
         Referenceable tableRef = dgiCLient.getEntity(tableId);
         Assert.assertEquals(tableRef.get("tableType"), TableType.MANAGED_TABLE.name());
         Assert.assertEquals(tableRef.get(HiveDataModelGenerator.COMMENT), "table comment");
-        String entityName = HiveMetaStoreBridge.getTableName(CLUSTER_NAME, DEFAULT_DB, tableName);
+        String entityName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName);
         Assert.assertEquals(tableRef.get(HiveDataModelGenerator.NAME), entityName);
 
-        final Id sdId = (Id) tableRef.get("sd");
-        Referenceable sdRef = dgiCLient.getEntity(sdId.id);
+        final Referenceable sdRef = (Referenceable) tableRef.get("sd");
         Assert.assertEquals(sdRef.get(HiveDataModelGenerator.STORAGE_IS_STORED_AS_SUB_DIRS), false);
 
         //Create table where database doesn't exist, will create database instance as well
@@ -160,8 +159,7 @@ public class HiveHookIT {
         LOG.debug("Searching for column {}", colName);
         String query =
                 String.format("%s where name = '%s'", HiveDataTypes.HIVE_COLUMN.getName(), colName.toLowerCase());
-        return assertEntityIsRegistered(query, true);
-
+        return assertEntityIsRegistered(query);
     }
 
     @Test
@@ -171,8 +169,8 @@ public class HiveHookIT {
         String query = "create table " + ctasTableName + " as select * from " + tableName;
         runCommand(query);
 
-        assertTableIsRegistered(DEFAULT_DB, ctasTableName);
         assertProcessIsRegistered(query);
+        assertTableIsRegistered(DEFAULT_DB, ctasTableName);
     }
 
     @Test
@@ -182,8 +180,8 @@ public class HiveHookIT {
         String query = "create view " + viewName + " as select * from " + tableName;
         runCommand(query);
 
-        assertTableIsRegistered(DEFAULT_DB, viewName);
         assertProcessIsRegistered(query);
+        assertTableIsRegistered(DEFAULT_DB, viewName);
     }
 
     @Test
@@ -257,7 +255,7 @@ public class HiveHookIT {
         assertProcessIsRegistered(query);
     }
 
-    @Test
+    @Test(enabled = false)
     public void testAlterTable() throws Exception {
         String tableName = createTable();
         String newName = tableName();
@@ -268,7 +266,7 @@ public class HiveHookIT {
         assertTableIsNotRegistered(DEFAULT_DB, tableName);
     }
 
-    @Test
+    @Test(enabled = false)
     public void testAlterView() throws Exception {
         String tableName = createTable();
         String viewName = tableName();
@@ -292,9 +290,7 @@ public class HiveHookIT {
         String gremlinQuery =
                 String.format("g.V.has('__typeName', '%s').has('%s.queryText', \"%s\").toList()", typeName, typeName,
                         normalize(queryStr));
-        JSONObject response = dgiCLient.searchByGremlin(gremlinQuery);
-        JSONArray results = response.getJSONArray(AtlasClient.RESULTS);
-        Assert.assertEquals(results.length(), 1);
+        assertEntityIsRegistered(gremlinQuery);
     }
 
     private String normalize(String str) {
@@ -304,27 +300,27 @@ public class HiveHookIT {
         return StringEscapeUtils.escapeJava(str.toLowerCase());
     }
 
-    private String assertTableIsRegistered(String dbName, String tableName) throws Exception {
-        return assertTableIsRegistered(dbName, tableName, true);
-    }
-
-    private String assertTableIsNotRegistered(String dbName, String tableName) throws Exception {
-        return assertTableIsRegistered(dbName, tableName, false);
+    private void assertTableIsNotRegistered(String dbName, String tableName) throws Exception {
+        LOG.debug("Searching for table {}.{}", dbName, tableName);
+        String query = String.format(
+                "%s as t where tableName = '%s', db where name = '%s' and clusterName = '%s'" + " select t",
+                HiveDataTypes.HIVE_TABLE.getName(), tableName.toLowerCase(), dbName.toLowerCase(), CLUSTER_NAME);
+        assertEntityIsNotRegistered(query);
     }
 
-    private String assertTableIsRegistered(String dbName, String tableName, boolean registered) throws Exception {
+    private String assertTableIsRegistered(String dbName, String tableName) throws Exception {
         LOG.debug("Searching for table {}.{}", dbName, tableName);
         String query = String.format(
                 "%s as t where tableName = '%s', db where name = '%s' and clusterName = '%s'" + " select t",
                 HiveDataTypes.HIVE_TABLE.getName(), tableName.toLowerCase(), dbName.toLowerCase(), CLUSTER_NAME);
-        return assertEntityIsRegistered(query, registered);
+        return assertEntityIsRegistered(query);
     }
 
     private String assertDatabaseIsRegistered(String dbName) throws Exception {
         LOG.debug("Searching for database {}", dbName);
         String query = String.format("%s where name = '%s' and clusterName = '%s'", HiveDataTypes.HIVE_DB.getName(),
                 dbName.toLowerCase(), CLUSTER_NAME);
-        return assertEntityIsRegistered(query, true);
+        return assertEntityIsRegistered(query);
     }
 
     private void assertPartitionIsRegistered(String dbName, String tableName, String value) throws Exception {
@@ -338,27 +334,34 @@ public class HiveHookIT {
                         + "out('__%s.table').has('%s.tableName', '%s').out('__%s.db').has('%s.name', '%s')"
                         + ".has('%s.clusterName', '%s').back('p').toList()", typeName, typeName, value, typeName,
                 tableType, tableName.toLowerCase(), tableType, dbType, dbName.toLowerCase(), dbType, CLUSTER_NAME);
-        JSONObject response = dgiCLient.searchByGremlin(gremlinQuery);
-        JSONArray results = response.getJSONArray(AtlasClient.RESULTS);
-        Assert.assertEquals(results.length(), 1);
+        assertEntityIsRegistered(gremlinQuery);
     }
 
-    private String assertEntityIsRegistered(String dslQuery, boolean registered) throws Exception {
-        JSONArray results = dgiCLient.searchByDSL(dslQuery);
-        if (registered) {
-            Assert.assertEquals(results.length(), 1);
-            JSONObject row = results.getJSONObject(0);
-            if (row.has("$id$")) {
-                return row.getJSONObject("$id$").getString("id");
-            } else {
-                return row.getJSONObject("_col_0").getString("id");
+    private String assertEntityIsRegistered(final String query) throws Exception {
+        waitFor(2000, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                JSONArray results = dgiCLient.search(query);
+                return results.length() == 1;
             }
+        });
+
+        JSONArray results = dgiCLient.search(query);
+        JSONObject row = results.getJSONObject(0);
+        if (row.has("__guid")) {
+            return row.getString("__guid");
+        } else if (row.has("$id$")) {
+            return row.getJSONObject("$id$").getString("id");
         } else {
-            Assert.assertEquals(results.length(), 0);
-            return null;
+            return row.getJSONObject("_col_0").getString("id");
         }
     }
 
+    private void assertEntityIsNotRegistered(String dslQuery) throws Exception {
+        JSONArray results = dgiCLient.searchByDSL(dslQuery);
+        Assert.assertEquals(results.length(), 0);
+    }
+
     @Test
     public void testLineage() throws Exception {
         String table1 = createTable(false);
@@ -371,16 +374,47 @@ public class HiveHookIT {
         String table1Id = assertTableIsRegistered(DEFAULT_DB, table1);
         String table2Id = assertTableIsRegistered(db2, table2);
 
-        String datasetName = HiveMetaStoreBridge.getTableName(CLUSTER_NAME, db2, table2);
+        String datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, db2, table2);
         JSONObject response = dgiCLient.getInputGraph(datasetName);
         JSONObject vertices = response.getJSONObject("values").getJSONObject("vertices");
         Assert.assertTrue(vertices.has(table1Id));
         Assert.assertTrue(vertices.has(table2Id));
 
-        datasetName = HiveMetaStoreBridge.getTableName(CLUSTER_NAME, DEFAULT_DB, table1);
+        datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, table1);
         response = dgiCLient.getOutputGraph(datasetName);
         vertices = response.getJSONObject("values").getJSONObject("vertices");
         Assert.assertTrue(vertices.has(table1Id));
         Assert.assertTrue(vertices.has(table2Id));
     }
+
+    public interface Predicate {
+
+        /**
+         * Perform a predicate evaluation.
+         *
+         * @return the boolean result of the evaluation.
+         * @throws Exception thrown if the predicate evaluation could not evaluate.
+         */
+        boolean evaluate() throws Exception;
+    }
+
+    /**
+     * Wait for a condition, expressed via a {@link Predicate} to become true.
+     *
+     * @param timeout maximum time in milliseconds to wait for the predicate to become true.
+     * @param predicate predicate waiting on.
+     */
+    protected void waitFor(int timeout, Predicate predicate) throws Exception {
+        ParamChecker.notNull(predicate, "predicate");
+        long mustEnd = System.currentTimeMillis() + timeout;
+
+        boolean eval;
+        while (!(eval = predicate.evaluate()) && System.currentTimeMillis() < mustEnd) {
+            LOG.info("Waiting up to {} msec", mustEnd - System.currentTimeMillis());
+            Thread.sleep(100);
+        }
+        if (!eval) {
+            throw new Exception("Waiting timed out after " + timeout + " msec");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/client/pom.xml
----------------------------------------------------------------------
diff --git a/client/pom.xml b/client/pom.xml
index 2e27930..279d894 100755
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -64,11 +64,6 @@
         </dependency>
 
         <dependency>
-            <groupId>commons-configuration</groupId>
-            <artifactId>commons-configuration</artifactId>
-        </dependency>
-
-        <dependency>
             <groupId>org.testng</groupId>
             <artifactId>testng</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/client/src/main/java/org/apache/atlas/ApplicationProperties.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/ApplicationProperties.java b/client/src/main/java/org/apache/atlas/ApplicationProperties.java
deleted file mode 100644
index 738ec53..0000000
--- a/client/src/main/java/org/apache/atlas/ApplicationProperties.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas;
-
-import org.apache.commons.configuration.CompositeConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.net.URL;
-import java.util.Arrays;
-import java.util.Iterator;
-
-public class ApplicationProperties extends PropertiesConfiguration {
-    private static final Logger LOG = LoggerFactory.getLogger(ApplicationProperties.class);
-
-    public static final String APPLICATION_PROPERTIES = "application.properties";
-    public static final String CLIENT_PROPERTIES = "client.properties";
-
-    private static Configuration INSTANCE = null;
-
-    private ApplicationProperties(URL url) throws ConfigurationException {
-        super(url);
-    }
-
-    public static Configuration get() throws AtlasException {
-        if (INSTANCE == null) {
-            synchronized (ApplicationProperties.class) {
-                if (INSTANCE == null) {
-                    Configuration applicationProperties = get(APPLICATION_PROPERTIES);
-                    Configuration clientProperties = get(CLIENT_PROPERTIES);
-                    INSTANCE = new CompositeConfiguration(Arrays.asList(applicationProperties, clientProperties));
-                }
-            }
-        }
-        return INSTANCE;
-    }
-
-    public static Configuration get(String fileName) throws AtlasException {
-        String confLocation = System.getProperty("atlas.conf");
-        try {
-            URL url = confLocation == null ? ApplicationProperties.class.getResource("/" + fileName)
-                    : new File(confLocation, fileName).toURI().toURL();
-            LOG.info("Loading {} from {}", fileName, url);
-
-            Configuration configuration = new ApplicationProperties(url).interpolatedConfiguration();
-            logConfiguration(configuration);
-            return configuration;
-        } catch (Exception e) {
-            throw new AtlasException("Failed to load application properties", e);
-        }
-    }
-
-    private static void logConfiguration(Configuration configuration) {
-        if (LOG.isDebugEnabled()) {
-            Iterator<String> keys = configuration.getKeys();
-            LOG.debug("Configuration loaded:");
-            while (keys.hasNext()) {
-                String key = keys.next();
-                LOG.debug("{} = {}", key, configuration.getProperty(key));
-            }
-        }
-    }
-
-    public static final Configuration getSubsetConfiguration(Configuration inConf, String prefix) {
-        return inConf.subset(prefix);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/client/src/main/java/org/apache/atlas/AtlasClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/AtlasClient.java b/client/src/main/java/org/apache/atlas/AtlasClient.java
index 1c7d62a..06d9206 100755
--- a/client/src/main/java/org/apache/atlas/AtlasClient.java
+++ b/client/src/main/java/org/apache/atlas/AtlasClient.java
@@ -27,7 +27,7 @@ import org.apache.atlas.security.SecureClientUtils;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.atlas.typesystem.json.InstanceSerialization;
 import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
@@ -40,6 +40,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.UriBuilder;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import static org.apache.atlas.security.SecurityProperties.TLS_ENABLED;
@@ -51,6 +52,7 @@ public class AtlasClient {
     private static final Logger LOG = LoggerFactory.getLogger(AtlasClient.class);
     public static final String NAME = "name";
     public static final String GUID = "GUID";
+    public static final String TYPE = "type";
     public static final String TYPENAME = "typeName";
 
     public static final String DEFINITION = "definition";
@@ -60,11 +62,12 @@ public class AtlasClient {
     public static final String RESULTS = "results";
     public static final String COUNT = "count";
     public static final String ROWS = "rows";
+    public static final String DATATYPE = "dataType";
 
     public static final String BASE_URI = "api/atlas/";
     public static final String TYPES = "types";
+    public static final String URI_ENTITY = "entity";
     public static final String URI_ENTITIES = "entities";
-    public static final String URI_TRAITS = "traits";
     public static final String URI_SEARCH = "discovery/search";
     public static final String URI_LINEAGE = "lineage/hive/table";
 
@@ -77,11 +80,17 @@ public class AtlasClient {
     public static final String INFRASTRUCTURE_SUPER_TYPE = "Infrastructure";
     public static final String DATA_SET_SUPER_TYPE = "DataSet";
     public static final String PROCESS_SUPER_TYPE = "Process";
+    public static final String REFERENCEABLE_SUPER_TYPE = "Referenceable";
+    public static final String REFERENCEABLE_ATTRIBUTE_NAME = "qualifiedName";
 
     public static final String JSON_MEDIA_TYPE = MediaType.APPLICATION_JSON + "; charset=UTF-8";
 
     private WebResource service;
 
+    protected AtlasClient() {
+        //do nothing. For LocalAtlasClient
+    }
+
     public AtlasClient(String baseUrl) {
         this(baseUrl, null, null);
     }
@@ -89,6 +98,8 @@ public class AtlasClient {
     public AtlasClient(String baseUrl, UserGroupInformation ugi, String doAsUser) {
         DefaultClientConfig config = new DefaultClientConfig();
         Configuration clientConfig = null;
+        int readTimeout = 60000;
+        int connectTimeout = 60000;
         try {
             clientConfig = getClientProperties();
             if (clientConfig.getBoolean(TLS_ENABLED, false)) {
@@ -97,6 +108,8 @@ public class AtlasClient {
                 // configuration object, persist it, then subsequently pass in an empty configuration to SSLFactory
                 SecureClientUtils.persistSSLClientConfiguration(clientConfig);
             }
+            readTimeout = clientConfig.getInt("atlas.client.readTimeoutMSecs", readTimeout);
+            connectTimeout = clientConfig.getInt("atlas.client.connectTimeoutMSecs", connectTimeout);
         } catch (Exception e) {
             LOG.info("Error processing client configuration.", e);
         }
@@ -106,6 +119,8 @@ public class AtlasClient {
 
         Client client = new Client(handler, config);
         client.resource(UriBuilder.fromUri(baseUrl).build());
+        client.setReadTimeout(readTimeout);
+        client.setConnectTimeout(connectTimeout);
 
         service = client.resource(UriBuilder.fromUri(baseUrl).build());
     }
@@ -124,14 +139,14 @@ public class AtlasClient {
 
         //Entity operations
         CREATE_ENTITY(BASE_URI + URI_ENTITIES, HttpMethod.POST),
-        GET_ENTITY(BASE_URI + URI_ENTITIES, HttpMethod.GET),
-        UPDATE_ENTITY(BASE_URI + URI_ENTITIES, HttpMethod.PUT),
-        LIST_ENTITY(BASE_URI + URI_ENTITIES, HttpMethod.GET),
+        GET_ENTITY(BASE_URI + URI_ENTITY, HttpMethod.GET),
+        UPDATE_ENTITY(BASE_URI + URI_ENTITY, HttpMethod.PUT),
+        LIST_ENTITIES(BASE_URI + URI_ENTITIES, HttpMethod.GET),
 
         //Trait operations
-        ADD_TRAITS(BASE_URI + URI_TRAITS, HttpMethod.POST),
-        DELETE_TRAITS(BASE_URI + URI_TRAITS, HttpMethod.DELETE),
-        LIST_TRAITS(BASE_URI + URI_TRAITS, HttpMethod.GET),
+        ADD_TRAITS(BASE_URI + URI_ENTITY, HttpMethod.POST),
+        DELETE_TRAITS(BASE_URI + URI_ENTITY, HttpMethod.DELETE),
+        LIST_TRAITS(BASE_URI + URI_ENTITY, HttpMethod.GET),
 
         //Search operations
         SEARCH(BASE_URI + URI_SEARCH, HttpMethod.GET),
@@ -172,18 +187,8 @@ public class AtlasClient {
     }
 
     public List<String> listTypes() throws AtlasServiceException {
-        try {
-            final JSONObject jsonObject = callAPI(API.LIST_TYPES, null);
-            final JSONArray list = jsonObject.getJSONArray(AtlasClient.RESULTS);
-            ArrayList<String> types = new ArrayList<>();
-            for (int index = 0; index < list.length(); index++) {
-                types.add(list.getString(index));
-            }
-
-            return types;
-        } catch (JSONException e) {
-            throw new AtlasServiceException(API.LIST_TYPES, e);
-        }
+        final JSONObject jsonObject = callAPI(API.LIST_TYPES, null);
+        return extractResults(jsonObject);
     }
 
     public String getType(String typeName) throws AtlasServiceException {
@@ -203,18 +208,33 @@ public class AtlasClient {
 
     /**
      * Create the given entity
-     * @param entityAsJson entity(type instance) as json
-     * @return result json object
+     * @param entities entity(type instance) as json
+     * @return json array of guids
+     * @throws AtlasServiceException
+     */
+    public JSONArray createEntity(JSONArray entities) throws AtlasServiceException {
+        JSONObject response = callAPI(API.CREATE_ENTITY, entities.toString());
+        try {
+            return response.getJSONArray(GUID);
+        } catch (JSONException e) {
+            throw new AtlasServiceException(API.GET_ENTITY, e);
+        }
+    }
+
+    /**
+     * Create the given entity
+     * @param entitiesAsJson entity(type instance) as json
+     * @return json array of guids
      * @throws AtlasServiceException
      */
-    public JSONObject createEntity(String entityAsJson) throws AtlasServiceException {
-        return callAPI(API.CREATE_ENTITY, entityAsJson);
+    public JSONArray createEntity(String... entitiesAsJson) throws AtlasServiceException {
+        return createEntity(new JSONArray(Arrays.asList(entitiesAsJson)));
     }
 
     /**
      * Get an entity given the entity id
      * @param guid entity id
-     * @return result json object
+     * @return result object
      * @throws AtlasServiceException
      */
     public Referenceable getEntity(String guid) throws AtlasServiceException {
@@ -223,6 +243,62 @@ public class AtlasClient {
             String entityInstanceDefinition = jsonResponse.getString(AtlasClient.DEFINITION);
             return InstanceSerialization.fromJsonReferenceable(entityInstanceDefinition, true);
         } catch (JSONException e) {
+            throw new AtlasServiceException(API.GET_ENTITY, e);
+        }
+    }
+
+    public static String toString(JSONArray jsonArray) throws JSONException {
+        ArrayList<String> resultsList = new ArrayList<>();
+        for (int index = 0; index < jsonArray.length(); index++) {
+            resultsList.add(jsonArray.getString(index));
+        }
+        return StringUtils.join(resultsList, ",");
+    }
+
+    /**
+     * Get an entity given the entity id
+     * @param entityType entity type name
+     * @param attribute qualified name of the entity
+     * @param value
+     * @return result object
+     * @throws AtlasServiceException
+     */
+    public Referenceable getEntity(String entityType, String attribute, String value) throws AtlasServiceException {
+        WebResource resource = getResource(API.GET_ENTITY);
+        resource = resource.queryParam(TYPE, entityType);
+        resource = resource.queryParam(ATTRIBUTE_NAME, attribute);
+        resource = resource.queryParam(ATTRIBUTE_VALUE, value);
+        JSONObject jsonResponse = callAPIWithResource(API.GET_ENTITY, resource);
+        try {
+            String entityInstanceDefinition = jsonResponse.getString(AtlasClient.DEFINITION);
+            return InstanceSerialization.fromJsonReferenceable(entityInstanceDefinition, true);
+        } catch (JSONException e) {
+            throw new AtlasServiceException(API.GET_ENTITY, e);
+        }
+    }
+
+    /**
+     * List entities for a given entity type
+     * @param entityType
+     * @return
+     * @throws AtlasServiceException
+     */
+    public List<String> listEntities(String entityType) throws AtlasServiceException {
+        WebResource resource = getResource(API.LIST_ENTITIES);
+        resource = resource.queryParam(TYPE, entityType);
+        JSONObject jsonResponse = callAPIWithResource(API.LIST_ENTITIES, resource);
+        return extractResults(jsonResponse);
+    }
+
+    private List<String> extractResults(JSONObject jsonResponse) throws AtlasServiceException {
+        try {
+            JSONArray results = jsonResponse.getJSONArray(AtlasClient.RESULTS);
+            ArrayList<String> resultsList = new ArrayList<>();
+            for (int index = 0; index < results.length(); index++) {
+                resultsList.add(results.getString(index));
+            }
+            return resultsList;
+        } catch (JSONException e) {
             throw new AtlasServiceException(e);
         }
     }
@@ -240,10 +316,22 @@ public class AtlasClient {
         return callAPIWithResource(API.UPDATE_ENTITY, resource);
     }
 
-    public JSONObject searchEntity(String searchQuery) throws AtlasServiceException {
+    /**
+     * Search using gremlin/dsl/full text
+     * @param searchQuery
+     * @return
+     * @throws AtlasServiceException
+     */
+    public JSONArray search(String searchQuery) throws AtlasServiceException {
         WebResource resource = getResource(API.SEARCH);
         resource = resource.queryParam(QUERY, searchQuery);
-        return callAPIWithResource(API.SEARCH, resource);
+        JSONObject result = callAPIWithResource(API.SEARCH, resource);
+        try {
+            return result.getJSONArray(RESULTS);
+        } catch (JSONException e) {
+            throw new AtlasServiceException(e);
+        }
+
     }
 
     /**
@@ -276,7 +364,7 @@ public class AtlasClient {
         resource = resource.queryParam(QUERY, query);
         JSONObject result = callAPIWithResource(API.SEARCH_DSL, resource);
         try {
-            return result.getJSONObject(RESULTS).getJSONArray(ROWS);
+            return result.getJSONArray(RESULTS);
         } catch (JSONException e) {
             throw new AtlasServiceException(e);
         }
@@ -288,11 +376,16 @@ public class AtlasClient {
      * @return result json object
      * @throws AtlasServiceException
      */
-    public JSONObject searchByGremlin(String gremlinQuery) throws AtlasServiceException {
+    public JSONArray searchByGremlin(String gremlinQuery) throws AtlasServiceException {
         LOG.debug("Gremlin query: " + gremlinQuery);
         WebResource resource = getResource(API.SEARCH_GREMLIN);
         resource = resource.queryParam(QUERY, gremlinQuery);
-        return callAPIWithResource(API.SEARCH_GREMLIN, resource);
+        JSONObject result = callAPIWithResource(API.SEARCH_GREMLIN, resource);
+        try {
+            return result.getJSONArray(RESULTS);
+        } catch (JSONException e) {
+            throw new AtlasServiceException(e);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/client/src/test/java/org/apache/atlas/ApplicationPropertiesTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/atlas/ApplicationPropertiesTest.java b/client/src/test/java/org/apache/atlas/ApplicationPropertiesTest.java
deleted file mode 100644
index 4acb3e1..0000000
--- a/client/src/test/java/org/apache/atlas/ApplicationPropertiesTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas;
-
-import org.apache.commons.configuration.Configuration;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-public class ApplicationPropertiesTest {
-
-    @Test
-    public void testVariables() throws Exception {
-        Configuration properties = ApplicationProperties.get();
-
-        //plain property without variables
-        Assert.assertEquals(properties.getString("atlas.service"), "atlas");
-
-        //property containing system property
-        String data = "/var/data/" + System.getProperty("user.name") + "/atlas";
-        Assert.assertEquals(properties.getString("atlas.data"), data);
-
-        //property referencing other property
-        Assert.assertEquals(properties.getString("atlas.graph.data"), data + "/graph");
-
-        //invalid system property - not substituted
-        Assert.assertEquals(properties.getString("atlas.db"), "${atlasdb}");
-    }
-
-    @Test
-    //variable substitutions should work with subset configuration as well
-    public void testSubset() throws Exception {
-        Configuration configuration = ApplicationProperties.get();
-        Configuration subConfiguration = configuration.subset("atlas");
-
-        Assert.assertEquals(subConfiguration.getString("service"), "atlas");
-        String data = "/var/data/" + System.getProperty("user.name") + "/atlas";
-        Assert.assertEquals(subConfiguration.getString("data"), data);
-        Assert.assertEquals(subConfiguration.getString("graph.data"), data + "/graph");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/client/src/test/resources/application.properties
----------------------------------------------------------------------
diff --git a/client/src/test/resources/application.properties b/client/src/test/resources/application.properties
deleted file mode 100644
index dbd6002..0000000
--- a/client/src/test/resources/application.properties
+++ /dev/null
@@ -1,29 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-#system property
-atlas.data=/var/data/${sys:user.name}/atlas
-
-#re-use existing property
-atlas.graph.data=${atlas.data}/graph
-
-#plain property
-atlas.service=atlas
-
-#invalid system property
-atlas.db=${atlasdb}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
new file mode 100644
index 0000000..5498105
--- /dev/null
+++ b/common/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>apache-atlas</artifactId>
+        <groupId>org.apache.atlas</groupId>
+        <version>0.6-incubating-SNAPSHOT</version>
+    </parent>
+    <artifactId>atlas-common</artifactId>
+    <description>Apache Atlas Common</description>
+    <name>Apache Atlas Common</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.atlas</groupId>
+            <artifactId>atlas-typesystem</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/common/src/main/java/org/apache/atlas/service/Service.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/atlas/service/Service.java b/common/src/main/java/org/apache/atlas/service/Service.java
new file mode 100644
index 0000000..6454f90
--- /dev/null
+++ b/common/src/main/java/org/apache/atlas/service/Service.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.service;
+
+import org.apache.atlas.AtlasException;
+
+/**
+ * Service interface to start any background jobs
+ */
+public interface Service {
+
+    void start() throws AtlasException;
+
+    void stop();
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/common/src/main/java/org/apache/atlas/service/Services.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/atlas/service/Services.java b/common/src/main/java/org/apache/atlas/service/Services.java
new file mode 100644
index 0000000..49b3fa7
--- /dev/null
+++ b/common/src/main/java/org/apache/atlas/service/Services.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.service;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+
+/**
+ * Utility for starting and stopping all services
+ */
+@Singleton
+public class Services {
+    public static final Logger LOG = LoggerFactory.getLogger(Services.class);
+
+    private final Set<Service> services;
+
+    @Inject
+    public Services(Set<Service> services) {
+        this.services = services;
+    }
+
+    public void start() {
+        try {
+            for (Service service : services) {
+                LOG.debug("Starting service {}", service.getClass().getName());
+                service.start();
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void stop() {
+        for (Service service : services) {
+            LOG.debug("Stopping service {}", service.getClass().getName());
+            service.stop();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/dashboard/public/modules/details/detailsResource.js
----------------------------------------------------------------------
diff --git a/dashboard/public/modules/details/detailsResource.js b/dashboard/public/modules/details/detailsResource.js
index af2f249..a559e6f 100644
--- a/dashboard/public/modules/details/detailsResource.js
+++ b/dashboard/public/modules/details/detailsResource.js
@@ -19,7 +19,7 @@
 'use strict';
 
 angular.module('dgc.details').factory('DetailsResource', ['$resource', function($resource) {
-    return $resource('/api/atlas/entities/:id', {}, {
+    return $resource('/api/atlas/entity/:id', {}, {
         get: {
             method: 'GET',
             transformResponse: function(data) {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/dashboard/public/modules/search/searchController.js
----------------------------------------------------------------------
diff --git a/dashboard/public/modules/search/searchController.js b/dashboard/public/modules/search/searchController.js
index b5e6c37..46bc823 100644
--- a/dashboard/public/modules/search/searchController.js
+++ b/dashboard/public/modules/search/searchController.js
@@ -43,7 +43,7 @@ angular.module('dgc.search').controller('SearchController', ['$scope', '$locatio
             }, function searchSuccess(response) {
                 $scope.resultCount = response.count;
                 $scope.results = response.results;
-                $scope.resultRows = $scope.results.rows;
+                $scope.resultRows = $scope.results;
                 $scope.totalItems = $scope.resultCount;
                 $scope.transformedResults = {};
                 $scope.dataTransitioned = false;
@@ -59,7 +59,7 @@ angular.module('dgc.search').controller('SearchController', ['$scope', '$locatio
                 } else {
                     $scope.transformedResults = $scope.resultRows;
                 }
-                if ($scope.results.rows)
+                if ($scope.results)
                     $scope.searchMessage = $scope.resultCount + ' results matching your search query ' + $scope.query + ' were found';
                 else
                     $scope.searchMessage = '0 results matching your search query ' + $scope.query + ' were found';

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/distro/pom.xml
----------------------------------------------------------------------
diff --git a/distro/pom.xml b/distro/pom.xml
index d42d78f..1496eec 100644
--- a/distro/pom.xml
+++ b/distro/pom.xml
@@ -57,6 +57,35 @@
                 </python.path.l>
             </properties>
         </profile>
+
+        <profile>
+            <id>dist</id>
+            <activation>
+                <activeByDefault>false</activeByDefault>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <artifactId>maven-assembly-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <goals>
+                                    <goal>single</goal>
+                                </goals>
+                                <phase>package</phase>
+                                <configuration>
+                                    <descriptors>
+                                        <descriptor>src/main/assemblies/standalone-package.xml</descriptor>
+                                        <descriptor>src/main/assemblies/src-package.xml</descriptor>
+                                    </descriptors>
+                                    <finalName>apache-atlas-${project.version}</finalName>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
     </profiles>
 
     <build>
@@ -69,6 +98,7 @@
                 <executions>
                     <execution>
                         <configuration>
+                            <skip>${skipTests}</skip>
                             <executable>python</executable>
                             <workingDirectory>src/test/python</workingDirectory>
                             <arguments>
@@ -86,26 +116,6 @@
                     </execution>
                 </executions>
             </plugin>
-
-            <plugin>
-                <artifactId>maven-assembly-plugin</artifactId>
-                <inherited>false</inherited>
-                <configuration>
-                    <descriptors>
-                        <descriptor>src/main/assemblies/standalone-package.xml</descriptor>
-                        <descriptor>src/main/assemblies/src-package.xml</descriptor>
-                    </descriptors>
-                    <finalName>apache-atlas-${project.version}</finalName>
-                </configuration>
-                <executions>
-                    <execution>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>single</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
         </plugins>
     </build>
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/distro/src/bin/atlas_start.py
----------------------------------------------------------------------
diff --git a/distro/src/bin/atlas_start.py b/distro/src/bin/atlas_start.py
index 7680f04..c07b076 100755
--- a/distro/src/bin/atlas_start.py
+++ b/distro/src/bin/atlas_start.py
@@ -24,7 +24,7 @@ import atlas_config as mc
 METADATA_LOG_OPTS="-Datlas.log.dir=%s -Datlas.log.file=application.log"
 METADATA_COMMAND_OPTS="-Datlas.home=%s"
 METADATA_CONFIG_OPTS="-Datlas.conf=%s"
-DEFAULT_JVM_OPTS="-Xmx1024m -Dlog4j.configuration=atlas-log4j.xml"
+DEFAULT_JVM_OPTS="-Xmx1024m -XX:MaxPermSize=512m -Dlog4j.configuration=atlas-log4j.xml"
 
 def main():
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/distro/src/conf/application.properties
----------------------------------------------------------------------
diff --git a/distro/src/conf/application.properties b/distro/src/conf/application.properties
index bf323a7..e7b1510 100755
--- a/distro/src/conf/application.properties
+++ b/distro/src/conf/application.properties
@@ -45,9 +45,16 @@ atlas.graph.index.search.elasticsearch.client-only=false
 atlas.graph.index.search.elasticsearch.local-mode=true
 atlas.graph.index.search.elasticsearch.create.sleep=2000
 
+
 #########  Notification Configs  #########
 atlas.notification.embedded=true
-atlas.notification.kafka.data=${sys:atlas.home}/data/kafka
+atlas.kafka.data=${sys:atlas.home}/data/kafka
+atlas.kafka.zookeeper.connect=localhost:9026
+atlas.kafka.bootstrap.servers=localhost:9027
+atlas.kafka.zookeeper.session.timeout.ms=400
+atlas.kafka.zookeeper.sync.time.ms=20
+atlas.kafka.auto.commit.interval.ms=1000
+
 
 #########  Hive Lineage Configs  #########
 # This models reflects the base super types for Data and Process

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/distro/src/main/assemblies/standalone-package.xml
----------------------------------------------------------------------
diff --git a/distro/src/main/assemblies/standalone-package.xml b/distro/src/main/assemblies/standalone-package.xml
index 625396a..60a33b2 100755
--- a/distro/src/main/assemblies/standalone-package.xml
+++ b/distro/src/main/assemblies/standalone-package.xml
@@ -50,13 +50,6 @@
         </fileSet>
 
         <fileSet>
-            <directory>../addons/hive-bridge/src/bin</directory>
-            <outputDirectory>bin</outputDirectory>
-            <fileMode>0755</fileMode>
-            <directoryMode>0755</directoryMode>
-        </fileSet>
-
-        <fileSet>
             <directory>../logs</directory>
             <outputDirectory>logs</outputDirectory>
             <directoryMode>0777</directoryMode>
@@ -85,6 +78,13 @@
 
         <!-- addons/hive -->
         <fileSet>
+            <directory>../addons/hive-bridge/src/bin</directory>
+            <outputDirectory>bin</outputDirectory>
+            <fileMode>0755</fileMode>
+            <directoryMode>0755</directoryMode>
+        </fileSet>
+
+        <fileSet>
             <directory>../addons/hive-bridge/target/dependency/bridge</directory>
             <outputDirectory>bridge</outputDirectory>
         </fileSet>
@@ -93,12 +93,6 @@
             <directory>../addons/hive-bridge/target/dependency/hook</directory>
             <outputDirectory>hook</outputDirectory>
         </fileSet>
-
-        <fileSet>
-            <directory>../addons/hive-bridge/target/site</directory>
-            <outputDirectory>docs/hive</outputDirectory>
-        </fileSet>
-
     </fileSets>
 
     <files>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/distro/src/test/python/scripts/TestMetadata.py
----------------------------------------------------------------------
diff --git a/distro/src/test/python/scripts/TestMetadata.py b/distro/src/test/python/scripts/TestMetadata.py
index 57f247f..02eb938 100644
--- a/distro/src/test/python/scripts/TestMetadata.py
+++ b/distro/src/test/python/scripts/TestMetadata.py
@@ -51,13 +51,13 @@ class TestMetadata(unittest.TestCase):
         'org.apache.atlas.Main',
         ['-app', 'metadata_home/server/webapp/atlas'],
         'metadata_home/conf:metadata_home/server/webapp/atlas/WEB-INF/classes:metadata_home/server/webapp/atlas/WEB-INF/lib\\*:metadata_home/libext\\*',
-        ['-Datlas.log.dir=metadata_home/logs', '-Datlas.log.file=application.log', '-Datlas.home=metadata_home', '-Datlas.conf=metadata_home/conf', '-Xmx1024m', '-Dlog4j.configuration=atlas-log4j.xml'], 'metadata_home/logs')
+        ['-Datlas.log.dir=metadata_home/logs', '-Datlas.log.file=application.log', '-Datlas.home=metadata_home', '-Datlas.conf=metadata_home/conf', '-Xmx1024m', '-XX:MaxPermSize=512m', '-Dlog4j.configuration=atlas-log4j.xml'], 'metadata_home/logs')
     else:
       java_mock.assert_called_with(
         'org.apache.atlas.Main',
         ['-app', 'metadata_home/server/webapp/atlas'],
         'metadata_home/conf:metadata_home/server/webapp/atlas/WEB-INF/classes:metadata_home/server/webapp/atlas/WEB-INF/lib/*:metadata_home/libext/*',
-        ['-Datlas.log.dir=metadata_home/logs', '-Datlas.log.file=application.log', '-Datlas.home=metadata_home', '-Datlas.conf=metadata_home/conf', '-Xmx1024m', '-Dlog4j.configuration=atlas-log4j.xml'],  'metadata_home/logs')
+        ['-Datlas.log.dir=metadata_home/logs', '-Datlas.log.file=application.log', '-Datlas.home=metadata_home', '-Datlas.conf=metadata_home/conf', '-Xmx1024m', '-XX:MaxPermSize=512m', '-Dlog4j.configuration=atlas-log4j.xml'],  'metadata_home/logs')
     pass
 
   def test_jar_java_lookups_fail(self):

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/docs/pom.xml
----------------------------------------------------------------------
diff --git a/docs/pom.xml b/docs/pom.xml
index 4535fe0..1d5babe 100755
--- a/docs/pom.xml
+++ b/docs/pom.xml
@@ -40,18 +40,12 @@
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-site-plugin</artifactId>
-                <version>3.3</version>
                 <dependencies>
                     <dependency>
                         <groupId>org.apache.maven.doxia</groupId>
                         <artifactId>doxia-module-twiki</artifactId>
                         <version>1.3</version>
                     </dependency>
-                    <dependency>
-                        <groupId>org.apache.maven.wagon</groupId>
-                        <artifactId>wagon-ssh-external</artifactId>
-                        <version>2.6</version>
-                    </dependency>
                 </dependencies>
                 <executions>
                     <execution>
@@ -62,45 +56,28 @@
                     </execution>
                 </executions>
                 <configuration>
-                    <skip>${skipDocs}</skip>
-                    <reportPlugins>
-                        <plugin>
-                            <groupId>org.apache.maven.plugins</groupId>
-                            <artifactId>maven-project-info-reports-plugin</artifactId>
-                            <version>2.3</version>
-                            <reportSets>
-                                <reportSet>
-                                    <reports>
-                                        <report>index</report>
-                                        <report>project-team</report>
-                                        <report>mailing-list</report>
-                                        <report>issue-tracking</report>
-                                        <report>license</report>
-                                        <report>scm</report>
-                                    </reports>
-                                </reportSet>
-                            </reportSets>
-                            <configuration>
-                                <dependencyDetailsEnabled>false</dependencyDetailsEnabled>
-                                <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
-                            </configuration>
-                        </plugin>
-                        <plugin>
-                            <groupId>org.apache.maven.plugins</groupId>
-                            <artifactId>maven-javadoc-plugin</artifactId>
-                            <version>2.7</version>
-                        </plugin>
-                        <plugin>
-                            <groupId>org.apache.maven.plugins</groupId>
-                            <artifactId>maven-jxr-plugin</artifactId>
-                            <version>2.1</version>
-                            <configuration>
-                                <aggregate>true</aggregate>
-                            </configuration>
-                        </plugin>
-                    </reportPlugins>
+                    <generateProjectInfo>false</generateProjectInfo>
+                    <generateReports>false</generateReports>
                 </configuration>
             </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-project-info-reports-plugin</artifactId>
+                <version>2.8.1</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>project-team</goal>
+                            <goal>mailing-list</goal>
+                            <goal>cim</goal>
+                            <goal>issue-tracking</goal>
+                            <goal>license</goal>
+                            <goal>scm</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/docs/src/site/resources/architecture.png
----------------------------------------------------------------------
diff --git a/docs/src/site/resources/architecture.png b/docs/src/site/resources/architecture.png
deleted file mode 100755
index 826df37..0000000
Binary files a/docs/src/site/resources/architecture.png and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/docs/src/site/resources/data-types.png
----------------------------------------------------------------------
diff --git a/docs/src/site/resources/data-types.png b/docs/src/site/resources/data-types.png
deleted file mode 100755
index 3aa1904..0000000
Binary files a/docs/src/site/resources/data-types.png and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/docs/src/site/resources/guide-class-diagram.png
----------------------------------------------------------------------
diff --git a/docs/src/site/resources/guide-class-diagram.png b/docs/src/site/resources/guide-class-diagram.png
deleted file mode 100644
index ca51239..0000000
Binary files a/docs/src/site/resources/guide-class-diagram.png and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/docs/src/site/resources/guide-instance-graph.png
----------------------------------------------------------------------
diff --git a/docs/src/site/resources/guide-instance-graph.png b/docs/src/site/resources/guide-instance-graph.png
deleted file mode 100644
index a2c8f82..0000000
Binary files a/docs/src/site/resources/guide-instance-graph.png and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/docs/src/site/resources/images/twiki/architecture.png
----------------------------------------------------------------------
diff --git a/docs/src/site/resources/images/twiki/architecture.png b/docs/src/site/resources/images/twiki/architecture.png
new file mode 100755
index 0000000..826df37
Binary files /dev/null and b/docs/src/site/resources/images/twiki/architecture.png differ

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/docs/src/site/resources/images/twiki/data-types.png
----------------------------------------------------------------------
diff --git a/docs/src/site/resources/images/twiki/data-types.png b/docs/src/site/resources/images/twiki/data-types.png
new file mode 100755
index 0000000..3aa1904
Binary files /dev/null and b/docs/src/site/resources/images/twiki/data-types.png differ

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/docs/src/site/resources/images/twiki/guide-class-diagram.png
----------------------------------------------------------------------
diff --git a/docs/src/site/resources/images/twiki/guide-class-diagram.png b/docs/src/site/resources/images/twiki/guide-class-diagram.png
new file mode 100644
index 0000000..ca51239
Binary files /dev/null and b/docs/src/site/resources/images/twiki/guide-class-diagram.png differ

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/docs/src/site/resources/images/twiki/guide-instance-graph.png
----------------------------------------------------------------------
diff --git a/docs/src/site/resources/images/twiki/guide-instance-graph.png b/docs/src/site/resources/images/twiki/guide-instance-graph.png
new file mode 100644
index 0000000..a2c8f82
Binary files /dev/null and b/docs/src/site/resources/images/twiki/guide-instance-graph.png differ

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/docs/src/site/resources/images/twiki/notification.png
----------------------------------------------------------------------
diff --git a/docs/src/site/resources/images/twiki/notification.png b/docs/src/site/resources/images/twiki/notification.png
new file mode 100644
index 0000000..ef30cd9
Binary files /dev/null and b/docs/src/site/resources/images/twiki/notification.png differ

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/docs/src/site/resources/images/twiki/types-instance.png
----------------------------------------------------------------------
diff --git a/docs/src/site/resources/images/twiki/types-instance.png b/docs/src/site/resources/images/twiki/types-instance.png
new file mode 100755
index 0000000..6afca21
Binary files /dev/null and b/docs/src/site/resources/images/twiki/types-instance.png differ

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/docs/src/site/resources/types-instance.png
----------------------------------------------------------------------
diff --git a/docs/src/site/resources/types-instance.png b/docs/src/site/resources/types-instance.png
deleted file mode 100755
index 6afca21..0000000
Binary files a/docs/src/site/resources/types-instance.png and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/docs/src/site/site.xml
----------------------------------------------------------------------
diff --git a/docs/src/site/site.xml b/docs/src/site/site.xml
index b412569..8b7b2d7 100755
--- a/docs/src/site/site.xml
+++ b/docs/src/site/site.xml
@@ -17,7 +17,7 @@
   ~ See the License for the specific language governing permissions and
   ~ limitations under the License.
   -->
-<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" name="Metadata and Governance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" name="Apache Atlas"
          xmlns="http://maven.apache.org/DECORATION/1.3.0"
          xsi:schemaLocation="http://maven.apache.org/DECORATION/1.3.0 http://maven.apache.org/xsd/decoration-1.3.0.xsd">
 
@@ -100,21 +100,10 @@
 
         <menu name="Documentation">
             <!-- current points to latest release -->
-            <item name="current" href="./0.5.0-incubating/index.html"/>
+            <item name="current" href="./index.html"/>
             <item name="0.5-incubating" href="./0.5.0-incubating/index.html"/>
         </menu>
 
-        <menu name="Resources">
-            <item name="Overview" href="index.html"/>
-            <item name="Getting Started" href="./QuickStart.html"/>
-            <item name="Architecture" href="./Architecture.html"/>
-            <item name="Installation" href="./InstallationSteps.html"/>
-            <item name="Type System" href="./TypeSystem.html"/>
-            <item name="Configuration" href="./Configuration.html"/>
-            <item name="Security" href="./Security.html"/>
-            <item name="Rest API" href="./api/rest.html"/>
-        </menu>
-
         <menu name="ASF">
             <item name="How Apache Works" href="http://www.apache.org/foundation/how-it-works.html"/>
             <item name="Foundation" href="http://www.apache.org/foundation/"/>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/docs/src/site/twiki/Architecture.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Architecture.twiki b/docs/src/site/twiki/Architecture.twiki
index 5f10fde..69449bd 100755
--- a/docs/src/site/twiki/Architecture.twiki
+++ b/docs/src/site/twiki/Architecture.twiki
@@ -2,5 +2,30 @@
 
 ---++ Introduction
 
----++ Metadata High Level Architecture - Overview
-<img src="architecture.png" height="400" width="600" />
+---++ Atlas High Level Architecture - Overview
+<img src="images/twiki/architecture.png" height="400" width="600" />
+
+
+---++ Bridges
+External components like hive/sqoop/storm/falcon should model their taxonomy using typesystem and register the types with Atlas. For every entity created in this external component, the corresponding entity should be registered in Atlas as well.
+This is typically done in a hook which runs in the external component and is called for every entity operation. Hook generally processes the entity asynchronously using a thread pool to avoid adding latency to the main operation.
+The hook can then build the entity and register the entity using Atlas REST APIs. Howerver, any failure in APIs because of network issue etc can in result entity not registered in Atlas and hence inconsistent metadata.
+
+Atlas exposes notification interface and can be used for reliable entity registration by hook as well. The hook can send notification message containing the list of entities to be registered.  Atlas service contains hook consumer that listens to these messages and registers the entities.
+
+Available bridges are:
+* [[hive/Bridge-Hive][Hive Bridge]]
+
+
+---++ Notification
+Notification is used for reliable entity registration from hooks and for entity/type change notifications. Atlas, by default, provides kafka integration, but its possible to provide other implementations as well. Atlas service starts embedded kafka server by default.
+
+Atlas also provides NotificationHookConsumer that runs in Atlas Service and listens to messages from hook and registers the entities in Atlas.
+<img src="images/twiki/notification.png" height="100" width="150" />
+
+
+
+
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/docs/src/site/twiki/Bridge-Hive.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Bridge-Hive.twiki b/docs/src/site/twiki/Bridge-Hive.twiki
new file mode 100644
index 0000000..5d3fd9a
--- /dev/null
+++ b/docs/src/site/twiki/Bridge-Hive.twiki
@@ -0,0 +1,83 @@
+---+ Hive Atlas Bridge
+
+---++ Hive Model
+The default hive modelling is available in org.apache.atlas.hive.model.HiveDataModelGenerator. It defines the following types:
+<verbatim>
+hive_object_type(EnumType) - values [GLOBAL, DATABASE, TABLE, PARTITION, COLUMN]
+hive_resource_type(EnumType) - values [JAR, FILE, ARCHIVE]
+hive_principal_type(EnumType) - values [USER, ROLE, GROUP]
+hive_db(ClassType) - super types [Referenceable] - attributes [name, clusterName, description, locationUri, parameters, ownerName, ownerType]
+hive_order(StructType) - attributes [col, order]
+hive_resourceuri(StructType) - attributes [resourceType, uri]
+hive_serde(StructType) - attributes [name, serializationLib, parameters]
+hive_type(ClassType) - super types [] - attributes [name, type1, type2, fields]
+hive_storagedesc(ClassType) - super types [Referenceable] - attributes [cols, location, inputFormat, outputFormat, compressed, numBuckets, serdeInfo, bucketCols, sortCols, parameters, storedAsSubDirectories]
+hive_role(ClassType) - super types [] - attributes [roleName, createTime, ownerName]
+hive_column(ClassType) - super types [Referenceable] - attributes [name, type, comment]
+hive_table(ClassType) - super types [DataSet] - attributes [tableName, db, owner, createTime, lastAccessTime, comment, retention, sd, partitionKeys, columns, parameters, viewOriginalText, viewExpandedText, tableType, temporary]
+hive_partition(ClassType) - super types [Referenceable] - attributes [values, table, createTime, lastAccessTime, sd, columns, parameters]
+hive_process(ClassType) - super types [Process] - attributes [startTime, endTime, userName, operationType, queryText, queryPlan, queryId, queryGraph]
+</verbatim>
+
+The entities are created and de-duped using unique qualified name. They provide namespace and can be used for querying as well:
+hive_db - attribute qualifiedName - clustername.dbname
+hive_table - attribute name - clustername.dbname.tablename
+hive_partition - attribute qualifiedName - clustername.dbname.tablename.partitionvalues
+hive_process - attribute qualifiedName - queryText
+
+
+---++ Importing Hive Metadata
+org.apache.atlas.hive.bridge.HiveMetaStoreBridge imports the hive metadata into Atlas using the model defined in org.apache.atlas.hive.model.HiveDataModelGenerator. import-hive.sh command can be used to facilitate this.
+Set-up the following configs in hive-site.xml of your hive set-up and set environment variable HIVE_CONFIG to the hive conf directory:
+   * Atlas endpoint - Add the following property with the Atlas endpoint for your set-up
+<verbatim>
+<property>
+  <name>atlas.rest.address</name>
+  <value>http://localhost:21000/</value>
+</property>
+<property>
+  <name>atlas.cluster.name</name>
+  <value>primary</value>
+</property>
+</verbatim>
+
+Usage: <atlas package>/bin/hive/import-hive.sh. The logs are in <atlas package>/logs/import-hive.log
+
+
+---++ Hive Hook
+Hive supports listeners on hive command execution using hive hooks. This is used to add/update/remove entities in Atlas using the model defined in org.apache.atlas.hive.model.HiveDataModelGenerator.
+The hook submits the request to a thread pool executor to avoid blocking the command execution. The thread submits the entities as message to the notification server and atlas server reads these messages and registers the entities.
+Follow these instructions in your hive set-up to add hive hook for Atlas:
+   * Set-up atlas hook and atlas endpoint in hive-site.xml:
+<verbatim>
+<property>
+  <name>hive.exec.post.hooks</name>
+  <value>org.apache.atlas.hive.hook.HiveHook</value>
+</property>
+</verbatim>
+<verbatim>
+<property>
+  <name>atlas.rest.address</name>
+  <value>http://localhost:21000/</value>
+</property>
+<property>
+  <name>atlas.cluster.name</name>
+  <value>primary</value>
+</property>
+</verbatim>
+   * Add 'export HIVE_AUX_JARS_PATH=<atlas package>/hook/hive' in hive-env.sh
+   * Copy <atlas package>/conf/application.properties to hive conf directory <hive package>/conf
+
+The following properties in hive-site.xml control the thread pool and notification details:
+   * atlas.hook.hive.synchronous - boolean, true to run the hook synchronously. default false
+   * atlas.hook.hive.numRetries - number of retries for notification failure. default 3
+   * atlas.hook.hive.minThreads - core number of threads. default 5
+   * atlas.hook.hive.maxThreads - maximum number of threads. default 5
+   * atlas.hook.hive.keepAliveTime - keep alive time in msecs. default 10
+
+Refer [[Configuration][Configuration]] for notification related configurations
+
+
+---++ Limitations
+   * Since database name, table name and column names are case insensitive in hive, the corresponding names in entities are lowercase. So, any search APIs should use lowercase while querying on the entity names
+   * Only the following hive operations are captured by hive hook currently - create database, create table, create view, CTAS, load, import, export, query, alter table rename and alter view rename
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/docs/src/site/twiki/Configuration.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Configuration.twiki b/docs/src/site/twiki/Configuration.twiki
index e2132cf..a94c6bb 100644
--- a/docs/src/site/twiki/Configuration.twiki
+++ b/docs/src/site/twiki/Configuration.twiki
@@ -49,6 +49,26 @@ atlas.lineage.hive.process.outputs.name=outputs
 atlas.lineage.hive.table.schema.query=hive_table where name=?, columns
 </verbatim>
 
+---+++ Notification Configs
+Refer http://kafka.apache.org/documentation.html#configuration for kafka configuration. All kafka configs should be prefixed with 'atlas.kafka.'
+
+<verbatim>
+atlas.notification.embedded=true
+atlas.kafka.data=${sys:atlas.home}/data/kafka
+atlas.kafka.zookeeper.connect=localhost:9026
+atlas.kafka.bootstrap.servers=localhost:9027
+atlas.kafka.zookeeper.session.timeout.ms=400
+atlas.kafka.zookeeper.sync.time.ms=20
+atlas.kafka.auto.commit.interval.ms=1000
+</verbatim>
+
+---+++ Client Configs
+<verbatim>
+atlas.client.readTimeoutMSecs=60000
+atlas.client.connectTimeoutMSecs=60000
+</verbatim>
+
+
 ---+++ Security Properties
 
 ---++++ SSL config

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/docs/src/site/twiki/InstallationSteps.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/InstallationSteps.twiki b/docs/src/site/twiki/InstallationSteps.twiki
index 3d0351c..ea81efe 100644
--- a/docs/src/site/twiki/InstallationSteps.twiki
+++ b/docs/src/site/twiki/InstallationSteps.twiki
@@ -14,7 +14,7 @@ Once the build successfully completes, artifacts can be packaged for deployment.
 
 <verbatim>
 
-mvn clean package -DskipTests -DskipCheck=true
+mvn clean package -Pdist
 
 </verbatim>
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/docs/src/site/twiki/QuickStart.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/QuickStart.twiki b/docs/src/site/twiki/QuickStart.twiki
index d899f95..9f3eb52 100644
--- a/docs/src/site/twiki/QuickStart.twiki
+++ b/docs/src/site/twiki/QuickStart.twiki
@@ -8,12 +8,12 @@ instance graph below.
 
 ---+++ Example Type Definitions
 
-<img src="guide-class-diagram.png"/>
+<img src="images/twiki/guide-class-diagram.png"/>
 
 
 ---+++ Example Instance Graph
 
-<img src="guide-instance-graph.png"/>
+<img src="images/twiki/guide-instance-graph.png"/>
 
 
 ---++ Running the example

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/docs/src/site/twiki/TypeSystem.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/TypeSystem.twiki b/docs/src/site/twiki/TypeSystem.twiki
index 78c3503..b42de92 100755
--- a/docs/src/site/twiki/TypeSystem.twiki
+++ b/docs/src/site/twiki/TypeSystem.twiki
@@ -5,10 +5,10 @@
 ---++ Overview
 
 ---+++ Data Types Overview
-<img src="data-types.png" height="400" width="600" />
+<img src="images/twiki/data-types.png" height="400" width="600" />
 
 ---+++ Types Instances Overview
-<img src="types-instances.png" height="400" width="600" />
+<img src="images/twiki/types-instances.png" height="400" width="600" />
 
 ---++ Details
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/docs/src/site/twiki/index.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/index.twiki b/docs/src/site/twiki/index.twiki
index 40d2350..d900865 100755
--- a/docs/src/site/twiki/index.twiki
+++ b/docs/src/site/twiki/index.twiki
@@ -47,6 +47,8 @@ allows integration with the whole enterprise data ecosystem.
    * [[Search][Search]]
    * [[security][security]]
    * [[Configuration][Configuration]]
+   * Bridges
+      * [[Bridge-Hive][Hive Bridge]]
 
 
 ---++ API Documentation
@@ -56,4 +58,4 @@ allows integration with the whole enterprise data ecosystem.
 #LicenseInfo
 ---+ Licensing Information
 
-Metadata (Atlas) is distributed under [[http://www.apache.org/licenses/LICENSE-2.0][Apache License 2.0]].
+Atlas is distributed under [[http://www.apache.org/licenses/LICENSE-2.0][Apache License 2.0]].

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/notification/pom.xml
----------------------------------------------------------------------
diff --git a/notification/pom.xml b/notification/pom.xml
index b036855..2e12520 100644
--- a/notification/pom.xml
+++ b/notification/pom.xml
@@ -27,7 +27,7 @@
         <version>0.6-incubating-SNAPSHOT</version>
     </parent>
     <artifactId>atlas-notification</artifactId>
-    <description>Apache Atlas Client</description>
+    <description>Apache Atlas Notification</description>
     <name>Apache Atlas Notification</name>
     <packaging>jar</packaging>
 
@@ -39,6 +39,11 @@
 
         <dependency>
             <groupId>org.apache.atlas</groupId>
+            <artifactId>atlas-common</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.atlas</groupId>
             <artifactId>atlas-typesystem</artifactId>
         </dependency>
 
@@ -53,8 +58,8 @@
         </dependency>
 
         <dependency>
-            <groupId>commons-configuration</groupId>
-            <artifactId>commons-configuration</artifactId>
+            <groupId>com.google.inject.extensions</groupId>
+            <artifactId>guice-multibindings</artifactId>
         </dependency>
 
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 9978275..7b3cf89 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -30,6 +30,7 @@ import org.apache.atlas.AtlasException;
 import org.apache.atlas.notification.NotificationConsumer;
 import org.apache.atlas.notification.NotificationException;
 import org.apache.atlas.notification.NotificationInterface;
+import org.apache.atlas.service.Service;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationConverter;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -49,6 +50,9 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -57,13 +61,11 @@ import java.util.Properties;
 import java.util.concurrent.Future;
 
 @Singleton
-public class KafkaNotification extends NotificationInterface {
+public class KafkaNotification extends NotificationInterface implements Service {
     public static final Logger LOG = LoggerFactory.getLogger(KafkaNotification.class);
 
-    public static final String PROPERTY_PREFIX = NotificationInterface.PROPERTY_PREFIX + ".kafka";
+    public static final String PROPERTY_PREFIX = "atlas.kafka";
 
-    private static final int ATLAS_ZK_PORT = 9026;
-    private static final int ATLAS_KAFKA_PORT = 9027;
     private static final String ATLAS_KAFKA_DATA = "data";
 
     public static final String ATLAS_HOOK_TOPIC = "ATLAS_HOOK";
@@ -92,9 +94,8 @@ public class KafkaNotification extends NotificationInterface {
         }
     }
 
-    @Override
-    public void initialize(Configuration applicationProperties) throws AtlasException {
-        super.initialize(applicationProperties);
+    public KafkaNotification(Configuration applicationProperties) throws AtlasException {
+        super(applicationProperties);
         Configuration subsetConfiguration =
                 ApplicationProperties.getSubsetConfiguration(applicationProperties, PROPERTY_PREFIX);
         properties = ConfigurationConverter.getProperties(subsetConfiguration);
@@ -118,42 +119,42 @@ public class KafkaNotification extends NotificationInterface {
         properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "roundrobin");
         properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "smallest");
 
-        if (isEmbedded()) {
-            properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + ATLAS_KAFKA_PORT);
-            properties.setProperty("zookeeper.connect", "localhost:" + ATLAS_ZK_PORT);
-        }
-
         //todo new APIs not available yet
 //        consumer = new KafkaConsumer(properties);
 //        consumer.subscribe(ATLAS_HOOK_TOPIC);
     }
 
-    @Override
-    protected void _startService() throws IOException {
-        startZk();
-        startKafka();
+    private URL getURL(String url) throws MalformedURLException {
+        try {
+            return new URL(url);
+        } catch(MalformedURLException e) {
+            return new URL("http://" + url);
+        }
     }
 
-    private String startZk() throws IOException {
-        //todo read zk endpoint from config
-        this.factory = NIOServerCnxnFactory.createFactory(new InetSocketAddress("0.0.0.0", ATLAS_ZK_PORT), 1024);
+    private String startZk() throws IOException, InterruptedException, URISyntaxException {
+        String zkValue = properties.getProperty("zookeeper.connect");
+        LOG.debug("Starting zookeeper at {}", zkValue);
+
+        URL zkAddress = getURL(zkValue);
+        this.factory = NIOServerCnxnFactory.createFactory(
+                new InetSocketAddress(zkAddress.getHost(), zkAddress.getPort()), 1024);
         File snapshotDir = constructDir("zk/txn");
         File logDir = constructDir("zk/snap");
 
-        try {
-            factory.startup(new ZooKeeperServer(snapshotDir, logDir, 500));
-        } catch (InterruptedException e) {
-            throw new IOException(e);
-        }
+        factory.startup(new ZooKeeperServer(snapshotDir, logDir, 500));
         return factory.getLocalAddress().getAddress().toString();
     }
 
-    private void startKafka() {
+    private void startKafka() throws IOException, URISyntaxException {
+        String kafkaValue = properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
+        LOG.debug("Starting kafka at {}", kafkaValue);
+        URL kafkaAddress = getURL(kafkaValue);
+
         Properties brokerConfig = properties;
         brokerConfig.setProperty("broker.id", "1");
-        //todo read kafka endpoint from config
-        brokerConfig.setProperty("host.name", "0.0.0.0");
-        brokerConfig.setProperty("port", String.valueOf(ATLAS_KAFKA_PORT));
+        brokerConfig.setProperty("host.name", kafkaAddress.getHost());
+        brokerConfig.setProperty("port", String.valueOf(kafkaAddress.getPort()));
         brokerConfig.setProperty("log.dirs", constructDir("kafka").getAbsolutePath());
         brokerConfig.setProperty("log.flush.interval.messages", String.valueOf(1));
 
@@ -162,6 +163,29 @@ public class KafkaNotification extends NotificationInterface {
         LOG.debug("Embedded kafka server started with broker config {}", brokerConfig);
     }
 
+    @Override
+    public void start() throws AtlasException {
+        if (isEmbedded()) {
+            try {
+                startZk();
+                startKafka();
+            } catch(Exception e) {
+                throw new AtlasException("Failed to start embedded kafka", e);
+            }
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (kafkaServer != null) {
+            kafkaServer.shutdown();
+        }
+
+        if (factory != null) {
+            factory.shutdown();
+        }
+    }
+
     private static class SystemTime implements Time {
         @Override
         public long milliseconds() {
@@ -192,29 +216,6 @@ public class KafkaNotification extends NotificationInterface {
     }
 
     @Override
-    public void _shutdown() {
-        if (producer != null) {
-            producer.close();
-        }
-
-        if (consumer != null) {
-            consumer.close();
-        }
-
-        for (ConsumerConnector consumerConnector : consumerConnectors) {
-            consumerConnector.shutdown();
-        }
-
-        if (kafkaServer != null) {
-            kafkaServer.shutdown();
-        }
-
-        if (factory != null) {
-            factory.shutdown();
-        }
-    }
-
-    @Override
     public List<NotificationConsumer> createConsumers(NotificationType type, int numConsumers) {
         String topic = topicMap.get(type);
 
@@ -261,6 +262,24 @@ public class KafkaNotification extends NotificationInterface {
         }
     }
 
+    @Override
+    public void close() {
+        if (producer != null) {
+            producer.close();
+            producer = null;
+        }
+
+        if (consumer != null) {
+            consumer.close();
+            consumer = null;
+        }
+
+        for (ConsumerConnector consumerConnector : consumerConnectors) {
+            consumerConnector.shutdown();
+        }
+        consumerConnectors.clear();
+    }
+
     //New API, not used now
     private List<String> receive(long timeout) throws NotificationException {
         Map<String, ConsumerRecords> recordsMap = consumer.poll(timeout);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotificationProvider.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotificationProvider.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotificationProvider.java
new file mode 100644
index 0000000..c97c726
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotificationProvider.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.kafka;
+
+import com.google.inject.Provider;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasException;
+import org.apache.commons.configuration.Configuration;
+
+public class KafkaNotificationProvider implements Provider<KafkaNotification> {
+    @Override
+    @Provides
+    @Singleton
+    public KafkaNotification get() {
+        try {
+            Configuration applicationProperties = ApplicationProperties.get();
+            KafkaNotification instance = new KafkaNotification(applicationProperties);
+            return instance;
+        } catch(AtlasException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/1bfda02a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 36a62f0..29194a4 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -18,37 +18,43 @@
 package org.apache.atlas.notification;
 
 import com.google.inject.Inject;
+import com.google.inject.Singleton;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasException;
-import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.service.Service;
 import org.apache.commons.configuration.Configuration;
+import org.codehaus.jettison.json.JSONArray;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
-public class NotificationHookConsumer {
+/**
+ * Consumer of notifications from hooks e.g., hive hook etc
+ */
+@Singleton
+public class NotificationHookConsumer implements Service {
     private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class);
 
     public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
     public static final String ATLAS_ENDPOINT_PROPERTY = "atlas.rest.address";
 
     @Inject
-    private static NotificationInterface notificationInterface;
-
-    private static ExecutorService executors;
-    private static AtlasClient atlasClient;
+    private NotificationInterface notificationInterface;
+    private ExecutorService executors;
+    private AtlasClient atlasClient;
 
-    public static void start() throws AtlasException {
+    @Override
+    public void start() throws AtlasException {
         Configuration applicationProperties = ApplicationProperties.get();
-        notificationInterface.initialize(applicationProperties);
 
         String atlasEndpoint = applicationProperties.getString(ATLAS_ENDPOINT_PROPERTY, "http://localhost:21000");
         atlasClient = new AtlasClient(atlasEndpoint);
-        int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 2);
+        int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
         List<NotificationConsumer> consumers =
                 notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads);
         executors = Executors.newFixedThreadPool(consumers.size());
@@ -58,12 +64,20 @@ public class NotificationHookConsumer {
         }
     }
 
-    public static void stop() {
-        notificationInterface.shutdown();
-        executors.shutdown();
+    @Override
+    public void stop() {
+        //Allow for completion of outstanding work
+        notificationInterface.close();
+        try {
+            if (executors != null && !executors.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
+                LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly");
+            }
+        } catch (InterruptedException e) {
+            LOG.error("Failure in shutting down consumers");
+        }
     }
 
-    static class HookConsumer implements Runnable {
+    class HookConsumer implements Runnable {
         private final NotificationConsumer consumer;
 
         public HookConsumer(NotificationConsumer consumerInterface) {
@@ -74,12 +88,13 @@ public class NotificationHookConsumer {
         public void run() {
             while(consumer.hasNext()) {
                 String entityJson = consumer.next();
-                LOG.debug("Processing message {}", entityJson);
+                LOG.info("Processing message {}", entityJson);
                 try {
-                    atlasClient.createEntity(entityJson);
-                } catch (AtlasServiceException e) {
+                    JSONArray guids = atlasClient.createEntity(new JSONArray(entityJson));
+                    LOG.info("Create entities with guid {}", guids);
+                } catch (Exception e) {
                     //todo handle failures
-                    LOG.warn("Error handling message {}", entityJson);
+                    LOG.warn("Error handling message {}", entityJson, e);
                 }
             }
         }



Mime
View raw message