atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From suma...@apache.org
Subject incubator-atlas git commit: ATLAS-917 Add hdfs paths to process qualified name for non-partition based queries(sumasai)
Date Fri, 01 Jul 2016 19:10:21 GMT
Repository: incubator-atlas
Updated Branches:
  refs/heads/master f623bddf8 -> f51c88615


ATLAS-917 Add hdfs paths to process qualified name for non-partition based queries(sumasai)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/f51c8861
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/f51c8861
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/f51c8861

Branch: refs/heads/master
Commit: f51c886158c9c0f7dc115f0c6f0aa0e08772e0b9
Parents: f623bdd
Author: Suma Shivaprasad <sumasai.shivaprasad@gmail.com>
Authored: Fri Jul 1 12:09:32 2016 -0700
Committer: Suma Shivaprasad <sumasai.shivaprasad@gmail.com>
Committed: Fri Jul 1 12:09:32 2016 -0700

----------------------------------------------------------------------
 .../atlas/hive/bridge/HiveMetaStoreBridge.java  |   4 +-
 .../org/apache/atlas/hive/hook/HiveHook.java    | 182 +++++--
 .../org/apache/atlas/hive/hook/HiveHookIT.java  | 483 +++++++++++++------
 .../java/org/apache/atlas/hook/AtlasHook.java   |   2 +-
 release-log.txt                                 |   1 +
 5 files changed, 473 insertions(+), 199 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f51c8861/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
index 0045780..cd0e964 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
@@ -426,8 +426,8 @@ public class HiveMetaStoreBridge {
                 createDate = new Date(hiveTable.getTTable().getCreateTime() * MILLIS_CONVERT_FACTOR);
                 LOG.debug("Setting create time to {} ", createDate);
                 tableReference.set(HiveDataModelGenerator.CREATE_TIME, createDate);
-            } catch(NumberFormatException ne) {
-                LOG.error("Error while updating createTime for the table {} ", hiveTable.getCompleteName(), ne);
+            } catch(Exception ne) {
+                LOG.error("Error while setting createTime for the table {} ", hiveTable.getCompleteName(), ne);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f51c8861/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index a1a00b3..99009ba 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -21,6 +21,7 @@ package org.apache.atlas.hive.hook;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import kafka.security.auth.Write;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasConstants;
 import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
@@ -66,7 +67,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
+import java.util.SortedSet;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -86,8 +89,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
     public static final String QUEUE_SIZE = CONF_PREFIX + "queueSize";
 
     public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
-    private static final String SEP = ":".intern();
-    private static final String IO_SEP = "->".intern();
+    static final String SEP = ":".intern();
+    static final String IO_SEP = "->".intern();
 
     private static final Map<String, HiveOperation> OPERATION_MAP = new HashMap<>();
 
@@ -291,6 +294,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
     private void deleteDatabase(HiveMetaStoreBridge dgiBridge, HiveEventContext event) {
         if (event.getOutputs().size() > 1) {
             LOG.info("Starting deletion of tables and databases with cascade {} ", event.getQueryStr());
+        } else {
+            LOG.info("Starting deletion of database {} ", event.getQueryStr());
         }
 
         for (WriteEntity output : event.getOutputs()) {
@@ -549,10 +554,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
         return str.toLowerCase().trim();
     }
 
-    public static String normalize(String queryStr) {
-        return lower(queryStr);
-    }
-
     private void registerProcess(HiveMetaStoreBridge dgiBridge, HiveEventContext event) throws Exception {
         Set<ReadEntity> inputs = event.getInputs();
         Set<WriteEntity> outputs = event.getOutputs();
@@ -567,8 +568,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
             LOG.info("Query id/plan is missing for {}", event.getQueryStr());
         }
 
-        final SortedMap<Entity, Referenceable> source = new TreeMap<>(entityComparator);
-        final SortedMap<Entity, Referenceable> target = new TreeMap<>(entityComparator);
+        final SortedMap<ReadEntity, Referenceable> source = new TreeMap<>(entityComparator);
+        final SortedMap<WriteEntity, Referenceable> target = new TreeMap<>(entityComparator);
 
         final Set<String> dataSets = new HashSet<>();
         final Set<Referenceable> entities = new LinkedHashSet<>();
@@ -577,16 +578,27 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
 
         // filter out select queries which do not modify data
         if (!isSelectQuery) {
-            for (ReadEntity readEntity : event.getInputs()) {
+
+            SortedSet<ReadEntity> sortedHiveInputs = new TreeSet<>(entityComparator);;
+            if ( event.getInputs() != null) {
+                sortedHiveInputs.addAll(event.getInputs());
+            }
+
+            SortedSet<WriteEntity> sortedHiveOutputs = new TreeSet<>(entityComparator);
+            if ( event.getOutputs() != null) {
+                sortedHiveOutputs.addAll(event.getOutputs());
+            }
+
+            for (ReadEntity readEntity : sortedHiveInputs) {
                 processHiveEntity(dgiBridge, event, readEntity, dataSets, source, entities);
             }
 
-            for (WriteEntity writeEntity : event.getOutputs()) {
+            for (WriteEntity writeEntity : sortedHiveOutputs) {
                 processHiveEntity(dgiBridge, event, writeEntity, dataSets, target, entities);
             }
 
             if (source.size() > 0 || target.size() > 0) {
-                Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event, source, target);
+                Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event, sortedHiveInputs, sortedHiveOutputs, source, target);
                 entities.add(processReferenceable);
                 event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), new ArrayList<>(entities)));
             } else {
@@ -597,8 +609,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
         }
     }
 
