atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shweth...@apache.org
Subject [5/5] incubator-atlas git commit: ATLAS-622 Introduce soft delete (shwethags)
Date Sat, 16 Apr 2016 03:58:14 GMT
ATLAS-622 Introduce soft delete (shwethags)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/40ee9492
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/40ee9492
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/40ee9492

Branch: refs/heads/master
Commit: 40ee9492192f8ea3afc4be7f6fa55803214e8247
Parents: daf812a
Author: Shwetha GS <sshivalingamurthy@hortonworks.com>
Authored: Sat Apr 16 09:14:01 2016 +0530
Committer: Shwetha GS <sshivalingamurthy@hortonworks.com>
Committed: Sat Apr 16 09:15:15 2016 +0530

----------------------------------------------------------------------
 .../atlas/hive/bridge/HiveMetaStoreBridge.java  |   2 +-
 .../org/apache/atlas/hive/hook/HiveHook.java    |   2 +-
 .../org/apache/atlas/hive/hook/HiveHookIT.java  | 515 ++++++-----
 .../main/java/org/apache/atlas/AtlasClient.java |  10 +-
 .../org/apache/atlas/ApplicationProperties.java |  10 +
 release-log.txt                                 |   1 +
 .../apache/atlas/RepositoryMetadataModule.java  |  11 +
 .../repository/audit/EntityAuditListener.java   |  10 +-
 .../atlas/repository/graph/AtlasEdgeLabel.java  |   3 -
 .../atlas/repository/graph/DeleteHandler.java   | 375 ++++++++
 .../graph/GraphBackedMetadataRepository.java    |  81 +-
 .../graph/GraphBackedSearchIndexer.java         |  10 +-
 .../atlas/repository/graph/GraphHelper.java     | 204 ++--
 .../graph/GraphToTypedInstanceMapper.java       | 146 +--
 .../repository/graph/HardDeleteHandler.java     |  45 +
 .../repository/graph/SoftDeleteHandler.java     |  55 ++
 .../graph/TypedInstanceToGraphMapper.java       | 919 ++++++-------------
 .../typestore/GraphBackedTypeStore.java         |   5 +-
 .../apache/atlas/BaseHiveRepositoryTest.java    |   1 +
 .../test/java/org/apache/atlas/TestUtils.java   |  78 +-
 .../GraphBackedDiscoveryServiceTest.java        |  14 +-
 ...kedMetadataRepositoryDeleteEntitiesTest.java | 613 -------------
 ...hBackedMetadataRepositoryDeleteTestBase.java | 772 ++++++++++++++++
 .../GraphBackedMetadataRepositoryTest.java      |  96 +-
 .../GraphBackedRepositoryHardDeleteTest.java    | 121 +++
 .../GraphBackedRepositorySoftDeleteTest.java    | 121 +++
 .../graph/GraphRepoMapperScaleTest.java         |   7 +
 .../typestore/GraphBackedTypeStoreTest.java     |   8 +-
 .../service/DefaultMetadataServiceTest.java     | 204 ++--
 .../org/apache/atlas/query/GremlinTest.scala    |   2 +-
 .../org/apache/atlas/query/GremlinTest2.scala   |   2 +-
 .../apache/atlas/query/LineageQueryTest.scala   |   2 +-
 .../apache/atlas/query/QueryTestsUtils.scala    |   2 +-
 .../java/org/apache/atlas/RequestContext.java   |  55 ++
 .../org/apache/atlas/typesystem/IInstance.java  |   1 +
 .../apache/atlas/typesystem/Referenceable.java  |   5 +
 .../org/apache/atlas/typesystem/Struct.java     |   5 +
 .../persistence/DownCastStructInstance.java     |   5 +
 .../apache/atlas/typesystem/persistence/Id.java |  22 +-
 .../persistence/ReferenceableInstance.java      |  13 +
 .../typesystem/persistence/StructInstance.java  |   5 +
 .../apache/atlas/ApplicationPropertiesTest.java |  29 +-
 .../test/resources/atlas-application.properties |   4 +-
 .../NotificationHookConsumerIT.java             |  10 +-
 .../web/resources/EntityJerseyResourceIT.java   |  33 +-
 45 files changed, 2666 insertions(+), 1968 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/40ee9492/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
index f007a32..6eb5194 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
@@ -459,7 +459,7 @@ public class HiveMetaStoreBridge {
         final String[] parts = tableQualifiedName.split("@");
         final String tableName = parts[0];
         final String clusterName = parts[1];
-        return String.format("%s.%s@%s", tableName, colName, clusterName);
+        return String.format("%s.%s@%s", tableName, colName.toLowerCase(), clusterName);
     }
 
     public List<Referenceable> getColumns(List<FieldSchema> schemaList, String tableQualifiedName) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/40ee9492/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index b9f00fd..4ee15d6 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -481,7 +481,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
         return entitiesCreatedOrUpdated;
     }
 
-    private String normalize(String str) {
+    public static String normalize(String str) {
         if (StringUtils.isEmpty(str)) {
             return null;
         }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/40ee9492/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
index 683f43c..5fda0d3 100755
--- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
+++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
@@ -20,8 +20,10 @@ package org.apache.atlas.hive.hook;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
+import com.sun.jersey.api.client.ClientResponse;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.fs.model.FSDataTypes;
 import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
 import org.apache.atlas.hive.model.HiveDataModelGenerator;
@@ -32,7 +34,6 @@ import org.apache.atlas.typesystem.persistence.Id;
 import org.apache.atlas.utils.ParamChecker;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.RandomStringUtils;
-import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -41,7 +42,6 @@ import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.hooks.Entity;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
-import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.testng.Assert;
@@ -53,26 +53,24 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.atlas.hive.hook.HiveHook.normalize;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.fail;
 
 public class HiveHookIT {
-    public static final Logger LOG = org.slf4j.LoggerFactory.getLogger(HiveHookIT.class);
+    private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(HiveHookIT.class);
 
     private static final String DGI_URL = "http://localhost:21000/";
     private static final String CLUSTER_NAME = "test";
     public static final String DEFAULT_DB = "default";
     private Driver driver;
-    private AtlasClient dgiCLient;
+    private AtlasClient atlasClient;
     private SessionState ss;
     
     private static final String INPUTS = AtlasClient.PROCESS_ATTRIBUTE_INPUTS;
     private static final String OUTPUTS = AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS;
 
-    private enum QUERY_TYPE {
-        GREMLIN,
-        DSL
-    }
-
     @BeforeClass
     public void setUp() throws Exception {
         //Set-up hive session
@@ -87,9 +85,9 @@ public class HiveHookIT {
         SessionState.setCurrentSessionState(ss);
 
         Configuration configuration = ApplicationProperties.get();
-        dgiCLient = new AtlasClient(configuration.getString(HiveMetaStoreBridge.ATLAS_ENDPOINT, DGI_URL));
+        atlasClient = new AtlasClient(configuration.getString(HiveMetaStoreBridge.ATLAS_ENDPOINT, DGI_URL));
 
-        HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(conf, dgiCLient);
+        HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(conf, atlasClient);
         hiveMetaStoreBridge.registerHiveDataModel();
 
     }
@@ -107,7 +105,7 @@ public class HiveHookIT {
         runCommand("create database " + dbName + " WITH DBPROPERTIES ('p1'='v1', 'p2'='v2')");
         String dbId = assertDatabaseIsRegistered(dbName);
 
-        Referenceable definition = dgiCLient.getEntity(dbId);
+        Referenceable definition = atlasClient.getEntity(dbId);
         Map params = (Map) definition.get(HiveDataModelGenerator.PARAMETERS);
         Assert.assertNotNull(params);
         Assert.assertEquals(params.size(), 2);
@@ -115,11 +113,13 @@ public class HiveHookIT {
 
         //There should be just one entity per dbname
         runCommand("drop database " + dbName);
+        assertDBIsNotRegistered(dbName);
+
         runCommand("create database " + dbName);
         String dbid = assertDatabaseIsRegistered(dbName);
 
         //assert on qualified name
-        Referenceable dbEntity = dgiCLient.getEntity(dbid);
+        Referenceable dbEntity = atlasClient.getEntity(dbid);
         Assert.assertEquals(dbEntity.get("qualifiedName"), dbName.toLowerCase() + "@" + CLUSTER_NAME);
 
     }
@@ -149,7 +149,7 @@ public class HiveHookIT {
     private String createTable(boolean isPartitioned) throws Exception {
         String tableName = tableName();
         runCommand("create table " + tableName + "(id int, name string) comment 'table comment' " + (isPartitioned ?
-            " partitioned by(dt string)" : ""));
+                " partitioned by(dt string)" : ""));
         return tableName;
     }
 
@@ -174,14 +174,15 @@ public class HiveHookIT {
         assertTableIsRegistered(dbName, tableName);
 
         //there is only one instance of column registered
-        String colId = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableName), colName));
-        Referenceable colEntity = dgiCLient.getEntity(colId);
+        String colId = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(
+                HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableName), colName));
+        Referenceable colEntity = atlasClient.getEntity(colId);
         Assert.assertEquals(colEntity.get("qualifiedName"), String.format("%s.%s.%s@%s", dbName.toLowerCase(),
                 tableName.toLowerCase(), colName.toLowerCase(), CLUSTER_NAME));
 
         tableName = createTable();
         String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
