atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From suma...@apache.org
Subject incubator-atlas git commit: ATLAS-642 import-hive should create the lineage for external tables ( svimal2106 via sumasai)
Date Fri, 17 Jun 2016 18:00:50 GMT
Repository: incubator-atlas
Updated Branches:
  refs/heads/master 4f6816572 -> 9d1040b7c


ATLAS-642 import-hive should create the lineage for external tables ( svimal2106 via sumasai)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/9d1040b7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/9d1040b7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/9d1040b7

Branch: refs/heads/master
Commit: 9d1040b7cf8e19728ad4c8d12ee24da28fb05ff8
Parents: 4f68165
Author: Suma Shivaprasad <sumasai.shivaprasad@gmail.com>
Authored: Fri Jun 17 10:56:52 2016 -0700
Committer: Suma Shivaprasad <sumasai.shivaprasad@gmail.com>
Committed: Fri Jun 17 10:57:44 2016 -0700

----------------------------------------------------------------------
 .../atlas/hive/bridge/HiveMetaStoreBridge.java  | 66 +++++++++++++++++++-
 .../org/apache/atlas/hive/hook/HiveHook.java    |  4 ++
 .../hive/bridge/HiveMetaStoreBridgeTest.java    |  8 ++-
 .../org/apache/atlas/hive/hook/HiveHookIT.java  |  4 +-
 release-log.txt                                 |  1 +
 5 files changed, 78 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9d1040b7/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
index 9732bce..c1940a6 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
@@ -27,6 +27,7 @@ import org.apache.atlas.fs.model.FSDataModel;
 import org.apache.atlas.fs.model.FSDataTypes;
 import org.apache.atlas.hive.model.HiveDataModelGenerator;
 import org.apache.atlas.hive.model.HiveDataTypes;
+import org.apache.atlas.notification.hook.HookNotification;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.atlas.typesystem.Struct;
 import org.apache.atlas.typesystem.json.InstanceSerialization;
@@ -37,6 +38,7 @@ import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Order;
@@ -239,6 +241,18 @@ public class HiveMetaStoreBridge {
         return String.format("%s@%s", dbName.toLowerCase(), clusterName);
     }
 
+    private String getCreateTableString(Table table, String location){
+        String colString = "";
+        List<FieldSchema> colList = table.getAllCols();
+        for(FieldSchema col:colList){
+            colString += col.getName()  + " " + col.getType() + ",";
+        }
+        colString = colString.substring(0, colString.length() - 1);
+        String query = "create external table " + table.getTableName() + "(" + colString
+ ")" +
+                " location '" + location + "'";
+        return query;
+    }
+
     /**
      * Imports all tables for the given db
      * @param databaseName
@@ -247,10 +261,45 @@ public class HiveMetaStoreBridge {
      */
     private void importTables(Referenceable databaseReferenceable, String databaseName) throws
Exception {
         List<String> hiveTables = hiveClient.getAllTables(databaseName);
-
+        LOG.info("Importing tables {} for db {}", hiveTables.toString(), databaseName);
         for (String tableName : hiveTables) {
             Table table = hiveClient.getTable(databaseName, tableName);
             Referenceable tableReferenceable = registerTable(databaseReferenceable, table);
+            if (table.getTableType() == TableType.EXTERNAL_TABLE){
+                String tableQualifiedName = getTableQualifiedName(clusterName, table);
+                Referenceable process = getProcessReference(tableQualifiedName);
+                if (process == null){
+                    LOG.info("Attempting to register create table process for {}", tableQualifiedName);
+                    Referenceable lineageProcess = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName());
+                    ArrayList<Referenceable> sourceList = new ArrayList<>();
+                    ArrayList<Referenceable> targetList = new ArrayList<>();
+                    String tableLocation = table.getDataLocation().toString();
+                    Referenceable path = fillHDFSDataSet(tableLocation);
+                    String query = getCreateTableString(table, tableLocation);
+                    sourceList.add(path);
+                    targetList.add(tableReferenceable);
+                    lineageProcess.set("inputs", sourceList);
+                    lineageProcess.set("outputs", targetList);
+                    lineageProcess.set("userName", table.getOwner());
+                    lineageProcess.set("startTime", new Date(System.currentTimeMillis()));
+                    lineageProcess.set("endTime", new Date(System.currentTimeMillis()));
+                    lineageProcess.set("operationType", "CREATETABLE");
+                    lineageProcess.set("queryText", query);
+                    lineageProcess.set("queryId", query);
+                    lineageProcess.set("queryPlan", "{}");
+                    lineageProcess.set("clusterName", clusterName);
+                    List<String> recentQueries = new ArrayList<>(1);
+                    recentQueries.add(query);
+                    lineageProcess.set("recentQueries", recentQueries);
+                    lineageProcess.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName);
+                    lineageProcess.set(AtlasClient.NAME, query);
+                    registerInstance(lineageProcess);
+
+                }
+                else {
+                    LOG.info("Process {} is already registered", process.toString());
+                }
+            }
         }
     }
 
