atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mad...@apache.org
Subject [1/3] incubator-atlas git commit: ATLAS-1403 and Performance fixes for search, lineage
Date Wed, 28 Dec 2016 01:00:15 GMT
Repository: incubator-atlas
Updated Branches:
  refs/heads/0.7-incubating 4c3a7f3dc -> dc0b29446


ATLAS-1403 and Performance fixes for search, lineage


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/3407303d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/3407303d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/3407303d

Branch: refs/heads/0.7-incubating
Commit: 3407303d7876ec8a90539c4d6737607116439989
Parents: 4c3a7f3
Author: Suma Shivaprasad <sumasai.shivaprasad@gmail.com>
Authored: Tue Dec 27 15:09:12 2016 -0800
Committer: Madhan Neethiraj <madhan@apache.org>
Committed: Tue Dec 27 15:09:12 2016 -0800

----------------------------------------------------------------------
 release-log.txt                                 |   2 +
 .../atlas/discovery/DataSetLineageService.java  | 178 +++++++++++++++----
 .../graph/DefaultGraphPersistenceStrategy.java  |   5 +
 .../graph/GraphBackedDiscoveryService.java      |   3 +-
 .../atlas/repository/graph/GraphHelper.java     |  18 +-
 .../org/apache/atlas/query/ClosureQuery.scala   |   6 +-
 .../query/GraphPersistenceStrategies.scala      |  22 ++-
 .../apache/atlas/query/GremlinEvaluator.scala   |   8 +-
 .../org/apache/atlas/BaseRepositoryTest.java    |   4 +-
 .../discovery/DataSetLineageServiceTest.java    |  64 ++++---
 .../GraphBackedMetadataRepositoryTest.java      |   4 +-
 .../org/apache/atlas/query/GremlinTest2.scala   |  12 +-
 .../titan/diskstorage/solr/Solr5Index.java      |   1 -
 13 files changed, 239 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3407303d/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 2543526..8f956f9 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -32,6 +32,8 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file
(dosset
 ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via
shwethags)
 
 ALL CHANGES:
+ATLAS-1403 Performance fixes for search, lineage
+ATLAS-1342 Titan Solrclient - Add timeouts for zookeeper connect and session (sumasai)
 ATLAS-1402 fix UI input validation
 ATLAS-1192 Atlas IE support (kevalbhatt)
 ATLAS-1215 Atlas UI not working in firefox due to fix in ATLAS-1199 (kevalbhatt)

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3407303d/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java
b/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java
index c216469..5a3a8cc 100644
--- a/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java
@@ -18,7 +18,9 @@
 
 package org.apache.atlas.discovery;
 
+import com.google.common.base.Splitter;
 import com.thinkaurelius.titan.core.TitanGraph;
+import com.tinkerpop.blueprints.Vertex;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasException;
@@ -26,17 +28,30 @@ import org.apache.atlas.AtlasProperties;
 import org.apache.atlas.GraphTransaction;
 import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy;
 import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
+import org.apache.atlas.query.ClosureQuery;
+import org.apache.atlas.query.Expressions;
+import org.apache.atlas.query.GremlinEvaluator;
+import org.apache.atlas.query.GremlinQuery;
 import org.apache.atlas.query.GremlinQueryResult;
+import org.apache.atlas.query.GremlinTranslator;
 import org.apache.atlas.query.InputLineageClosureQuery;
 import org.apache.atlas.query.OutputLineageClosureQuery;
 import org.apache.atlas.query.QueryParams;
+import org.apache.atlas.query.QueryProcessor;
+import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.MetadataRepository;
 import org.apache.atlas.repository.graph.GraphProvider;
+import org.apache.atlas.repository.graph.GraphHelper;
+import org.apache.atlas.typesystem.ITypedReferenceableInstance;
 import org.apache.atlas.typesystem.exception.EntityNotFoundException;
 import org.apache.atlas.typesystem.exception.SchemaNotFoundException;
-import org.apache.atlas.typesystem.persistence.ReferenceableInstance;
+import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.atlas.typesystem.types.ClassType;
+import org.apache.atlas.typesystem.types.TypeSystem;
+import org.apache.atlas.typesystem.types.TypeUtils;
 import org.apache.atlas.utils.ParamChecker;
 import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
@@ -45,6 +60,11 @@ import scala.collection.immutable.List;
 
 import javax.inject.Inject;
 import javax.inject.Singleton;