-        Referenceable tableRef = dgiCLient.getEntity(tableId);
+        Referenceable tableRef = atlasClient.getEntity(tableId);
         Assert.assertEquals(tableRef.get("tableType"), TableType.MANAGED_TABLE.name());
         Assert.assertEquals(tableRef.get(HiveDataModelGenerator.COMMENT), "table comment");
         String entityName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName);
@@ -227,17 +228,19 @@ public class HiveHookIT {
     }
 
     private String assertColumnIsRegistered(String colName) throws Exception {
-        LOG.debug("Searching for column {}", colName.toLowerCase());
-        String query =
-                String.format("%s where qualifiedName = '%s'", HiveDataTypes.HIVE_COLUMN.getName(), colName.toLowerCase());
-        return assertEntityIsRegistered(query);
+        return assertColumnIsRegistered(colName, null);
+    }
+
+    private String assertColumnIsRegistered(String colName, AssertPredicate assertPredicate) throws Exception {
+        LOG.debug("Searching for column {}", colName);
+        return assertEntityIsRegistered(HiveDataTypes.HIVE_COLUMN.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
+                colName, assertPredicate);
     }
 
     private void assertColumnIsNotRegistered(String colName) throws Exception {
         LOG.debug("Searching for column {}", colName);
-        String query =
-            String.format("%s where qualifiedName = '%s'", HiveDataTypes.HIVE_COLUMN.getName(), colName.toLowerCase());
-        assertEntityIsNotRegistered(QUERY_TYPE.DSL, query);
+        assertEntityIsNotRegistered(HiveDataTypes.HIVE_COLUMN.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
+                colName);
     }
 
     @Test
@@ -277,7 +280,7 @@ public class HiveHookIT {
 
         //Check lineage which includes table1
         String datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName);
-        JSONObject response = dgiCLient.getInputGraph(datasetName);
+        JSONObject response = atlasClient.getInputGraph(datasetName);
         JSONObject vertices = response.getJSONObject("values").getJSONObject("vertices");
         Assert.assertTrue(vertices.has(viewId));
         Assert.assertTrue(vertices.has(table1Id));
@@ -293,7 +296,7 @@ public class HiveHookIT {
         Assert.assertEquals(assertTableIsRegistered(DEFAULT_DB, viewName), viewId);
 
         datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName);
-        response = dgiCLient.getInputGraph(datasetName);
+        response = atlasClient.getInputGraph(datasetName);
         vertices = response.getJSONObject("values").getJSONObject("vertices");
         Assert.assertTrue(vertices.has(viewId));
 
@@ -304,7 +307,7 @@ public class HiveHookIT {
         Assert.assertTrue(vertices.has(table1Id));
 
         //Outputs dont exist
-        response = dgiCLient.getOutputGraph(datasetName);
+        response = atlasClient.getOutputGraph(datasetName);
         vertices = response.getJSONObject("values").getJSONObject("vertices");
         Assert.assertEquals(vertices.length(), 0);
     }
@@ -358,7 +361,7 @@ public class HiveHookIT {
 
     private Referenceable validateProcess(String query, int numInputs, int numOutputs) throws Exception {
         String processId = assertProcessIsRegistered(query);
-        Referenceable process = dgiCLient.getEntity(processId);
+        Referenceable process = atlasClient.getEntity(processId);
         if (numInputs == 0) {
             Assert.assertNull(process.get(INPUTS));
         } else {
@@ -376,7 +379,7 @@ public class HiveHookIT {
 
     private Referenceable validateProcess(String query, String[] inputs, String[] outputs) throws Exception {
         String processId = assertProcessIsRegistered(query);
-        Referenceable process = dgiCLient.getEntity(processId);
+        Referenceable process = atlasClient.getEntity(processId);
         if (inputs == null) {
             Assert.assertNull(process.get(INPUTS));
         } else {
@@ -406,7 +409,7 @@ public class HiveHookIT {
         String inputTableId = assertTableIsRegistered(DEFAULT_DB, tableName);
         String opTableId = assertTableIsRegistered(DEFAULT_DB, insertTableName);
 
-        validateProcess(query, new String[] {inputTableId}, new String[] {opTableId});
+        validateProcess(query, new String[]{inputTableId}, new String[]{opTableId});
     }
 
     @Test
@@ -450,7 +453,7 @@ public class HiveHookIT {
 
         String ipTableId = assertTableIsRegistered(DEFAULT_DB, tableName);
         String opTableId = assertTableIsRegistered(DEFAULT_DB, insertTableName);
-        validateProcess(query, new String[] {ipTableId}, new String[] {opTableId});
+        validateProcess(query, new String[]{ipTableId}, new String[]{opTableId});
     }
 
     @Test
@@ -465,7 +468,7 @@ public class HiveHookIT {
 
         String ipTableId = assertTableIsRegistered(DEFAULT_DB, tableName);
         String opTableId = assertTableIsRegistered(DEFAULT_DB, insertTableName);
-        validateProcess(query, new String[] {ipTableId}, new String[] {opTableId});
+        validateProcess(query, new String[]{ipTableId}, new String[]{opTableId});
     }
 
     private String random() {
@@ -571,7 +574,7 @@ public class HiveHookIT {
 
     private List<Referenceable> getColumns(String dbName, String tableName) throws Exception {
         String tableId = assertTableIsRegistered(dbName, tableName);
-        Referenceable tableRef = dgiCLient.getEntity(tableId);
+        Referenceable tableRef = atlasClient.getEntity(tableId);
         return ((List<Referenceable>)tableRef.get(HiveDataModelGenerator.COLUMNS));
     }
 
@@ -582,7 +585,9 @@ public class HiveHookIT {
         String query = "alter table " + tableName + " add columns (" + column + " string)";
         runCommand(query);
 
-        assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), column));
+        assertColumnIsRegistered(HiveMetaStoreBridge
+                .getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName),
+                        column));
 
         //Verify the number of columns present in the table
         final List<Referenceable> columns = getColumns(DEFAULT_DB, tableName);
@@ -595,13 +600,21 @@ public class HiveHookIT {
         final String colDropped = "id";
         String query = "alter table " + tableName + " replace columns (name string)";
         runCommand(query);
-        assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), colDropped));
+
+        assertColumnIsNotRegistered(HiveMetaStoreBridge
+                .getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName),
+                        colDropped));
 
         //Verify the number of columns present in the table
-        final List<Referenceable> columns = getColumns(DEFAULT_DB, tableName);
-        Assert.assertEquals(columns.size(), 1);
+        assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
+            @Override
+            public void assertOnEntity(Referenceable tableRef) throws Exception {
+                List<Referenceable> columns = (List<Referenceable>) tableRef.get(HiveDataModelGenerator.COLUMNS);
+                Assert.assertEquals(columns.size(), 1);
+                Assert.assertEquals(columns.get(0).get(HiveDataModelGenerator.NAME), "name");
 
-        Assert.assertEquals(columns.get(0).get(HiveDataModelGenerator.NAME), "name");
+            }
+        });
     }
 
     @Test
@@ -612,12 +625,15 @@ public class HiveHookIT {
         String tableName = createTable();
         String query = String.format("alter table %s change %s %s string", tableName, oldColName, newColName);
         runCommand(query);
-        assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
-        assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName));
+        assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(
+                HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
+        assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(
+                HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName));
 
         //Verify the number of columns present in the table
         List<Referenceable> columns = getColumns(DEFAULT_DB, tableName);
         Assert.assertEquals(columns.size(), 2);
+
         //Change column type
         oldColName = "name1";
         newColName = "name2";
@@ -627,46 +643,70 @@ public class HiveHookIT {
 
         columns = getColumns(DEFAULT_DB, tableName);
         Assert.assertEquals(columns.size(), 2);
-        assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
-
-        String newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName);
-        assertColumnIsRegistered(newColQualifiedName);
+        assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(
+                HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
 
-        Assert.assertEquals(columns.get(1).get("type"), "int");
+        String newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(
+                HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName);
+        assertColumnIsRegistered(newColQualifiedName, new AssertPredicate() {
+            @Override
+            public void assertOnEntity(Referenceable entity) throws Exception {
+                assertEquals(entity.get("type"), "int");
+            }
+        });
 
         //Change name and add comment
         oldColName = "name2";
         newColName = "name3";
         final String comment = "added comment";
-        query = String.format("alter table %s change column %s %s %s COMMENT '%s' after id", tableName, oldColName, newColName, newColType, comment);
+        query = String.format("alter table %s change column %s %s %s COMMENT '%s' after id", tableName, oldColName,
+                newColName, newColType, comment);
         runCommand(query);
 
         columns = getColumns(DEFAULT_DB, tableName);
         Assert.assertEquals(columns.size(), 2);
 
-        assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
-        newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName);
-        assertColumnIsRegistered(newColQualifiedName);
+        assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(
+                HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
+        newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(
+                HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName);
 
-        Assert.assertEquals(columns.get(1).get(HiveDataModelGenerator.COMMENT), comment);
+        assertColumnIsRegistered(newColQualifiedName, new AssertPredicate() {
+            @Override
+            public void assertOnEntity(Referenceable entity) throws Exception {
+                assertEquals(entity.get(HiveDataModelGenerator.COMMENT), comment);
+            }
+        });
 
         //Change column position
         oldColName = "name3";
         newColName = "name4";
-        query = String.format("alter table %s change column %s %s %s first", tableName, oldColName, newColName, newColType);
+        query = String.format("alter table %s change column %s %s %s first", tableName, oldColName, newColName,
+                newColType);
         runCommand(query);
 
         columns = getColumns(DEFAULT_DB, tableName);
         Assert.assertEquals(columns.size(), 2);
 
-        assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
-        newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName);
+        assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(
+                HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
+
+        newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(
+                HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName);
         assertColumnIsRegistered(newColQualifiedName);
 
-        //Change col position again
-        Assert.assertEquals(columns.get(0).get(HiveDataModelGenerator.NAME), newColName);
-        Assert.assertEquals(columns.get(1).get(HiveDataModelGenerator.NAME), "id");
+        final String finalNewColName = newColName;
+        assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
+                    @Override
+                    public void assertOnEntity(Referenceable entity) throws Exception {
+                        List<Referenceable> columns = (List<Referenceable>) entity.get(HiveDataModelGenerator.COLUMNS);
+                        assertEquals(columns.get(0).get(HiveDataModelGenerator.NAME), finalNewColName);
+                        assertEquals(columns.get(1).get(HiveDataModelGenerator.NAME), "id");
+                    }
+                }
+        );
 
