atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shweth...@apache.org
Subject incubator-atlas git commit: ATLAS-247 Hive Column level lineage (rhbutani, svimal2106 via shwethags)
Date Fri, 30 Sep 2016 07:21:44 GMT
Repository: incubator-atlas
Updated Branches:
  refs/heads/master 4c56c61f8 -> a52112d86


ATLAS-247 Hive Column level lineage (rhbutani,svimal2106 via shwethags)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/a52112d8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/a52112d8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/a52112d8

Branch: refs/heads/master
Commit: a52112d8633c2eb5dea0c3a6ffd3f308594a3996
Parents: 4c56c61
Author: Shwetha GS <sshivalingamurthy@hortonworks.com>
Authored: Fri Sep 30 12:51:29 2016 +0530
Committer: Shwetha GS <sshivalingamurthy@hortonworks.com>
Committed: Fri Sep 30 12:51:29 2016 +0530

----------------------------------------------------------------------
 .../atlas/hive/bridge/ColumnLineageUtils.java   | 121 +++++++++++++++++++
 .../org/apache/atlas/hive/hook/HiveHook.java    |  76 +++++++++++-
 .../hive/model/HiveDataModelGenerator.java      |  21 +++-
 .../apache/atlas/hive/model/HiveDataTypes.java  |   3 +-
 .../org/apache/atlas/hive/hook/HiveHookIT.java  | 117 ++++++++++++++----
 release-log.txt                                 |   1 +
 6 files changed, 313 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a52112d8/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/ColumnLineageUtils.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/ColumnLineageUtils.java
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/ColumnLineageUtils.java
new file mode 100644
index 0000000..e4a20e1
--- /dev/null
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/ColumnLineageUtils.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.hive.bridge;
+
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.hive.hook.HiveHook;
+import org.apache.atlas.hive.model.HiveDataModelGenerator;
+import org.apache.atlas.hive.model.HiveDataTypes;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.hadoop.hive.ql.hooks.LineageInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class ColumnLineageUtils {
+    public static final Logger LOG = LoggerFactory.getLogger(ColumnLineageUtils.class);
+    public static class HiveColumnLineageInfo {
+        public final String depenendencyType;
+        public final String expr;
+        public final String inputColumn;
+
+        HiveColumnLineageInfo(LineageInfo.Dependency d, String inputCol) {
+            depenendencyType = d.getType().name();
+            expr = d.getExpr();
+            inputColumn = inputCol;
+        }
+
+        @Override
+        public String toString(){
+            return inputColumn;
+        }
+    }
+
+    public static String getQualifiedName(LineageInfo.DependencyKey key){
+        String db = key.getDataContainer().getTable().getDbName();
+        String table = key.getDataContainer().getTable().getTableName();
+        String col = key.getFieldSchema().getName();
+        return db + "." + table + "." + col;
+    }
+
+    public static Map<String, List<HiveColumnLineageInfo>> buildLineageMap(LineageInfo
lInfo) {
+        Map<String, List<HiveColumnLineageInfo>> m = new HashMap<>();
+
+        for (Map.Entry<LineageInfo.DependencyKey, LineageInfo.Dependency> e : lInfo.entrySet())
{
+            List<HiveColumnLineageInfo> l = new ArrayList<>();
+            String k = getQualifiedName(e.getKey());
+            for (LineageInfo.BaseColumnInfo iCol : e.getValue().getBaseCols()) {
+                String db = iCol.getTabAlias().getTable().getDbName();
+                String table = iCol.getTabAlias().getTable().getTableName();
+                String colQualifiedName = iCol.getColumn() == null ? db + "." + table : db
+ "." + table + "." + iCol.getColumn().getName();
+                l.add(new HiveColumnLineageInfo(e.getValue(), colQualifiedName));
+            }
+            LOG.debug("Setting lineage --> Input: {} ==> Output : {}", l, k);
+            m.put(k, l);
+        }
+        return m;
+    }
+
+    static String[] extractComponents(String qualifiedName) {
+        String[] comps = qualifiedName.split("\\.");
+        int lastIdx = comps.length - 1;
+        int atLoc = comps[lastIdx].indexOf('@');
+        if (atLoc > 0) {
+            comps[lastIdx] = comps[lastIdx].substring(0, atLoc);
+        }
+        return comps;
+    }
+
+    static void populateColumnReferenceableMap(Map<String, Referenceable> m,
+                                               Referenceable r) {
+        if (r.getTypeName().equals(HiveDataTypes.HIVE_TABLE.getName())) {
+            String qName = (String) r.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME);
+            String[] qNameComps = extractComponents(qName);
+            for (Referenceable col : (List<Referenceable>) r.get(HiveDataModelGenerator.COLUMNS))
{
+                String cName = (String) col.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME);
+                String[] colQNameComps = extractComponents(cName);
+                String colQName = colQNameComps[0] + "." + colQNameComps[1] + "." + colQNameComps[2];
+                m.put(colQName, col);
+            }
+            String tableQName = qNameComps[0] + "." + qNameComps[1];
+            m.put(tableQName, r);
+        }
+    }
+
+
+    public static Map<String, Referenceable> buildColumnReferenceableMap(List<Referenceable>
inputs,
+                                                                         List<Referenceable>
outputs) {
+        Map<String, Referenceable> m = new HashMap<>();
+
+        for (Referenceable r : inputs) {
+            populateColumnReferenceableMap(m, r);
+        }
+
+        for (Referenceable r : outputs) {
+            populateColumnReferenceableMap(m, r);
+        }
+
+        return m;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a52112d8/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index a3464a0..eaef337 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -24,6 +24,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasConstants;
 import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
+import org.apache.atlas.hive.bridge.ColumnLineageUtils;
 import org.apache.atlas.hive.model.HiveDataModelGenerator;
 import org.apache.atlas.hive.model.HiveDataTypes;
 import org.apache.atlas.hook.AtlasHook;
@@ -39,6 +40,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.exec.ExplainTask;
 import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.hooks.*;
 import org.apache.hadoop.hive.ql.hooks.Entity;
 import org.apache.hadoop.hive.ql.hooks.Entity.Type;
 import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
@@ -55,6 +57,7 @@ import org.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.util.ArrayList;
@@ -182,6 +185,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext
{
             event.setQueryStr(hookContext.getQueryPlan().getQueryStr());
             event.setQueryStartTime(hookContext.getQueryPlan().getQueryStartTime());
             event.setQueryType(hookContext.getQueryPlan().getQueryPlan().getQueryType());
+            event.setLineageInfo(hookContext.getLinfo());
 
             if (executor == null) {
                 fireAndForget(event);
@@ -616,7 +620,21 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext
{
 
             if (source.size() > 0 || target.size() > 0) {
                 Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event,
sortedHiveInputs, sortedHiveOutputs, source, target);
-                entities.add(processReferenceable);
+                // setup Column Lineage
+                List<Referenceable> sourceList = new ArrayList<>(source.values());
+                List<Referenceable> targetList = new ArrayList<>(target.values());
+                List<Referenceable> colLineageProcessInstances = new ArrayList<>();
+                try {
+                    Map<String, Referenceable> columnQNameToRef =
+                            ColumnLineageUtils.buildColumnReferenceableMap(sourceList, targetList);
+                    colLineageProcessInstances = createColumnLineageProcessInstances(processReferenceable,
+                            event.lineageInfo,
+                            columnQNameToRef);
+                }catch (Exception e){
+                    LOG.warn("Column lineage process setup failed with exception {}", e);
+                }
+                colLineageProcessInstances.add(0, processReferenceable);
+                entities.addAll(colLineageProcessInstances);
                 event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(),
new ArrayList<>(entities)));
             } else {
                 LOG.info("Skipped query {} since it has no getInputs() or resulting getOutputs()",
event.getQueryStr());
@@ -773,6 +791,51 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext
{
         return processReferenceable;
     }
 
+
+    private List<Referenceable> createColumnLineageProcessInstances(
+            Referenceable processRefObj,
+            Map<String, List<ColumnLineageUtils.HiveColumnLineageInfo>> lineageInfo,
+            Map<String, Referenceable> columnQNameToRef
+    ) {
+        List<Referenceable> l = new ArrayList<>();
+        for(Map.Entry<String, List<ColumnLineageUtils.HiveColumnLineageInfo>>
e :
+                lineageInfo.entrySet()) {
+            Referenceable destCol = columnQNameToRef.get(e.getKey());
+            if (destCol == null ) {
+                LOG.debug("Couldn't find output Column {}", e.getKey());
+                continue;
+            }
+            List<Referenceable> outRef = new ArrayList<>();
+            outRef.add(destCol);
+            List<Referenceable> inputRefs = new ArrayList<>();
+            for(ColumnLineageUtils.HiveColumnLineageInfo cLI : e.getValue()) {
+                Referenceable srcCol = columnQNameToRef.get(cLI.inputColumn);
+                if (srcCol == null ) {
+                    LOG.debug("Couldn't find input Column {}", cLI.inputColumn);
+                    continue;
+                }
+                inputRefs.add(srcCol);
+            }
+
+            if (inputRefs.size() > 0 ) {
+                Referenceable r = new Referenceable(HiveDataTypes.HIVE_COLUMN_LINEAGE.getName());
+                r.set("name", processRefObj.get(AtlasClient.NAME) + ":" + outRef.get(0).get(AtlasClient.NAME));
+                r.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, processRefObj.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME)
+ ":" + outRef.get(0).get(AtlasClient.NAME));
+                r.set("inputs", inputRefs);
+                r.set("outputs", outRef);
+                r.set("query", processRefObj);
+                r.set("depenendencyType", e.getValue().get(0).depenendencyType);
+                r.set("expression", e.getValue().get(0).expr);
+                l.add(r);
+            }
+            else{
+                LOG.debug("No input references found for lineage of column {}", destCol.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME));
+            }
+        }
+
+        return l;
+    }
+
     @VisibleForTesting
     static String getProcessQualifiedName(HiveMetaStoreBridge dgiBridge, HiveEventContext
eventContext,
                                           final SortedSet<ReadEntity> sortedHiveInputs,
@@ -930,6 +993,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext
{
         private String queryStr;
         private Long queryStartTime;
 
+        public Map<String, List<ColumnLineageUtils.HiveColumnLineageInfo>> lineageInfo;
+
         private List<HookNotification.HookNotificationMessage> messages = new ArrayList<>();
 
         private String queryType;
@@ -978,6 +1043,15 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext
{
             this.queryType = queryType;
         }
 
+        public void setLineageInfo(LineageInfo lineageInfo){
+            try {
+                this.lineageInfo = ColumnLineageUtils.buildLineageMap(lineageInfo);
+                LOG.debug("Column Lineage Map => {} ", this.lineageInfo.entrySet());
+            }catch (Exception e){
+                LOG.warn("Column Lineage Map build failed with exception {}", e);
+            }
+        }
+
         public Set<ReadEntity> getInputs() {
             return inputs;
         }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a52112d8/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java
index 45f0bc9..28078f4 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java
@@ -20,7 +20,6 @@ package org.apache.atlas.hive.model;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
-
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasConstants;
 import org.apache.atlas.AtlasException;
@@ -107,6 +106,7 @@ public class HiveDataModelGenerator {
 
         // DDL/DML Process
         createProcessClass();
+        createColumnLineageClass();
     }
 
     public TypesDef getTypesDef() {
@@ -328,4 +328,23 @@ public class HiveDataModelGenerator {
         }
     }
 
+    private void createColumnLineageClass() throws AtlasException {
+
+        AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
+                new AttributeDefinition("query", HiveDataTypes.HIVE_PROCESS.getName(),
+                        Multiplicity.REQUIRED, false, null),
+                new AttributeDefinition("depenendencyType",DataTypes.STRING_TYPE.getName(),
+                        Multiplicity.REQUIRED, false, null),
+                new AttributeDefinition("expression",DataTypes.STRING_TYPE.getName(),
+                        Multiplicity.OPTIONAL, false, null)
+        };
+        HierarchicalTypeDefinition<ClassType> definition =
+                new HierarchicalTypeDefinition<>(ClassType.class, HiveDataTypes.HIVE_COLUMN_LINEAGE.getName(),
null,
+                        ImmutableSet.of(AtlasClient.PROCESS_SUPER_TYPE), attributeDefinitions);
+        classTypeDefinitions.put(HiveDataTypes.HIVE_COLUMN_LINEAGE.getName(), definition);
+        LOG.debug("Created definition for " + HiveDataTypes.HIVE_COLUMN_LINEAGE.getName());
+
+    }
+
 }