-    private void processHiveEntity(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Entity entity, Set<String> dataSetsProcessed,
-        SortedMap<Entity, Referenceable> dataSets, Set<Referenceable> entities) throws Exception {
+    private  <T extends Entity> void processHiveEntity(HiveMetaStoreBridge dgiBridge, HiveEventContext event, T entity, Set<String> dataSetsProcessed,
+        SortedMap<T, Referenceable> dataSets, Set<Referenceable> entities) throws Exception {
         if (entity.getType() == Type.TABLE || entity.getType() == Type.PARTITION) {
             final String tblQFName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), entity.getTable());
             if (!dataSetsProcessed.contains(tblQFName)) {
@@ -609,7 +621,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
             }
         } else if (entity.getType() == Type.DFS_DIR) {
             final String pathUri = lower(new Path(entity.getLocation()).toString());
-            LOG.info("Registering DFS Path {} ", pathUri);
+            LOG.debug("Registering DFS Path {} ", pathUri);
             if (!dataSetsProcessed.contains(pathUri)) {
                 Referenceable hdfsPath = dgiBridge.fillHDFSDataSet(pathUri);
                 dataSets.put(entity, hdfsPath);
@@ -653,7 +665,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
 
     private void handleExternalTables(final HiveMetaStoreBridge dgiBridge, final HiveEventContext event, final LinkedHashMap<Type, Referenceable> tables) throws HiveException, MalformedURLException {
         List<Referenceable> entities = new ArrayList<>();
-        final Entity hiveEntity = getEntityByType(event.getOutputs(), Type.TABLE);
+        final WriteEntity hiveEntity = (WriteEntity) getEntityByType(event.getOutputs(), Type.TABLE);
         Table hiveTable = hiveEntity.getTable();
         //Refresh to get the correct location
         hiveTable = dgiBridge.hiveClient.getTable(hiveTable.getDbName(), hiveTable.getTableName());
@@ -665,18 +677,25 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
             dfsEntity.setTyp(Type.DFS_DIR);
             dfsEntity.setName(location);
 
-            SortedMap<Entity, Referenceable> inputs = new TreeMap<Entity, Referenceable>(entityComparator) {{
+            SortedMap<ReadEntity, Referenceable> hiveInputsMap = new TreeMap<ReadEntity, Referenceable>(entityComparator) {{
                 put(dfsEntity, dgiBridge.fillHDFSDataSet(location));
             }};
 
-            SortedMap<Entity, Referenceable> outputs = new TreeMap<Entity, Referenceable>(entityComparator) {{
+            SortedMap<WriteEntity, Referenceable> hiveOutputsMap = new TreeMap<WriteEntity, Referenceable>(entityComparator) {{
                 put(hiveEntity, tables.get(Type.TABLE));
             }};
 
-            Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event, inputs, outputs);
+            SortedSet<ReadEntity> sortedIps = new TreeSet<>(entityComparator);
+            sortedIps.addAll(hiveInputsMap.keySet());
+            SortedSet<WriteEntity> sortedOps = new TreeSet<>(entityComparator);
+            sortedOps.addAll(hiveOutputsMap.keySet());
+
+            Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event,
+                sortedIps, sortedOps, hiveInputsMap, hiveOutputsMap);
             String tableQualifiedName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), hiveTable);
 
             if (isCreateOp(event)){
+                LOG.info("Overriding process qualified name to {}", tableQualifiedName);
                 processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName);
             }
             entities.addAll(tables.values());
@@ -689,6 +708,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
         if (HiveOperation.CREATETABLE.equals(hiveEvent.getOperation())
             || HiveOperation.CREATEVIEW.equals(hiveEvent.getOperation())
             || HiveOperation.ALTERVIEW_AS.equals(hiveEvent.getOperation())
+            || HiveOperation.ALTERTABLE_LOCATION.equals(hiveEvent.getOperation())
             || HiveOperation.CREATETABLE_AS_SELECT.equals(hiveEvent.getOperation())) {
             return true;
         }
@@ -696,11 +716,11 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
     }
 
     private Referenceable getProcessReferenceable(HiveMetaStoreBridge dgiBridge, HiveEventContext hiveEvent,
-        SortedMap<Entity, Referenceable> source, SortedMap<Entity, Referenceable> target) {
+        final SortedSet<ReadEntity> sortedHiveInputs, final SortedSet<WriteEntity> sortedHiveOutputs, SortedMap<ReadEntity, Referenceable> source, SortedMap<WriteEntity, Referenceable> target) {
         Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName());
 
         String queryStr = lower(hiveEvent.getQueryStr());
-        processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getProcessQualifiedName(hiveEvent.getOperation(), source, target));
+        processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getProcessQualifiedName(hiveEvent, sortedHiveInputs, sortedHiveOutputs, source, target));
 
         LOG.debug("Registering query: {}", queryStr);
         List<Referenceable> sourceList = new ArrayList<>(source.values());
@@ -733,51 +753,113 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
     }
 
     @VisibleForTesting