+        //Change col position again
         oldColName = "name4";
         newColName = "name5";
         query = String.format("alter table %s change column %s %s %s after id", tableName, oldColName, newColName, newColType);
@@ -675,16 +715,27 @@ public class HiveHookIT {
         columns = getColumns(DEFAULT_DB, tableName);
         Assert.assertEquals(columns.size(), 2);
 
-        assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
-        newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName);
+        assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(
+                HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName));
+
+        newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(
+                HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName);
         assertColumnIsRegistered(newColQualifiedName);
 
         //Check col position
-        Assert.assertEquals(columns.get(1).get(HiveDataModelGenerator.NAME), newColName);
-        Assert.assertEquals(columns.get(0).get(HiveDataModelGenerator.NAME), "id");
+        final String finalNewColName2 = newColName;
+        assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
+                    @Override
+                    public void assertOnEntity(Referenceable entity) throws Exception {
+                        List<Referenceable> columns = (List<Referenceable>) entity.get(HiveDataModelGenerator.COLUMNS);
+                        assertEquals(columns.get(1).get(HiveDataModelGenerator.NAME), finalNewColName2);
+                        assertEquals(columns.get(0).get(HiveDataModelGenerator.NAME), "id");
+                    }
+                }
+        );
     }
 
-    @Test()
+    @Test
     public void testTruncateTable() throws Exception {
         String tableName = createTable(false);
         String query = String.format("truncate table %s", tableName);
@@ -695,7 +746,7 @@ public class HiveHookIT {
 
         //Check lineage
         String datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName);
-        JSONObject response = dgiCLient.getInputGraph(datasetName);
+        JSONObject response = atlasClient.getInputGraph(datasetName);
         JSONObject vertices = response.getJSONObject("values").getJSONObject("vertices");
         //Below should be assertTrue - Fix https://issues.apache.org/jira/browse/ATLAS-653
         Assert.assertFalse(vertices.has(tableId));
@@ -708,15 +759,24 @@ public class HiveHookIT {
         String query = String.format("ALTER TABLE %s PARTITION COLUMN (dt %s)", tableName, newType);
         runCommand(query);
 
-        final String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
-        final String dtColId = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), "dt"));
-        Referenceable table = dgiCLient.getEntity(tableId);
-        Referenceable column = dgiCLient.getEntity(dtColId);
-        Assert.assertEquals(column.get("type"), newType);
+        String colQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(
+                HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), "dt");
+        final String dtColId = assertColumnIsRegistered(colQualifiedName, new AssertPredicate() {
+            @Override
+            public void assertOnEntity(Referenceable column) throws Exception {
+                Assert.assertEquals(column.get("type"), newType);
+            }
+        });
 
-        final List<Referenceable> partitionKeys = (List<Referenceable>) table.get("partitionKeys");
-        Assert.assertEquals(partitionKeys.size(), 1);
-        Assert.assertEquals(partitionKeys.get(0).getId()._getId(), dtColId);
+        assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
+            @Override
+            public void assertOnEntity(Referenceable table) throws Exception {
+                final List<Referenceable> partitionKeys = (List<Referenceable>) table.get("partitionKeys");
+                Assert.assertEquals(partitionKeys.size(), 1);
+                Assert.assertEquals(partitionKeys.get(0).getId()._getId(), dtColId);
+
+            }
+        });
     }
 
     @Test
@@ -742,17 +802,18 @@ public class HiveHookIT {
         String query = "alter table " + tableName + " set location '" + testPath + "'";
         runCommand(query);
 
-        String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
-        //Verify the number of columns present in the table
-        Referenceable tableRef = dgiCLient.getEntity(tableId);
-        Referenceable sdRef = (Referenceable)tableRef.get(HiveDataModelGenerator.STORAGE_DESC);
-        Assert.assertEquals(sdRef.get("location"), testPath);
+        String tableId = assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
+            @Override
+            public void assertOnEntity(Referenceable tableRef) throws Exception {
+                Referenceable sdRef = (Referenceable) tableRef.get(HiveDataModelGenerator.STORAGE_DESC);
+                Assert.assertEquals(sdRef.get("location"), testPath);
+            }
+        });
 
         Referenceable processReference = validateProcess(query, 1, 1);
         validateHDFSPaths(processReference, testPath, INPUTS);
 
         validateOutputTables(processReference, tableId);