+

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a52112d8/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataTypes.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataTypes.java
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataTypes.java
index e094cb6..94010d0 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataTypes.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataTypes.java
@@ -42,7 +42,8 @@ public enum HiveDataTypes {
     HIVE_INDEX,
     HIVE_ROLE,
     HIVE_TYPE,
-    HIVE_PROCESS
+    HIVE_PROCESS,
+    HIVE_COLUMN_LINEAGE
     // HIVE_VIEW,
     ;
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a52112d8/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
----------------------------------------------------------------------
diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
index a5838b4..b6f55a1 100755
--- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
+++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.plan.HiveOperation;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
@@ -54,18 +55,7 @@ import org.testng.annotations.Test;
 
 import java.io.File;
 import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
+import java.util.*;
 
 import static org.apache.atlas.AtlasClient.NAME;
 import static org.apache.atlas.hive.hook.HiveHook.IO_SEP;
@@ -320,6 +310,7 @@ public class HiveHookIT extends HiveITBase {
 
         assertProcessIsRegistered(constructEvent(query, HiveOperation.CREATETABLE_AS_SELECT,
readEntities, writeEntities));
         assertTableIsRegistered(DEFAULT_DB, ctasTableName);
+
     }
 
     private HiveHook.HiveEventContext constructEvent(String query, HiveOperation op, Set<ReadEntity>
inputs, Set<WriteEntity> outputs) {
@@ -1116,6 +1107,83 @@ public class HiveHookIT extends HiveITBase {
         );
     }
 
+    /*
+    The test is disabled by default
+    Reason : Atlas uses Hive version 1.2.x and the Hive patch HIVE-13112 which enables column
level lineage is not
+    committed in Hive version 1.2.x
+    This test will fail if the lineage information is not available from Hive
+    Once the patch for HIVE-13112 is committed to Hive branch 1.2.x, the test can be enabled
+    Please track HIVE-14706 to know the status of column lineage availability in latest Hive
versions i.e 2.1.x
+     */
+    @Test(enabled = false)
+    public void testColumnLevelLineage() throws Exception {
+        String sourceTable = "table" + random();
+        runCommand("create table " + sourceTable + "(a int, b int)");
+        String sourceTableGUID = assertTableIsRegistered(DEFAULT_DB, sourceTable);
+        String a_guid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME,
DEFAULT_DB, sourceTable), "a"));
+        String b_guid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME,
DEFAULT_DB, sourceTable), "b"));
+
+        String ctasTableName = "table" + random();
+        String query = "create table " + ctasTableName + " as " +
+                        "select sum(a+b) as a, count(*) as b from " + sourceTable;
+        runCommand(query);
+
+        String dest_a_guid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME,
DEFAULT_DB, ctasTableName), "a"));
+        String dest_b_guid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME,
DEFAULT_DB, ctasTableName), "b"));
+
+        final Set<ReadEntity> inputs = getInputs(sourceTable, Entity.Type.TABLE);
+        final Set<WriteEntity> outputs = getOutputs(ctasTableName, Entity.Type.TABLE);
+        HiveHook.HiveEventContext event = constructEvent(query, HiveOperation.CREATETABLE_AS_SELECT,
inputs, outputs);
+        assertProcessIsRegistered(event);
+        assertTableIsRegistered(DEFAULT_DB, ctasTableName);
+
+        String processQName = sortEventsAndGetProcessQualifiedName(event);
+
+        List<String> aLineageInputs = Arrays.asList(a_guid, b_guid);
+        String aLineageProcessName = processQName + ":" + "a";
+        LOG.debug("Searching for column lineage process {} ", aLineageProcessName);
+        String guid = assertEntityIsRegistered(HiveDataTypes.HIVE_COLUMN_LINEAGE.getName(),
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, aLineageProcessName, null);
+        List<Id> processInputs = (List<Id>) atlasClient.getEntity(guid).get("inputs");
+        List<String> processInputsAsString = new ArrayList<>();
+        for(Id input: processInputs){
+            processInputsAsString.add(input._getId());
+        }
+        Collections.sort(processInputsAsString);
+        Collections.sort(aLineageInputs);
+        Assert.assertEquals(processInputsAsString, aLineageInputs);
+
+        List<String> bLineageInputs = Arrays.asList(sourceTableGUID);
+        String bLineageProcessName = processQName + ":" + "b";
+        LOG.debug("Searching for column lineage process {} ", bLineageProcessName);
+        String guid1 = assertEntityIsRegistered(HiveDataTypes.HIVE_COLUMN_LINEAGE.getName(),
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, bLineageProcessName, null);
+        List<Id> bProcessInputs = (List<Id>) atlasClient.getEntity(guid1).get("inputs");
+        List<String> bProcessInputsAsString = new ArrayList<>();
+        for(Id input: bProcessInputs){
+            bProcessInputsAsString.add(input._getId());
+        }
+        Collections.sort(bProcessInputsAsString);
+        Collections.sort(bLineageInputs);
+        Assert.assertEquals(bProcessInputsAsString, bLineageInputs);
+
+        //Test lineage API response
+        JSONObject response = atlasClient.getInputGraphForEntity(dest_a_guid);
+        JSONObject vertices = response.getJSONObject("values").getJSONObject("vertices");
+        JSONObject dest_a_val = (JSONObject) vertices.get(dest_a_guid);
+        JSONObject src_a_val = (JSONObject) vertices.get(a_guid);
+        JSONObject src_b_val = (JSONObject) vertices.get(b_guid);
+        Assert.assertNotNull(dest_a_val);
+        Assert.assertNotNull(src_a_val);
+        Assert.assertNotNull(src_b_val);
+
+
+        JSONObject b_response = atlasClient.getInputGraphForEntity(dest_b_guid);
+        JSONObject b_vertices = b_response.getJSONObject("values").getJSONObject("vertices");
+        JSONObject b_val = (JSONObject) b_vertices.get(dest_b_guid);
+        JSONObject src_tbl_val = (JSONObject) b_vertices.get(sourceTableGUID);
+        Assert.assertNotNull(b_val);
+        Assert.assertNotNull(src_tbl_val);
+    }
+
     @Test
     public void testTruncateTable() throws Exception {
         String tableName = createTable(false);
@@ -1620,19 +1688,22 @@ public class HiveHookIT extends HiveITBase {
         }
     }
 