-    static String getProcessQualifiedName(HiveOperation op, SortedMap<Entity, Referenceable> inputs, SortedMap<Entity, Referenceable> outputs) {
+    static String getProcessQualifiedName(HiveEventContext eventContext, final SortedSet<ReadEntity> sortedHiveInputs, final SortedSet<WriteEntity> sortedHiveOutputs, SortedMap<ReadEntity, Referenceable> hiveInputsMap, SortedMap<WriteEntity, Referenceable> hiveOutputsMap) {
+        HiveOperation op = eventContext.getOperation();
         StringBuilder buffer = new StringBuilder(op.getOperationName());
-        addDatasets(op, buffer, inputs);
+
+        boolean ignoreHDFSPathsinQFName = ignoreHDFSPathsinQFName(op, sortedHiveInputs, sortedHiveOutputs);
+        if ( ignoreHDFSPathsinQFName && LOG.isDebugEnabled()) {
+            LOG.debug("Ignoring HDFS paths in qualifiedName for {} {} ", op, eventContext.getQueryStr());
+        }
+
+        addInputs(op, sortedHiveInputs, buffer, hiveInputsMap, ignoreHDFSPathsinQFName);
         buffer.append(IO_SEP);
-        addDatasets(op, buffer, outputs);
+        addOutputs(op, sortedHiveOutputs, buffer, hiveOutputsMap, ignoreHDFSPathsinQFName);
         LOG.info("Setting process qualified name to {}", buffer);
         return buffer.toString();
     }
 
-    private static void addDatasets(HiveOperation op, StringBuilder buffer, final Map<Entity, Referenceable> refs) {
-        if (refs != null) {
-            for (Entity input : refs.keySet()) {
-                final Entity entity = input;
+    private static boolean ignoreHDFSPathsinQFName(final HiveOperation op, final Set<ReadEntity> inputs, final Set<WriteEntity> outputs) {
+        switch (op) {
+        case LOAD:
+        case IMPORT:
+            return isPartitionBasedQuery(outputs);
+        case EXPORT:
+            return isPartitionBasedQuery(inputs);
+        case QUERY:
+            return true;
+        }
+        return false;
+    }
 
-                //HiveOperation.QUERY type encompasses INSERT, INSERT_OVERWRITE, UPDATE, DELETE, PATH_WRITE operations
-                if (addQueryType(op, entity)) {
-                    buffer.append(SEP);
-                    buffer.append(((WriteEntity) entity).getWriteType().name());
+    private static boolean isPartitionBasedQuery(Set<? extends Entity> entities) {
+        for (Entity entity : entities) {
+            if (Type.PARTITION.equals(entity.getType())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private static void addInputs(HiveOperation op, SortedSet<ReadEntity> sortedInputs, StringBuilder buffer, final Map<ReadEntity, Referenceable> refs, final boolean ignoreHDFSPathsInQFName) {
+        if (refs != null) {
+            if (sortedInputs != null) {
+                Set<String> dataSetsProcessed = new LinkedHashSet<>();
+                for (Entity input : sortedInputs) {
+
+                    if (!dataSetsProcessed.contains(input.getName().toLowerCase())) {
+                        //HiveOperation.QUERY type encompasses INSERT, INSERT_OVERWRITE, UPDATE, DELETE, PATH_WRITE operations
+                        if (ignoreHDFSPathsInQFName &&
+                            (Type.DFS_DIR.equals(input.getType()) || Type.LOCAL_DIR.equals(input.getType()))) {
+                            LOG.debug("Skipping dfs dir input addition to process qualified name {} ", input.getName());
+                        } else if (refs.containsKey(input)) {
+                            addDataset(buffer, refs.get(input));
+                        }
+                        dataSetsProcessed.add(input.getName().toLowerCase());
+                    }
                 }
-                if (Type.DFS_DIR.equals(entity.getType()) ||
-                    Type.LOCAL_DIR.equals(entity.getType())) {
-                    LOG.debug("Skipping dfs dir addition into process qualified name {} ", refs.get(input).get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME));
-                } else {
-                    buffer.append(SEP);
-                    String dataSetQlfdName = (String) refs.get(input).get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME);
-                    // '/' breaks query parsing on ATLAS
-                    buffer.append(dataSetQlfdName.toLowerCase().replaceAll("/", ""));
+
+            }
+        }
+    }
+
+    private static void addDataset(StringBuilder buffer, Referenceable ref) {
+        buffer.append(SEP);
+        String dataSetQlfdName = (String) ref.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME);
+        // '/' breaks query parsing on ATLAS
+        buffer.append(dataSetQlfdName.toLowerCase().replaceAll("/", ""));
+    }
+
+    private static void addOutputs(HiveOperation op, SortedSet<WriteEntity> sortedOutputs, StringBuilder buffer, final Map<WriteEntity, Referenceable> refs, final boolean ignoreHDFSPathsInQFName) {
+        if (refs != null) {
+            Set<String> dataSetsProcessed = new LinkedHashSet<>();
+            if (sortedOutputs != null) {
+                for (Entity output : sortedOutputs) {
+                    final Entity entity = output;
+                    if (!dataSetsProcessed.contains(output.getName().toLowerCase())) {
+                        //HiveOperation.QUERY type encompasses INSERT, INSERT_OVERWRITE, UPDATE, DELETE, PATH_WRITE operations
+                        if (addQueryType(op, (WriteEntity) entity)) {
+                            buffer.append(SEP);
+                            buffer.append(((WriteEntity) entity).getWriteType().name());
+                        }
+                        if (ignoreHDFSPathsInQFName &&
+                            (Type.DFS_DIR.equals(output.getType()) || Type.LOCAL_DIR.equals(output.getType()))) {
+                            LOG.debug("Skipping dfs dir output addition to process qualified name {} ", output.getName());
+                        } else if (refs.containsKey(output)) {
+                            addDataset(buffer, refs.get(output));
+                        }
+                        dataSetsProcessed.add(output.getName().toLowerCase());
+                    }
                 }
             }
         }
     }
 
-    private static boolean addQueryType(HiveOperation op, Entity entity) {
-        if (WriteEntity.class.isAssignableFrom(entity.getClass())) {
-            if (((WriteEntity) entity).getWriteType() != null &&
-                op.equals(HiveOperation.QUERY)) {
-                switch (((WriteEntity) entity).getWriteType()) {
-                case INSERT:
-                case INSERT_OVERWRITE:
-                case UPDATE:
-                case DELETE:
-                case PATH_WRITE:
+    private static boolean addQueryType(HiveOperation op, WriteEntity entity) {
+        if (((WriteEntity) entity).getWriteType() != null && HiveOperation.QUERY.equals(op)) {
+            switch (((WriteEntity) entity).getWriteType()) {
+            case INSERT:
+            case INSERT_OVERWRITE:
+            case UPDATE:
+            case DELETE:
+                return true;
+            case PATH_WRITE:
+                //Add query type only for DFS paths and ignore local paths since they are not added as outputs
+                if ( !Type.LOCAL_DIR.equals(entity.getType())) {
                     return true;
-                default:
                 }
+                break;
+            default:
             }
         }
         return false;

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f51c8861/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
index f9e1926..8ca47d9 100755
--- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
+++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
@@ -62,15 +62,22 @@ import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.SortedMap;
+import java.util.SortedSet;
 import java.util.TreeMap;
+import java.util.TreeSet;
 
 import static org.apache.atlas.AtlasClient.NAME;
 import static org.apache.atlas.hive.hook.HiveHook.entityComparator;
 import static org.apache.atlas.hive.hook.HiveHook.getProcessQualifiedName;
 import static org.apache.atlas.hive.hook.HiveHook.lower;
+import static org.apache.atlas.hive.hook.HiveHook.IO_SEP;
+import static org.apache.atlas.hive.hook.HiveHook.SEP;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
@@ -82,6 +89,8 @@ public class HiveHookIT {
     private static final String DGI_URL = "http://localhost:21000/";
     private static final String CLUSTER_NAME = "test";
     public static final String DEFAULT_DB = "default";
+    
+    private static final String PART_FILE = "2015-01-01";
     private Driver driver;
     private AtlasClient atlasClient;
     private HiveMetaStoreBridge hiveMetaStoreBridge;
@@ -262,7 +271,7 @@ public class HiveHookIT {
         validateHDFSPaths(processReference, INPUTS, pFile);
     }
 
-    private List<Entity> getInputs(String inputName, Entity.Type entityType) {
+    private Set<ReadEntity> getInputs(String inputName, Entity.Type entityType) {
         final ReadEntity entity = new ReadEntity();
 
         if ( Entity.Type.DFS_DIR.equals(entityType)) {
@@ -270,14 +279,13 @@ public class HiveHookIT {
             entity.setTyp(Entity.Type.DFS_DIR);
         } else {
             entity.setName(getQualifiedTblName(inputName));
-            entity.setTyp(Entity.Type.TABLE);
+            entity.setTyp(entityType);
         }
 
-        return new ArrayList<Entity>() {{ add(entity); }};
+        return new LinkedHashSet<ReadEntity>() {{ add(entity); }};
     }
 
-
-    private List<Entity> getOutputs(String inputName, Entity.Type entityType) {
+    private Set<WriteEntity> getOutputs(String inputName, Entity.Type entityType) {
         final WriteEntity entity = new WriteEntity();
 
         if ( Entity.Type.DFS_DIR.equals(entityType) || Entity.Type.LOCAL_DIR.equals(entityType)) {
@@ -285,27 +293,32 @@ public class HiveHookIT {
             entity.setTyp(entityType);
         } else {
             entity.setName(getQualifiedTblName(inputName));
-            entity.setTyp(Entity.Type.TABLE);
+            entity.setTyp(entityType);
         }
 
-        return new ArrayList<Entity>() {{ add(entity); }};
+        return new LinkedHashSet<WriteEntity>() {{ add(entity); }};
     }
 
-
-    private void validateOutputTables(Referenceable processReference, List<Entity> expectedTables) throws Exception {
+    private void validateOutputTables(Referenceable processReference, Set<WriteEntity> expectedTables) throws Exception {
        validateTables(processReference, OUTPUTS, expectedTables);
     }
 
-    private void validateInputTables(Referenceable processReference, List<Entity> expectedTables) throws Exception {
+    private void validateInputTables(Referenceable processReference, Set<ReadEntity> expectedTables) throws Exception {
         validateTables(processReference, INPUTS, expectedTables);
     }
 
-    private void validateTables(Referenceable processReference, String attrName, List<Entity> expectedTables) throws Exception {
+    private void validateTables(Referenceable processReference, String attrName, Set<? extends Entity> expectedTables) throws Exception {
         List<Id> tableRef = (List<Id>) processReference.get(attrName);
+
+        Iterator<? extends Entity> iterator = expectedTables.iterator();
         for(int i = 0; i < expectedTables.size(); i++) {
-            Referenceable entity = atlasClient.getEntity(tableRef.get(i)._getId());
-            LOG.debug("Validating output {} {} ", i, entity);
-            Assert.assertEquals(entity.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), expectedTables.get(i).getName());
+            Entity hiveEntity = iterator.next();
+            if (Entity.Type.TABLE.equals(hiveEntity.getType()) ||
+                Entity.Type.DFS_DIR.equals(hiveEntity.getType())) {
+                Referenceable entity = atlasClient.getEntity(tableRef.get(i)._getId());
+                LOG.debug("Validating output {} {} ", i, entity);
+                Assert.assertEquals(entity.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), hiveEntity.getName());
+            }
         }
     }
 
@@ -338,18 +351,22 @@ public class HiveHookIT {
         String query = "create table " + ctasTableName + " as select * from " + tableName;
         runCommand(query);
 
-        final ReadEntity entity = new ReadEntity();
-        entity.setName(getQualifiedTblName(tableName));
-        entity.setTyp(Entity.Type.TABLE);
+        final Set<ReadEntity> readEntities = getInputs(tableName, Entity.Type.TABLE);
+        final Set<WriteEntity> writeEntities = getOutputs(ctasTableName, Entity.Type.TABLE);
 
-        final WriteEntity writeEntity = new WriteEntity();
-        writeEntity.setTyp(Entity.Type.TABLE);
-        writeEntity.setName(getQualifiedTblName(ctasTableName));
-
-        assertProcessIsRegistered(query, HiveOperation.CREATETABLE_AS_SELECT, new ArrayList<Entity>() {{ add(entity); }}, new ArrayList<Entity>() {{ add(writeEntity); }});
+        assertProcessIsRegistered(constructEvent(query, HiveOperation.CREATETABLE_AS_SELECT, readEntities, writeEntities));
         assertTableIsRegistered(DEFAULT_DB, ctasTableName);
     }
 
+    private HiveHook.HiveEventContext constructEvent(String query, HiveOperation op, Set<ReadEntity> inputs, Set<WriteEntity> outputs) {
+        HiveHook.HiveEventContext event = new HiveHook.HiveEventContext();
+        event.setQueryStr(query);
+        event.setOperation(op);
+        event.setInputs(inputs);
+        event.setOutputs(outputs);
+        return event;
+    }
+
     @Test
     public void testDropAndRecreateCTASOutput() throws Exception {
         String tableName = createTable();
@@ -359,10 +376,11 @@ public class HiveHookIT {
 
         assertTableIsRegistered(DEFAULT_DB, ctasTableName);
 
-        List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
-        List<Entity> outputs =  getOutputs(ctasTableName, Entity.Type.TABLE);
+        Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
+        Set<WriteEntity> outputs =  getOutputs(ctasTableName, Entity.Type.TABLE);
 
-        String processId = assertProcessIsRegistered(query, HiveOperation.CREATETABLE_AS_SELECT, inputs, outputs);
+        final HiveHook.HiveEventContext hiveEventContext = constructEvent(query, HiveOperation.CREATETABLE_AS_SELECT, inputs, outputs);
+        String processId = assertProcessIsRegistered(hiveEventContext);
 
         final String drpquery = String.format("drop table %s ", ctasTableName);
         runCommand(drpquery);
@@ -371,14 +389,13 @@ public class HiveHookIT {
         //Fix after ATLAS-876
         runCommand(query);
         assertTableIsRegistered(DEFAULT_DB, ctasTableName);
-        String process2Id = assertProcessIsRegistered(query, HiveOperation.CREATETABLE_AS_SELECT, inputs, outputs);
+        String process2Id = assertProcessIsRegistered(hiveEventContext, inputs, outputs);
 
         Assert.assertEquals(process2Id, processId);
 
         Referenceable processRef = atlasClient.getEntity(processId);
 
-        validateInputTables(processRef, inputs);
-        outputs.add(outputs.get(0));
+        outputs.add(outputs.iterator().next());
         validateOutputTables(processRef, outputs);
     }
 
@@ -389,7 +406,7 @@ public class HiveHookIT {
         String query = "create view " + viewName + " as select * from " + tableName;
         runCommand(query);
 
-        assertProcessIsRegistered(query, HiveOperation.CREATEVIEW, getInputs(tableName, Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE));
+        assertProcessIsRegistered(constructEvent(query, HiveOperation.CREATEVIEW, getInputs(tableName, Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE)));
         assertTableIsRegistered(DEFAULT_DB, viewName);
     }
 
@@ -403,7 +420,7 @@ public class HiveHookIT {
         runCommand(query);
 
         String table1Id = assertTableIsRegistered(DEFAULT_DB, table1Name);
-        assertProcessIsRegistered(query, HiveOperation.CREATEVIEW, getInputs(table1Name, Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE));
+        assertProcessIsRegistered(constructEvent(query, HiveOperation.CREATEVIEW, getInputs(table1Name, Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE)));
         String viewId = assertTableIsRegistered(DEFAULT_DB, viewName);
 
         //Check lineage which includes table1
@@ -419,7 +436,7 @@ public class HiveHookIT {
         runCommand(query);
 
         //Check if alter view process is reqistered
-        assertProcessIsRegistered(query, HiveOperation.CREATEVIEW, getInputs(table2Name, Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE));
+        assertProcessIsRegistered(constructEvent(query, HiveOperation.CREATEVIEW, getInputs(table2Name, Entity.Type.TABLE), getOutputs(viewName, Entity.Type.TABLE)));
         String table2Id = assertTableIsRegistered(DEFAULT_DB, table2Name);
         Assert.assertEquals(assertTableIsRegistered(DEFAULT_DB, viewName), viewId);
 
@@ -456,9 +473,7 @@ public class HiveHookIT {
         String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName;
         runCommand(query);
 
-        List<Entity> outputs = getOutputs(tableName, Entity.Type.TABLE);
-
-        assertProcessIsRegistered(query, HiveOperation.LOAD, null, outputs);
+        assertProcessIsRegistered(constructEvent(query, HiveOperation.LOAD, null, getOutputs(tableName, Entity.Type.TABLE)));
     }
 
     @Test
@@ -466,41 +481,56 @@ public class HiveHookIT {
         String tableName = createTable(true);
 
         String loadFile = file("load");
-        String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName +  " partition(dt = '2015-01-01')";
+        String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName +  " partition(dt = '"+ PART_FILE + "')";
         runCommand(query);
 
-        validateProcess(query, HiveOperation.LOAD, null, getOutputs(tableName, Entity.Type.TABLE));
+        assertProcessIsRegistered(constructEvent(query, HiveOperation.LOAD, null, getOutputs(tableName, Entity.Type.TABLE)));
     }
 
     @Test
-    public void testLoadDFSPath() throws Exception {
+    public void testLoadDFSPathPartitioned() throws Exception {
         String tableName = createTable(true, true, false);
 
-        String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
+        assertTableIsRegistered(DEFAULT_DB, tableName);
 
-        String loadFile = createTestDFSFile("loadDFSFile");
-        String query = "load data inpath '" + loadFile + "' into table " + tableName + " partition(dt = '2015-01-01')";
+        final String loadFile = createTestDFSFile("loadDFSFile");
+        String query = "load data inpath '" + loadFile + "' into table " + tableName + " partition(dt = '"+ PART_FILE + "')";
         runCommand(query);
 
-        final List<Entity> outputs = getOutputs(tableName, Entity.Type.TABLE);
-        Referenceable processReference = validateProcess(query, HiveOperation.LOAD, getInputs(loadFile, Entity.Type.DFS_DIR), outputs);
+        final Set<WriteEntity> outputs = getOutputs(tableName, Entity.Type.TABLE);
+        final Set<ReadEntity> inputs = getInputs(loadFile, Entity.Type.DFS_DIR);
 
-        validateHDFSPaths(processReference, INPUTS, loadFile);
+        final Set<WriteEntity> partitionOps = new LinkedHashSet<>(outputs);
+        partitionOps.addAll(getOutputs(DEFAULT_DB + "@" + tableName + "@dt=" + PART_FILE, Entity.Type.PARTITION));
 
+        Referenceable processReference = validateProcess(constructEvent(query, HiveOperation.LOAD, inputs, partitionOps), inputs, outputs);
+        validateHDFSPaths(processReference, INPUTS, loadFile);
         validateOutputTables(processReference, outputs);
+
+        final String loadFile2 = createTestDFSFile("loadDFSFile1");
+        query = "load data inpath '" + loadFile2 + "' into table " + tableName + " partition(dt = '"+ PART_FILE + "')";
+        runCommand(query);
+
+        Set<ReadEntity> process2Inputs = getInputs(loadFile2, Entity.Type.DFS_DIR);
+        Set<ReadEntity> expectedInputs = new LinkedHashSet<>();
+        expectedInputs.addAll(process2Inputs);
+        expectedInputs.addAll(inputs);
+
+        validateProcess(constructEvent(query, HiveOperation.LOAD, expectedInputs, partitionOps), expectedInputs, outputs);
+
     }
 
     private String getQualifiedTblName(String inputTable) {
         String inputtblQlfdName = inputTable;
 
-        if (inputTable != null && !inputTable.contains(".")) {
+        if (inputTable != null && !inputTable.contains("@")) {
             inputtblQlfdName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, inputTable);
         }
         return inputtblQlfdName;
     }
 
-    private Referenceable validateProcess(String query, HiveOperation op, List<Entity> inputTables, List<Entity> outputTables) throws Exception {
-        String processId = assertProcessIsRegistered(query, op, inputTables, outputTables);
+    private Referenceable validateProcess(HiveHook.HiveEventContext event, Set<ReadEntity> inputTables, Set<WriteEntity> outputTables) throws Exception {
+        String processId = assertProcessIsRegistered(event, inputTables, outputTables);
         Referenceable process = atlasClient.getEntity(processId);
         if (inputTables == null) {
             Assert.assertNull(process.get(INPUTS));
@@ -519,25 +549,47 @@ public class HiveHookIT {
         return process;
     }
 
+    private Referenceable validateProcess(HiveHook.HiveEventContext event) throws Exception {
+       return validateProcess(event, event.getInputs(), event.getOutputs());
+    }
+
     @Test
     public void testInsertIntoTable() throws Exception {
-        String tableName = createTable();
+        String inputTable1Name = createTable();
+        String inputTable2Name = createTable();
         String insertTableName = createTable();
-        assertTableIsRegistered(DEFAULT_DB, tableName);
+        assertTableIsRegistered(DEFAULT_DB, inputTable1Name);
         assertTableIsRegistered(DEFAULT_DB, insertTableName);
 
-        String query = "insert into " + insertTableName + " select id, name from " + tableName;
+        String query = "insert into " + insertTableName + " select t1.id, t1.name from " + inputTable2Name + " as t2, " + inputTable1Name + " as t1 where t1.id=t2.id";
 
         runCommand(query);
-        List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
-        List<Entity> outputs = getOutputs(insertTableName, Entity.Type.TABLE);
-        ((WriteEntity)outputs.get(0)).setWriteType(WriteEntity.WriteType.INSERT);
+        final Set<ReadEntity> inputs = getInputs(inputTable1Name, Entity.Type.TABLE);
+        inputs.addAll(getInputs(inputTable2Name, Entity.Type.TABLE));
+
+        Set<WriteEntity> outputs = getOutputs(insertTableName, Entity.Type.TABLE);
+        (outputs.iterator().next()).setWriteType(WriteEntity.WriteType.INSERT);
+
+        HiveHook.HiveEventContext event = constructEvent(query, HiveOperation.QUERY, inputs, outputs);
+
+        Set<ReadEntity> expectedInputs = new TreeSet<ReadEntity>(entityComparator) {{
+            addAll(inputs);
+        }};
+        assertTableIsRegistered(DEFAULT_DB, insertTableName);
+        Referenceable processRef1 = validateProcess(event, expectedInputs, outputs);
 
-        Referenceable processRef1 = validateProcess(query, HiveOperation.QUERY, inputs, outputs);
+        //Test sorting of tbl names
+        SortedSet<String> sortedTblNames = new TreeSet<>();
+        sortedTblNames.add(getQualifiedTblName(inputTable1Name));
+        sortedTblNames.add(getQualifiedTblName(inputTable2Name));
+
+        //Verify sorted orer of inputs in qualified name
+        Assert.assertEquals(Joiner.on(SEP).join("QUERY", sortedTblNames.first(), sortedTblNames.last()) + IO_SEP + SEP + Joiner.on(SEP).join(WriteEntity.WriteType.INSERT.name(), getQualifiedTblName(insertTableName))
+            , processRef1.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME));
 
         //Rerun same query. Should result in same process
         runCommandWithDelay(query, 1000);
-        Referenceable processRef2 = validateProcess(query, HiveOperation.QUERY, inputs, outputs);
+        Referenceable processRef2 = validateProcess(event, expectedInputs, outputs);
         Assert.assertEquals(processRef1.getId()._getId(), processRef2.getId()._getId());
 
     }
@@ -550,7 +602,7 @@ public class HiveHookIT {
             "insert overwrite LOCAL DIRECTORY '" + randomLocalPath.getAbsolutePath() + "' select id, name from " + tableName;
 
         runCommand(query);
-        validateProcess(query, HiveOperation.QUERY, getInputs(tableName, Entity.Type.TABLE), null);
+        validateProcess(constructEvent(query, HiveOperation.QUERY, getInputs(tableName, Entity.Type.TABLE), null));
 
         assertTableIsRegistered(DEFAULT_DB, tableName);
     }
@@ -564,72 +616,78 @@ public class HiveHookIT {
 
         runCommand(query);
 
-        List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
-        final List<Entity> outputs = getOutputs(pFile1, Entity.Type.DFS_DIR);
-        ((WriteEntity)outputs.get(0)).setWriteType(WriteEntity.WriteType.PATH_WRITE);
+        Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
+        final Set<WriteEntity> outputs = getOutputs(pFile1, Entity.Type.DFS_DIR);
+        ((WriteEntity)outputs.iterator().next()).setWriteType(WriteEntity.WriteType.PATH_WRITE);
 
-        Referenceable processReference = validateProcess(query, HiveOperation.QUERY, inputs, outputs);
+        final HiveHook.HiveEventContext hiveEventContext = constructEvent(query, HiveOperation.QUERY, inputs, outputs);
+        Referenceable processReference = validateProcess(hiveEventContext);
         validateHDFSPaths(processReference, OUTPUTS, pFile1);
 
         String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
         validateInputTables(processReference, inputs);
 
         //Rerun same query with same HDFS path
-
-        runCommand(query);
-        Referenceable process2Reference = validateProcess(query,  HiveOperation.QUERY, inputs, outputs);
+        runCommandWithDelay(query, 1000);
+        assertTableIsRegistered(DEFAULT_DB, tableName);
+        Referenceable process2Reference = validateProcess(hiveEventContext);
         validateHDFSPaths(process2Reference, OUTPUTS, pFile1);
 
         Assert.assertEquals(process2Reference.getId()._getId(), processReference.getId()._getId());
 
-        //Rerun same query with a new HDFS path. Will result in same process since HDFS paths are not part of qualifiedName.
+        //Rerun same query with a new HDFS path. Will result in same process since HDFS paths is not part of qualified name for QUERY operations
         final String pFile2 = createTestDFSPath("somedfspath2");
         query = "insert overwrite DIRECTORY '" + pFile2  + "' select id, name from " + tableName;
-        runCommand(query);
-        List<Entity> p3Outputs = new ArrayList<Entity>() {{
+        runCommandWithDelay(query, 1000);
+        assertTableIsRegistered(DEFAULT_DB, tableName);
+        Set<WriteEntity> p3Outputs = new LinkedHashSet<WriteEntity>() {{
             addAll(getOutputs(pFile2, Entity.Type.DFS_DIR));
             addAll(outputs);
         }};
 
-        Referenceable process3Reference = validateProcess(query,  HiveOperation.QUERY, inputs, p3Outputs);
+        Referenceable process3Reference = validateProcess(constructEvent(query,  HiveOperation.QUERY, inputs, p3Outputs));
         validateHDFSPaths(process3Reference, OUTPUTS, pFile2);
 
         Assert.assertEquals(process3Reference.getId()._getId(), processReference.getId()._getId());
     }
 
     @Test
-    public void testInsertIntoDFSDir() throws Exception {
-        String tableName = createTable();
+    public void testInsertIntoDFSDirPartitioned() throws Exception {
+
+        //Test with partitioned table
+        String tableName = createTable(true);
         String pFile1 = createTestDFSPath("somedfspath1");
         String query =
-            "insert overwrite DIRECTORY '" + pFile1  + "' select id, name from " + tableName;
+            "insert overwrite DIRECTORY '" + pFile1  + "' select id, name from " + tableName + " where dt = '" + PART_FILE + "'";
 
         runCommand(query);
 
-        List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
-        final List<Entity> outputs = getOutputs(pFile1, Entity.Type.DFS_DIR);
-        ((WriteEntity)outputs.get(0)).setWriteType(WriteEntity.WriteType.PATH_WRITE);
+        Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
+        final Set<WriteEntity> outputs = getOutputs(pFile1, Entity.Type.DFS_DIR);
+        ((WriteEntity)outputs.iterator().next()).setWriteType(WriteEntity.WriteType.PATH_WRITE);
 
-        Referenceable processReference = validateProcess(query,  HiveOperation.QUERY, inputs, outputs);
-        validateHDFSPaths(processReference, OUTPUTS, pFile1);
+        final Set<ReadEntity> partitionIps = new LinkedHashSet<>(inputs);
+        partitionIps.addAll(getInputs(DEFAULT_DB + "@" + tableName + "@dt='" + PART_FILE + "'", Entity.Type.PARTITION));
 
-        String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
+        Referenceable processReference = validateProcess(constructEvent(query,  HiveOperation.QUERY, partitionIps, outputs), inputs, outputs);
 
-        validateInputTables(processReference, inputs);
-
-        //Rerun same query with different HDFS path
+        //Rerun same query with different HDFS path. Should not create another process and should update it.
 
         final String pFile2 = createTestDFSPath("somedfspath2");
         query =
-            "insert overwrite DIRECTORY '" + pFile2  + "' select id, name from " + tableName;
+            "insert overwrite DIRECTORY '" + pFile2  + "' select id, name from " + tableName + " where dt = '" + PART_FILE + "'";
 
         runCommand(query);
-        List<Entity> p2Outputs = new ArrayList<Entity>() {{
-            addAll(getOutputs(pFile2, Entity.Type.DFS_DIR));
+
+        final Set<WriteEntity> pFile2Outputs = getOutputs(pFile2, Entity.Type.DFS_DIR);
+        ((WriteEntity)pFile2Outputs.iterator().next()).setWriteType(WriteEntity.WriteType.PATH_WRITE);
+        //Now the process has 2 paths - one older with deleted reference to partition and another with the the latest partition
+        Set<WriteEntity> p2Outputs = new LinkedHashSet<WriteEntity>() {{
+            addAll(pFile2Outputs);
             addAll(outputs);
         }};
 
-        Referenceable process2Reference = validateProcess(query, HiveOperation.QUERY, inputs, p2Outputs);
+        Referenceable process2Reference = validateProcess(constructEvent(query, HiveOperation.QUERY, partitionIps, pFile2Outputs), inputs, p2Outputs);
         validateHDFSPaths(process2Reference, OUTPUTS, pFile2);
 
         Assert.assertEquals(process2Reference.getId()._getId(), processReference.getId()._getId());
@@ -647,12 +705,12 @@ public class HiveHookIT {
 
         runCommand(query);
 
-        List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
-        List<Entity> outputs = getOutputs(insertTableName, Entity.Type.TABLE);
-        outputs.get(0).setName(getQualifiedTblName(insertTableName + HiveMetaStoreBridge.TEMP_TABLE_PREFIX + SessionState.get().getSessionId()));
-        ((WriteEntity)outputs.get(0)).setWriteType(WriteEntity.WriteType.INSERT);
+        Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
+        Set<WriteEntity> outputs = getOutputs(insertTableName, Entity.Type.TABLE);
+        outputs.iterator().next().setName(getQualifiedTblName(insertTableName + HiveMetaStoreBridge.TEMP_TABLE_PREFIX + SessionState.get().getSessionId()));
+        ((WriteEntity)outputs.iterator().next()).setWriteType(WriteEntity.WriteType.INSERT);
 
-        validateProcess(query,  HiveOperation.QUERY, inputs, outputs);
+        validateProcess(constructEvent(query,  HiveOperation.QUERY, inputs, outputs));
 
         assertTableIsRegistered(DEFAULT_DB, tableName);
         assertTableIsRegistered(DEFAULT_DB, insertTableName, null, true);
@@ -660,21 +718,40 @@ public class HiveHookIT {
 
     @Test
     public void testInsertIntoPartition() throws Exception {
-        String tableName = createTable(true);
-        String insertTableName = createTable(true);
+        final boolean isPartitionedTable = true;
+        String tableName = createTable(isPartitionedTable);
+        String insertTableName = createTable(isPartitionedTable);
         String query =
-            "insert into " + insertTableName + " partition(dt = '2015-01-01') select id, name from " + tableName
-                + " where dt = '2015-01-01'";
+            "insert into " + insertTableName + " partition(dt = '"+ PART_FILE + "') select id, name from " + tableName
+                + " where dt = '"+ PART_FILE + "'";
         runCommand(query);
 
-        List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
-        List<Entity> outputs = getOutputs(insertTableName, Entity.Type.TABLE);
-        ((WriteEntity)outputs.get(0)).setWriteType(WriteEntity.WriteType.INSERT);
+        final Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
+        final Set<WriteEntity> outputs = getOutputs(insertTableName, Entity.Type.TABLE);
+        ((WriteEntity)outputs.iterator().next()).setWriteType(WriteEntity.WriteType.INSERT);
+
+        final Set<ReadEntity> partitionIps = new LinkedHashSet<ReadEntity>() {
+            {
+                addAll(inputs);
+                add(getPartitionInput());
+
+            }
+        };
+
+        final Set<WriteEntity> partitionOps = new LinkedHashSet<WriteEntity>() {
+            {
+                addAll(outputs);
+                add(getPartitionOutput());
+
+            }
+        };
 
-        validateProcess(query,  HiveOperation.QUERY, inputs, outputs);
+        validateProcess(constructEvent(query,  HiveOperation.QUERY, partitionIps, partitionOps), inputs, outputs);
 
         assertTableIsRegistered(DEFAULT_DB, tableName);
         assertTableIsRegistered(DEFAULT_DB, insertTableName);
+
+        //TODO - update
     }
 
     private String random() {
@@ -701,65 +778,111 @@ public class HiveHookIT {
 
         assertTableIsRegistered(DEFAULT_DB, tableName);
 
-        String filename = "pfile://" + mkdir("export");
+        String filename = "pfile://" + mkdir("exportUnPartitioned");
         String query = "export table " + tableName + " to \"" + filename + "\"";
         runCommand(query);
 
-        List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
-        List<Entity> outputs = getOutputs(filename, Entity.Type.DFS_DIR);
+        Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
+        Set<WriteEntity> outputs = getOutputs(filename, Entity.Type.DFS_DIR);
 
-        Referenceable processReference = validateProcess(query, HiveOperation.EXPORT, inputs, outputs);
+        Referenceable processReference = validateProcess(constructEvent(query, HiveOperation.EXPORT, inputs, outputs));
 
         validateHDFSPaths(processReference, OUTPUTS, filename);
         validateInputTables(processReference, inputs);
 
         //Import
-        tableName = createTable(false);
-        assertTableIsRegistered(DEFAULT_DB, tableName);
+        String importTableName = createTable(false);
+        assertTableIsRegistered(DEFAULT_DB, importTableName);
 
-        query = "import table " + tableName + " from '" + filename + "'";
+        query = "import table " + importTableName + " from '" + filename + "'";
         runCommand(query);
-        outputs = getOutputs(tableName, Entity.Type.TABLE);
-        processReference = validateProcess(query, HiveOperation.IMPORT, getInputs(filename, Entity.Type.DFS_DIR), outputs);
-        validateHDFSPaths(processReference, INPUTS, filename);
+        outputs = getOutputs(importTableName, Entity.Type.TABLE);
+        validateProcess(constructEvent(query, HiveOperation.IMPORT, getInputs(filename, Entity.Type.DFS_DIR), outputs));
 
-        validateOutputTables(processReference, outputs);
+        //Should create another process
+        filename = "pfile://" + mkdir("export2UnPartitioned");
+        query = "export table " + tableName + " to \"" + filename + "\"";
+        runCommand(query);
+
+        inputs = getInputs(tableName, Entity.Type.TABLE);
+        outputs = getOutputs(filename, Entity.Type.DFS_DIR);
+
+        validateProcess(constructEvent(query, HiveOperation.EXPORT, inputs, outputs));
+
+        //import again shouyld create another process
+        query = "import table " + importTableName + " from '" + filename + "'";
+        runCommand(query);
+        outputs = getOutputs(importTableName, Entity.Type.TABLE);
+        validateProcess(constructEvent(query, HiveOperation.IMPORT, getInputs(filename, Entity.Type.DFS_DIR), outputs));
     }
 
     @Test
     public void testExportImportPartitionedTable() throws Exception {
-        String tableName = createTable(true);
-        String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
+        boolean isPartitionedTable = true;
+        final String tableName = createTable(isPartitionedTable);
+        assertTableIsRegistered(DEFAULT_DB, tableName);
 
         //Add a partition
         String partFile = "pfile://" + mkdir("partition");
-        String query = "alter table " + tableName + " add partition (dt='2015-01-01') location '" + partFile + "'";
+        String query = "alter table " + tableName + " add partition (dt='"+ PART_FILE + "') location '" + partFile + "'";
         runCommand(query);
 
         String filename = "pfile://" + mkdir("export");
         query = "export table " + tableName + " to \"" + filename + "\"";
         runCommand(query);
 
-        List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
-        List<Entity> outputs = getOutputs(filename, Entity.Type.DFS_DIR);
+        final Set<ReadEntity> expectedExportInputs = getInputs(tableName, Entity.Type.TABLE);
+        final Set<WriteEntity> outputs = getOutputs(filename, Entity.Type.DFS_DIR);
 
-        Referenceable processReference = validateProcess(query, HiveOperation.EXPORT, inputs, outputs);
-        validateHDFSPaths(processReference, OUTPUTS, filename);
+        //Note that export has only partition as input in this case
+        final Set<ReadEntity> partitionIps = getInputs(DEFAULT_DB + "@" + tableName + "@dt=" + PART_FILE, Entity.Type.PARTITION);
+        partitionIps.addAll(expectedExportInputs);
 
-        validateInputTables(processReference, inputs);
+        Referenceable processReference = validateProcess(constructEvent(query, HiveOperation.EXPORT, partitionIps, outputs), expectedExportInputs, outputs);
+        validateHDFSPaths(processReference, OUTPUTS, filename);
 
         //Import
-        tableName = createTable(true);
-        tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
+        String importTableName = createTable(true);
+        assertTableIsRegistered(DEFAULT_DB, tableName);
 
-        query = "import table " + tableName + " from '" + filename + "'";
+        query = "import table " + importTableName + " from '" + filename + "'";
         runCommand(query);
 
-        outputs = getOutputs(tableName, Entity.Type.TABLE);
-        processReference = validateProcess(query, HiveOperation.IMPORT, getInputs(filename, Entity.Type.DFS_DIR), outputs);
-        validateHDFSPaths(processReference, INPUTS, filename);
+        final Set<ReadEntity> expectedImportInputs = getInputs(filename, Entity.Type.DFS_DIR);
+        final Set<WriteEntity> importOutputs = getOutputs(importTableName, Entity.Type.TABLE);
 
-        validateOutputTables(processReference, outputs);
+        final Set<WriteEntity> partitionOps = getOutputs(DEFAULT_DB + "@" + importTableName + "@dt=" + PART_FILE, Entity.Type.PARTITION);
+        partitionOps.addAll(importOutputs);
+
+        validateProcess(constructEvent(query, HiveOperation.IMPORT, expectedImportInputs , partitionOps), expectedImportInputs, importOutputs);
+
+        //Export should update same process
+        filename = "pfile://" + mkdir("export2");
+        query = "export table " + tableName + " to \"" + filename + "\"";
+        runCommand(query);
+
+        final Set<WriteEntity> outputs2 = getOutputs(filename, Entity.Type.DFS_DIR);
+        Set<WriteEntity> p3Outputs = new LinkedHashSet<WriteEntity>() {{
+            addAll(outputs2);
+            addAll(outputs);
+        }};
+
+        validateProcess(constructEvent(query, HiveOperation.EXPORT, partitionIps, outputs2), expectedExportInputs, p3Outputs);
+
+        query = "alter table " + importTableName + " drop partition (dt='"+ PART_FILE + "')";
+        runCommand(query);
+
+        //Import should update same process
+        query = "import table " + importTableName + " from '" + filename + "'";
+        runCommandWithDelay(query, 1000);
+
+        final Set<ReadEntity> importInputs = getInputs(filename, Entity.Type.DFS_DIR);
+        final Set<ReadEntity> expectedImport2Inputs  = new LinkedHashSet<ReadEntity>() {{
+            addAll(importInputs);
+            addAll(expectedImportInputs);
+        }};
+
+        validateProcess(constructEvent(query, HiveOperation.IMPORT, importInputs, partitionOps), expectedImport2Inputs, importOutputs);
     }
 
     @Test
@@ -767,13 +890,14 @@ public class HiveHookIT {
         String tableName = createTable();
         String query = "select * from " + tableName;
         runCommand(query);
-        List<Entity> inputs = getInputs(tableName, Entity.Type.TABLE);
-        assertProcessIsNotRegistered(query, HiveOperation.QUERY, inputs, null);
+        Set<ReadEntity> inputs = getInputs(tableName, Entity.Type.TABLE);
+        HiveHook.HiveEventContext hiveEventContext = constructEvent(query, HiveOperation.QUERY, inputs, null);
+        assertProcessIsNotRegistered(hiveEventContext);
 
         //check with uppercase table name
         query = "SELECT * from " + tableName.toUpperCase();
         runCommand(query);
-        assertProcessIsNotRegistered(query, HiveOperation.QUERY, inputs, null);
+        assertProcessIsNotRegistered(hiveEventContext);
     }
 
     @Test
@@ -1042,10 +1166,10 @@ public class HiveHookIT {
         String query = String.format("truncate table %s", tableName);
         runCommand(query);
 
-        List<Entity> outputs = getInputs(tableName, Entity.Type.TABLE);
+        Set<WriteEntity> outputs = getOutputs(tableName, Entity.Type.TABLE);
 
         String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
-        validateProcess(query, HiveOperation.TRUNCATETABLE, null, outputs);
+        validateProcess(constructEvent(query, HiveOperation.TRUNCATETABLE, null, outputs));
 
         //Check lineage
         String datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName);
@@ -1144,7 +1268,7 @@ public class HiveHookIT {
         String query = "alter table " + tableName + " set location '" + testPath + "'";
         runCommand(query);
 
-        String tableId = assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
+        assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() {
             @Override
             public void assertOnEntity(Referenceable tableRef) throws Exception {
                 Referenceable sdRef = (Referenceable) tableRef.get(HiveDataModelGenerator.STORAGE_DESC);
@@ -1152,10 +1276,11 @@ public class HiveHookIT {
             }
         });
 
-        List<Entity> inputs = getInputs(testPath, Entity.Type.DFS_DIR);
-        List<Entity> outputs = getOutputs(tableName, Entity.Type.TABLE);
+        String processId = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
+            HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName, false), null);
+
+        Referenceable processReference = atlasClient.getEntity(processId);
 
-        Referenceable processReference = validateProcess(query, HiveOperation.ALTERTABLE_LOCATION, inputs, outputs);
         validateHDFSPaths(processReference, INPUTS, testPath);
     }
 
@@ -1302,6 +1427,20 @@ public class HiveHookIT {
         assertTableIsNotRegistered(DEFAULT_DB, tableName);
     }
 
+    private WriteEntity getPartitionOutput() {
+        WriteEntity partEntity = new WriteEntity();
+        partEntity.setName(PART_FILE);
+        partEntity.setTyp(Entity.Type.PARTITION);
+        return partEntity;
+    }
+
+    private ReadEntity getPartitionInput() {
+        ReadEntity partEntity = new ReadEntity();
+        partEntity.setName(PART_FILE);
+        partEntity.setTyp(Entity.Type.PARTITION);
+        return partEntity;
+    }
+
     @Test
     public void testDropDatabaseWithCascade() throws Exception {
         //Test Deletion of database and its corresponding tables
@@ -1550,26 +1689,66 @@ public class HiveHookIT {
         }
     }
 
-    private String assertProcessIsRegistered(final String queryStr, HiveOperation op, final List<Entity> inputTbls, final List<Entity> outputTbls) throws Exception {
-        String processQFName = getProcessQualifiedName(op, getSortedProcessDataSets(inputTbls), getSortedProcessDataSets(outputTbls));
-        LOG.debug("Searching for process with query {}", processQFName);
-        return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName, new AssertPredicate() {
-            @Override
-            public void assertOnEntity(final Referenceable entity) throws Exception {
-                List<String> recentQueries = (List<String>) entity.get("recentQueries");
-                Assert.assertEquals(recentQueries.get(0), lower(queryStr));
+    private String assertProcessIsRegistered(final HiveHook.HiveEventContext event) throws Exception {
+        try {
+            SortedSet<ReadEntity> sortedHiveInputs = event.getInputs() == null ? null : new TreeSet<ReadEntity>(entityComparator);
+            SortedSet<WriteEntity> sortedHiveOutputs = event.getOutputs() == null ? null : new TreeSet<WriteEntity>(entityComparator);
+
+            if ( event.getInputs() != null) {
+                sortedHiveInputs.addAll(event.getInputs());
             }
-        });
+            if ( event.getOutputs() != null) {
+                sortedHiveOutputs.addAll(event.getOutputs());
+            }
+
+            String processQFName = getProcessQualifiedName(event, sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(event.getInputs()), getSortedProcessDataSets(event.getOutputs()));
+            LOG.debug("Searching for process with query {}", processQFName);
+            return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName, new AssertPredicate() {
+                @Override
+                public void assertOnEntity(final Referenceable entity) throws Exception {
+                    List<String> recentQueries = (List<String>) entity.get("recentQueries");
+                    Assert.assertEquals(recentQueries.get(0), lower(event.getQueryStr()));
+                }
+            });
+        } catch (Exception e) {
+            LOG.error("Exception : ", e);
+            throw e;
+        }
+    }
+
+    private String assertProcessIsRegistered(final HiveHook.HiveEventContext event, final Set<ReadEntity> inputTbls, final Set<WriteEntity> outputTbls) throws Exception {
+        try {
+            SortedSet<ReadEntity> sortedHiveInputs = event.getInputs() == null ? null : new TreeSet<ReadEntity>(entityComparator);
+            SortedSet<WriteEntity> sortedHiveOutputs = event.getOutputs() == null ? null : new TreeSet<WriteEntity>(entityComparator);
+            if ( event.getInputs() != null) {
+                sortedHiveInputs.addAll(event.getInputs());
+            }
+            if ( event.getOutputs() != null) {
+                sortedHiveOutputs.addAll(event.getOutputs());
+            }
+            String processQFName = getProcessQualifiedName(event, sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(inputTbls), getSortedProcessDataSets(outputTbls));
+            LOG.debug("Searching for process with query {}", processQFName);
+            return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName, new AssertPredicate() {
+                @Override
+                public void assertOnEntity(final Referenceable entity) throws Exception {
+                    List<String> recentQueries = (List<String>) entity.get("recentQueries");
+                    Assert.assertEquals(recentQueries.get(0), lower(event.getQueryStr()));
+                }
+            });
+        } catch(Exception e) {
+            LOG.error("Exception : ", e);
+            throw e;
+        }
     }
 
     private String getDSTypeName(Entity entity) {
         return Entity.Type.TABLE.equals(entity.getType()) ? HiveDataTypes.HIVE_TABLE.name() : FSDataTypes.HDFS_PATH().toString();
     }
 
-    private SortedMap<Entity, Referenceable> getSortedProcessDataSets(List<Entity> inputTbls) {
-        SortedMap<Entity, Referenceable> inputs = new TreeMap<Entity, Referenceable>(entityComparator);
+    private <T extends Entity> SortedMap<T, Referenceable> getSortedProcessDataSets(Set<T> inputTbls) {
+        SortedMap<T, Referenceable> inputs = new TreeMap<T, Referenceable>(entityComparator);
         if (inputTbls != null) {
-            for (final Entity tbl : inputTbls) {
+            for (final T tbl : inputTbls) {
                 Referenceable inputTableRef = new Referenceable(getDSTypeName(tbl), new HashMap<String, Object>() {{
                     put(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tbl.getName());
                 }});
@@ -1579,10 +1758,22 @@ public class HiveHookIT {
         return inputs;
     }
 
-    private void assertProcessIsNotRegistered(String queryStr, HiveOperation op, final List<Entity> inputTbls, final List<Entity> outputTbls) throws Exception {
-        String processQFName = getProcessQualifiedName(op, getSortedProcessDataSets(inputTbls), getSortedProcessDataSets(outputTbls));
-        LOG.debug("Searching for process with query {}", processQFName);
-        assertEntityIsNotRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName);
+    private void assertProcessIsNotRegistered(HiveHook.HiveEventContext event) throws Exception {
+        try {
+            SortedSet<ReadEntity> sortedHiveInputs = event.getInputs() == null ? null : new TreeSet<ReadEntity>(entityComparator);
+            SortedSet<WriteEntity> sortedHiveOutputs = event.getOutputs() == null ? null : new TreeSet<WriteEntity>(entityComparator);
+            if ( event.getInputs() != null) {
+                sortedHiveInputs.addAll(event.getInputs());
+            }
+            if ( event.getOutputs() != null) {
+                sortedHiveOutputs.addAll(event.getOutputs());
+            }
+            String processQFName = getProcessQualifiedName(event, sortedHiveInputs, sortedHiveOutputs, getSortedProcessDataSets(event.getInputs()), getSortedProcessDataSets(event.getOutputs()));
+            LOG.debug("Searching for process with query {}", processQFName);
+            assertEntityIsNotRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processQFName);
+        } catch( Exception e) {
+            LOG.error("Exception : ", e);
+        }
     }
 
     private void assertTableIsNotRegistered(String dbName, String tableName, boolean isTemporaryTable) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f51c8861/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
index 8bbe2d7..09b1c4b 100644
--- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
+++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
@@ -128,7 +128,7 @@ public abstract class AtlasHook {
             } catch (Exception e) {
                 numRetries++;
                 if (numRetries < maxRetries) {
-                    LOG.debug("Failed to notify atlas for entity {}. Retrying", message, e);
+                    LOG.info("Failed to notify atlas for entity {}. Retrying", message, e);
                 } else {
                     if (shouldLogFailedMessages && e instanceof NotificationException) {
                         List<String> failedMessages = ((NotificationException) e).getFailedMessages();

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f51c8861/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 5a6440c..5de6df1 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES:
 
 ALL CHANGES:
 ATLAS-966 Exit execution of import_hive.sh if HIVE_HOME is not set (svimal2106 via sumasai)
+ATLAS-917 Add hdfs paths to process qualified name for non-partition based queries (sumasai)
 
 
 --Release 0.7-incubating


Mime
View raw message