+import javax.script.ScriptException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
 
 /**
  * Hive implementation of Lineage service interface.
@@ -61,16 +81,39 @@ public class DataSetLineageService implements LineageService {
 
     public static final String DATASET_SCHEMA_QUERY_PREFIX = "atlas.lineage.schema.query.";
 
+    public static final String DATASET_SCHEMA_ATTRIBUTE = "atlas.lineage.schema.attribute.";
+
     private static final String HIVE_PROCESS_TYPE_NAME = "Process";
     private static final String HIVE_PROCESS_INPUT_ATTRIBUTE_NAME = "inputs";
     private static final String HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME = "outputs";
 
-    private static final String DATASET_EXISTS_QUERY = AtlasClient.DATA_SET_SUPER_TYPE +
" where __guid = '%s'";
-    private static final String DATASET_NAME_EXISTS_QUERY =
-            AtlasClient.DATA_SET_SUPER_TYPE + " where " + AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME
+ "='%s' and __state = 'ACTIVE'";
+    private static final String INPUT_PROCESS_EDGE      =  "__Process.inputs";
+    private static final String OUTPUT_PROCESS_EDGE     =  "__Process.outputs";
 
     private static final Configuration propertiesConf;
 
+    private MetadataRepository metadataRepository;
+
+    private final TypeSystem typeSystem = TypeSystem.getInstance();
+
+    private final GraphHelper graphHelper = GraphHelper.getInstance();
+
+    private final TitanGraph titanGraph;
+    private final DefaultGraphPersistenceStrategy graphPersistenceStrategy;
+    private final GraphBackedDiscoveryService discoveryService;
+
+    private Map<String, String> schemaAttributeCache = new HashMap<>();
+
+    /**
+     *  Gremlin query to retrieve all (no fixed depth) input/output lineage for a DataSet
entity.
+     *  return list of Atlas vertices paths.
+     */
+    private static final String FULL_LINEAGE_QUERY    = "g.v(%s).as('src').in('%s').out('%s')."
+
+        "loop('src', {((it.path.contains(it.object)) ? false : true)}, " +
+        "{((it.object.'__superTypeNames') ? " +
+        "(it.object.'__superTypeNames'.contains('DataSet')) : false)})." +
+        "enablePath().as('dest').select(['src', 'dest'], {[it.'Asset.name', it.'Referenceable.qualifiedName']},
{[it.'Asset.name', it.'Referenceable.qualifiedName']}).path().toList()";
+
     static {
         try {
             propertiesConf = ApplicationProperties.get();
@@ -80,14 +123,11 @@ public class DataSetLineageService implements LineageService {
     }
 
 
-    private final TitanGraph titanGraph;
-    private final DefaultGraphPersistenceStrategy graphPersistenceStrategy;
-    private final GraphBackedDiscoveryService discoveryService;
-
     @Inject
     DataSetLineageService(GraphProvider<TitanGraph> graphProvider, MetadataRepository
metadataRepository,
                           GraphBackedDiscoveryService discoveryService) throws DiscoveryException
{
         this.titanGraph = graphProvider.get();
+        this.metadataRepository = metadataRepository;
         this.graphPersistenceStrategy = new DefaultGraphPersistenceStrategy(metadataRepository);
         this.discoveryService = discoveryService;
     }
@@ -103,8 +143,8 @@ public class DataSetLineageService implements LineageService {
     public String getOutputsGraph(String datasetName) throws AtlasException {
         LOG.info("Fetching lineage outputs graph for datasetName={}", datasetName);
         datasetName = ParamChecker.notEmpty(datasetName, "dataset name");
-        ReferenceableInstance datasetInstance = validateDatasetNameExists(datasetName);
-        return getOutputsGraphForId(datasetInstance.getId()._getId());
+        TypeUtils.Pair<String, String> typeIdPair = validateDatasetNameExists(datasetName);
+        return getOutputsGraphForId(typeIdPair.right);
     }
 
     /**
@@ -118,8 +158,8 @@ public class DataSetLineageService implements LineageService {
     public String getInputsGraph(String tableName) throws AtlasException {
         LOG.info("Fetching lineage inputs graph for tableName={}", tableName);
         tableName = ParamChecker.notEmpty(tableName, "table name");
-        ReferenceableInstance datasetInstance = validateDatasetNameExists(tableName);
-        return getInputsGraphForId(datasetInstance.getId()._getId());
+        TypeUtils.Pair<String, String> typeIdPair = validateDatasetNameExists(tableName);
+        return getInputsGraphForId(typeIdPair.right);
     }
 
     @Override
@@ -131,13 +171,22 @@ public class DataSetLineageService implements LineageService {
         return getInputsGraphForId(guid);
     }
 
-    private String getInputsGraphForId(String guid) {
+    private String getInputsGraphForId(String guid) throws AtlasException {
+
+        Vertex instanceVertex = GraphHelper.getInstance().getVertexForGUID(guid);
+        Object instanceVertexId = instanceVertex.getId();
+
+        String lineageQuery = String.format(FULL_LINEAGE_QUERY, instanceVertexId, OUTPUT_PROCESS_EDGE,
INPUT_PROCESS_EDGE);
         InputLineageClosureQuery
-                inputsQuery = new InputLineageClosureQuery(AtlasClient.DATA_SET_SUPER_TYPE,
SELECT_INSTANCE_GUID,
-                guid, HIVE_PROCESS_TYPE_NAME,
-                HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(),
-                SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph);
-        return inputsQuery.graph().toInstanceJson();
+            inputsQuery = new InputLineageClosureQuery(AtlasClient.DATA_SET_SUPER_TYPE, SELECT_INSTANCE_GUID,
+            guid, HIVE_PROCESS_TYPE_NAME,
+            HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(),
+            SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph);
+
+        LOG.info("Evaluating gremlin lineage input query ={}", lineageQuery);
+
+        GremlinQueryResult result = evaluateLineageQuery(inputsQuery, lineageQuery);
+        return inputsQuery.graph(result).toInstanceJson();
     }
 
     @Override
@@ -149,12 +198,29 @@ public class DataSetLineageService implements LineageService {
         return getOutputsGraphForId(guid);
     }
 
-    private String getOutputsGraphForId(String guid) {
+    private String getOutputsGraphForId(String guid) throws EntityNotFoundException {
+        Object instanceVertexId = graphHelper.getVertexId(guid);
+        String lineageQuery = String.format(FULL_LINEAGE_QUERY, instanceVertexId, INPUT_PROCESS_EDGE,
OUTPUT_PROCESS_EDGE);
+
         OutputLineageClosureQuery outputsQuery =
                 new OutputLineageClosureQuery(AtlasClient.DATA_SET_SUPER_TYPE, SELECT_INSTANCE_GUID,
guid, HIVE_PROCESS_TYPE_NAME,
                         HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME,
Option.empty(),
                         SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph);
-        return outputsQuery.graph().toInstanceJson();
+
+        LOG.info("Evaluating gremlin lineage output query ={}", lineageQuery);
+
+        GremlinQueryResult result = evaluateLineageQuery(outputsQuery, lineageQuery);
+        return outputsQuery.graph(result).toInstanceJson();
+    }
+
+    private GremlinQueryResult evaluateLineageQuery(ClosureQuery closureQuery, String lineageQuery)
{
+        Expressions.Expression validatedExpression = QueryProcessor.validate(closureQuery.expr());
+        GremlinQuery gremlinQuery = new GremlinTranslator(validatedExpression, graphPersistenceStrategy).translate();
+        //Replace with handcrafted gremlin till we optimize the DSL query associated with
guid + typeName
+        GremlinQuery optimizedQuery = new GremlinQuery(gremlinQuery.expr(), lineageQuery,
gremlinQuery.resultMaping());
+
+
+        return new GremlinEvaluator(optimizedQuery, graphPersistenceStrategy, titanGraph).evaluate();
     }
 
     /**
@@ -168,12 +234,12 @@ public class DataSetLineageService implements LineageService {
     public String getSchema(String datasetName) throws AtlasException {
         datasetName = ParamChecker.notEmpty(datasetName, "table name");
         LOG.info("Fetching schema for tableName={}", datasetName);
-        ReferenceableInstance datasetInstance = validateDatasetNameExists(datasetName);
+        TypeUtils.Pair<String, String> typeIdPair = validateDatasetNameExists(datasetName);
 
-        return getSchemaForId(datasetInstance.getTypeName(), datasetInstance.getId()._getId());
+        return getSchemaForId(typeIdPair.left, typeIdPair.right);
     }
 
-    private String getSchemaForId(String typeName, String guid) throws DiscoveryException,
SchemaNotFoundException {
+    private String getSchemaForId(String typeName, String guid) throws AtlasException {
         String configName = DATASET_SCHEMA_QUERY_PREFIX + typeName;
         if (propertiesConf.getString(configName) != null) {
             final String schemaQuery =
@@ -184,13 +250,45 @@ public class DataSetLineageService implements LineageService {
         throw new SchemaNotFoundException("Schema is not configured for type " + typeName
+ ". Configure " + configName);
     }
 
+    private String getSchemaForId(ITypedReferenceableInstance instance, ClassType type) throws
AtlasException {
+        String configName = DATASET_SCHEMA_QUERY_PREFIX + instance.getTypeName();
+        String schemaAttrName = null;
+
+        if ( schemaAttributeCache.containsKey(instance.getTypeName()) ) {
+            schemaAttrName = schemaAttributeCache.get(instance.getTypeName());
+        } else {
+            schemaAttrName = getSchemaAttributeName(configName, instance.getTypeName());
+            schemaAttributeCache.put(instance.getTypeName(), schemaAttrName);
+        }
+
+        if (schemaAttrName != null) {
+            java.util.List schemaValue = (java.util.List) instance.get(schemaAttrName);
+            GremlinQueryResult queryResult = new GremlinQueryResult(schemaAttrName, type,
schemaValue);
+            return queryResult.toJson();
+        }
+        throw new SchemaNotFoundException("Schema is not configured for type " + instance.getTypeName()
+ ". Configure " + configName);
+    }
+
+
+    private String getSchemaAttributeName(String configName, String typeName) throws SchemaNotFoundException
{
+        String schemaQuery = propertiesConf.getString(configName);
+        final String[] configs = schemaQuery != null ? schemaQuery.split(",") : null;
+
+        if (configs != null && configs.length == 2) {
+            LOG.info("Extracted schema attribute {} for type {} with query {} ", configs[1],
typeName, schemaQuery);
+            return configs[1].trim();
+        } else {
+            throw new SchemaNotFoundException("Schema is not configured as expected for type
" + typeName);
+        }
+    }
+
     @Override
     @GraphTransaction
     public String getSchemaForEntity(String guid) throws AtlasException {
         guid = ParamChecker.notEmpty(guid, "Entity id");
         LOG.info("Fetching schema for entity guid={}", guid);
-        String typeName = validateDatasetExists(guid);
-        return getSchemaForId(typeName, guid);
+        Pair<ITypedReferenceableInstance, ClassType> instanceClassTypePair = validateDatasetExists(guid);
+        return getSchemaForId(instanceClassTypePair.getLeft(), instanceClassTypePair.getRight());
     }
 
     /**
@@ -198,14 +296,16 @@ public class DataSetLineageService implements LineageService {
      *
      * @param datasetName table name
      */
-    private ReferenceableInstance validateDatasetNameExists(String datasetName) throws AtlasException
{
-        final String tableExistsQuery = String.format(DATASET_NAME_EXISTS_QUERY, datasetName);
-        GremlinQueryResult queryResult = discoveryService.evaluate(tableExistsQuery, new
QueryParams(1, 0));
-        if (!(queryResult.rows().length() > 0)) {
-            throw new EntityNotFoundException(datasetName + " does not exist");
+    private TypeUtils.Pair<String, String> validateDatasetNameExists(String datasetName)
throws AtlasException {
+        Iterator<Vertex> results = titanGraph.query().has("Referenceable.qualifiedName",
datasetName)
+                                             .has(Constants.STATE_PROPERTY_KEY, Id.EntityState.ACTIVE.name())
+                                             .has(Constants.SUPER_TYPES_PROPERTY_KEY, AtlasClient.DATA_SET_SUPER_TYPE).limit(1)
+                                             .vertices().iterator();
+        while (results.hasNext()) {
+            Vertex vertex = results.next();
+            return TypeUtils.Pair.of(GraphHelper.getTypeName(vertex), GraphHelper.getIdFromVertex(vertex));
         }
-
-        return (ReferenceableInstance)queryResult.rows().apply(0);
+        throw new EntityNotFoundException("Dataset with name = " + datasetName + " does not
exist");
     }
 
     /**
@@ -213,14 +313,14 @@ public class DataSetLineageService implements LineageService {
      *
      * @param guid entity id
      */
-    private String validateDatasetExists(String guid) throws AtlasException {
-        final String datasetExistsQuery = String.format(DATASET_EXISTS_QUERY, guid);
-        GremlinQueryResult queryResult = discoveryService.evaluate(datasetExistsQuery, new
QueryParams(1, 0));
-        if (!(queryResult.rows().length() > 0)) {
+    private Pair<ITypedReferenceableInstance, ClassType> validateDatasetExists(String
guid) throws AtlasException {
+
+        ITypedReferenceableInstance instance = metadataRepository.getEntityDefinition(guid);
+        String typeName = instance.getTypeName();
+        ClassType clsType = typeSystem.getDataType(ClassType.class, typeName);
+        if ( !clsType.superTypes.contains(AtlasClient.DATA_SET_SUPER_TYPE) ) {
             throw new EntityNotFoundException("Dataset with guid = " + guid + " does not
exist");
         }
-
-        ReferenceableInstance referenceable = (ReferenceableInstance)queryResult.rows().apply(0);
-        return referenceable.getTypeName();
+        return Pair.of(instance, clsType);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3407303d/repository/src/main/java/org/apache/atlas/discovery/graph/DefaultGraphPersistenceStrategy.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/discovery/graph/DefaultGraphPersistenceStrategy.java
b/repository/src/main/java/org/apache/atlas/discovery/graph/DefaultGraphPersistenceStrategy.java
index b17eec7..5143fc8 100755
--- a/repository/src/main/java/org/apache/atlas/discovery/graph/DefaultGraphPersistenceStrategy.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/graph/DefaultGraphPersistenceStrategy.java
@@ -247,6 +247,11 @@ public class DefaultGraphPersistenceStrategy implements GraphPersistenceStrategi
     }
 
     @Override
+    public boolean filterBySubTypes() {
+        return GraphPersistenceStrategies$class.filterBySubTypes(this);
+    }
+
+    @Override
     public boolean addGraphVertexPrefix(scala.collection.Traversable<String> preStatements)
{
         return GraphPersistenceStrategies$class.addGraphVertexPrefix(this, preStatements);
     }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3407303d/repository/src/main/java/org/apache/atlas/discovery/graph/GraphBackedDiscoveryService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/discovery/graph/GraphBackedDiscoveryService.java
b/repository/src/main/java/org/apache/atlas/discovery/graph/GraphBackedDiscoveryService.java
index 0c029bb..060b086 100755
--- a/repository/src/main/java/org/apache/atlas/discovery/graph/GraphBackedDiscoveryService.java
+++ b/repository/src/main/java/org/apache/atlas/discovery/graph/GraphBackedDiscoveryService.java
@@ -56,6 +56,7 @@ import javax.script.ScriptEngine;
 import javax.script.ScriptEngineManager;
 import javax.script.ScriptException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -150,7 +151,7 @@ public class GraphBackedDiscoveryService implements DiscoveryService {
         //If the final limit is 0, don't launch the query, return with 0 rows
         if (validatedExpression instanceof Expressions.LimitExpression
                 && ((Integer)((Expressions.LimitExpression) validatedExpression).limit().rawValue())
== 0) {
-            return new GremlinQueryResult(dslQuery, validatedExpression.dataType());
+            return new GremlinQueryResult(dslQuery, validatedExpression.dataType(), Collections.emptyList());
         }
 
         GremlinQuery gremlinQuery = new GremlinTranslator(validatedExpression, graphPersistenceStrategy).translate();

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3407303d/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
index 334177c..fb2c2ee 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
@@ -19,8 +19,10 @@
 package org.apache.atlas.repository.graph;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
+import com.google.common.collect.Lists;
 import com.thinkaurelius.titan.core.TitanGraph;
 import com.thinkaurelius.titan.core.TitanProperty;
 import com.thinkaurelius.titan.core.TitanVertex;
@@ -30,6 +32,7 @@ import com.tinkerpop.blueprints.Element;
 import com.tinkerpop.blueprints.Graph;
 import com.tinkerpop.blueprints.GraphQuery;
 import com.tinkerpop.blueprints.Vertex;
+import com.tinkerpop.pipes.util.structures.Row;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.RequestContext;
@@ -50,11 +53,17 @@ import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.script.Bindings;
+import javax.script.ScriptContext;
+import javax.script.ScriptEngine;
+import javax.script.ScriptEngineManager;
+import javax.script.ScriptException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
@@ -579,4 +588,11 @@ public final class GraphHelper {
         }
         return key;
     }
-}
+
+    public Object getVertexId(String guid) throws EntityNotFoundException {
+        Vertex instanceVertex = getVertexForGUID(guid);
+        Object instanceVertexId = instanceVertex.getId();
+        return instanceVertexId;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3407303d/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala
----------------------------------------------------------------------
diff --git a/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala b/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala
index c4621cd..f2dd4e6 100755
--- a/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala
+++ b/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala
@@ -147,7 +147,7 @@ trait ClosureQuery {
     QueryProcessor.evaluate(e, g, persistenceStrategy)
   }
 
-  def graph : GraphResult = {
+  def graph(res: GremlinQueryResult) : GraphResult = {
 
     if (!withPath) {
       throw new ExpressionException(expr, "Graph requested for non Path Query")
@@ -155,8 +155,6 @@ trait ClosureQuery {
 
     import scala.collection.JavaConverters._
 
-    val res = evaluate()
-
     val graphResType = TypeUtils.GraphResultStruct.createType(res.resultDataType.asInstanceOf[StructType])
     val vertexPayloadType = {
       val mT = graphResType.fieldMapping.fields.get(TypeUtils.GraphResultStruct.verticesAttrName).
@@ -187,7 +185,7 @@ trait ClosureQuery {
      *   add an entry for the Src Vertex to the vertex Map
      *   add an entry for the Dest Vertex to the vertex Map
      */
-    res.rows.map(_.asInstanceOf[StructInstance]).foreach { r =>
+    res.rows.asScala.map(_.asInstanceOf[StructInstance]).foreach { r =>
 
       val path = r.get(TypeUtils.ResultWithPathStruct.pathAttrName).asInstanceOf[java.util.List[_]].asScala
       val srcVertex = path.head.asInstanceOf[StructInstance]

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3407303d/repository/src/main/scala/org/apache/atlas/query/GraphPersistenceStrategies.scala
----------------------------------------------------------------------
diff --git a/repository/src/main/scala/org/apache/atlas/query/GraphPersistenceStrategies.scala
b/repository/src/main/scala/org/apache/atlas/query/GraphPersistenceStrategies.scala
index f774d97..92b7e96 100755
--- a/repository/src/main/scala/org/apache/atlas/query/GraphPersistenceStrategies.scala
+++ b/repository/src/main/scala/org/apache/atlas/query/GraphPersistenceStrategies.scala
@@ -30,7 +30,9 @@ import org.apache.atlas.repository.graph.{GraphHelper, GraphBackedMetadataReposi
 import org.apache.atlas.typesystem.persistence.Id
 import org.apache.atlas.typesystem.types.DataTypes._
 import org.apache.atlas.typesystem.types._
+import org.apache.atlas.typesystem.types.cache.TypeCache
 import org.apache.atlas.typesystem.{ITypedInstance, ITypedReferenceableInstance}
+import org.elasticsearch.common.collect.ImmutableList
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable
@@ -149,10 +151,14 @@ trait GraphPersistenceStrategies {
      *
      * @return
      */
-    def collectTypeInstancesIntoVar = true
+    def collectTypeInstancesIntoVar = false
+
+    def filterBySubTypes = true
 
     def typeTestExpression(typeName : String, intSeq : IntSequence) : Seq[String] = {
-        if (collectTypeInstancesIntoVar)
+        if (filterBySubTypes)
+            typeTestExpressionUsingInFilter(typeName)
+        else if (collectTypeInstancesIntoVar)
             typeTestExpressionMultiStep(typeName, intSeq)
         else
             typeTestExpressionUsingFilter(typeName)
@@ -180,6 +186,18 @@ trait GraphPersistenceStrategies {
         )
     }
 
+    private def typeTestExpressionUsingInFilter(typeName: String) = {
+        val filters = collection.mutable.Map[TypeCache.TYPE_FILTER, String]();
+        filters put (TypeCache.TYPE_FILTER.SUPERTYPE, typeName)
+        val subTypes : com.google.common.collect.ImmutableList[String] = TypeSystem.getInstance().getTypeNames(filters)
+        val typeNames = new util.ArrayList[String]()
+        typeNames.add(typeName)
+        if ( !subTypes.isEmpty )
+          typeNames.addAll(subTypes)
+
+        Seq(s"""has("${typeAttributeName}", T.in, ${typeNames.mkString("['", "','", "']")})""")
+    }
+
     private def newSetVar(varName : String) = s"$varName = [] as Set"
 
     private def fillVarWithTypeInstances(typeName : String, fillVar : String) = {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3407303d/repository/src/main/scala/org/apache/atlas/query/GremlinEvaluator.scala
----------------------------------------------------------------------
diff --git a/repository/src/main/scala/org/apache/atlas/query/GremlinEvaluator.scala b/repository/src/main/scala/org/apache/atlas/query/GremlinEvaluator.scala
index 10d66a9..2020b44 100755
--- a/repository/src/main/scala/org/apache/atlas/query/GremlinEvaluator.scala
+++ b/repository/src/main/scala/org/apache/atlas/query/GremlinEvaluator.scala
@@ -29,14 +29,12 @@ import org.json4s._
 import org.json4s.native.Serialization._
 import scala.language.existentials
 import org.apache.atlas.query.Expressions._
+import scala.collection.JavaConversions._
+
 
 case class GremlinQueryResult(query: String,
                               resultDataType: IDataType[_],
-                              rows: List[_]) {
-    def this(query: String,resultDataType: IDataType[_]) {
-      this(query,resultDataType,List.empty)
-    }
-  
+                              rows: java.util.List[_]) {
     def toJson = JsonHelper.toJson(this)
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3407303d/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java b/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java
index 500a305..04eb8a4 100644
--- a/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java
+++ b/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java
@@ -22,7 +22,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.thinkaurelius.titan.core.TitanGraph;
 import com.thinkaurelius.titan.core.util.TitanCleanup;
-
 import org.apache.atlas.repository.MetadataRepository;
 import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
 import org.apache.atlas.repository.graph.GraphProvider;
@@ -46,7 +45,6 @@ import org.apache.atlas.typesystem.types.utils.TypesUtil;
 import org.testng.annotations.Guice;
 
 import javax.inject.Inject;
-
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
@@ -329,7 +327,7 @@ public class BaseRepositoryTest {
         List<Referenceable> columns, String... traitNames) throws Exception {
         Referenceable referenceable = new Referenceable(HIVE_TABLE_TYPE, traitNames);
         referenceable.set("name", name);
-        referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
+        referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, "qualified:" + name);
         referenceable.set("description", description);
         referenceable.set("owner", owner);
         referenceable.set("tableType", tableType);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3407303d/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java
b/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java
index b675459..a0ee26c 100644
--- a/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java
@@ -34,6 +34,7 @@ import org.apache.atlas.typesystem.persistence.Id;
 import org.apache.commons.collections.ArrayStack;
 import org.apache.commons.lang.RandomStringUtils;
 import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -156,14 +157,14 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
         testInvalidArguments(expectedException, new Invoker() {
             @Override
             void run() throws AtlasException {
-                lineageService.getInputsGraphForEntity(tableName);
+                lineageService.getInputsGraph(tableName);
             }
         });
     }
 
     @Test
     public void testGetInputsGraph() throws Exception {
-        JSONObject results = new JSONObject(lineageService.getInputsGraph("sales_fact_monthly_mv"));
+        JSONObject results = getInputsGraph("sales_fact_monthly_mv");
         assertNotNull(results);
         System.out.println("inputs graph = " + results);
 
@@ -179,7 +180,7 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
 
     @Test
     public void testCircularLineage() throws Exception{
-        JSONObject results = new JSONObject(lineageService.getInputsGraph("table2"));
+        JSONObject results = getInputsGraph("table2");
         assertNotNull(results);
         System.out.println("inputs graph = " + results);
 
@@ -223,19 +224,19 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
     }
 
     @Test(dataProvider = "invalidArgumentsProvider")
-    public void testGetOutputsGraphForEntityInvalidArguments(final String tableName, String
expectedException)
+    public void testGetOutputsGraphForEntityInvalidArguments(final String tableId, String
expectedException)
             throws Exception {
         testInvalidArguments(expectedException, new Invoker() {
             @Override
             void run() throws AtlasException {
-                lineageService.getOutputsGraphForEntity(tableName);
+                lineageService.getOutputsGraphForEntity(tableId);
             }
         });
     }
 
     @Test
     public void testGetOutputsGraph() throws Exception {
-        JSONObject results = new JSONObject(lineageService.getOutputsGraph("sales_fact"));
+        JSONObject results = getOutputsGraph("sales_fact");
         assertNotNull(results);
         System.out.println("outputs graph = " + results);
 
@@ -276,7 +277,7 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
 
     @Test(dataProvider = "tableNamesProvider")
     public void testGetSchema(String tableName, String expected) throws Exception {
-        JSONObject results = new JSONObject(lineageService.getSchema(tableName));
+        JSONObject results = getSchema(tableName);
         assertNotNull(results);
         System.out.println("columns = " + results);
 
@@ -284,11 +285,7 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
         Assert.assertEquals(rows.length(), Integer.parseInt(expected));
 
         for (int index = 0; index < rows.length(); index++) {
-            final JSONObject row = rows.getJSONObject(index);
-            assertNotNull(row.getString("name"));
-            assertNotNull(row.getString("comment"));
-            assertNotNull(row.getString("dataType"));
-            Assert.assertEquals(row.getString("$typeName$"), "hive_column");
+            assertColumn(rows.getJSONObject(index));
         }
     }
 
@@ -305,14 +302,17 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
         Assert.assertEquals(rows.length(), Integer.parseInt(expected));
 
         for (int index = 0; index < rows.length(); index++) {
-            final JSONObject row = rows.getJSONObject(index);
-            assertNotNull(row.getString("name"));
-            assertNotNull(row.getString("comment"));
-            assertNotNull(row.getString("dataType"));
-            Assert.assertEquals(row.getString("$typeName$"), "hive_column");
+            assertColumn(rows.getJSONObject(index));
         }
     }
 
+    private void assertColumn(JSONObject jsonObject) throws JSONException {
+        assertNotNull(jsonObject.getString("name"));
+        assertNotNull(jsonObject.getString("comment"));
+        assertNotNull(jsonObject.getString("dataType"));
+        Assert.assertEquals(jsonObject.getString("$typeName$"), "hive_column");
+    }
+
     @Test(expectedExceptions = SchemaNotFoundException.class)
     public void testGetSchemaForDBEntity() throws Exception {
         String dbId = getEntityId(DATASET_SUBTYPE, "name", "dataSetSubTypeInst1");
@@ -359,23 +359,35 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
         });
     }
 
+    private JSONObject getSchema(String tableName) throws Exception {
+        return new JSONObject(lineageService.getSchema("qualified:" + tableName));
+    }
+
+    private JSONObject getInputsGraph(String tableName) throws Exception {
+        return new JSONObject(lineageService.getInputsGraph("qualified:" + tableName));
+    }
+
+    private JSONObject getOutputsGraph(String tableName) throws Exception {
+        return new JSONObject(lineageService.getOutputsGraph("qualified:" + tableName));
+    }
+
     @Test
     public void testLineageWithDelete() throws Exception {
         String tableName = "table" + random();
         createTable(tableName, 3, true);
         String tableId = getEntityId(HIVE_TABLE_TYPE, "name", tableName);
 
-        JSONObject results = new JSONObject(lineageService.getSchema(tableName));
+        JSONObject results = getSchema(tableName);
         assertEquals(results.getJSONArray("rows").length(), 3);
 
-        results = new JSONObject(lineageService.getInputsGraph(tableName));
+        results = getInputsGraph(tableName);
         Struct resultInstance = InstanceSerialization.fromJsonStruct(results.toString(),
true);
         Map<String, Struct> vertices = (Map) resultInstance.get("vertices");
         assertEquals(vertices.size(), 2);
         Struct vertex = vertices.get(tableId);
         assertEquals(((Struct) vertex.get("vertexId")).get("state"), Id.EntityState.ACTIVE.name());
 
-        results = new JSONObject(lineageService.getOutputsGraph(tableName));
+        results = getOutputsGraph(tableName);
         assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(),
2);
 
         results = new JSONObject(lineageService.getSchemaForEntity(tableId));
@@ -408,21 +420,21 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
         assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(),
2);
 
         try {
-            lineageService.getSchema(tableName);
+            getSchema(tableName);
             fail("Expected EntityNotFoundException");
         } catch (EntityNotFoundException e) {
             //expected
         }
 
         try {
-            lineageService.getInputsGraph(tableName);
+            getInputsGraph(tableName);
             fail("Expected EntityNotFoundException");
         } catch (EntityNotFoundException e) {
             //expected
         }
 
         try {
-            lineageService.getOutputsGraph(tableName);
+            getOutputsGraph(tableName);
             fail("Expected EntityNotFoundException");
         } catch (EntityNotFoundException e) {
             //expected
@@ -430,13 +442,13 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest {
 
         //Create table again should show new lineage
         createTable(tableName, 2, false);
-        results = new JSONObject(lineageService.getSchema(tableName));
+        results = getSchema(tableName);
         assertEquals(results.getJSONArray("rows").length(), 2);
 
-        results = new JSONObject(lineageService.getOutputsGraph(tableName));
+        results = getOutputsGraph(tableName);
         assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(),
0);
 
-        results = new JSONObject(lineageService.getInputsGraph(tableName));
+        results = getInputsGraph(tableName);
         assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(),
0);
 
         tableId = getEntityId(HIVE_TABLE_TYPE, "name", tableName);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3407303d/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
index 2541541..69bb45b 100755
--- a/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepositoryTest.java
@@ -135,8 +135,8 @@ public class GraphBackedMetadataRepositoryTest {
         }
     }
 
-    @Test
     //In some cases of parallel APIs, the edge is added, but get edge by label doesn't return
the edge. ATLAS-1104
+    @Test
     public void testConcurrentCalls() throws Exception {
         final HierarchicalTypeDefinition<ClassType> refType =
                 createClassTypeDef(randomString(), ImmutableSet.<String>of());
@@ -188,7 +188,7 @@ public class GraphBackedMetadataRepositoryTest {
 
     private boolean assertEdge(String id, String typeName) throws Exception {
         TitanGraph graph = graphProvider.get();
-        Vertex vertex = (Vertex)graph.query().has(Constants.GUID_PROPERTY_KEY, id).vertices().iterator().next();
+        Vertex vertex = (Vertex) graph.query().has(Constants.GUID_PROPERTY_KEY, id).vertices().iterator().next();
         Iterable<Edge> edges = vertex.getEdges(Direction.OUT, Constants.INTERNAL_PROPERTY_KEY_PREFIX
+ typeName + ".ref");
         if (!edges.iterator().hasNext()) {
             ITypedReferenceableInstance entity = repositoryService.getEntityDefinition(id);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3407303d/repository/src/test/scala/org/apache/atlas/query/GremlinTest2.scala
----------------------------------------------------------------------
diff --git a/repository/src/test/scala/org/apache/atlas/query/GremlinTest2.scala b/repository/src/test/scala/org/apache/atlas/query/GremlinTest2.scala
index f65cedb..b0961b0 100755
--- a/repository/src/test/scala/org/apache/atlas/query/GremlinTest2.scala
+++ b/repository/src/test/scala/org/apache/atlas/query/GremlinTest2.scala
@@ -116,11 +116,13 @@ class GremlinTest2 extends BaseGremlinTest {
   }
 
   @Test def testHighLevelLineageReturnGraph {
-    val r = InputLineageClosureQuery("Table", "name", "sales_fact_monthly_mv",
+    val q = InputLineageClosureQuery("Table", "name", "sales_fact_monthly_mv",
       "LoadProcess",
       "inputTables",
       "outputTable",
-      None, Some(List("name")), true, GraphPersistenceStrategy1, g).graph
+      None, Some(List("name")), true, GraphPersistenceStrategy1, g);
+    val gr = q.evaluate();
+    val r = q.graph(gr);
 
     println(r.toInstanceJson)
     //validateJson(r)
@@ -136,11 +138,13 @@ class GremlinTest2 extends BaseGremlinTest {
   }
 
   @Test def testHighLevelWhereUsedReturnGraph {
-    val r = OutputLineageClosureQuery("Table", "name", "sales_fact",
+    val q = OutputLineageClosureQuery("Table", "name", "sales_fact",
       "LoadProcess",
       "inputTables",
       "outputTable",
-      None, Some(List("name")), true, GraphPersistenceStrategy1, g).graph
+      None, Some(List("name")), true, GraphPersistenceStrategy1, g)
+    val gr = q.evaluate();
+    val r = q.graph(gr);
     println(r.toInstanceJson)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/3407303d/titan/src/main/java/com/thinkaurelius/titan/diskstorage/solr/Solr5Index.java
----------------------------------------------------------------------
diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/solr/Solr5Index.java
b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/solr/Solr5Index.java
index f3b9fd9..0176208 100644
--- a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/solr/Solr5Index.java
+++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/solr/Solr5Index.java
@@ -224,7 +224,6 @@ public class Solr5Index implements IndexProvider {
 
             logger.info("Zookeeper session timeout : " + config.get(ZOOKEEPER_SESSION_TIMEOUT));
             cloudServer.setZkClientTimeout(config.get(ZOOKEEPER_SESSION_TIMEOUT));
-
             cloudServer.connect();
             solrClient = cloudServer;
         } else if (mode==Mode.HTTP) {



Mime
View raw message