atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From suma...@apache.org
Subject incubator-atlas git commit: ATLAS-527 Support lineage for load table, import, export (sumasai via shwethags)
Date Thu, 07 Apr 2016 16:00:57 GMT
Repository: incubator-atlas
Updated Branches:
  refs/heads/master 009330de2 -> 46365f8c4


ATLAS-527 Support lineage for load table, import, export (sumasai via shwethags)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/46365f8c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/46365f8c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/46365f8c

Branch: refs/heads/master
Commit: 46365f8c484b06fe4f2ef681d0f8533c698820ff
Parents: 009330d
Author: Suma Shivaprasad <sumasai.shivaprasad@gmail.com>
Authored: Thu Apr 7 09:00:47 2016 -0700
Committer: Suma Shivaprasad <sumasai.shivaprasad@gmail.com>
Committed: Thu Apr 7 09:00:47 2016 -0700

----------------------------------------------------------------------
 .../atlas/fs/model/FSDataModelGenerator.java    |   3 +
 .../org/apache/atlas/fs/model/FSDataModel.scala |   5 +-
 .../atlas/hive/bridge/HiveMetaStoreBridge.java  |  58 ++-
 .../org/apache/atlas/hive/hook/HiveHook.java    | 400 ++++++++++++-------
 .../org/apache/atlas/hive/hook/HiveHookIT.java  | 253 ++++++++++--
 .../apache/atlas/storm/hook/StormAtlasHook.java |   6 +-
 .../main/java/org/apache/atlas/AtlasClient.java |   3 +
 release-log.txt                                 |   1 +
 .../atlas/services/ReservedTypesRegistrar.java  |  12 +
 9 files changed, 540 insertions(+), 201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/46365f8c/addons/hdfs-model/src/main/java/org/apache/atlas/fs/model/FSDataModelGenerator.java