-
     }
 
     private String validateHDFSPaths(Referenceable processReference, String testPath, String attributeName) throws Exception {
@@ -762,7 +823,7 @@ public class HiveHookIT {
         String hdfsPathId = assertHDFSPathIsRegistered(testPathNormed);
         Assert.assertEquals(hdfsPathRefs.get(0)._getId(), hdfsPathId);
 
-        Referenceable hdfsPathRef = dgiCLient.getEntity(hdfsPathId);
+        Referenceable hdfsPathRef = atlasClient.getEntity(hdfsPathId);
         Assert.assertEquals(hdfsPathRef.get("path"), testPathNormed);
         Assert.assertEquals(hdfsPathRef.get("name"), testPathNormed);
 //        Assert.assertEquals(hdfsPathRef.get("name"), new Path(testPath).getName());
@@ -771,14 +832,9 @@ public class HiveHookIT {
         return hdfsPathRef.getId()._getId();
     }
 
-
     private String assertHDFSPathIsRegistered(String path) throws Exception {
-        final String typeName = FSDataTypes.HDFS_PATH().toString();
-        final String parentTypeName = FSDataTypes.FS_PATH().toString();
-        String gremlinQuery =
-            String.format("g.V.has('__typeName', '%s').has('%s.path', \"%s\").toList()", typeName, parentTypeName,
-                normalize(path));
-        return assertEntityIsRegistered(gremlinQuery);
+        LOG.debug("Searching for hdfs path {}", path);
+        return assertEntityIsRegistered(FSDataTypes.HDFS_PATH().toString(), "name", path, null);
     }
 
     @Test
@@ -788,18 +844,25 @@ public class HiveHookIT {
         String query = "alter table " + tableName + " set FILEFORMAT " + testFormat;
         runCommand(query);
 
-        String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
-
-        Referenceable tableRef = dgiCLient.getEntity(tableId);
-        Referenceable sdRef = (Referenceable)tableRef.get(HiveDataModelGenerator.STORAGE_DESC);
-        Assert.assertEquals(sdRef.get(HiveDataModelGenerator.STORAGE_DESC_INPUT_FMT), "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat");
-        Assert.assertEquals(sdRef.get(HiveDataModelGenerator.STORAGE_DESC_OUTPUT_FMT), "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat");
-        Assert.assertNotNull(sdRef.get("serdeInfo"));
-
-        Struct serdeInfo = (Struct) sdRef.get("serdeInfo");
-        Assert.assertEquals(serdeInfo.get("serializationLib"), "org.apache.hadoop.hive.ql.io.orc.OrcSerde");
-        Assert.assertNotNull(serdeInfo.get(HiveDataModelGenerator.PARAMETERS));
-        Assert.assertEquals(((Map<String, String>)serdeInfo.get(HiveDataModelGenerator.PARAMETERS)).get("serialization.format"), "1");
+        assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
+            @Override
+            public void assertOnEntity(Referenceable tableRef) throws Exception {
+                Referenceable sdRef = (Referenceable) tableRef.get(HiveDataModelGenerator.STORAGE_DESC);
+                Assert.assertEquals(sdRef.get(HiveDataModelGenerator.STORAGE_DESC_INPUT_FMT),
+                        "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat");
+                Assert.assertEquals(sdRef.get(HiveDataModelGenerator.STORAGE_DESC_OUTPUT_FMT),
+                        "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat");
+                Assert.assertNotNull(sdRef.get("serdeInfo"));
+
+                Struct serdeInfo = (Struct) sdRef.get("serdeInfo");
+                Assert.assertEquals(serdeInfo.get("serializationLib"), "org.apache.hadoop.hive.ql.io.orc.OrcSerde");
+                Assert.assertNotNull(serdeInfo.get(HiveDataModelGenerator.PARAMETERS));
+                Assert.assertEquals(
+                        ((Map<String, String>) serdeInfo.get(HiveDataModelGenerator.PARAMETERS))
+                                .get("serialization.format"),
+                        "1");
+            }
+        });
 
 
         /**
@@ -807,7 +870,7 @@ public class HiveHookIT {
          * query = "alter table " + tableName + " STORED AS " + testFormat.toUpperCase();
          * runCommand(query);
 
-         * tableRef = dgiCLient.getEntity(tableId);
+         * tableRef = atlasClient.getEntity(tableId);
          * sdRef = (Referenceable)tableRef.get(HiveDataModelGenerator.STORAGE_DESC);
          * Assert.assertEquals(sdRef.get(HiveDataModelGenerator.STORAGE_DESC_INPUT_FMT), "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat");
          * Assert.assertEquals(sdRef.get(HiveDataModelGenerator.STORAGE_DESC_OUTPUT_FMT), "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat");
@@ -818,31 +881,37 @@ public class HiveHookIT {
     @Test
     public void testAlterTableBucketingClusterSort() throws Exception {
         String tableName = createTable();
-        ImmutableList<String> cols = ImmutableList.<String>of("id");
+        ImmutableList<String> cols = ImmutableList.of("id");
         runBucketSortQuery(tableName, 5, cols, cols);
 
-        cols = ImmutableList.<String>of("id", "name");
+        cols = ImmutableList.of("id", "name");
         runBucketSortQuery(tableName, 2, cols, cols);
     }
 
-    private void runBucketSortQuery(String tableName, int numBuckets,  ImmutableList<String> bucketCols,ImmutableList<String> sortCols) throws Exception {
+    private void runBucketSortQuery(String tableName, final int numBuckets,  final ImmutableList<String> bucketCols,
+                                    final ImmutableList<String> sortCols) throws Exception {
         final String fmtQuery = "alter table %s CLUSTERED BY (%s) SORTED BY (%s) INTO %s BUCKETS";
-        String query = String.format(fmtQuery, tableName, stripListBrackets(bucketCols.toString()), stripListBrackets(sortCols.toString()), numBuckets);
+        String query = String.format(fmtQuery, tableName, stripListBrackets(bucketCols.toString()),
+                stripListBrackets(sortCols.toString()), numBuckets);
         runCommand(query);
-        verifyBucketSortingProperties(tableName, numBuckets, bucketCols, sortCols);
+        assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
+            @Override
+            public void assertOnEntity(Referenceable entity) throws Exception {
+                verifyBucketSortingProperties(entity, numBuckets, bucketCols, sortCols);
+            }
+        });
     }
 
     private String stripListBrackets(String listElements) {
         return StringUtils.strip(StringUtils.strip(listElements, "["), "]");
     }
 
-    private void verifyBucketSortingProperties(String tableName, int numBuckets, ImmutableList<String> bucketColNames, ImmutableList<String>  sortcolNames) throws Exception {
-
-        String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
-
-        Referenceable tableRef = dgiCLient.getEntity(tableId);
-        Referenceable sdRef = (Referenceable)tableRef.get(HiveDataModelGenerator.STORAGE_DESC);
-        Assert.assertEquals(((scala.math.BigInt) sdRef.get(HiveDataModelGenerator.STORAGE_NUM_BUCKETS)).intValue(), numBuckets);
+    private void verifyBucketSortingProperties(Referenceable tableRef, int numBuckets,
+                                               ImmutableList<String> bucketColNames,
+                                               ImmutableList<String>  sortcolNames) throws Exception {
+        Referenceable sdRef = (Referenceable) tableRef.get(HiveDataModelGenerator.STORAGE_DESC);
+        Assert.assertEquals(((scala.math.BigInt) sdRef.get(HiveDataModelGenerator.STORAGE_NUM_BUCKETS)).intValue(),
+                numBuckets);
         Assert.assertEquals(sdRef.get("bucketCols"), bucketColNames);
 
         List<Struct> hiveOrderStructList = (List<Struct>) sdRef.get("sortCols");
@@ -851,7 +920,7 @@ public class HiveHookIT {
 
         for (int i = 0; i < sortcolNames.size(); i++) {
             Assert.assertEquals(hiveOrderStructList.get(i).get("col"), sortcolNames.get(i));
-            Assert.assertEquals(((scala.math.BigInt)hiveOrderStructList.get(i).get("order")).intValue(), 1);
+            Assert.assertEquals(((scala.math.BigInt) hiveOrderStructList.get(i).get("order")).intValue(), 1);
         }
     }
 
@@ -882,8 +951,12 @@ public class HiveHookIT {
 
         final String query = String.format("drop table %s ", tableName);
         runCommand(query);
-        assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), "id"));
-        assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), "name"));
+        assertColumnIsNotRegistered(HiveMetaStoreBridge
+                .getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName),
+                        "id"));
+        assertColumnIsNotRegistered(HiveMetaStoreBridge
+                .getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName),
+                        "name"));
         assertTableIsNotRegistered(DEFAULT_DB, tableName);
     }
 
@@ -903,8 +976,11 @@ public class HiveHookIT {
         runCommand(query);
 
         //Verify columns are not registered for one of the tables
-        assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableNames[0]), "id"));
-        assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableNames[0]), "name"));
+        assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(
+                HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableNames[0]), "id"));
+        assertColumnIsNotRegistered(HiveMetaStoreBridge
+                .getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableNames[0]),
+                        "name"));
 
         for(int i = 0; i < numTables; i++) {
             assertTableIsNotRegistered(dbName, tableNames[i]);
@@ -974,8 +1050,12 @@ public class HiveHookIT {
         query = String.format("drop view %s ", viewName);
 
         runCommand(query);
-        assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName), "id"));
-        assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName), "name"));
+        assertColumnIsNotRegistered(HiveMetaStoreBridge
+                .getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName),
+                        "id"));
+        assertColumnIsNotRegistered(HiveMetaStoreBridge
+                .getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName),
+                        "name"));
         assertTableIsNotRegistered(DEFAULT_DB, viewName);
     }
 
@@ -1006,16 +1086,20 @@ public class HiveHookIT {
     @Test
     public void testAlterDBOwner() throws Exception {
         String dbName = createDatabase();
+        assertDatabaseIsRegistered(dbName);
+
         final String owner = "testOwner";
-        String dbId = assertDatabaseIsRegistered(dbName);
         final String fmtQuery = "alter database %s set OWNER %s %s";
         String query = String.format(fmtQuery, dbName, "USER", owner);
 
         runCommand(query);
 
-        assertDatabaseIsRegistered(dbName);
-        Referenceable entity = dgiCLient.getEntity(dbId);
-        Assert.assertEquals(entity.get(HiveDataModelGenerator.OWNER), owner);
+        assertDatabaseIsRegistered(dbName, new AssertPredicate() {
+            @Override
+            public void assertOnEntity(Referenceable entity) {
+                assertEquals(entity.get(HiveDataModelGenerator.OWNER), owner);
+            }
+        });
     }
 
     @Test
@@ -1073,30 +1157,38 @@ public class HiveHookIT {
         testAlterProperties(Entity.Type.TABLE, viewName, fmtQuery);
     }
 
-    private void verifyEntityProperties(Entity.Type type, String entityName, Map<String, String> expectedProps, boolean checkIfNotExists) throws Exception {
-
-        String entityId = null;
-
+    private void verifyEntityProperties(Entity.Type type, String entityName, final Map<String, String> expectedProps,
+                                        final boolean checkIfNotExists) throws Exception {
         switch(type) {
         case TABLE:
-            entityId = assertTableIsRegistered(DEFAULT_DB, entityName);
+            assertTableIsRegistered(DEFAULT_DB, entityName, new AssertPredicate() {
+                @Override
+                public void assertOnEntity(Referenceable entity) throws Exception {
+                    verifyProperties(entity, expectedProps, checkIfNotExists);
+                }
+            });
             break;
         case DATABASE:
-            entityId = assertDatabaseIsRegistered(entityName);
+            assertDatabaseIsRegistered(entityName, new AssertPredicate() {
+                @Override
+                public void assertOnEntity(Referenceable entity) throws Exception {
+                    verifyProperties(entity, expectedProps, checkIfNotExists);
+                }
+            });
             break;
         }
-
-        Referenceable ref = dgiCLient.getEntity(entityId);
-        verifyProperties(ref, expectedProps, checkIfNotExists);
     }
 
-    private void verifyTableSdProperties(String tableName, String serdeLib, Map<String, String> expectedProps) throws Exception {
-        String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
-        Referenceable tableRef = dgiCLient.getEntity(tableId);
-        Referenceable sdRef = (Referenceable) tableRef.get(HiveDataModelGenerator.STORAGE_DESC);
-        Struct serdeInfo = (Struct) sdRef.get("serdeInfo");
-        Assert.assertEquals(serdeInfo.get("serializationLib"), serdeLib);
-        verifyProperties(serdeInfo, expectedProps, false);
+    private void verifyTableSdProperties(String tableName, final String serdeLib, final Map<String, String> expectedProps) throws Exception {
+        assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
+            @Override
+            public void assertOnEntity(Referenceable tableRef) throws Exception {
+                Referenceable sdRef = (Referenceable) tableRef.get(HiveDataModelGenerator.STORAGE_DESC);
+                Struct serdeInfo = (Struct) sdRef.get("serdeInfo");
+                Assert.assertEquals(serdeInfo.get("serializationLib"), serdeLib);
+                verifyProperties(serdeInfo, expectedProps, false);
+            }
+        });
     }
 
     private void verifyProperties(Struct referenceable, Map<String, String> expectedProps, boolean checkIfNotExists) {
@@ -1119,108 +1211,80 @@ public class HiveHookIT {
     }
 
     private String assertProcessIsRegistered(String queryStr) throws Exception {
-        //        String dslQuery = String.format("%s where queryText = \"%s\"", HiveDataTypes.HIVE_PROCESS.getName(),
-        //                normalize(queryStr));
-        //        assertEntityIsRegistered(dslQuery, true);
-        //todo replace with DSL
-        String typeName = HiveDataTypes.HIVE_PROCESS.getName();
-        String gremlinQuery =
-                String.format("g.V.has('__typeName', '%s').has('%s.queryText', \"%s\").toList()", typeName, typeName,
-                        normalize(queryStr));
-        return assertEntityIsRegistered(gremlinQuery);
+        LOG.debug("Searching for process with query {}", queryStr);
+        return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.NAME, normalize(queryStr), null);
     }
 
     private void assertProcessIsNotRegistered(String queryStr) throws Exception {
-        //        String dslQuery = String.format("%s where queryText = \"%s\"", HiveDataTypes.HIVE_PROCESS.getName(),
-        //                normalize(queryStr));
-        //        assertEntityIsRegistered(dslQuery, true);
-        //todo replace with DSL
-        String typeName = HiveDataTypes.HIVE_PROCESS.getName();
-        String gremlinQuery =
-            String.format("g.V.has('__typeName', '%s').has('%s.queryText', \"%s\").toList()", typeName, typeName,
-                normalize(queryStr));
-        assertEntityIsNotRegistered(QUERY_TYPE.GREMLIN, gremlinQuery);
-    }
-
-    private String normalize(String str) {
-        if (StringUtils.isEmpty(str)) {
-            return null;
-        }
-        return StringEscapeUtils.escapeJava(str.toLowerCase());
+        LOG.debug("Searching for process with query {}", queryStr);
+        assertEntityIsNotRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.NAME, normalize(queryStr));
     }
 
     private void assertTableIsNotRegistered(String dbName, String tableName) throws Exception {
         LOG.debug("Searching for table {}.{}", dbName, tableName);
-        String query = String.format(
-                "%s as t where tableName = '%s', db where name = '%s' and clusterName = '%s'" + " select t",
-                HiveDataTypes.HIVE_TABLE.getName(), tableName.toLowerCase(), dbName.toLowerCase(), CLUSTER_NAME);
-        assertEntityIsNotRegistered(QUERY_TYPE.DSL, query);
+        String tableQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableName);
+        assertEntityIsNotRegistered(HiveDataTypes.HIVE_TABLE.getName(), AtlasClient.NAME, tableQualifiedName);
     }
 
     private void assertDBIsNotRegistered(String dbName) throws Exception {
-        LOG.debug("Searching for database {}.{}", dbName);
-        String query = String.format(
-            "%s as d where name = '%s' and clusterName = '%s'" + " select d",
-            HiveDataTypes.HIVE_DB.getName(), dbName.toLowerCase(), CLUSTER_NAME);
-        assertEntityIsNotRegistered(QUERY_TYPE.DSL, query);
+        LOG.debug("Searching for database {}", dbName);
+        String dbQualifiedName = HiveMetaStoreBridge.getDBQualifiedName(CLUSTER_NAME, dbName);
+        assertEntityIsNotRegistered(HiveDataTypes.HIVE_DB.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, dbQualifiedName);
     }
 
     private String assertTableIsRegistered(String dbName, String tableName) throws Exception {
-        LOG.debug("Searching for table {}.{}", dbName, tableName);
-        String query = String.format(
-                "%s as t where tableName = '%s', db where name = '%s' and clusterName = '%s'" + " select t",
-                HiveDataTypes.HIVE_TABLE.getName(), tableName.toLowerCase(), dbName.toLowerCase(), CLUSTER_NAME);
-        return assertEntityIsRegistered(query, "t");
+        return assertTableIsRegistered(dbName, tableName, null);
     }
 
-    private String getTableEntity(String dbName, String tableName) throws Exception {
+    private String assertTableIsRegistered(String dbName, String tableName, AssertPredicate assertPredicate) throws Exception {
         LOG.debug("Searching for table {}.{}", dbName, tableName);
-        String query = String.format(
-            "%s as t where tableName = '%s', db where name = '%s' and clusterName = '%s'" + " select t",
-            HiveDataTypes.HIVE_TABLE.getName(), tableName.toLowerCase(), dbName.toLowerCase(), CLUSTER_NAME);
-        return assertEntityIsRegistered(query, "t");
+        String tableQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableName);
+        return assertEntityIsRegistered(HiveDataTypes.HIVE_TABLE.getName(), AtlasClient.NAME, tableQualifiedName,
+                assertPredicate);
     }
 
     private String assertDatabaseIsRegistered(String dbName) throws Exception {
+        return assertDatabaseIsRegistered(dbName, null);
+    }
+
+    private String assertDatabaseIsRegistered(String dbName, AssertPredicate assertPredicate) throws Exception {
         LOG.debug("Searching for database {}", dbName);
-        String query = String.format("%s where name = '%s' and clusterName = '%s'", HiveDataTypes.HIVE_DB.getName(),
-                dbName.toLowerCase(), CLUSTER_NAME);
-        return assertEntityIsRegistered(query);
+        String dbQualifiedName = HiveMetaStoreBridge.getDBQualifiedName(CLUSTER_NAME, dbName);
+        return assertEntityIsRegistered(HiveDataTypes.HIVE_DB.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
+                dbQualifiedName, assertPredicate);
     }
 
-    private String assertEntityIsRegistered(final String query, String... arg) throws Exception {
-        waitFor(60000, new Predicate() {
+    private String assertEntityIsRegistered(final String typeName, final String property, final String value,
+                                            final AssertPredicate assertPredicate) throws Exception {
+        waitFor(80000, new Predicate() {
             @Override
-            public boolean evaluate() throws Exception {
-                JSONArray results = dgiCLient.search(query);
-                return results.length() == 1;
+            public void evaluate() throws Exception {
+                Referenceable entity = atlasClient.getEntity(typeName, property, value);
+                assertNotNull(entity);
+                if(assertPredicate != null) {
+                    assertPredicate.assertOnEntity(entity);
+                }
             }
         });
-
-        String column = (arg.length > 0) ? arg[0] : "_col_0";
-
-        JSONArray results = dgiCLient.search(query);
-        JSONObject row = results.getJSONObject(0);
-        if (row.has("__guid")) {
-            return row.getString("__guid");
-        } else if (row.has("$id$")) {
-            return row.getJSONObject("$id$").getString("id");
-        } else {
-            return row.getJSONObject(column).getString("id");
-        }
+        Referenceable entity = atlasClient.getEntity(typeName, property, value);
+        return entity.getId()._getId();
     }
 
-    private void assertEntityIsNotRegistered(QUERY_TYPE queryType, String query) throws Exception {
-        JSONArray results = null;
-        switch(queryType) {
-        case DSL :
-            results = dgiCLient.searchByDSL(query);
-            break;
-        case GREMLIN :
-            results = dgiCLient.searchByGremlin(query);
-            break;
-        }
-        Assert.assertEquals(results.length(), 0);
+    private void assertEntityIsNotRegistered(final String typeName, final String property, final String value) throws Exception {
+        waitFor(80000, new Predicate() {
+            @Override
+            public void evaluate() throws Exception {
+                try {
+                    atlasClient.getEntity(typeName, property, value);
+                } catch (AtlasServiceException e) {
+                    if(e.getStatus() == ClientResponse.Status.NOT_FOUND) {
+                        return;
+                    }
+                }
+                fail(String.format("Entity was not supposed to exist for typeName = %s, attributeName = %s, "
+                        + "attributeValue = %s", typeName, property, value));
+            }
+        });
     }
 
     @Test
@@ -1236,13 +1300,13 @@ public class HiveHookIT {
         String table2Id = assertTableIsRegistered(db2, table2);
 
         String datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, db2, table2);
-        JSONObject response = dgiCLient.getInputGraph(datasetName);
+        JSONObject response = atlasClient.getInputGraph(datasetName);
         JSONObject vertices = response.getJSONObject("values").getJSONObject("vertices");
         Assert.assertTrue(vertices.has(table1Id));
         Assert.assertTrue(vertices.has(table2Id));
 
         datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, table1);
-        response = dgiCLient.getOutputGraph(datasetName);
+        response = atlasClient.getOutputGraph(datasetName);
         vertices = response.getJSONObject("values").getJSONObject("vertices");
         Assert.assertTrue(vertices.has(table1Id));
         Assert.assertTrue(vertices.has(table2Id));
@@ -1255,15 +1319,18 @@ public class HiveHookIT {
         runCommand("show transactions");
     }
 
-    public interface Predicate {
+    public interface AssertPredicate {
+        void assertOnEntity(Referenceable entity) throws Exception;
+    }
 
+    public interface Predicate {
         /**
          * Perform a predicate evaluation.
          *
          * @return the boolean result of the evaluation.
          * @throws Exception thrown if the predicate evaluation could not evaluate.
          */
-        boolean evaluate() throws Exception;
+        void evaluate() throws Exception;
     }
 
     /**
@@ -1276,13 +1343,17 @@ public class HiveHookIT {
         ParamChecker.notNull(predicate, "predicate");
         long mustEnd = System.currentTimeMillis() + timeout;
 
-        boolean eval;
-        while (!(eval = predicate.evaluate()) && System.currentTimeMillis() < mustEnd) {
-            LOG.info("Waiting up to {} msec", mustEnd - System.currentTimeMillis());
-            Thread.sleep(100);
-        }
-        if (!eval) {
-            throw new Exception("Waiting timed out after " + timeout + " msec");
+        while (true) {
+            try {
+                predicate.evaluate();
+                return;
+            } catch(Error | Exception e) {
+                if (System.currentTimeMillis() >= mustEnd) {
+                    fail("Assertions failed. Failing after waiting for timeout " + timeout + " msecs", e);
+                }
+                LOG.debug("Waiting up to " + (mustEnd - System.currentTimeMillis()) + " msec as assertion failed", e);
+                Thread.sleep(300);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/40ee9492/client/src/main/java/org/apache/atlas/AtlasClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/AtlasClient.java b/client/src/main/java/org/apache/atlas/AtlasClient.java
index cc87706..22a1726 100755
--- a/client/src/main/java/org/apache/atlas/AtlasClient.java
+++ b/client/src/main/java/org/apache/atlas/AtlasClient.java
@@ -57,6 +57,7 @@ import static org.apache.atlas.security.SecurityProperties.TLS_ENABLED;
  */
 public class AtlasClient {
     private static final Logger LOG = LoggerFactory.getLogger(AtlasClient.class);
+
     public static final String NAME = "name";
     public static final String GUID = "GUID";
     public static final String TYPE = "type";
@@ -403,6 +404,7 @@ public class AtlasClient {
      * @throws AtlasServiceException
      */
     public List<String> createType(String typeAsJson) throws AtlasServiceException {
+        LOG.debug("Creating type definition: {}", typeAsJson);
         JSONObject response = callAPI(API.CREATE_TYPE, typeAsJson);
         return extractResults(response, AtlasClient.TYPES, new ExtractOperation<String, JSONObject>() {
             @Override
@@ -429,6 +431,7 @@ public class AtlasClient {
      * @throws AtlasServiceException
      */
     public List<String> updateType(String typeAsJson) throws AtlasServiceException {
+        LOG.debug("Updating tyep definition: {}", typeAsJson);
         JSONObject response = callAPI(API.UPDATE_TYPE, typeAsJson);
         return extractResults(response, AtlasClient.TYPES, new ExtractOperation<String, JSONObject>() {
             @Override
@@ -474,6 +477,7 @@ public class AtlasClient {
      * @throws AtlasServiceException
      */
     public JSONArray createEntity(JSONArray entities) throws AtlasServiceException {
+        LOG.debug("Creating entities: {}", entities);
         JSONObject response = callAPI(API.CREATE_ENTITY, entities.toString());
         try {
             return response.getJSONArray(GUID);
@@ -522,6 +526,7 @@ public class AtlasClient {
 
     public JSONArray updateEntities(Collection<Referenceable> entities) throws AtlasServiceException {
         JSONArray entitiesArray = getEntitiesArray(entities);
+        LOG.debug("Updating entities: {}", entitiesArray);
         JSONObject response = callAPI(API.UPDATE_ENTITY, entitiesArray.toString());
         try {
             return response.getJSONArray(GUID);
@@ -538,6 +543,7 @@ public class AtlasClient {
      * @param value     property value
      */
     public void updateEntityAttribute(final String guid, final String attribute, String value) throws AtlasServiceException {
+        LOG.debug("Updating entity id: {}, attribute name: {}, attribute value: {}", guid, attribute, value);
         callAPIWithRetries(API.UPDATE_ENTITY_PARTIAL, value, new ResourceCreator() {
             @Override
             public WebResource createResource() {
@@ -555,7 +561,7 @@ public class AtlasClient {
         for (int i = 0; i < getNumberOfRetries(); i++) {
             WebResource resource = resourceCreator.createResource();
             try {
-                LOG.info("using resource {} for {} times", resource.getURI(), i);
+                LOG.debug("Using resource {} for {} times", resource.getURI(), i);
                 JSONObject result = callAPIWithResource(api, resource, requestObject);
                 return result;
             } catch (ClientHandlerException che) {
@@ -578,6 +584,7 @@ public class AtlasClient {
      */
     public void updateEntity(String guid, Referenceable entity) throws AtlasServiceException {
         String entityJson = InstanceSerialization.toJson(entity, true);
+        LOG.debug("Updating entity id {} with {}", guid, entityJson);
         callAPI(API.UPDATE_ENTITY_PARTIAL, entityJson, guid);
     }
 
@@ -904,6 +911,7 @@ public class AtlasClient {
             clientResponse = resource.accept(JSON_MEDIA_TYPE).type(JSON_MEDIA_TYPE)
                 .method(api.getMethod(), ClientResponse.class, requestObject);
 
+            LOG.debug("API {} returned status {}", resource.getURI(), clientResponse.getStatus());
             if (clientResponse.getStatus() == api.getExpectedStatus().getStatusCode()) {
                 String responseAsString = clientResponse.getEntity(String.class);
                 try {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/40ee9492/common/src/main/java/org/apache/atlas/ApplicationProperties.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/atlas/ApplicationProperties.java b/common/src/main/java/org/apache/atlas/ApplicationProperties.java
index ca72ffd..6a4dca3 100644
--- a/common/src/main/java/org/apache/atlas/ApplicationProperties.java
+++ b/common/src/main/java/org/apache/atlas/ApplicationProperties.java
@@ -91,4 +91,14 @@ public final class ApplicationProperties extends PropertiesConfiguration {
     public static Configuration getSubsetConfiguration(Configuration inConf, String prefix) {
         return inConf.subset(prefix);
     }
+
+    public static Class getClass(String propertyName, String defaultValue) {
+        try {
+            Configuration configuration = get();
+            String propertyValue = configuration.getString(propertyName, defaultValue);
+            return Class.forName(propertyValue);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/40ee9492/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 64446f4..61c6e8b 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -3,6 +3,7 @@ Apache Atlas Release Notes
 
 --trunk - unreleased
 INCOMPATIBLE CHANGES:
+ATLAS-622 Introduce soft delete (shwethags)
 ATLAS-494 UI Authentication (nixonrodrigues via shwethags)
 ATLAS-621 Introduce entity state in Id object (shwethags)
 ATLAS-474 Server does not start if the type is updated with same super type class information (dkantor via shwethags)

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/40ee9492/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java b/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
index 75d14f0..8dae968 100755
--- a/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
+++ b/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
@@ -35,9 +35,11 @@ import org.apache.atlas.repository.MetadataRepository;
 import org.apache.atlas.repository.audit.EntityAuditListener;
 import org.apache.atlas.repository.audit.EntityAuditRepository;
 import org.apache.atlas.repository.audit.HBaseBasedAuditRepository;
+import org.apache.atlas.repository.graph.DeleteHandler;
 import org.apache.atlas.repository.graph.GraphBackedMetadataRepository;
 import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
 import org.apache.atlas.repository.graph.GraphProvider;
+import org.apache.atlas.repository.graph.SoftDeleteHandler;
 import org.apache.atlas.repository.graph.TitanGraphProvider;
 import org.apache.atlas.repository.typestore.GraphBackedTypeStore;
 import org.apache.atlas.repository.typestore.ITypeStore;
@@ -85,6 +87,8 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
 
         bindAuditRepository(binder());
 
+        bind(DeleteHandler.class).to(getDeleteHandler()).asEagerSingleton();
+
         //Add EntityAuditListener as EntityChangeListener
         Multibinder<EntityChangeListener> entityChangeListenerBinder =
                 Multibinder.newSetBinder(binder(), EntityChangeListener.class);
@@ -103,4 +107,11 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
         Multibinder<Service> serviceBinder = Multibinder.newSetBinder(binder, Service.class);
         serviceBinder.addBinding().to(HBaseBasedAuditRepository.class);
     }
+
+    private static final String DELETE_HANDLER_IMPLEMENTATION_PROPERTY = "atlas.DeleteHandler.impl";
+
+    private Class<? extends DeleteHandler> getDeleteHandler() {
+        return ApplicationProperties.getClass(DELETE_HANDLER_IMPLEMENTATION_PROPERTY,
+                SoftDeleteHandler.class.getName());
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/40ee9492/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
index 7f77feb..5b4bdbf 100644
--- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
+++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java
@@ -45,7 +45,7 @@ public class EntityAuditListener implements EntityChangeListener {
     @Override
     public void onEntitiesAdded(Collection<ITypedReferenceableInstance> entities) throws AtlasException {
         List<EntityAuditEvent> events = new ArrayList<>();
-        long currentTime = System.currentTimeMillis();
+        long currentTime = RequestContext.get().getRequestTime();
         for (ITypedReferenceableInstance entity : entities) {
             EntityAuditEvent event = createEvent(entity, currentTime, EntityAuditEvent.EntityAuditAction.ENTITY_CREATE,
                     "Created: " + InstanceSerialization.toJson(entity, true));
@@ -62,7 +62,7 @@ public class EntityAuditListener implements EntityChangeListener {
     @Override
     public void onEntitiesUpdated(Collection<ITypedReferenceableInstance> entities) throws AtlasException {
         List<EntityAuditEvent> events = new ArrayList<>();
-        long currentTime = System.currentTimeMillis();
+        long currentTime = RequestContext.get().getRequestTime();
         for (ITypedReferenceableInstance entity : entities) {
             EntityAuditEvent event = createEvent(entity, currentTime, EntityAuditEvent.EntityAuditAction.ENTITY_UPDATE,
                     "Updated: " + InstanceSerialization.toJson(entity, true));
@@ -73,7 +73,7 @@ public class EntityAuditListener implements EntityChangeListener {
 
     @Override
     public void onTraitAdded(ITypedReferenceableInstance entity, IStruct trait) throws AtlasException {
-        EntityAuditEvent event = createEvent(entity, System.currentTimeMillis(),
+        EntityAuditEvent event = createEvent(entity, RequestContext.get().getRequestTime(),
                 EntityAuditEvent.EntityAuditAction.TAG_ADD,
                 "Added trait: " + InstanceSerialization.toJson(trait, true));
         auditRepository.putEvents(event);
@@ -81,7 +81,7 @@ public class EntityAuditListener implements EntityChangeListener {
 
     @Override
     public void onTraitDeleted(ITypedReferenceableInstance entity, String traitName) throws AtlasException {
-        EntityAuditEvent event = createEvent(entity, System.currentTimeMillis(),
+        EntityAuditEvent event = createEvent(entity, RequestContext.get().getRequestTime(),
                 EntityAuditEvent.EntityAuditAction.TAG_DELETE, "Deleted trait: " + traitName);
         auditRepository.putEvents(event);
     }
@@ -89,7 +89,7 @@ public class EntityAuditListener implements EntityChangeListener {
     @Override
     public void onEntitiesDeleted(Collection<ITypedReferenceableInstance> entities) throws AtlasException {
         List<EntityAuditEvent> events = new ArrayList<>();
-        long currentTime = System.currentTimeMillis();
+        long currentTime = RequestContext.get().getRequestTime();
         for (ITypedReferenceableInstance entity : entities) {
             EntityAuditEvent event = createEvent(entity, currentTime,
                     EntityAuditEvent.EntityAuditAction.ENTITY_DELETE, "Deleted entity");

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/40ee9492/repository/src/main/java/org/apache/atlas/repository/graph/AtlasEdgeLabel.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/AtlasEdgeLabel.java b/repository/src/main/java/org/apache/atlas/repository/graph/AtlasEdgeLabel.java
index da2ad9a..d905c01 100644
--- a/repository/src/main/java/org/apache/atlas/repository/graph/AtlasEdgeLabel.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/AtlasEdgeLabel.java
@@ -31,9 +31,6 @@ public class AtlasEdgeLabel {
     private final String qualifiedAttributeName_;
     
     public AtlasEdgeLabel(String edgeLabel) {
-        if (!edgeLabel.startsWith(GraphHelper.EDGE_LABEL_PREFIX)) {
-            throw new IllegalArgumentException("Invalid edge label " + edgeLabel + ": missing required prefix " + GraphHelper.EDGE_LABEL_PREFIX);
-        }
         String labelWithoutPrefix = edgeLabel.substring(GraphHelper.EDGE_LABEL_PREFIX.length());
         String[] fields = labelWithoutPrefix.split("\\.", 3);
         if (fields.length < 2 || fields.length > 3) {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/40ee9492/repository/src/main/java/org/apache/atlas/repository/graph/DeleteHandler.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/DeleteHandler.java b/repository/src/main/java/org/apache/atlas/repository/graph/DeleteHandler.java
new file mode 100644
index 0000000..369a5d5
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/DeleteHandler.java
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.graph;
+
+import com.tinkerpop.blueprints.Direction;
+import com.tinkerpop.blueprints.Edge;
+import com.tinkerpop.blueprints.Vertex;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.atlas.typesystem.types.AttributeInfo;
+import org.apache.atlas.typesystem.types.DataTypes;
+import org.apache.atlas.typesystem.types.FieldMapping;
+import org.apache.atlas.typesystem.types.HierarchicalType;
+import org.apache.atlas.typesystem.types.IDataType;
+import org.apache.atlas.typesystem.types.StructType;
+import org.apache.atlas.typesystem.types.TypeSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.atlas.repository.graph.GraphHelper.EDGE_LABEL_PREFIX;
+import static org.apache.atlas.repository.graph.GraphHelper.string;
+
+public abstract class DeleteHandler {
+    public static final Logger LOG = LoggerFactory.getLogger(DeleteHandler.class);
+
+    private static final GraphHelper graphHelper = GraphHelper.getInstance();
+
+    protected TypeSystem typeSystem;
+    private boolean shouldUpdateReverseAttribute;
+
+    public DeleteHandler(TypeSystem typeSystem, boolean shouldUpdateReverseAttribute) {
+        this.typeSystem = typeSystem;
+        this.shouldUpdateReverseAttribute = shouldUpdateReverseAttribute;
+
+    }
+
+    /**
+     * Deletes the entity vertex - deletes the traits and all the references
+     * @param instanceVertex
+     * @throws AtlasException
+     */
+    public void deleteEntity(Vertex instanceVertex) throws AtlasException {
+        String guid = GraphHelper.getIdFromVertex(instanceVertex);
+        String typeName = GraphHelper.getTypeName(instanceVertex);
+        RequestContext.get().recordDeletedEntity(guid, typeName);
+
+        deleteAllTraits(instanceVertex);
+
+        deleteTypeVertex(instanceVertex);
+    }
+
+    protected abstract void deleteEdge(Edge edge) throws AtlasException;
+
+    /**
+     * Deletes a type vertex - can be entity(class type) or just vertex(struct/trait type)
+     * @param instanceVertex
+     * @param typeCategory
+     * @throws AtlasException
+     */
+    protected void deleteTypeVertex(Vertex instanceVertex, DataTypes.TypeCategory typeCategory) throws AtlasException {
+        switch (typeCategory) {
+        case STRUCT:
+        case TRAIT:
+            deleteTypeVertex(instanceVertex);
+            break;
+
+        case CLASS:
+            deleteEntity(instanceVertex);
+            break;
+
+        default:
+            throw new IllegalStateException("Type category " + typeCategory + " not handled");
+        }
+    }
+
+    /**
+     * Deleting any type vertex. Goes over the complex attributes and removes the references
+     * @param instanceVertex
+     * @throws AtlasException
+     */
+    protected void deleteTypeVertex(Vertex instanceVertex) throws AtlasException {
+        LOG.debug("Deleting {}", string(instanceVertex));
+        String typeName = GraphHelper.getTypeName(instanceVertex);
+        IDataType type = typeSystem.getDataType(IDataType.class, typeName);
+        FieldMapping fieldMapping = getFieldMapping(type);
+
+        for (AttributeInfo attributeInfo : fieldMapping.fields.values()) {
+            LOG.debug("Deleting attribute {} for {}", attributeInfo.name, string(instanceVertex));
+            String edgeLabel = GraphHelper.getEdgeLabel(type, attributeInfo);
+
+            switch (attributeInfo.dataType().getTypeCategory()) {
+            case CLASS:
+                //If its class attribute, delete the reference
+                deleteReference(instanceVertex, edgeLabel, DataTypes.TypeCategory.CLASS, attributeInfo.isComposite);
+                break;
+
+            case STRUCT:
+                //If its struct attribute, delete the reference
+                deleteReference(instanceVertex, edgeLabel, DataTypes.TypeCategory.STRUCT);
+                break;
+
+            case ARRAY:
+                //For array attribute, if the element is struct/class, delete all the references
+                IDataType elementType = ((DataTypes.ArrayType) attributeInfo.dataType()).getElemType();
+                DataTypes.TypeCategory elementTypeCategory = elementType.getTypeCategory();
+                if (elementTypeCategory == DataTypes.TypeCategory.STRUCT ||
+                        elementTypeCategory == DataTypes.TypeCategory.CLASS) {
+                    Iterator<Edge> edges = GraphHelper.getOutGoingEdgesByLabel(instanceVertex, edgeLabel);
+                    if (edges != null) {
+                        while (edges.hasNext()) {
+                            Edge edge = edges.next();
+                            deleteReference(edge, elementType, attributeInfo);
+                        }
+                    }
+                }
+                break;
+
+            case MAP:
+                //For map attribute, if the value type is struct/class, delete all the references
+                DataTypes.MapType mapType = (DataTypes.MapType) attributeInfo.dataType();
+                DataTypes.TypeCategory valueTypeCategory = mapType.getValueType().getTypeCategory();
+                String propertyName = GraphHelper.getQualifiedFieldName(type, attributeInfo.name);
+
+                if (valueTypeCategory == DataTypes.TypeCategory.STRUCT ||
+                        valueTypeCategory == DataTypes.TypeCategory.CLASS) {
+                    List<String> keys = instanceVertex.getProperty(propertyName);
+                    if (keys != null) {
+                        for (String key : keys) {
+                            String mapEdgeLabel = GraphHelper.getQualifiedNameForMapKey(edgeLabel, key);
+                            deleteReference(instanceVertex, mapEdgeLabel, valueTypeCategory, attributeInfo.isComposite);
+                        }
+                    }
+                }
+            }
+        }
+
+        deleteVertex(instanceVertex, type.getTypeCategory());
+    }
+
+    public void deleteReference(Edge edge, IDataType dataType, AttributeInfo attributeInfo) throws AtlasException {
+        deleteReference(edge, dataType.getTypeCategory(), attributeInfo.isComposite);
+    }
+
+    public void deleteReference(Edge edge, DataTypes.TypeCategory typeCategory, boolean isComposite) throws AtlasException {
+        LOG.debug("Deleting {}", string(edge));
+        if (typeCategory == DataTypes.TypeCategory.STRUCT || typeCategory == DataTypes.TypeCategory.TRAIT
+                || (typeCategory == DataTypes.TypeCategory.CLASS && isComposite)) {
+            //If the vertex is of type struct/trait, delete the edge and then the reference vertex as the vertex is not shared by any other entities.
+            //If the vertex is of type class, and its composite attribute, this reference vertex' lifecycle is controlled
+            //through this delete, hence delete the edge and the reference vertex.
+            Vertex vertexForDelete = edge.getVertex(Direction.IN);
+
+            //If deleting the edge and then the in vertex, reverse attribute shouldn't be updated
+            deleteEdge(edge, false);
+            deleteTypeVertex(vertexForDelete, typeCategory);
+        } else {
+            //If the vertex is of type class, and its not a composite attributes, the reference vertex' lifecycle is not controlled
+            //through this delete. Hence just remove the reference edge. Leave the reference vertex as is
+
+            //If deleting just the edge, reverse attribute should be updated for any references
+            //For example, for the department type system, if the person's manager edge is deleted, subordinates of manager should be updated
+            deleteEdge(edge, true);
+        }
+    }
+
+    public void deleteReference(Vertex instanceVertex, String edgeLabel, DataTypes.TypeCategory typeCategory)
+            throws AtlasException {
+        deleteReference(instanceVertex, edgeLabel, typeCategory, false);
+    }
+
+    public void deleteReference(Vertex instanceVertex, String edgeLabel, DataTypes.TypeCategory typeCategory,
+                                boolean isComposite) throws AtlasException {
+        Edge edge = GraphHelper.getEdgeForLabel(instanceVertex, edgeLabel);
+        if (edge != null) {
+            deleteReference(edge, typeCategory, isComposite);
+        }
+    }
+
+    protected void deleteEdge(Edge edge, boolean updateReverseAttribute) throws AtlasException {
+        //update reverse attribute
+        if (updateReverseAttribute) {
+            AttributeInfo attributeInfo = getAttributeForEdge(edge.getLabel());
+            if (attributeInfo.reverseAttributeName != null) {
+                deleteEdgeBetweenVertices(edge.getVertex(Direction.IN), edge.getVertex(Direction.OUT),
+                        attributeInfo.reverseAttributeName);
+            }
+        }
+
+        deleteEdge(edge);
+    }
+
+    protected void deleteVertex(Vertex instanceVertex, DataTypes.TypeCategory typeCategory) throws AtlasException {
+        //Update external references(incoming edges) to this vertex
+        LOG.debug("Setting the external references to {} to null(removing edges)", string(instanceVertex));
+        Iterator<Edge> edges = instanceVertex.getEdges(Direction.IN).iterator();
+
+        while(edges.hasNext()) {
+            Edge edge = edges.next();
+            String edgeState = edge.getProperty(Constants.STATE_PROPERTY_KEY);
+            if (Id.EntityState.ACTIVE.name().equals(edgeState)) {
+                //Delete only the active edge references
+                AttributeInfo attribute = getAttributeForEdge(edge.getLabel());
+                deleteEdgeBetweenVertices(edge.getVertex(Direction.OUT), edge.getVertex(Direction.IN), attribute.name);
+                deleteEdge(edge);
+            }
+        }
+        _deleteVertex(instanceVertex);
+    }
+
+    protected abstract void _deleteVertex(Vertex instanceVertex);
+
+    /**
+     * Deletes the edge between outvertex and inVertex. The edge is for attribute attributeName of outVertex
+     * @param outVertex
+     * @param inVertex
+     * @param attributeName
+     * @throws AtlasException
+     */
+    protected void deleteEdgeBetweenVertices(Vertex outVertex, Vertex inVertex, String attributeName) throws AtlasException {
+        LOG.debug("Removing edge from {} to {} with attribute name {}", string(outVertex), string(inVertex),
+                attributeName);
+        String typeName = GraphHelper.getTypeName(outVertex);
+        String outId = GraphHelper.getIdFromVertex(outVertex);
+        if (outId != null && RequestContext.get().getDeletedEntityIds().contains(outId)) {
+            //If the reference vertex is marked for deletion, skip updating the reference
+            return;
+        }
+
+        IDataType type = typeSystem.getDataType(IDataType.class, typeName);
+        AttributeInfo attributeInfo = getFieldMapping(type).fields.get(attributeName);
+        String propertyName = GraphHelper.getQualifiedFieldName(type, attributeName);
+        String edgeLabel = EDGE_LABEL_PREFIX + propertyName;
+        Edge edge = null;
+
+        switch (attributeInfo.dataType().getTypeCategory()) {
+        case CLASS:
+            //If its class attribute, its the only edge between two vertices
+            //TODO need to enable this
+            //            if (refAttributeInfo.multiplicity == Multiplicity.REQUIRED) {
+            //                throw new AtlasException("Can't set attribute " + refAttributeName + " to null as its required attribute");
+            //            }
+            edge = GraphHelper.getEdgeForLabel(outVertex, edgeLabel);
+            break;
+
+        case ARRAY:
+            //If its array attribute, find the right edge between the two vertices and update array property
+            List<String> elements = outVertex.getProperty(propertyName);
+            if (elements != null) {
+                elements = new ArrayList<>(elements);   //Make a copy, else list.remove reflects on titan.getProperty()
+                for (String elementEdgeId : elements) {
+                    Edge elementEdge = graphHelper.getEdgeById(elementEdgeId);
+                    if (elementEdge == null) {
+                        continue;
+                    }
+
+                    Vertex elementVertex = elementEdge.getVertex(Direction.IN);
+                    if (elementVertex.getId().toString().equals(inVertex.getId().toString())) {
+                        edge = elementEdge;
+
+                        if (shouldUpdateReverseAttribute || attributeInfo.isComposite) {
+                            //if composite attribute, remove the reference as well. else, just remove the edge
+                            //for example, when table is deleted, process still references the table
+                            //but when column is deleted, table will not reference the deleted column
+                            LOG.debug("Removing edge {} from the array attribute {}", string(elementEdge),
+                                    attributeName);
+                            elements.remove(elementEdge.getId().toString());
+                            GraphHelper.setProperty(outVertex, propertyName, elements);
+                        }
+                        break;
+                    }
+                }
+            }
+            break;
+
+        case MAP:
+            //If its map attribute, find the right edge between two vertices and update map property
+            List<String> keys = outVertex.getProperty(propertyName);
+            if (keys != null) {
+                keys = new ArrayList<>(keys);   //Make a copy, else list.remove reflects on titan.getProperty()
+                for (String key : keys) {
+                    String keyPropertyName = propertyName + "." + key;
+                    String mapEdgeId = outVertex.getProperty(keyPropertyName);
+                    Edge mapEdge = graphHelper.getEdgeById(mapEdgeId);
+                    Vertex mapVertex = mapEdge.getVertex(Direction.IN);
+                    if (mapVertex.getId().toString().equals(inVertex.getId().toString())) {
+                        edge = mapEdge;
+
+                        if (shouldUpdateReverseAttribute || attributeInfo.isComposite) {
+                            //remove this key
+                            LOG.debug("Removing edge {}, key {} from the map attribute {}", string(mapEdge), key,
+                                    attributeName);
+                            keys.remove(key);
+                            GraphHelper.setProperty(outVertex, propertyName, keys);
+                            GraphHelper.setProperty(outVertex, keyPropertyName, null);
+                        }
+                        break;
+                    }
+                }
+            }
+            break;
+
+        case STRUCT:
+        case TRAIT:
+            break;
+
+        default:
+            throw new IllegalStateException("There can't be an edge from " + string(outVertex) + " to "
+                    + string(inVertex) + " with attribute name " + attributeName + " which is not class/array/map attribute");
+        }
+
+        if (edge != null) {
+            deleteEdge(edge);
+            GraphHelper.setProperty(outVertex, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY,
+                    RequestContext.get().getRequestTime());
+        }
+    }
+
+    protected AttributeInfo getAttributeForEdge(String edgLabel) throws AtlasException {
+        AtlasEdgeLabel atlasEdgeLabel = new AtlasEdgeLabel(edgLabel);
+        IDataType referenceType = typeSystem.getDataType(IDataType.class, atlasEdgeLabel.getTypeName());
+        return getFieldMapping(referenceType).fields.get(atlasEdgeLabel.getAttributeName());
+    }
+
+    protected FieldMapping getFieldMapping(IDataType type) {
+        switch (type.getTypeCategory()) {
+        case CLASS:
+        case TRAIT:
+            return ((HierarchicalType)type).fieldMapping();
+
+        case STRUCT:
+            return ((StructType)type).fieldMapping();
+
+        default:
+            throw new IllegalStateException("Type " + type + " doesn't have any fields!");
+        }
+    }
+
+    /**
+     * Delete all traits from the specified vertex.
+     * @param instanceVertex
+     * @throws AtlasException
+     */
+    private void deleteAllTraits(Vertex instanceVertex) throws AtlasException {
+        List<String> traitNames = GraphHelper.getTraitNames(instanceVertex);
+        LOG.debug("Deleting traits {} for {}", traitNames, string(instanceVertex));
+        String typeName = GraphHelper.getTypeName(instanceVertex);
+
+        for (String traitNameToBeDeleted : traitNames) {
+            String relationshipLabel = GraphHelper.getTraitLabel(typeName, traitNameToBeDeleted);
+            deleteReference(instanceVertex, relationshipLabel, DataTypes.TypeCategory.TRAIT);
+        }
+    }
+}



Mime
View raw message