@@ -269,9 +318,21 @@ public class HiveMetaStoreBridge {
         return getEntityReferenceFromDSL(typeName, dslQuery);
     }
 
+    private Referenceable getProcessReference(String qualifiedName) throws Exception{
+        LOG.debug("Getting reference for process {}", qualifiedName);
+        String typeName = HiveDataTypes.HIVE_PROCESS.getName();
+        String dslQuery = getProcessDSLQuery(typeName, qualifiedName);
+        return getEntityReferenceFromDSL(typeName, dslQuery);
+    }
+
+    static String getProcessDSLQuery(String typeName, String qualifiedName) throws Exception{
+        String dslQuery = String.format("%s as t where qualifiedName = '%s'", typeName, qualifiedName);
+        return dslQuery;
+    }
+
     static String getTableDSLQuery(String clusterName, String dbName, String tableName, String
typeName, boolean isTemporary) {
         String entityName = getTableQualifiedName(clusterName, dbName, tableName, isTemporary);
-        return String.format("%s as t where name = '%s'", typeName, entityName);
+        return String.format("%s as t where qualifiedName = '%s'", typeName, entityName);
     }
 
     /**
@@ -398,6 +459,7 @@ public class HiveMetaStoreBridge {
         String tableName = table.getTableName();
         LOG.info("Attempting to register table [" + tableName + "]");
         Referenceable tableReference = getTableReference(table);
+        LOG.info("Found result " + tableReference);
         if (tableReference == null) {
             tableReference = createTableInstance(dbReference, table);
             tableReference = registerInstance(tableReference);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9d1040b7/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index 0ccb18b..23c82df 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -678,6 +678,10 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext
{
             }};
 
             Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event,
inputs, outputs);
+            String tableQualifiedName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(),
hiveTable);
+            if(isCreateOp(event)){
+                processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName);
+            }
             entities.addAll(tables.values());
             entities.add(processReferenceable);
             messages.add(new HookNotification.EntityUpdateRequest(event.getUser(), entities));

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9d1040b7/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
index dec5fcb..856e5b1 100644
--- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
+++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java
@@ -90,7 +90,7 @@ public class HiveMetaStoreBridgeTest {
     public void testImportThatUpdatesRegisteredTable() throws Exception {
         setupDB(hiveClient, TEST_DB_NAME);
 
-        setupTable(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME);
+        Table hiveTable = setupTable(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME);
 
         returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME);
 
@@ -99,6 +99,9 @@ public class HiveMetaStoreBridgeTest {
                 HiveDataTypes.HIVE_TABLE.getName(), false))).thenReturn(
                 getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
         when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference());
+        String processQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME,
hiveTable);
+        when(atlasClient.searchByDSL(HiveMetaStoreBridge.getProcessDSLQuery(HiveDataTypes.HIVE_PROCESS.getName(),
+                processQualifiedName))).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
 
         HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient);
         bridge.importHiveMetadata();
@@ -140,6 +143,9 @@ public class HiveMetaStoreBridgeTest {
             TEST_TABLE_NAME,
             HiveDataTypes.HIVE_TABLE.getName(), false))).thenReturn(
             getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
+        String processQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME,
hiveTable);
+        when(atlasClient.searchByDSL(HiveMetaStoreBridge.getProcessDSLQuery(HiveDataTypes.HIVE_PROCESS.getName(),
+                processQualifiedName))).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77"));
         when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference());
 
         Partition partition = mock(Partition.class);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9d1040b7/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
index 00c17e8..f5904d6 100755
--- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
+++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
@@ -240,8 +240,8 @@ public class HiveHookIT {
         final String query = String.format("create TEMPORARY EXTERNAL table %s.%s( %s, %s)
location '%s'", DEFAULT_DB , tableName , colName + " int", "name string",  pFile);
         runCommand(query);
         assertTableIsRegistered(DEFAULT_DB, tableName, null, true);
-
-        String processId = assertProcessIsRegistered(query);
+        String processId = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(),
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
+                HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName,
true), null);
         Referenceable processReference = atlasClient.getEntity(processId);
         assertEquals(processReference.get("userName"), UserGroupInformation.getCurrentUser().getShortUserName());
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9d1040b7/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 225cc8e..e4256bf 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -24,6 +24,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file
(dosset
 ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via
shwethags)
 
 ALL CHANGES:
+ATLAS-642 import-hive should create the lineage for external tables (svimal2106 via sumasai)
 ATLAS-901 Log messages that cannot be sent to Kafka to a specific log configuration (yhemanth)
 ATLAS-911 Get entity by unique attribute doesn't enforce type (shwethags)
 ATLAS-899 Fix Hive Hook documentation (sumasai via yhemanth)


Mime
View raw message