----------------------------------------------------------------------
diff --git a/addons/hdfs-model/src/main/java/org/apache/atlas/fs/model/FSDataModelGenerator.java b/addons/hdfs-model/src/main/java/org/apache/atlas/fs/model/FSDataModelGenerator.java
index 555d565..444c1aa 100644
--- a/addons/hdfs-model/src/main/java/org/apache/atlas/fs/model/FSDataModelGenerator.java
+++ b/addons/hdfs-model/src/main/java/org/apache/atlas/fs/model/FSDataModelGenerator.java
@@ -17,6 +17,9 @@
  */
 package org.apache.atlas.fs.model;
 
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.addons.ModelDefinitionDump;
 import org.apache.atlas.typesystem.TypesDef;
 import org.apache.atlas.typesystem.json.TypesSerialization;

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/46365f8c/addons/hdfs-model/src/main/scala/org/apache/atlas/fs/model/FSDataModel.scala
----------------------------------------------------------------------
diff --git a/addons/hdfs-model/src/main/scala/org/apache/atlas/fs/model/FSDataModel.scala b/addons/hdfs-model/src/main/scala/org/apache/atlas/fs/model/FSDataModel.scala
index c964f73..c380a92 100644
--- a/addons/hdfs-model/src/main/scala/org/apache/atlas/fs/model/FSDataModel.scala
+++ b/addons/hdfs-model/src/main/scala/org/apache/atlas/fs/model/FSDataModel.scala
@@ -31,13 +31,10 @@ import scala.tools.scalap.scalax.rules.scalasig.ClassFileParser.EnumConstValue
  */
 object FSDataModel extends App {
 
-    var typesDef : TypesDef = null
-
     val typesBuilder = new TypesBuilder
-
     import typesBuilder._
 
-    typesDef = types {
+    val typesDef : TypesDef = types {
 
         // FS DataSet
         _class(FSDataTypes.FS_PATH.toString, List("DataSet", AtlasClient.REFERENCEABLE_SUPER_TYPE)) {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/46365f8c/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
index 3a802d7..eb5f1e6 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
@@ -18,18 +18,22 @@
 
 package org.apache.atlas.hive.bridge;
 
-import com.google.common.base.Joiner;
+import com.google.common.annotations.VisibleForTesting;
 import com.sun.jersey.api.client.ClientResponse;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasConstants;
 import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.fs.model.FSDataModel;
+import org.apache.atlas.fs.model.FSDataTypes;
 import org.apache.atlas.hive.model.HiveDataModelGenerator;
 import org.apache.atlas.hive.model.HiveDataTypes;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.atlas.typesystem.Struct;
 import org.apache.atlas.typesystem.json.InstanceSerialization;
+import org.apache.atlas.typesystem.json.TypesSerialization;
 import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -67,6 +71,9 @@ public class HiveMetaStoreBridge {
 
     public static final String ATLAS_ENDPOINT = "atlas.rest.address";
 
+    private final String doAsUser;
+    private final UserGroupInformation ugi;
+
     private static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreBridge.class);
 
     public final Hive hiveClient;
@@ -82,6 +89,11 @@ public class HiveMetaStoreBridge {
         this(hiveConf, atlasConf, null, null);
     }
 
+    @VisibleForTesting
+    HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClient atlasClient) {
+        this(clusterName, hiveClient, atlasClient, null, null);
+    }
+
     public String getClusterName() {
         return clusterName;
     }
@@ -96,21 +108,16 @@ public class HiveMetaStoreBridge {
                                UserGroupInformation ugi) throws Exception {
         this(hiveConf.get(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME),
                 Hive.get(hiveConf),
-                atlasConf, doAsUser, ugi);
-    }
-
-    HiveMetaStoreBridge(String clusterName, Hive hiveClient,
-                        Configuration atlasConf, String doAsUser, UserGroupInformation ugi) {
-        this.clusterName = clusterName;
-        this.hiveClient = hiveClient;
-        String baseUrls = atlasConf.getString(ATLAS_ENDPOINT, DEFAULT_DGI_URL);
-        this.atlasClient = new AtlasClient(ugi, doAsUser, baseUrls.split(","));
+                new AtlasClient(atlasConf.getString(ATLAS_ENDPOINT, DEFAULT_DGI_URL), ugi, doAsUser), doAsUser, ugi);
     }
 
-    HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClient atlasClient) {
+    @VisibleForTesting
+    HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClient atlasClient, String user, UserGroupInformation ugi) {
         this.clusterName = clusterName;
         this.hiveClient = hiveClient;
         this.atlasClient = atlasClient;
+        this.doAsUser = user;
+        this.ugi = ugi;
     }
 
     private AtlasClient getAtlasClient() {
@@ -306,7 +313,7 @@ public class HiveMetaStoreBridge {
     }
 
     private Referenceable createOrUpdateTableInstance(Referenceable dbReference, Referenceable tableReference,
-                                                      Table hiveTable) throws Exception {
+                                                      final Table hiveTable) throws Exception {
         LOG.info("Importing objects from {}.{}", hiveTable.getDbName(), hiveTable.getTableName());
 
         if (tableReference == null) {
@@ -348,6 +355,7 @@ public class HiveMetaStoreBridge {
 
         tableReference.set(TABLE_TYPE_ATTR, hiveTable.getTableType().name());
         tableReference.set("temporary", hiveTable.isTemporary());
+
         return tableReference;
     }
 
@@ -453,6 +461,17 @@ public class HiveMetaStoreBridge {
         return sdReferenceable;
     }
 
+    public Referenceable fillHDFSDataSet(String pathUri) {
+        Referenceable ref = new Referenceable(FSDataTypes.HDFS_PATH().toString());
+        ref.set("path", pathUri);
+//        Path path = new Path(pathUri);
+//        ref.set("name", path.getName());
+//        TODO - Fix after ATLAS-542 to shorter Name
+        ref.set("name", pathUri);
+        ref.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, pathUri);
+        return ref;
+    }
+
     public static String getColumnQualifiedName(final String tableQualifiedName, final String colName) {
         final String[] parts = tableQualifiedName.split("@");
         final String tableName = parts[0];
@@ -488,6 +507,21 @@ public class HiveMetaStoreBridge {
         AtlasClient dgiClient = getAtlasClient();
 
         try {
+            dgiClient.getType(FSDataTypes.HDFS_PATH().toString());
+            LOG.info("HDFS data model is already registered!");
+        } catch(AtlasServiceException ase) {
+            if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
+                //Trigger val definition
+                FSDataModel.main(null);
+
+                final String hdfsModelJson = TypesSerialization.toJson(FSDataModel.typesDef());
+                //Expected in case types do not exist
+                LOG.info("Registering HDFS data model : " + hdfsModelJson);
+                dgiClient.createType(hdfsModelJson);
+            }
+        }
+
+        try {
             dgiClient.getType(HiveDataTypes.HIVE_PROCESS.getName());
             LOG.info("Hive data model is already registered!");
         } catch(AtlasServiceException ase) {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/46365f8c/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index 68e32ff..4102263 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -27,7 +27,10 @@ import org.apache.atlas.hook.AtlasHook;
 import org.apache.atlas.notification.hook.HookNotification;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.exec.ExplainTask;
@@ -38,18 +41,20 @@ import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
 import org.apache.hadoop.hive.ql.hooks.HookContext;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.plan.HiveOperation;
+
 import org.apache.hadoop.security.UserGroupInformation;
 import org.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.MalformedURLException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -86,18 +91,108 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
     private static final long keepAliveTimeDefault = 10;
     private static final int queueSizeDefault = 10000;
 
-    class HiveEvent {
-        public Set<ReadEntity> inputs;
-        public Set<WriteEntity> outputs;
-
-        public String user;
-        public UserGroupInformation ugi;
-        public HiveOperation operation;
-        public HookContext.HookType hookType;
-        public JSONObject jsonPlan;
-        public String queryId;
-        public String queryStr;
-        public Long queryStartTime;
+    static class HiveEventContext {
+        private Set<ReadEntity> inputs;
+        private Set<WriteEntity> outputs;
+
+        private String user;
+        private UserGroupInformation ugi;
+        private HiveOperation operation;
+        private HookContext.HookType hookType;
+        private org.json.JSONObject jsonPlan;
+        private String queryId;
+        private String queryStr;
+        private Long queryStartTime;
+
+        private String queryType;
+
+        public void setInputs(Set<ReadEntity> inputs) {
+            this.inputs = inputs;
+        }
+
+        public void setOutputs(Set<WriteEntity> outputs) {
+            this.outputs = outputs;
+        }
+
+        public void setUser(String user) {
+            this.user = user;
+        }
+
+        public void setUgi(UserGroupInformation ugi) {
+            this.ugi = ugi;
+        }
+
+        public void setOperation(HiveOperation operation) {
+            this.operation = operation;
+        }
+
+        public void setHookType(HookContext.HookType hookType) {
+            this.hookType = hookType;
+        }
+
+        public void setJsonPlan(JSONObject jsonPlan) {
+            this.jsonPlan = jsonPlan;
+        }
+
+        public void setQueryId(String queryId) {
+            this.queryId = queryId;
+        }
+
+        public void setQueryStr(String queryStr) {
+            this.queryStr = queryStr;
+        }
+
+        public void setQueryStartTime(Long queryStartTime) {
+            this.queryStartTime = queryStartTime;
+        }
+
+        public void setQueryType(String queryType) {
+            this.queryType = queryType;
+        }
+
+        public Set<ReadEntity> getInputs() {
+            return inputs;
+        }
+
+        public Set<WriteEntity> getOutputs() {
+            return outputs;
+        }
+
+        public String getUser() {
+            return user;
+        }
+
+        public UserGroupInformation getUgi() {
+            return ugi;
+        }
+
+        public HiveOperation getOperation() {
+            return operation;
+        }
+
+        public HookContext.HookType getHookType() {
+            return hookType;
+        }
+
+        public org.json.JSONObject getJsonPlan() {
+            return jsonPlan;
+        }
+
+        public String getQueryId() {
+            return queryId;
+        }
+
+        public String getQueryStr() {
+            return queryStr;
+        }
+
+        public Long getQueryStartTime() {
+            return queryStartTime;
+        }
+
+        public String getQueryType() {
+            return queryType;
+        }
     }
 
     private List<HookNotification.HookNotificationMessage> messages = new ArrayList<>();
@@ -114,22 +209,22 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
             int queueSize = atlasProperties.getInt(QUEUE_SIZE, queueSizeDefault);
 
             executor = new ThreadPoolExecutor(minThreads, maxThreads, keepAliveTime, TimeUnit.MILLISECONDS,
-                    new LinkedBlockingQueue<Runnable>(queueSize),
-                    new ThreadFactoryBuilder().setNameFormat("Atlas Logger %d").build());
-
-                Runtime.getRuntime().addShutdownHook(new Thread() {
-                    @Override
-                    public void run() {
-                        try {
-                            executor.shutdown();
-                            executor.awaitTermination(WAIT_TIME, TimeUnit.SECONDS);
-                            executor = null;
-                        } catch (InterruptedException ie) {
-                            LOG.info("Interrupt received in shutdown.");
-                        }
-                        // shutdown client
+                new LinkedBlockingQueue<Runnable>(queueSize),
+                new ThreadFactoryBuilder().setNameFormat("Atlas Logger %d").build());
+
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+                @Override
+                public void run() {
+                    try {
+                        executor.shutdown();
+                        executor.awaitTermination(WAIT_TIME, TimeUnit.SECONDS);
+                        executor = null;
+                    } catch (InterruptedException ie) {
+                        LOG.info("Interrupt received in shutdown.");
                     }
-                });
+                    // shutdown client
+                }
+            });
 
             setupOperationMap();
         } catch (Exception e) {
@@ -156,21 +251,21 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
     @Override
     public void run(final HookContext hookContext) throws Exception {
         // clone to avoid concurrent access
-        final HiveEvent event = new HiveEvent();
-        final HiveConf conf = new HiveConf(hookContext.getConf());
-
-        event.inputs = hookContext.getInputs();
-        event.outputs = hookContext.getOutputs();
 
-        event.user = getUser(hookContext.getUserName(), hookContext.getUgi());
-        event.ugi = hookContext.getUgi();
-        event.operation = OPERATION_MAP.get(hookContext.getOperationName());
-        event.hookType = hookContext.getHookType();
-        event.queryId = hookContext.getQueryPlan().getQueryId();
-        event.queryStr = hookContext.getQueryPlan().getQueryStr();
-        event.queryStartTime = hookContext.getQueryPlan().getQueryStartTime();
+        final HiveConf conf = new HiveConf(hookContext.getConf());
 
-        event.jsonPlan = getQueryPlan(hookContext.getConf(), hookContext.getQueryPlan());
+        final HiveEventContext event = new HiveEventContext();
+        event.setInputs(hookContext.getInputs());
+        event.setOutputs(hookContext.getOutputs());
+        event.setJsonPlan(getQueryPlan(hookContext.getConf(), hookContext.getQueryPlan()));
+        event.setHookType(hookContext.getHookType());
+        event.setUgi(hookContext.getUgi());
+        event.setUser(hookContext.getUserName());
+        event.setOperation(OPERATION_MAP.get(hookContext.getOperationName()));
+        event.setQueryId(hookContext.getQueryPlan().getQueryId());
+        event.setQueryStr(hookContext.getQueryPlan().getQueryStr());
+        event.setQueryStartTime(hookContext.getQueryPlan().getQueryStartTime());
+        event.setQueryType(hookContext.getQueryPlan().getQueryPlan().getQueryType());
 
         boolean sync = conf.get(CONF_SYNC, "false").equals("true");
         if (sync) {
@@ -189,20 +284,21 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
         }
     }
 
-    private void fireAndForget(HiveEvent event) throws Exception {
-        assert event.hookType == HookContext.HookType.POST_EXEC_HOOK : "Non-POST_EXEC_HOOK not supported!";
+    private void fireAndForget(HiveEventContext event) throws Exception {
+        assert event.getHookType() == HookContext.HookType.POST_EXEC_HOOK : "Non-POST_EXEC_HOOK not supported!";
 
-        LOG.info("Entered Atlas hook for hook type {} operation {}", event.hookType, event.operation);
+        LOG.info("Entered Atlas hook for hook type {} operation {}", event.getHookType(), event.getOperation());
 
-        HiveMetaStoreBridge dgiBridge = new HiveMetaStoreBridge(hiveConf, atlasProperties, event.user, event.ugi);
+        HiveMetaStoreBridge dgiBridge = new HiveMetaStoreBridge(hiveConf, atlasProperties, event.getUser(), event.getUgi());
 
-        switch (event.operation) {
+        switch (event.getOperation()) {
         case CREATEDATABASE:
             handleEventOutputs(dgiBridge, event, Type.DATABASE);
             break;
 
         case CREATETABLE:
-            handleEventOutputs(dgiBridge, event, Type.TABLE);
+            List<Pair<? extends Entity, Referenceable>> tablesCreated = handleEventOutputs(dgiBridge, event, Type.TABLE);
+            handleExternalTables(dgiBridge, event, tablesCreated.get(0).getLeft(), tablesCreated.get(0).getRight());
             break;
 
         case CREATETABLE_AS_SELECT:
@@ -221,25 +317,26 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
             break;
 
         case ALTERTABLE_FILEFORMAT:
-        case ALTERTABLE_LOCATION:
         case ALTERTABLE_CLUSTER_SORT:
         case ALTERTABLE_BUCKETNUM:
         case ALTERTABLE_PROPERTIES:
         case ALTERVIEW_PROPERTIES:
         case ALTERTABLE_SERDEPROPERTIES:
         case ALTERTABLE_SERIALIZER:
-            alterTable(dgiBridge, event);
-            break;
-
         case ALTERTABLE_ADDCOLS:
         case ALTERTABLE_REPLACECOLS:
         case ALTERTABLE_RENAMECOL:
-            alterTable(dgiBridge, event);
+            handleEventOutputs(dgiBridge, event, Type.TABLE);
             break;
-
+        case ALTERTABLE_LOCATION:
+            List<Pair<? extends Entity, Referenceable>> tablesUpdated = handleEventOutputs(dgiBridge, event, Type.TABLE);
+            if (tablesUpdated != null && tablesUpdated.size() > 0) {
+                //Track altered lineage in case of external tables
+                handleExternalTables(dgiBridge, event, tablesUpdated.get(0).getLeft(), tablesUpdated.get(0).getRight());
+            }
         case ALTERDATABASE:
         case ALTERDATABASE_OWNER:
-            alterDatabase(dgiBridge, event);
+            handleEventOutputs(dgiBridge, event, Type.DATABASE);
             break;
 
         default:
@@ -248,61 +345,37 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
         notifyEntities(messages);
     }
 
-    private void alterDatabase(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception {
-        assert event.outputs != null && event.outputs.size() > 0;
-
-        for (WriteEntity writeEntity : event.outputs) {
-            if (writeEntity.getType() == Type.DATABASE) {
-                //Create/update table entity
-                createOrUpdateEntities(dgiBridge, event.user, writeEntity);
-            }
-        }
-    }
-
-    private void alterTable(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception {
-        assert event.inputs != null && event.inputs.size() == 1;
-        assert event.outputs != null && event.outputs.size() > 0;
-
-        for (WriteEntity writeEntity : event.outputs) {
-           //Below check should  filter out partition related ddls
-           if (writeEntity.getType() == Entity.Type.TABLE) {
-               //Create/update table entity
-               createOrUpdateEntities(dgiBridge, event.user, writeEntity);
-           }
-        }
-    }
-
-    private void renameTable(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception {
+    private void renameTable(HiveMetaStoreBridge dgiBridge, HiveEventContext event) throws Exception {
         //crappy, no easy of getting new name
-        assert event.inputs != null && event.inputs.size() == 1;
-        assert event.outputs != null && event.outputs.size() > 0;
+        assert event.getInputs() != null && event.getInputs().size() == 1;
+        assert event.getOutputs() != null && event.getOutputs().size() > 0;
 
         //Update entity if not exists
-        ReadEntity oldEntity = event.inputs.iterator().next();
+        ReadEntity oldEntity = event.getInputs().iterator().next();
         Table oldTable = oldEntity.getTable();
 
-        for (WriteEntity writeEntity : event.outputs) {
+        for (WriteEntity writeEntity : event.getOutputs()) {
             if (writeEntity.getType() == Entity.Type.TABLE) {
                 Table newTable = writeEntity.getTable();
                 if (newTable.getDbName().equals(oldTable.getDbName()) && !newTable.getTableName()
                     .equals(oldTable.getTableName())) {
 
                     //Create/update old table entity - create new entity and replace id
-                    Referenceable tableEntity = createOrUpdateEntities(dgiBridge, event.user, writeEntity);
+                    Referenceable tableEntity = createOrUpdateEntities(dgiBridge, event.getUser(), writeEntity);
                     String oldQualifiedName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(),
-                            oldTable.getDbName(), oldTable.getTableName());
+                        oldTable.getDbName(), oldTable.getTableName());
                     tableEntity.set(HiveDataModelGenerator.NAME, oldQualifiedName);
                     tableEntity.set(HiveDataModelGenerator.TABLE_NAME, oldTable.getTableName().toLowerCase());
 
                     String newQualifiedName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(),
-                            newTable.getDbName(), newTable.getTableName());
+                        newTable.getDbName(), newTable.getTableName());
 
                     Referenceable newEntity = new Referenceable(HiveDataTypes.HIVE_TABLE.getName());
                     newEntity.set(HiveDataModelGenerator.NAME, newQualifiedName);
                     newEntity.set(HiveDataModelGenerator.TABLE_NAME, newTable.getTableName().toLowerCase());
-                    messages.add(new HookNotification.EntityPartialUpdateRequest(event.user,
-                            HiveDataTypes.HIVE_TABLE.getName(), HiveDataModelGenerator.NAME,
-                            oldQualifiedName, newEntity));
+                    messages.add(new HookNotification.EntityPartialUpdateRequest(event.getUser(),
+                        HiveDataTypes.HIVE_TABLE.getName(), HiveDataModelGenerator.NAME,
+                        oldQualifiedName, newEntity));
                 }
             }
         }
@@ -346,12 +419,17 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
         return tableEntity;
     }
 
-    private void handleEventOutputs(HiveMetaStoreBridge dgiBridge, HiveEvent event, Type entityType) throws Exception {
-        for (WriteEntity entity : event.outputs) {
+    private List<Pair<? extends Entity, Referenceable>> handleEventOutputs(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Type entityType) throws Exception {
+        List<Pair<? extends Entity, Referenceable>> entitiesCreatedOrUpdated = new ArrayList<>();
+        for (Entity entity : event.getOutputs()) {
             if (entity.getType() == entityType) {
-                createOrUpdateEntities(dgiBridge, event.user, entity);
+                Referenceable entityCreatedOrUpdated = createOrUpdateEntities(dgiBridge, event.getUser(), entity);
+                if (entitiesCreatedOrUpdated != null) {
+                    entitiesCreatedOrUpdated.add(Pair.of(entity, entityCreatedOrUpdated));
+                }
             }
         }
+        return entitiesCreatedOrUpdated;
     }
 
     private String normalize(String str) {
@@ -361,9 +439,9 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
         return str.toLowerCase().trim();
     }
 
-    private void registerProcess(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception {
-        Set<ReadEntity> inputs = event.inputs;
-        Set<WriteEntity> outputs = event.outputs;
+    private void registerProcess(HiveMetaStoreBridge dgiBridge, HiveEventContext event) throws Exception {
+        Set<ReadEntity> inputs = event.getInputs();
+        Set<WriteEntity> outputs = event.getOutputs();
 
         //Even explain CTAS has operation name as CREATETABLE_AS_SELECT
         if (inputs.isEmpty() && outputs.isEmpty()) {
@@ -371,64 +449,54 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
             return;
         }
 
-        if (event.queryId == null) {
-            LOG.info("Query id/plan is missing for {}" , event.queryStr);
+        if (event.getQueryId() == null) {
+            LOG.info("Query id/plan is missing for {}", event.getQueryStr());
         }
 
-        String queryStr = normalize(event.queryStr);
-        LOG.debug("Registering query: {}", queryStr);
-
-        Map<String, Referenceable>  source = new LinkedHashMap<>();
-        Map<String, Referenceable> target = new LinkedHashMap<>();
+        final Map<String, Referenceable> source = new LinkedHashMap<>();
+        final Map<String, Referenceable> target = new LinkedHashMap<>();
 
         boolean isSelectQuery = isSelectQuery(event);
 
         // Also filter out select queries which do not modify data
         if (!isSelectQuery) {
-            for (ReadEntity readEntity : inputs) {
-                if (readEntity.getType() == Type.TABLE || readEntity.getType() == Type.PARTITION) {
-                    final String tblQFName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(),readEntity.getTable().getDbName(), readEntity.getTable().getTableName());
-                    if (!source.containsKey(tblQFName)) {
-                        Referenceable inTable = createOrUpdateEntities(dgiBridge, event.user, readEntity);
-                        source.put(tblQFName, inTable);
-                    }
-                }
+            for (ReadEntity readEntity : event.getInputs()) {
+                processHiveEntity(dgiBridge, event, readEntity, source);
             }
 
-            for (WriteEntity writeEntity : outputs) {
-                if (writeEntity.getType() == Type.TABLE || writeEntity.getType() == Type.PARTITION) {
-                    Referenceable outTable = createOrUpdateEntities(dgiBridge, event.user, writeEntity);
-                    final String tblQFName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), writeEntity.getTable().getDbName(), writeEntity.getTable().getTableName());
-                    if (!target.containsKey(tblQFName)) {
-                        target.put(tblQFName, outTable);
-                    }
-                }
+            for (WriteEntity writeEntity : event.getOutputs()) {
+                processHiveEntity(dgiBridge, event, writeEntity, target);
             }
 
             if (source.size() > 0 || target.size() > 0) {
-                Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName());
-
-                List<Referenceable> sourceList = new ArrayList<>(source.values());
-                List<Referenceable> targetList = new ArrayList<>(target.values());
-
-                //The serialization code expected a list
-                processReferenceable.set("inputs", sourceList);
-                processReferenceable.set("outputs", targetList);
-                processReferenceable.set("name", queryStr);
-                processReferenceable.set("operationType", event.operation.getOperationName());
-                processReferenceable.set("startTime", event.queryStartTime);
-                processReferenceable.set("userName", event.user);
-                processReferenceable.set("queryText", queryStr);
-                processReferenceable.set("queryId", event.queryId);
-                processReferenceable.set("queryPlan", event.jsonPlan.toString());
-                processReferenceable.set("endTime", System.currentTimeMillis());
-                //TODO set queryGraph
-                messages.add(new HookNotification.EntityCreateRequest(event.user, processReferenceable));
+                Referenceable processReferenceable = getProcessReferenceable(event,
+                    new ArrayList<Referenceable>() {{
+                        addAll(source.values());
+                    }},
+                    new ArrayList<Referenceable>() {{
+                        addAll(target.values());
+                    }});
+                messages.add(new HookNotification.EntityCreateRequest(event.getUser(), processReferenceable));
             } else {
-                LOG.info("Skipped query {} since it has no inputs or resulting outputs", queryStr);
+                LOG.info("Skipped query {} since it has no getInputs() or resulting getOutputs()", event.getQueryStr());
             }
         } else {
-            LOG.info("Skipped query {} for processing since it is a select query ", queryStr);
+            LOG.info("Skipped query {} for processing since it is a select query ", event.getQueryStr());
+        }
+    }
+
+    private void processHiveEntity(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Entity entity, Map<String, Referenceable> dataSets) throws Exception {
+        if (entity.getType() == Type.TABLE || entity.getType() == Type.PARTITION) {
+            final String tblQFName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), entity.getTable().getDbName(), entity.getTable().getTableName());
+            if (!dataSets.containsKey(tblQFName)) {
+                Referenceable inTable = createOrUpdateEntities(dgiBridge, event.getUser(), entity);
+                dataSets.put(tblQFName, inTable);
+            }
+        } else if (entity.getType() == Type.DFS_DIR) {
+            final String pathUri = normalize(new Path(entity.getLocation()).toString());
+            LOG.info("Registering DFS Path {} ", pathUri);
+            Referenceable hdfsPath = dgiBridge.fillHDFSDataSet(pathUri);
+            dataSets.put(pathUri, hdfsPath);
         }
     }
 
@@ -444,13 +512,13 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
         }
     }
 
-    private boolean isSelectQuery(HiveEvent event) {
-        if (event.operation == HiveOperation.QUERY) {
-            Set<WriteEntity> outputs = event.outputs;
+    private boolean isSelectQuery(HiveEventContext event) {
+        if (event.getOperation() == HiveOperation.QUERY) {
+            Set<WriteEntity> outputs = event.getOutputs();
 
             //Select query has only one output
-            if (outputs.size() == 1) {
-                WriteEntity output = outputs.iterator().next();
+            if (event.getOutputs().size() == 1) {
+                WriteEntity output = event.getOutputs().iterator().next();
                 /* Strangely select queries have DFS_DIR as the type which seems like a bug in hive. Filter out by checking if the path is a temporary URI
                  * Insert into/overwrite queries onto local or dfs paths have DFS_DIR or LOCAL_DIR as the type and WriteType.PATH_WRITE and tempUri = false
                  * Insert into a temporary table has isTempURI = false. So will not skip as expected
@@ -465,4 +533,50 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext {
         }
         return false;
     }
+
+    private void handleExternalTables(final HiveMetaStoreBridge dgiBridge, final HiveEventContext event, final Entity entity, final Referenceable tblRef) throws HiveException, MalformedURLException {
+        Table hiveTable = entity.getTable();
+        //Refresh to get the correct location
+        hiveTable = dgiBridge.hiveClient.getTable(hiveTable.getDbName(), hiveTable.getTableName());
+
+        final String location = normalize(hiveTable.getDataLocation().toString());
+        if (hiveTable != null && TableType.EXTERNAL_TABLE.equals(hiveTable.getTableType())) {
+            LOG.info("Registering external table process {} ", event.getQueryStr());
+            List<Referenceable> inputs = new ArrayList<Referenceable>() {{
+                add(dgiBridge.fillHDFSDataSet(location));
+            }};
+
+            List<Referenceable> outputs = new ArrayList<Referenceable>() {{
+                add(tblRef);
+            }};
+
+            Referenceable processReferenceable = getProcessReferenceable(event, inputs, outputs);
+            messages.add(new HookNotification.EntityCreateRequest(event.getUser(), processReferenceable));
+        }
+    }
+
+    private Referenceable getProcessReferenceable(HiveEventContext hiveEvent, List<Referenceable> sourceList, List<Referenceable> targetList) {
+        Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName());
+
+        String queryStr = normalize(hiveEvent.getQueryStr());
+        LOG.debug("Registering query: {}", queryStr);
+
+        //The serialization code expected a list
+        if (sourceList != null || !sourceList.isEmpty()) {
+            processReferenceable.set("inputs", sourceList);
+        }
+        if (targetList != null || !targetList.isEmpty()) {
+            processReferenceable.set("outputs", targetList);
+        }
+        processReferenceable.set("name", queryStr);
+        processReferenceable.set("operationType", hiveEvent.getOperation().getOperationName());
+        processReferenceable.set("startTime", hiveEvent.getQueryStartTime());
+        processReferenceable.set("userName", hiveEvent.getUser());
+        processReferenceable.set("queryText", queryStr);
+        processReferenceable.set("queryId", hiveEvent.getQueryId());
+        processReferenceable.set("queryPlan", hiveEvent.getJsonPlan());
+        processReferenceable.set("endTime", System.currentTimeMillis());
+        //TODO set queryGraph
+        return processReferenceable;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/46365f8c/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
index e17afb8..8ef8479 100755
--- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
+++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
@@ -22,16 +22,19 @@ import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
+import org.apache.atlas.fs.model.FSDataTypes;
 import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
 import org.apache.atlas.hive.model.HiveDataModelGenerator;
 import org.apache.atlas.hive.model.HiveDataTypes;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.atlas.typesystem.Struct;
+import org.apache.atlas.typesystem.persistence.Id;
 import org.apache.atlas.utils.ParamChecker;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.ql.Driver;
@@ -61,6 +64,9 @@ public class HiveHookIT {
     private Driver driver;
     private AtlasClient dgiCLient;
     private SessionState ss;
+    
+    private static final String INPUTS = AtlasClient.PROCESS_ATTRIBUTE_INPUTS;
+    private static final String OUTPUTS = AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS;
 
     private enum QUERY_TYPE {
         GREMLIN,
@@ -81,9 +87,11 @@ public class HiveHookIT {
         SessionState.setCurrentSessionState(ss);
 
         Configuration configuration = ApplicationProperties.get();
+        dgiCLient = new AtlasClient(configuration.getString(HiveMetaStoreBridge.ATLAS_ENDPOINT, DGI_URL));
+
         HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(conf, configuration);
         hiveMetaStoreBridge.registerHiveDataModel();
-        dgiCLient = new AtlasClient(configuration.getString(HiveMetaStoreBridge.ATLAS_ENDPOINT, DGI_URL));
+
     }
 
     private void runCommand(String cmd) throws Exception {
@@ -145,10 +153,15 @@ public class HiveHookIT {
         return tableName;
     }
 
-    private String createTable(boolean isPartitioned, boolean isTemporary) throws Exception {
+    private String createTable(boolean isExternal, boolean isPartitioned, boolean isTemporary) throws Exception {
         String tableName = tableName();
-        runCommand("create " + (isTemporary ? "TEMPORARY " : "") + "table " + tableName + "(id int, name string) comment 'table comment' " + (isPartitioned ?
-            " partitioned by(dt string)" : ""));
+
+        String location = "";
+        if (isExternal) {
+            location = " location '" +  createTestDFSPath("someTestPath") + "'";
+        }
+        runCommand("create " + (isExternal ? " EXTERNAL " : "") + (isTemporary ? "TEMPORARY " : "") + "table " + tableName + "(id int, name string) comment 'table comment' " + (isPartitioned ?
+            " partitioned by(dt string)" : "") + location);
         return tableName;
     }
 
@@ -182,6 +195,37 @@ public class HiveHookIT {
         assertDatabaseIsRegistered(DEFAULT_DB);
     }
 
+    @Test
+    public void testCreateExternalTable() throws Exception {
+        String tableName = tableName();
+        String dbName = createDatabase();
+        String colName = columnName();
+
+        String pFile = createTestDFSPath("parentPath");
+        final String query = String.format("create EXTERNAL table %s.%s( %s, %s) location '%s'", dbName , tableName , colName + " int", "name string",  pFile);
+        runCommand(query);
+        String tableId = assertTableIsRegistered(dbName, tableName);
+
+        Referenceable processReference = validateProcess(query, 1, 1);
+        validateHDFSPaths(processReference, pFile, INPUTS);
+        validateOutputTables(processReference, tableId);
+    }
+
+    private void validateOutputTables(Referenceable processReference, String... expectedTableGuids) throws Exception {
+       validateTables(processReference, OUTPUTS, expectedTableGuids);
+    }
+
+    private void validateInputTables(Referenceable processReference, String... expectedTableGuids) throws Exception {
+        validateTables(processReference, INPUTS, expectedTableGuids);
+    }
+
+    private void validateTables(Referenceable processReference, String attrName, String... expectedTableGuids) throws Exception {
+        List<Id> tableRef = (List<Id>) processReference.get(attrName);
+        for(int i = 0; i < expectedTableGuids.length; i++) {
+            Assert.assertEquals(tableRef.get(i)._getId(), expectedTableGuids[i]);
+        }
+    }
+
     private String assertColumnIsRegistered(String colName) throws Exception {
         LOG.debug("Searching for column {}", colName.toLowerCase());
         String query =
@@ -265,9 +309,16 @@ public class HiveHookIT {
         Assert.assertEquals(vertices.length(), 0);
     }
 
+    private String createTestDFSPath(String path) throws Exception {
+        return "pfile://" + mkdir(path);
+    }
+
+    private String createTestDFSFile(String path) throws Exception {
+        return "pfile://" + file(path);
+    }
 
     @Test
-    public void testLoadData() throws Exception {
+    public void testLoadLocalPath() throws Exception {
         String tableName = createTable(false);
 
         String loadFile = file("load");
@@ -278,17 +329,69 @@ public class HiveHookIT {
     }
 
     @Test
-    public void testLoadDataIntoPartition() throws Exception {
+    public void testLoadLocalPathIntoPartition() throws Exception {
         String tableName = createTable(true);
 
         String loadFile = file("load");
         String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName +  " partition(dt = '2015-01-01')";
         runCommand(query);
 
+        validateProcess(query, 0, 1);
+    }
+
+    @Test
+    public void testLoadDFSPath() throws Exception {
+        String tableName = createTable(true, true, false);
+
+        String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
+
+        String loadFile = createTestDFSFile("loadDFSFile");
+        String query = "load data inpath '" + loadFile + "' into table " + tableName + " partition(dt = '2015-01-01')";
+        runCommand(query);
+
+        Referenceable processReference = validateProcess(query, 1, 1);
+
+        validateHDFSPaths(processReference, loadFile, INPUTS);
+
+        validateOutputTables(processReference, tableId);
+    }
+
+    private Referenceable validateProcess(String query, int numInputs, int numOutputs) throws Exception {
+        String processId = assertProcessIsRegistered(query);
+        Referenceable process = dgiCLient.getEntity(processId);
+        if (numInputs == 0) {
+            Assert.assertNull(process.get(INPUTS));
+        } else {
+            Assert.assertEquals(((List<Referenceable>) process.get(INPUTS)).size(), numInputs);
+        }
+
+        if (numOutputs == 0) {
+            Assert.assertNull(process.get(OUTPUTS));
+        } else {
+            Assert.assertEquals(((List<Id>) process.get(OUTPUTS)).size(), numOutputs);
+        }
+
+        return process;
+    }
+
+    private Referenceable validateProcess(String query, String[] inputs, String[] outputs) throws Exception {
         String processId = assertProcessIsRegistered(query);
         Referenceable process = dgiCLient.getEntity(processId);
-        Assert.assertNull(process.get("inputs"));
-        Assert.assertEquals(((List<Referenceable>) process.get("outputs")).size(), 1);
+        if (inputs == null) {
+            Assert.assertNull(process.get(INPUTS));
+        } else {
+            Assert.assertEquals(((List<Referenceable>) process.get(INPUTS)).size(), inputs.length);
+            validateInputTables(process, inputs);
+        }
+
+        if (outputs == null) {
+            Assert.assertNull(process.get(OUTPUTS));
+        } else {
+            Assert.assertEquals(((List<Id>) process.get(OUTPUTS)).size(), outputs.length);
+            validateOutputTables(process, outputs);
+        }
+
+        return process;
     }
 
     @Test
@@ -299,13 +402,11 @@ public class HiveHookIT {
                 "insert into " + insertTableName + " select id, name from " + tableName;
 
         runCommand(query);
-        String processId = assertProcessIsRegistered(query);
-        Referenceable process = dgiCLient.getEntity(processId);
-        Assert.assertEquals(((List<Referenceable>) process.get("inputs")).size(), 1);
-        Assert.assertEquals(((List<Referenceable>) process.get("outputs")).size(), 1);
 
-        assertTableIsRegistered(DEFAULT_DB, tableName);
-        assertTableIsRegistered(DEFAULT_DB, insertTableName);
+        String inputTableId = assertTableIsRegistered(DEFAULT_DB, tableName);
+        String opTableId = assertTableIsRegistered(DEFAULT_DB, insertTableName);
+
+        validateProcess(query, new String[] {inputTableId}, new String[] {opTableId});
     }
 
     @Test
@@ -316,10 +417,7 @@ public class HiveHookIT {
             "insert overwrite LOCAL DIRECTORY '" + randomLocalPath.getAbsolutePath() + "' select id, name from " + tableName;
 
         runCommand(query);
-        String processId = assertProcessIsRegistered(query);
-        Referenceable process = dgiCLient.getEntity(processId);
-        Assert.assertEquals(((List<Referenceable>) process.get("inputs")).size(), 1);
-        Assert.assertNull(process.get("outputs"));
+        validateProcess(query, 1, 0);
 
         assertTableIsRegistered(DEFAULT_DB, tableName);
     }
@@ -327,34 +425,32 @@ public class HiveHookIT {
     @Test
     public void testInsertIntoDFSDir() throws Exception {
         String tableName = createTable();
-        String pFile = "pfile://" + mkdir("somedfspath");
+        String pFile = createTestDFSPath("somedfspath");
         String query =
             "insert overwrite DIRECTORY '" + pFile  + "' select id, name from " + tableName;
 
         runCommand(query);
-        String processId = assertProcessIsRegistered(query);
-        Referenceable process = dgiCLient.getEntity(processId);
-        Assert.assertEquals(((List<Referenceable>) process.get("inputs")).size(), 1);
-        Assert.assertNull(process.get("outputs"));
+        Referenceable processReference = validateProcess(query, 1, 1);
+        validateHDFSPaths(processReference, pFile, OUTPUTS);
 
-        assertTableIsRegistered(DEFAULT_DB, tableName);
+        String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
+
+        validateInputTables(processReference, tableId);
     }
 
     @Test
     public void testInsertIntoTempTable() throws Exception {
         String tableName = createTable();
-        String insertTableName = createTable(false, true);
+        String insertTableName = createTable(false, false, true);
         String query =
             "insert into " + insertTableName + " select id, name from " + tableName;
 
         runCommand(query);
-        String processId = assertProcessIsRegistered(query);
-        Referenceable process = dgiCLient.getEntity(processId);
-        Assert.assertEquals(((List<Referenceable>) process.get("inputs")).size(), 1);
-        Assert.assertEquals(((List<Referenceable>) process.get("outputs")).size(), 1);
+        validateProcess(query, 1, 1);
 
-        assertTableIsRegistered(DEFAULT_DB, tableName);
-        assertTableIsRegistered(DEFAULT_DB, insertTableName);
+        String ipTableId = assertTableIsRegistered(DEFAULT_DB, tableName);
+        String opTableId = assertTableIsRegistered(DEFAULT_DB, insertTableName);
+        validateProcess(query, new String[] {ipTableId}, new String[] {opTableId});
     }
 
     @Test
@@ -365,10 +461,11 @@ public class HiveHookIT {
             "insert into " + insertTableName + " partition(dt = '2015-01-01') select id, name from " + tableName
                 + " where dt = '2015-01-01'";
         runCommand(query);
-        String processId = assertProcessIsRegistered(query);
-        Referenceable process = dgiCLient.getEntity(processId);
-        Assert.assertEquals(((List<Referenceable>) process.get("inputs")).size(), 1);
-        Assert.assertEquals(((List<Referenceable>) process.get("outputs")).size(), 1);
+        validateProcess(query, 1, 1);
+
+        String ipTableId = assertTableIsRegistered(DEFAULT_DB, tableName);
+        String opTableId = assertTableIsRegistered(DEFAULT_DB, insertTableName);
+        validateProcess(query, new String[] {ipTableId}, new String[] {opTableId});
     }
 
     private String random() {
@@ -390,19 +487,62 @@ public class HiveHookIT {
     }
 
     @Test
-    public void testExportImport() throws Exception {
+    public void testExportImportUnPartitionedTable() throws Exception {
         String tableName = createTable(false);
 
+        String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
+
         String filename = "pfile://" + mkdir("export");
         String query = "export table " + tableName + " to \"" + filename + "\"";
         runCommand(query);
-        assertProcessIsRegistered(query);
+        Referenceable processReference = validateProcess(query, 1, 1);
+        validateHDFSPaths(processReference, filename, OUTPUTS);
+
+        validateInputTables(processReference, tableId);
 
+        //Import
         tableName = createTable(false);
+        tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
 
         query = "import table " + tableName + " from '" + filename + "'";
         runCommand(query);
-        assertProcessIsRegistered(query);
+        processReference = validateProcess(query, 1, 1);
+        validateHDFSPaths(processReference, filename, INPUTS);
+
+        validateOutputTables(processReference, tableId);
+
+    }
+
+    @Test
+    public void testExportImportPartitionedTable() throws Exception {
+        String tableName = createTable(true);
+        String tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
+
+        //Add a partition
+        String partFile = "pfile://" + mkdir("partition");
+        String query = "alter table " + tableName + " add partition (dt='2015-01-01') location '" + partFile + "'";
+        runCommand(query);
+
+        String filename = "pfile://" + mkdir("export");
+        query = "export table " + tableName + " to \"" + filename + "\"";
+        runCommand(query);
+        Referenceable processReference = validateProcess(query, 1, 1);
+        validateHDFSPaths(processReference, filename, OUTPUTS);
+
+        validateInputTables(processReference, tableId);
+
+        //Import
+        tableName = createTable(true);
+        tableId = assertTableIsRegistered(DEFAULT_DB, tableName);
+
+        query = "import table " + tableName + " from '" + filename + "'";
+        runCommand(query);
+        processReference = validateProcess(query, 1, 1);
+        validateHDFSPaths(processReference, filename, INPUTS);
+
+        validateOutputTables(processReference, tableId);
+
+
     }
 
     @Test
@@ -561,8 +701,9 @@ public class HiveHookIT {
 
     @Test
     public void testAlterTableLocation() throws Exception {
-        String tableName = createTable();
-        final String testPath = "file://" + System.getProperty("java.io.tmpdir", "/tmp") + File.pathSeparator + "testPath";
+        //Its an external table, so the HDFS location should also be registered as an entity
+        String tableName = createTable(true, true, false);
+        final String testPath = createTestDFSPath("testBaseDir");
         String query = "alter table " + tableName + " set location '" + testPath + "'";
         runCommand(query);
 
@@ -571,6 +712,38 @@ public class HiveHookIT {
         Referenceable tableRef = dgiCLient.getEntity(tableId);
         Referenceable sdRef = (Referenceable)tableRef.get(HiveDataModelGenerator.STORAGE_DESC);
         Assert.assertEquals(sdRef.get("location"), testPath);
+
+        Referenceable processReference = validateProcess(query, 1, 1);
+        validateHDFSPaths(processReference, testPath, INPUTS);
+
+        validateOutputTables(processReference, tableId);
+
+    }
+
+    private String validateHDFSPaths(Referenceable processReference, String testPath, String attributeName) throws Exception {
+        List<Id> hdfsPathRefs = (List<Id>) processReference.get(attributeName);
+
+        final String testPathNormed = normalize(new Path(testPath).toString());
+        String hdfsPathId = assertHDFSPathIsRegistered(testPathNormed);
+        Assert.assertEquals(hdfsPathRefs.get(0)._getId(), hdfsPathId);
+
+        Referenceable hdfsPathRef = dgiCLient.getEntity(hdfsPathId);
+        Assert.assertEquals(hdfsPathRef.get("path"), testPathNormed);
+        Assert.assertEquals(hdfsPathRef.get("name"), testPathNormed);
+//        Assert.assertEquals(hdfsPathRef.get("name"), new Path(testPath).getName());
+        Assert.assertEquals(hdfsPathRef.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), testPathNormed);
+
+        return hdfsPathRef.getId()._getId();
+    }
+
+
+    private String assertHDFSPathIsRegistered(String path) throws Exception {
+        final String typeName = FSDataTypes.HDFS_PATH().toString();
+        final String parentTypeName = FSDataTypes.FS_PATH().toString();
+        String gremlinQuery =
+            String.format("g.V.has('__typeName', '%s').has('%s.path', \"%s\").toList()", typeName, parentTypeName,
+                normalize(path));
+        return assertEntityIsRegistered(gremlinQuery);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/46365f8c/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
----------------------------------------------------------------------
diff --git a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
index 5665856..70a72ef 100644
--- a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
+++ b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java
@@ -219,8 +219,10 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
                 dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, hdfsPathStr);
                 dataSetReferenceable.set("path", hdfsPathStr);
                 dataSetReferenceable.set("owner", stormConf.get("hdfs.kerberos.principal"));
-                final Path hdfsPath = new Path(hdfsPathStr);
-                dataSetReferenceable.set(AtlasClient.NAME, hdfsPath.getName());
+                //Fix after ATLAS-542
+//                final Path hdfsPath = new Path(hdfsPathStr);
+//                dataSetReferenceable.set(AtlasClient.NAME, hdfsPath.getName());
+                dataSetReferenceable.set(AtlasClient.NAME, hdfsPathStr);
                 break;
 
             case "HiveBolt":

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/46365f8c/client/src/main/java/org/apache/atlas/AtlasClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/AtlasClient.java b/client/src/main/java/org/apache/atlas/AtlasClient.java
index c3b4ba9..938a0a3 100755
--- a/client/src/main/java/org/apache/atlas/AtlasClient.java
+++ b/client/src/main/java/org/apache/atlas/AtlasClient.java
@@ -91,6 +91,9 @@ public class AtlasClient {
     public static final String REFERENCEABLE_SUPER_TYPE = "Referenceable";
     public static final String REFERENCEABLE_ATTRIBUTE_NAME = "qualifiedName";
 
+    public static final String PROCESS_ATTRIBUTE_INPUTS = "inputs";
+    public static final String PROCESS_ATTRIBUTE_OUTPUTS = "outputs";
+
     public static final String JSON_MEDIA_TYPE = MediaType.APPLICATION_JSON + "; charset=UTF-8";
     public static final String UNKNOWN_STATUS = "Unknown status";
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/46365f8c/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 377ea5c..524cd33 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -13,6 +13,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
 ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
 
 ALL CHANGES:
+ATLAS-527 Support lineage for load table, import, export (sumasai via shwethags)
 ATLAS-572 Handle secure instance of Zookeeper for leader election.(yhemanth via sumasai)
 ATLAS-605 Hook Notifications for DELETE entity needs to be supported (sumasai)
 ATLAS-607 Add Support for delete entity through a qualifiedName (sumasai via yhemanth)

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/46365f8c/repository/src/main/java/org/apache/atlas/services/ReservedTypesRegistrar.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/services/ReservedTypesRegistrar.java b/repository/src/main/java/org/apache/atlas/services/ReservedTypesRegistrar.java
index 430bb6b..41c0155 100644
--- a/repository/src/main/java/org/apache/atlas/services/ReservedTypesRegistrar.java
+++ b/repository/src/main/java/org/apache/atlas/services/ReservedTypesRegistrar.java
@@ -31,6 +31,8 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Comparator;
 
 public class ReservedTypesRegistrar implements IBootstrapTypesRegistrar {
 
@@ -48,7 +50,17 @@ public class ReservedTypesRegistrar implements IBootstrapTypesRegistrar {
             LOG.info("No types directory {} found - not registering any reserved types", typesDirName);
             return;
         }
+
         File[] typeDefFiles = typesDir.listFiles();
+        //TODO - Enforce a dependency order among models registered by definition and not by modifiedTime as below
+        // Workaround - Sort by modifiedTime to get the dependency of models in the right order - first hdfs, followed by hive and hive is needed by storm, falcon models.
+        // Sorting them by time will ensure the right order since the modules are in the correct order in pom.
+        Arrays.sort(typeDefFiles, new Comparator<File>() {
+            public int compare(File f1, File f2) {
+                return Long.valueOf(f1.lastModified()).compareTo(f2.lastModified());
+            }
+        });
+
         for (File typeDefFile : typeDefFiles) {
             try {
                 String typeDefJSON = new String(Files.readAllBytes(typeDefFile.toPath()), StandardCharsets.UTF_8);


Mime
View raw message