-    private String assertProcessIsRegistered(final HiveHook.HiveEventContext event) throws
Exception {
-        try {
-            SortedSet<ReadEntity> sortedHiveInputs = event.getInputs() == null ? null
: new TreeSet<ReadEntity>(entityComparator);
-            SortedSet<WriteEntity> sortedHiveOutputs = event.getOutputs() == null ?
null : new TreeSet<WriteEntity>(entityComparator);
+    private String sortEventsAndGetProcessQualifiedName(final HiveHook.HiveEventContext event)
throws HiveException{
+        SortedSet<ReadEntity> sortedHiveInputs = event.getInputs() == null ? null :
new TreeSet<ReadEntity>(entityComparator);
+        SortedSet<WriteEntity> sortedHiveOutputs = event.getOutputs() == null ? null
: new TreeSet<WriteEntity>(entityComparator);
 
-            if ( event.getInputs() != null) {
-                sortedHiveInputs.addAll(event.getInputs());
-            }
-            if ( event.getOutputs() != null) {
-                sortedHiveOutputs.addAll(event.getOutputs());
-            }
+        if ( event.getInputs() != null) {
+            sortedHiveInputs.addAll(event.getInputs());
+        }
+        if ( event.getOutputs() != null) {
+            sortedHiveOutputs.addAll(event.getOutputs());
+        }
+        return getProcessQualifiedName(hiveMetaStoreBridge, event, sortedHiveInputs, sortedHiveOutputs,
getSortedProcessDataSets(event.getInputs()), getSortedProcessDataSets(event.getOutputs()));
+    }
 
-            String processQFName = getProcessQualifiedName(hiveMetaStoreBridge, event, sortedHiveInputs,
sortedHiveOutputs, getSortedProcessDataSets(event.getInputs()), getSortedProcessDataSets(event.getOutputs()));
+    private String assertProcessIsRegistered(final HiveHook.HiveEventContext event) throws
Exception {
+        try {
+            String processQFName = sortEventsAndGetProcessQualifiedName(event);
             LOG.debug("Searching for process with query {}", processQFName);
             return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
processQFName, new AssertPredicate() {
                 @Override

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a52112d8/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 1a691ca..9e185c7 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -9,6 +9,7 @@ ATLAS-1060 Add composite indexes for exact match performance improvements
for al
 ATLAS-1127 Modify creation and modification timestamps to Date instead of Long(sumasai)
 
 ALL CHANGES:
+ATLAS-247 Hive Column level lineage (rhbutani,svimal2106 via shwethags)
 ATLAS-1184 ReservedTypesRegistrar checks for existence of 1st class type (svimal2106 via
shwethags)
 ATLAS-1199 Atlas UI not loading after fresh build due to jquery-asBreadcrumbs plugin upgrade
(kevalbhatt via shwethags)
 ATLAS-1174 Framework to apply updates to types in the type-system (sarath.kum4r@gmail.com
via shwethags)


Mime
View raw message