atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shweth...@apache.org
Subject [2/2] incubator-atlas git commit: ATLAS-713 Entity lineage based on entity id (shwethags)
Date Wed, 18 May 2016 12:25:40 GMT
ATLAS-713 Entity lineage based on entity id (shwethags)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/b65dd91c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/b65dd91c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/b65dd91c

Branch: refs/heads/master
Commit: b65dd91c3587d35abafc4ec136e162f9a5c92ac1
Parents: 857561a
Author: Shwetha GS <sshivalingamurthy@hortonworks.com>
Authored: Wed May 18 17:55:24 2016 +0530
Committer: Shwetha GS <sshivalingamurthy@hortonworks.com>
Committed: Wed May 18 17:55:24 2016 +0530

----------------------------------------------------------------------
 .../main/java/org/apache/atlas/AtlasClient.java |  41 +-
 dashboardv2/public/js/models/VLineage.js        |   4 +-
 dashboardv2/public/js/models/VSchema.js         |   4 +-
 .../views/detail_page/DetailPageLayoutView.js   |   8 +-
 .../public/js/views/graph/LineageLayoutView.js  |   4 +-
 .../public/js/views/schema/SchemaLayoutView.js  |   2 +-
 distro/src/conf/atlas-application.properties    |  10 +-
 release-log.txt                                 |   1 +
 .../apache/atlas/RepositoryMetadataModule.java  |   4 +-
 .../atlas/discovery/DataSetLineageService.java  | 215 +++++++++
 .../atlas/discovery/HiveLineageService.java     | 222 ---------
 .../org/apache/atlas/query/ClosureQuery.scala   |  44 +-
 .../apache/atlas/BaseHiveRepositoryTest.java    | 377 ----------------
 .../org/apache/atlas/BaseRepositoryTest.java    | 377 ++++++++++++++++
 .../discovery/DataSetLineageServiceTest.java    | 447 +++++++++++++++++++
 .../GraphBackedDiscoveryServiceTest.java        |   4 +-
 .../atlas/discovery/HiveLineageServiceTest.java | 260 -----------
 .../org/apache/atlas/query/GremlinTest2.scala   |   8 +-
 .../apache/atlas/discovery/LineageService.java  |  44 +-
 .../main/resources/atlas-application.properties |   8 +-
 .../web/resources/DataSetLineageResource.java   | 162 +++++++
 .../web/resources/HiveLineageResource.java      | 166 -------
 .../atlas/web/resources/LineageResource.java    | 153 +++++++
 .../DataSetLineageJerseyResourceIT.java         | 306 +++++++++++++
 .../resources/HiveLineageJerseyResourceIT.java  | 257 -----------
 25 files changed, 1768 insertions(+), 1360 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/client/src/main/java/org/apache/atlas/AtlasClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/atlas/AtlasClient.java b/client/src/main/java/org/apache/atlas/AtlasClient.java
index b3ec95c..7e32cc2 100755
--- a/client/src/main/java/org/apache/atlas/AtlasClient.java
+++ b/client/src/main/java/org/apache/atlas/AtlasClient.java
@@ -90,7 +90,8 @@ public class AtlasClient {
     public static final String URI_ENTITY = "entities";
     public static final String URI_ENTITY_AUDIT = "audit";
     public static final String URI_SEARCH = "discovery/search";
-    public static final String URI_LINEAGE = "lineage/hive/table";
+    public static final String URI_NAME_LINEAGE = "lineage/hive/table";
+    public static final String URI_LINEAGE = "lineage/";
     public static final String URI_TRAITS = "traits";
 
     public static final String QUERY = "query";
@@ -416,7 +417,12 @@ public class AtlasClient {
         SEARCH_GREMLIN(BASE_URI + URI_SEARCH + "/gremlin", HttpMethod.GET, Response.Status.OK),
         SEARCH_FULL_TEXT(BASE_URI + URI_SEARCH + "/fulltext", HttpMethod.GET, Response.Status.OK),
 
-        //Lineage operations
+        //Lineage operations based on dataset name
+        NAME_LINEAGE_INPUTS_GRAPH(BASE_URI + URI_NAME_LINEAGE, HttpMethod.GET, Response.Status.OK),
+        NAME_LINEAGE_OUTPUTS_GRAPH(BASE_URI + URI_NAME_LINEAGE, HttpMethod.GET, Response.Status.OK),
+        NAME_LINEAGE_SCHEMA(BASE_URI + URI_NAME_LINEAGE, HttpMethod.GET, Response.Status.OK),
+
+        //Lineage operations based on entity id of the dataset
         LINEAGE_INPUTS_GRAPH(BASE_URI + URI_LINEAGE, HttpMethod.GET, Response.Status.OK),
         LINEAGE_OUTPUTS_GRAPH(BASE_URI + URI_LINEAGE, HttpMethod.GET, Response.Status.OK),
         LINEAGE_SCHEMA(BASE_URI + URI_LINEAGE, HttpMethod.GET, Response.Status.OK);
@@ -988,7 +994,7 @@ public class AtlasClient {
     }
 
     public JSONObject getInputGraph(String datasetName) throws AtlasServiceException {
-        JSONObject response = callAPI(API.LINEAGE_INPUTS_GRAPH, null, datasetName, "/inputs/graph");
+        JSONObject response = callAPI(API.NAME_LINEAGE_INPUTS_GRAPH, null, datasetName, "/inputs/graph");
         try {
             return response.getJSONObject(AtlasClient.RESULTS);
         } catch (JSONException e) {
@@ -997,7 +1003,34 @@ public class AtlasClient {
     }
 
     public JSONObject getOutputGraph(String datasetName) throws AtlasServiceException {
-        JSONObject response = callAPI(API.LINEAGE_OUTPUTS_GRAPH, null, datasetName, "/outputs/graph");
+        JSONObject response = callAPI(API.NAME_LINEAGE_OUTPUTS_GRAPH, null, datasetName, "/outputs/graph");
+        try {
+            return response.getJSONObject(AtlasClient.RESULTS);
+        } catch (JSONException e) {
+            throw new AtlasServiceException(e);
+        }
+    }
+
+    public JSONObject getInputGraphForEntity(String entityId) throws AtlasServiceException {
+        JSONObject response = callAPI(API.LINEAGE_INPUTS_GRAPH, null, entityId, "/inputs/graph");
+        try {
+            return response.getJSONObject(AtlasClient.RESULTS);
+        } catch (JSONException e) {
+            throw new AtlasServiceException(e);
+        }
+    }
+
+    public JSONObject getOutputGraphForEntity(String datasetId) throws AtlasServiceException {
+        JSONObject response = callAPI(API.LINEAGE_OUTPUTS_GRAPH, null, datasetId, "/outputs/graph");
+        try {
+            return response.getJSONObject(AtlasClient.RESULTS);
+        } catch (JSONException e) {
+            throw new AtlasServiceException(e);
+        }
+    }
+
+    public JSONObject getSchemaForEntity(String datasetId) throws AtlasServiceException {
+        JSONObject response = callAPI(API.LINEAGE_OUTPUTS_GRAPH, null, datasetId, "/schema");
         try {
             return response.getJSONObject(AtlasClient.RESULTS);
         } catch (JSONException e) {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/dashboardv2/public/js/models/VLineage.js
----------------------------------------------------------------------
diff --git a/dashboardv2/public/js/models/VLineage.js b/dashboardv2/public/js/models/VLineage.js
index e33488a..fa1be05 100644
--- a/dashboardv2/public/js/models/VLineage.js
+++ b/dashboardv2/public/js/models/VLineage.js
@@ -23,7 +23,7 @@ define(['require',
     'use strict';
     var VLineage = VBaseModel.extend({
 
-        urlRoot: Globals.baseURL + 'api/atlas/lineage/hive/table/assetName/outputs/graph',
+        urlRoot: Globals.baseURL + 'api/atlas/lineage/assetName/outputs/graph',
 
         defaults: {},
 
@@ -36,7 +36,7 @@ define(['require',
             this.bindErrorEvents();
         },
         toString: function() {
-            return this.get('name');
+            return this.get('id');
         },
     }, {});
     return VLineage;

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/dashboardv2/public/js/models/VSchema.js
----------------------------------------------------------------------
diff --git a/dashboardv2/public/js/models/VSchema.js b/dashboardv2/public/js/models/VSchema.js
index 1f8e0bb..24462e6 100644
--- a/dashboardv2/public/js/models/VSchema.js
+++ b/dashboardv2/public/js/models/VSchema.js
@@ -22,7 +22,7 @@ define(['require',
 ], function(require, Globals, VBaseModel) {
     'use strict';
     var VSchema = VBaseModel.extend({
-        urlRoot: Globals.baseURL + '/api/atlas/lineage/hive/table/log_fact_daily_mv/schema',
+        urlRoot: Globals.baseURL + '/api/atlas/lineage/log_fact_daily_mv/schema',
 
         defaults: {},
 
@@ -35,7 +35,7 @@ define(['require',
             this.bindErrorEvents();
         },
         toString: function() {
-            return this.get('name');
+            return this.get('id');
         },
     }, {});
     return VSchema;

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/dashboardv2/public/js/views/detail_page/DetailPageLayoutView.js
----------------------------------------------------------------------
diff --git a/dashboardv2/public/js/views/detail_page/DetailPageLayoutView.js b/dashboardv2/public/js/views/detail_page/DetailPageLayoutView.js
index 87adec0..0932208 100644
--- a/dashboardv2/public/js/views/detail_page/DetailPageLayoutView.js
+++ b/dashboardv2/public/js/views/detail_page/DetailPageLayoutView.js
@@ -92,7 +92,7 @@ define(['require',
                     this.renderEntityDetailTableLayoutView();
                     this.renderTagTableLayoutView(tagGuid);
                     this.renderLineageLayoutView(tagGuid);
-                    this.renderSchemaLayoutView();
+                    this.renderSchemaLayoutView(tagGuid);
                 }, this);
             },
             onRender: function() {},
@@ -120,17 +120,17 @@ define(['require',
                 require(['views/graph/LineageLayoutView'], function(LineageLayoutView) {
                     that.RLineageLayoutView.show(new LineageLayoutView({
                         globalVent: that.globalVent,
-                        assetName: that.name,
+                        assetName: tagGuid,
                         guid: tagGuid
                     }));
                 });
             },
-            renderSchemaLayoutView: function() {
+            renderSchemaLayoutView: function(tagGuid) {
                 var that = this;
                 require(['views/schema/SchemaLayoutView'], function(SchemaLayoutView) {
                     that.RSchemaTableLayoutView.show(new SchemaLayoutView({
                         globalVent: that.globalVent,
-                        name: that.name,
+                        name: tagGuid,
                         vent: that.vent
                     }));
                 });

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/dashboardv2/public/js/views/graph/LineageLayoutView.js
----------------------------------------------------------------------
diff --git a/dashboardv2/public/js/views/graph/LineageLayoutView.js b/dashboardv2/public/js/views/graph/LineageLayoutView.js
index 973d091..31433c1 100644
--- a/dashboardv2/public/js/views/graph/LineageLayoutView.js
+++ b/dashboardv2/public/js/views/graph/LineageLayoutView.js
@@ -56,8 +56,8 @@ define(['require',
                 this.inputCollection = new VLineageList();
                 this.outputCollection = new VLineageList();
                 this.entityModel = new VEntity();
-                this.inputCollection.url = "/api/atlas/lineage/hive/table/" + this.assetName + "/inputs/graph";
-                this.outputCollection.url = "/api/atlas/lineage/hive/table/" + this.assetName + "/outputs/graph";
+                this.inputCollection.url = "/api/atlas/lineage/" + this.assetName + "/inputs/graph";
+                this.outputCollection.url = "/api/atlas/lineage/" + this.assetName + "/outputs/graph";
                 this.bindEvents();
                 this.fetchGraphData();
                 this.data = {};

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/dashboardv2/public/js/views/schema/SchemaLayoutView.js
----------------------------------------------------------------------
diff --git a/dashboardv2/public/js/views/schema/SchemaLayoutView.js b/dashboardv2/public/js/views/schema/SchemaLayoutView.js
index de558a7..301b993 100644
--- a/dashboardv2/public/js/views/schema/SchemaLayoutView.js
+++ b/dashboardv2/public/js/views/schema/SchemaLayoutView.js
@@ -73,7 +73,7 @@ define(['require',
             initialize: function(options) {
                 _.extend(this, _.pick(options, 'globalVent', 'name', 'vent'));
                 this.schemaCollection = new VSchemaList([], {});
-                this.schemaCollection.url = "/api/atlas/lineage/hive/table/" + this.name + "/schema";
+                this.schemaCollection.url = "/api/atlas/lineage/" + this.name + "/schema";
                 this.commonTableOptions = {
                     collection: this.schemaCollection,
                     includeFilter: false,

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/distro/src/conf/atlas-application.properties
----------------------------------------------------------------------
diff --git a/distro/src/conf/atlas-application.properties b/distro/src/conf/atlas-application.properties
index 68a0021..d4722fb 100755
--- a/distro/src/conf/atlas-application.properties
+++ b/distro/src/conf/atlas-application.properties
@@ -63,15 +63,9 @@ atlas.kafka.auto.commit.enable=false
 
 
 #########  Hive Lineage Configs  #########
-# This models reflects the base super types for Data and Process
-#atlas.lineage.hive.table.type.name=DataSet
-#atlas.lineage.hive.process.type.name=Process
-#atlas.lineage.hive.process.inputs.name=inputs
-#atlas.lineage.hive.process.outputs.name=outputs
-
 ## Schema
-atlas.lineage.hive.table.schema.query.hive_table=hive_table where name='%s'\, columns
-atlas.lineage.hive.table.schema.query.Table=Table where name='%s'\, columns
+atlas.lineage.schema.query.hive_table=hive_table where __guid='%s'\, columns
+atlas.lineage.schema.query.Table=Table where __guid='%s'\, columns
 
 ## Server port configuration
 #atlas.server.http.port=21000

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index b600fff..a68010a 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -21,6 +21,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset
 ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags)
 
 ALL CHANGES:
+ATLAS-713 Entity lineage based on entity id (shwethags)
 ATLAS-736 UI - BUG :: displaying timestamp values for hive_db description (kevalbhatt18 via yhemanth)
 ATLAS-784 Configure config.store.uri for Falcon hook IT (yhemanth)
 ATLAS-645 FieldMapping.output() results in stack overflow when instances reference each other (dkantor via shwethags)

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java b/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
index 8dae968..68b707f 100755
--- a/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
+++ b/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java
@@ -26,7 +26,7 @@ import com.google.inject.throwingproviders.ThrowingProviderBinder;
 import com.thinkaurelius.titan.core.TitanGraph;
 import org.aopalliance.intercept.MethodInterceptor;
 import org.apache.atlas.discovery.DiscoveryService;
-import org.apache.atlas.discovery.HiveLineageService;
+import org.apache.atlas.discovery.DataSetLineageService;
 import org.apache.atlas.discovery.LineageService;
 import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
 import org.apache.atlas.listener.EntityChangeListener;
@@ -83,7 +83,7 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule {
         // bind the DiscoveryService interface to an implementation
         bind(DiscoveryService.class).to(GraphBackedDiscoveryService.class).asEagerSingleton();
 
-        bind(LineageService.class).to(HiveLineageService.class).asEagerSingleton();
+        bind(LineageService.class).to(DataSetLineageService.class).asEagerSingleton();
 
         bindAuditRepository(binder());
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java
new file mode 100644
index 0000000..39dde2a
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.discovery;
+
+import com.thinkaurelius.titan.core.TitanGraph;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.GraphTransaction;
+import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy;
+import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
+import org.apache.atlas.query.GremlinQueryResult;
+import org.apache.atlas.query.InputLineageClosureQuery;
+import org.apache.atlas.query.OutputLineageClosureQuery;
+import org.apache.atlas.repository.MetadataRepository;
+import org.apache.atlas.repository.graph.GraphProvider;
+import org.apache.atlas.typesystem.exception.EntityNotFoundException;
+import org.apache.atlas.typesystem.persistence.ReferenceableInstance;
+import org.apache.atlas.utils.ParamChecker;
+import org.apache.commons.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Some;
+import scala.collection.immutable.List;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
+/**
+ * Hive implementation of Lineage service interface.
+ */
+@Singleton
+public class DataSetLineageService implements LineageService {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DataSetLineageService.class);
+
+    private static final Option<List<String>> SELECT_ATTRIBUTES =
+            Some.<List<String>>apply(List.<String>fromArray(new String[]{"name"}));
+    public static final String SELECT_INSTANCE_GUID = "__guid";
+
+    public static final String DATASET_SCHEMA_QUERY_PREFIX = "atlas.lineage.schema.query.";
+
+    private static final String HIVE_PROCESS_TYPE_NAME = "Process";
+    private static final String HIVE_PROCESS_INPUT_ATTRIBUTE_NAME = "inputs";
+    private static final String HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME = "outputs";
+
+    private static final String DATASET_EXISTS_QUERY = AtlasClient.DATA_SET_SUPER_TYPE + " where __guid = '%s'";
+    private static final String DATASET_NAME_EXISTS_QUERY =
+            AtlasClient.DATA_SET_SUPER_TYPE + " where name = '%s' and __state = 'ACTIVE'";
+
+    private static final Configuration propertiesConf;
+
+    static {
+        try {
+            propertiesConf = ApplicationProperties.get();
+        } catch (AtlasException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+
+    private final TitanGraph titanGraph;
+    private final DefaultGraphPersistenceStrategy graphPersistenceStrategy;
+    private final GraphBackedDiscoveryService discoveryService;
+
+    @Inject
+    DataSetLineageService(GraphProvider<TitanGraph> graphProvider, MetadataRepository metadataRepository,
+                          GraphBackedDiscoveryService discoveryService) throws DiscoveryException {
+        this.titanGraph = graphProvider.get();
+        this.graphPersistenceStrategy = new DefaultGraphPersistenceStrategy(metadataRepository);
+        this.discoveryService = discoveryService;
+    }
+
+    /**
+     * Return the lineage outputs graph for the given datasetName.
+     *
+     * @param datasetName datasetName
+     * @return Outputs Graph as JSON
+     */
+    @Override
+    @GraphTransaction
+    public String getOutputsGraph(String datasetName) throws AtlasException {
+        LOG.info("Fetching lineage outputs graph for datasetName={}", datasetName);
+        ParamChecker.notEmpty(datasetName, "dataset name");
+        ReferenceableInstance datasetInstance = validateDatasetNameExists(datasetName);
+        return getOutputsGraphForId(datasetInstance.getId()._getId());
+    }
+
+    /**
+     * Return the lineage inputs graph for the given tableName.
+     *
+     * @param tableName tableName
+     * @return Inputs Graph as JSON
+     */
+    @Override
+    @GraphTransaction
+    public String getInputsGraph(String tableName) throws AtlasException {
+        LOG.info("Fetching lineage inputs graph for tableName={}", tableName);
+        ParamChecker.notEmpty(tableName, "table name");
+        ReferenceableInstance datasetInstance = validateDatasetNameExists(tableName);
+        return getInputsGraphForId(datasetInstance.getId()._getId());
+    }
+
+    @Override
+    public String getInputsGraphForEntity(String guid) throws AtlasException {
+        LOG.info("Fetching lineage inputs graph for entity={}", guid);
+        ParamChecker.notEmpty(guid, "Entity id");
+        validateDatasetExists(guid);
+        return getInputsGraphForId(guid);
+    }
+
+    private String getInputsGraphForId(String guid) {
+        InputLineageClosureQuery
+                inputsQuery = new InputLineageClosureQuery(AtlasClient.DATA_SET_SUPER_TYPE, SELECT_INSTANCE_GUID,
+                guid, HIVE_PROCESS_TYPE_NAME,
+                HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(),
+                SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph);
+        return inputsQuery.graph().toInstanceJson();
+    }
+
+    @Override
+    public String getOutputsGraphForEntity(String guid) throws AtlasException {
+        LOG.info("Fetching lineage outputs graph for entity guid={}", guid);
+        ParamChecker.notEmpty(guid, "Entity id");
+        validateDatasetExists(guid);
+        return getOutputsGraphForId(guid);
+    }
+
+    private String getOutputsGraphForId(String guid) {
+        OutputLineageClosureQuery outputsQuery =
+                new OutputLineageClosureQuery(AtlasClient.DATA_SET_SUPER_TYPE, SELECT_INSTANCE_GUID, guid, HIVE_PROCESS_TYPE_NAME,
+                        HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(),
+                        SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph);
+        return outputsQuery.graph().toInstanceJson();
+    }
+
+    /**
+     * Return the schema for the given tableName.
+     *
+     * @param datasetName tableName
+     * @return Schema as JSON
+     */
+    @Override
+    @GraphTransaction
+    public String getSchema(String datasetName) throws AtlasException {
+        ParamChecker.notEmpty(datasetName, "table name");
+        LOG.info("Fetching schema for tableName={}", datasetName);
+        ReferenceableInstance datasetInstance = validateDatasetNameExists(datasetName);
+
+        return getSchemaForId(datasetInstance.getTypeName(), datasetInstance.getId()._getId());
+    }
+
+    private String getSchemaForId(String typeName, String guid) throws DiscoveryException {
+        final String schemaQuery =
+                String.format(propertiesConf.getString(DATASET_SCHEMA_QUERY_PREFIX + typeName), guid);
+        return discoveryService.searchByDSL(schemaQuery);
+    }
+
+    @Override
+    public String getSchemaForEntity(String guid) throws AtlasException {
+        ParamChecker.notEmpty(guid, "Entity id");
+        LOG.info("Fetching schema for entity guid={}", guid);
+        String typeName = validateDatasetExists(guid);
+        return getSchemaForId(typeName, guid);
+    }
+
+    /**
+     * Validate if indeed this is a table type and exists.
+     *
+     * @param datasetName table name
+     */
+    private ReferenceableInstance validateDatasetNameExists(String datasetName) throws AtlasException {
+        final String tableExistsQuery = String.format(DATASET_NAME_EXISTS_QUERY, datasetName);
+        GremlinQueryResult queryResult = discoveryService.evaluate(tableExistsQuery);
+        if (!(queryResult.rows().length() > 0)) {
+            throw new EntityNotFoundException(datasetName + " does not exist");
+        }
+
+        return (ReferenceableInstance)queryResult.rows().apply(0);
+    }
+
+    /**
+     * Validate if indeed this is a table type and exists.
+     *
+     * @param guid entity id
+     */
+    private String validateDatasetExists(String guid) throws AtlasException {
+        final String datasetExistsQuery = String.format(DATASET_EXISTS_QUERY, guid);
+        GremlinQueryResult queryResult = discoveryService.evaluate(datasetExistsQuery);
+        if (!(queryResult.rows().length() > 0)) {
+            throw new EntityNotFoundException("Dataset with guid = " + guid + " does not exist");
+        }
+
+        ReferenceableInstance referenceable = (ReferenceableInstance)queryResult.rows().apply(0);
+        return referenceable.getTypeName();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/repository/src/main/java/org/apache/atlas/discovery/HiveLineageService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/discovery/HiveLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/HiveLineageService.java
deleted file mode 100644
index 00905d7..0000000
--- a/repository/src/main/java/org/apache/atlas/discovery/HiveLineageService.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.discovery;
-
-import com.thinkaurelius.titan.core.TitanGraph;
-import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.GraphTransaction;
-import org.apache.atlas.typesystem.exception.EntityNotFoundException;
-import org.apache.atlas.utils.ParamChecker;
-import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy;
-import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService;
-import org.apache.atlas.query.Expressions;
-import org.apache.atlas.query.GremlinQueryResult;
-import org.apache.atlas.query.HiveLineageQuery;
-import org.apache.atlas.query.HiveWhereUsedQuery;
-import org.apache.atlas.repository.MetadataRepository;
-import org.apache.atlas.repository.graph.GraphProvider;
-import org.apache.atlas.typesystem.persistence.ReferenceableInstance;
-import org.apache.commons.configuration.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.Some;
-import scala.collection.immutable.List;
-
-import javax.inject.Inject;
-import javax.inject.Singleton;
-
-/**
- * Hive implementation of Lineage service interface.
- */
-@Singleton
-public class HiveLineageService implements LineageService {
-
-    private static final Logger LOG = LoggerFactory.getLogger(HiveLineageService.class);
-
-    private static final Option<List<String>> SELECT_ATTRIBUTES =
-            Some.<List<String>>apply(List.<String>fromArray(new String[]{"name"}));
-
-    public static final String HIVE_TABLE_SCHEMA_QUERY_PREFIX = "atlas.lineage.hive.table.schema.query.";
-
-    private static final String HIVE_TABLE_TYPE_NAME;
-    private static final String HIVE_PROCESS_TYPE_NAME;
-    private static final String HIVE_PROCESS_INPUT_ATTRIBUTE_NAME;
-    private static final String HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME;
-
-    private static final String HIVE_TABLE_EXISTS_QUERY;
-
-    private static final Configuration propertiesConf;
-
-    static {
-        // todo - externalize this using type system - dog food
-        try {
-            propertiesConf = ApplicationProperties.get();
-            HIVE_TABLE_TYPE_NAME = propertiesConf.getString("atlas.lineage.hive.table.type.name", "DataSet");
-            HIVE_PROCESS_TYPE_NAME = propertiesConf.getString("atlas.lineage.hive.process.type.name", "Process");
-            HIVE_PROCESS_INPUT_ATTRIBUTE_NAME = propertiesConf.getString("atlas.lineage.hive.process.inputs.name", "inputs");
-            HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME = propertiesConf.getString("atlas.lineage.hive.process.outputs.name", "outputs");
-
-            HIVE_TABLE_EXISTS_QUERY = propertiesConf.getString("atlas.lineage.hive.table.exists.query",
-                    "from " + HIVE_TABLE_TYPE_NAME + " where name=\"%s\"");
-        } catch (AtlasException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-
-    private final TitanGraph titanGraph;
-    private final DefaultGraphPersistenceStrategy graphPersistenceStrategy;
-    private final GraphBackedDiscoveryService discoveryService;
-
-    @Inject
-    HiveLineageService(GraphProvider<TitanGraph> graphProvider, MetadataRepository metadataRepository,
-            GraphBackedDiscoveryService discoveryService) throws DiscoveryException {
-        this.titanGraph = graphProvider.get();
-        this.graphPersistenceStrategy = new DefaultGraphPersistenceStrategy(metadataRepository);
-        this.discoveryService = discoveryService;
-    }
-
-    /**
-     * Return the lineage outputs for the given tableName.
-     *
-     * @param tableName tableName
-     * @return Lineage Outputs as JSON
-     */
-    @Override
-    @GraphTransaction
-    public String getOutputs(String tableName) throws AtlasException {
-        LOG.info("Fetching lineage outputs for tableName={}", tableName);
-        ParamChecker.notEmpty(tableName, "table name cannot be null");
-        validateTableExists(tableName);
-
-        HiveWhereUsedQuery outputsQuery =
-                new HiveWhereUsedQuery(HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME,
-                        HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(),
-                        SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph);
-
-        Expressions.Expression expression = outputsQuery.expr();
-        LOG.debug("Expression is [" + expression.toString() + "]");
-        try {
-            return discoveryService.evaluate(expression).toJson();
-        } catch (Exception e) { // unable to catch ExpressionException
-            throw new DiscoveryException("Invalid expression [" + expression.toString() + "]", e);
-        }
-    }
-
-    /**
-     * Return the lineage outputs graph for the given tableName.
-     *
-     * @param tableName tableName
-     * @return Outputs Graph as JSON
-     */
-    @Override
-    @GraphTransaction
-    public String getOutputsGraph(String tableName) throws AtlasException {
-        LOG.info("Fetching lineage outputs graph for tableName={}", tableName);
-        ParamChecker.notEmpty(tableName, "table name cannot be null");
-        validateTableExists(tableName);
-
-        HiveWhereUsedQuery outputsQuery =
-                new HiveWhereUsedQuery(HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME,
-                        HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(),
-                        SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph);
-        return outputsQuery.graph().toInstanceJson();
-    }
-
-    /**
-     * Return the lineage inputs for the given tableName.
-     *
-     * @param tableName tableName
-     * @return Lineage Inputs as JSON
-     */
-    @Override
-    @GraphTransaction
-    public String getInputs(String tableName) throws AtlasException {
-        LOG.info("Fetching lineage inputs for tableName={}", tableName);
-        ParamChecker.notEmpty(tableName, "table name cannot be null");
-        validateTableExists(tableName);
-
-        HiveLineageQuery inputsQuery = new HiveLineageQuery(HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME,
-                HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(),
-                SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph);
-
-        Expressions.Expression expression = inputsQuery.expr();
-        LOG.debug("Expression is [" + expression.toString() + "]");
-        try {
-            return discoveryService.evaluate(expression).toJson();
-        } catch (Exception e) { // unable to catch ExpressionException
-            throw new DiscoveryException("Invalid expression [" + expression.toString() + "]", e);
-        }
-    }
-
-    /**
-     * Return the lineage inputs graph for the given tableName.
-     *
-     * @param tableName tableName
-     * @return Inputs Graph as JSON
-     */
-    @Override
-    @GraphTransaction
-    public String getInputsGraph(String tableName) throws AtlasException {
-        LOG.info("Fetching lineage inputs graph for tableName={}", tableName);
-        ParamChecker.notEmpty(tableName, "table name cannot be null");
-        validateTableExists(tableName);
-
-        HiveLineageQuery inputsQuery = new HiveLineageQuery(HIVE_TABLE_TYPE_NAME, tableName, HIVE_PROCESS_TYPE_NAME,
-                HIVE_PROCESS_INPUT_ATTRIBUTE_NAME, HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME, Option.empty(),
-                SELECT_ATTRIBUTES, true, graphPersistenceStrategy, titanGraph);
-        return inputsQuery.graph().toInstanceJson();
-    }
-
-    /**
-     * Return the schema for the given tableName.
-     *
-     * @param tableName tableName
-     * @return Schema as JSON
-     */
-    @Override
-    @GraphTransaction
-    public String getSchema(String tableName) throws AtlasException {
-        LOG.info("Fetching schema for tableName={}", tableName);
-        ParamChecker.notEmpty(tableName, "table name cannot be null");
-        String typeName = validateTableExists(tableName);
-
-        final String schemaQuery =
-                String.format(propertiesConf.getString(HIVE_TABLE_SCHEMA_QUERY_PREFIX + typeName), tableName);
-        return discoveryService.searchByDSL(schemaQuery);
-    }
-
-    /**
-     * Validate if indeed this is a table type and exists.
-     *
-     * @param tableName table name
-     */
-    private String validateTableExists(String tableName) throws AtlasException {
-        final String tableExistsQuery = String.format(HIVE_TABLE_EXISTS_QUERY, tableName);
-        GremlinQueryResult queryResult = discoveryService.evaluate(tableExistsQuery);
-        if (!(queryResult.rows().length() > 0)) {
-            throw new EntityNotFoundException(tableName + " does not exist");
-        }
-
-        ReferenceableInstance referenceable = (ReferenceableInstance)queryResult.rows().apply(0);
-        return referenceable.getTypeName();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala
----------------------------------------------------------------------
diff --git a/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala b/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala
index 05dc6a4..c4621cd 100755
--- a/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala
+++ b/repository/src/main/scala/org/apache/atlas/query/ClosureQuery.scala
@@ -256,21 +256,21 @@ trait SingleInstanceClosureQuery[T] extends ClosureQuery {
  * @param persistenceStrategy as needed to evaluate the Closure Query.
  * @param g as needed to evaluate the Closure Query.
  */
-case class HiveLineageQuery(tableTypeName : String,
-                           tableName : String,
-                        ctasTypeName : String,
-                      ctasInputTableAttribute : String,
-                      ctasOutputTableAttribute : String,
-                      depth : Option[Int],
-                      selectAttributes : Option[List[String]],
-                      withPath : Boolean,
-                        persistenceStrategy: GraphPersistenceStrategies,
-                        g: TitanGraph
+case class InputLineageClosureQuery(tableTypeName : String,
+                                    attributeToSelectInstance : String,
+                                    tableName : String,
+                                    ctasTypeName : String,
+                                    ctasInputTableAttribute : String,
+                                    ctasOutputTableAttribute : String,
+                                    depth : Option[Int],
+                                    selectAttributes : Option[List[String]],
+                                    withPath : Boolean,
+                                    persistenceStrategy: GraphPersistenceStrategies,
+                                    g: TitanGraph
                         ) extends SingleInstanceClosureQuery[String] {
 
   val closureType : String = tableTypeName
 
-  val attributeToSelectInstance = "name"
   val attributeTyp = DataTypes.STRING_TYPE
 
   val instanceValue = tableName
@@ -296,21 +296,21 @@ case class HiveLineageQuery(tableTypeName : String,
  * @param persistenceStrategy as needed to evaluate the Closure Query.
  * @param g as needed to evaluate the Closure Query.
  */
-case class HiveWhereUsedQuery(tableTypeName : String,
-                              tableName : String,
-                            ctasTypeName : String,
-                            ctasInputTableAttribute : String,
-                            ctasOutputTableAttribute : String,
-                            depth : Option[Int],
-                            selectAttributes : Option[List[String]],
-                            withPath : Boolean,
-                            persistenceStrategy: GraphPersistenceStrategies,
-                            g: TitanGraph
+case class OutputLineageClosureQuery(tableTypeName : String,
+                                     attributeToSelectInstance : String,
+                                     tableName : String,
+                                     ctasTypeName : String,
+                                     ctasInputTableAttribute : String,
+                                     ctasOutputTableAttribute : String,
+                                     depth : Option[Int],
+                                     selectAttributes : Option[List[String]],
+                                     withPath : Boolean,
+                                     persistenceStrategy: GraphPersistenceStrategies,
+                                     g: TitanGraph
                              ) extends SingleInstanceClosureQuery[String] {
 
   val closureType : String = tableTypeName
 
-  val attributeToSelectInstance = "name"
   val attributeTyp = DataTypes.STRING_TYPE
 
   val instanceValue = tableName

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java b/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java
deleted file mode 100644
index 40f0d91..0000000
--- a/repository/src/test/java/org/apache/atlas/BaseHiveRepositoryTest.java
+++ /dev/null
@@ -1,377 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.thinkaurelius.titan.core.TitanGraph;
-import com.thinkaurelius.titan.core.util.TitanCleanup;
-
-import org.apache.atlas.repository.MetadataRepository;
-import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
-import org.apache.atlas.repository.graph.GraphProvider;
-import org.apache.atlas.services.MetadataService;
-import org.apache.atlas.typesystem.ITypedReferenceableInstance;
-import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.TypesDef;
-import org.apache.atlas.typesystem.json.TypesSerialization;
-import org.apache.atlas.typesystem.persistence.Id;
-import org.apache.atlas.typesystem.types.AttributeDefinition;
-import org.apache.atlas.typesystem.types.ClassType;
-import org.apache.atlas.typesystem.types.DataTypes;
-import org.apache.atlas.typesystem.types.EnumTypeDefinition;
-import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
-import org.apache.atlas.typesystem.types.IDataType;
-import org.apache.atlas.typesystem.types.Multiplicity;
-import org.apache.atlas.typesystem.types.StructTypeDefinition;
-import org.apache.atlas.typesystem.types.TraitType;
-import org.apache.atlas.typesystem.types.TypeSystem;
-import org.apache.atlas.typesystem.types.utils.TypesUtil;
-import org.testng.annotations.Guice;
-
-import javax.inject.Inject;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-/**
- *  Base Class to set up hive types and instances for tests
- */
-@Guice(modules = RepositoryMetadataModule.class)
-public class BaseHiveRepositoryTest {
-
-    @Inject
-    protected MetadataService metadataService;
-
-    @Inject
-    protected MetadataRepository repository;
-
-    @Inject
-    protected GraphProvider<TitanGraph> graphProvider;
-
-    protected void setUp() throws Exception {
-        setUpTypes();
-        new GraphBackedSearchIndexer(graphProvider);
-        RequestContext.createContext();
-        setupInstances();
-        TestUtils.dumpGraph(graphProvider.get());
-    }
-
-    protected void tearDown() throws Exception {
-        TypeSystem.getInstance().reset();
-        try {
-            graphProvider.get().shutdown();
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-        try {
-            TitanCleanup.clear(graphProvider.get());
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-    private void setUpTypes() throws Exception {
-        TypesDef typesDef = createTypeDefinitions();
-        String typesAsJSON = TypesSerialization.toJson(typesDef);
-        metadataService.createType(typesAsJSON);
-    }
-
-    private static final String DATABASE_TYPE = "hive_db";
-    private static final String HIVE_TABLE_TYPE = "hive_table";
-    private static final String COLUMN_TYPE = "hive_column";
-    private static final String HIVE_PROCESS_TYPE = "hive_process";
-    private static final String STORAGE_DESC_TYPE = "StorageDesc";
-    private static final String VIEW_TYPE = "View";
-    private static final String PARTITION_TYPE = "hive_partition";
-
-    TypesDef createTypeDefinitions() {
-        HierarchicalTypeDefinition<ClassType> dbClsDef = TypesUtil
-            .createClassTypeDef(DATABASE_TYPE, null, attrDef("name", DataTypes.STRING_TYPE),
-                attrDef("description", DataTypes.STRING_TYPE), attrDef("locationUri", DataTypes.STRING_TYPE),
-                attrDef("owner", DataTypes.STRING_TYPE), attrDef("createTime", DataTypes.LONG_TYPE));
-
-        HierarchicalTypeDefinition<ClassType> columnClsDef = TypesUtil
-            .createClassTypeDef(COLUMN_TYPE, null, attrDef("name", DataTypes.STRING_TYPE),
-                attrDef("dataType", DataTypes.STRING_TYPE), attrDef("comment", DataTypes.STRING_TYPE));
-
-        HierarchicalTypeDefinition<ClassType> storageDescClsDef = TypesUtil
-            .createClassTypeDef(STORAGE_DESC_TYPE, null,
-                attrDef("location", DataTypes.STRING_TYPE),
-                attrDef("inputFormat", DataTypes.STRING_TYPE), attrDef("outputFormat", DataTypes.STRING_TYPE),
-                attrDef("compressed", DataTypes.STRING_TYPE, Multiplicity.REQUIRED, false, null));
-
-
-        HierarchicalTypeDefinition<ClassType> tblClsDef = TypesUtil
-            .createClassTypeDef(HIVE_TABLE_TYPE, ImmutableSet.of("DataSet"),
-                attrDef("owner", DataTypes.STRING_TYPE),
-                attrDef("createTime", DataTypes.DATE_TYPE),
-                attrDef("lastAccessTime", DataTypes.LONG_TYPE), attrDef("tableType", DataTypes.STRING_TYPE),
-                attrDef("temporary", DataTypes.BOOLEAN_TYPE),
-                new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null),
-                // todo - uncomment this, something is broken
-                new AttributeDefinition("sd", STORAGE_DESC_TYPE,
-                                                Multiplicity.REQUIRED, true, null),
-                new AttributeDefinition("columns", DataTypes.arrayTypeName(COLUMN_TYPE),
-                    Multiplicity.COLLECTION, true, null));
-
-        HierarchicalTypeDefinition<ClassType> loadProcessClsDef = TypesUtil
-            .createClassTypeDef(HIVE_PROCESS_TYPE, ImmutableSet.of("Process"),
-                attrDef("userName", DataTypes.STRING_TYPE), attrDef("startTime", DataTypes.LONG_TYPE),
-                attrDef("endTime", DataTypes.LONG_TYPE),
-                attrDef("queryText", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
-                attrDef("queryPlan", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
-                attrDef("queryId", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
-                attrDef("queryGraph", DataTypes.STRING_TYPE, Multiplicity.REQUIRED));
-
-        HierarchicalTypeDefinition<ClassType> viewClsDef = TypesUtil
-            .createClassTypeDef(VIEW_TYPE, null, attrDef("name", DataTypes.STRING_TYPE),
-                new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null),
-                new AttributeDefinition("inputTables", DataTypes.arrayTypeName(HIVE_TABLE_TYPE),
-                    Multiplicity.COLLECTION, false, null));
-
-        AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
-            new AttributeDefinition("values", DataTypes.arrayTypeName(DataTypes.STRING_TYPE.getName()),
-                Multiplicity.OPTIONAL, false, null),
-            new AttributeDefinition("table", HIVE_TABLE_TYPE, Multiplicity.REQUIRED, false, null),
-            };
-        HierarchicalTypeDefinition<ClassType> partClsDef =
-            new HierarchicalTypeDefinition<>(ClassType.class, PARTITION_TYPE, null, null,
-                attributeDefinitions);
-
-        HierarchicalTypeDefinition<TraitType> dimTraitDef = TypesUtil.createTraitTypeDef("Dimension", null);
-
-        HierarchicalTypeDefinition<TraitType> factTraitDef = TypesUtil.createTraitTypeDef("Fact", null);
-
-        HierarchicalTypeDefinition<TraitType> metricTraitDef = TypesUtil.createTraitTypeDef("Metric", null);
-
-        HierarchicalTypeDefinition<TraitType> etlTraitDef = TypesUtil.createTraitTypeDef("ETL", null);
-
-        HierarchicalTypeDefinition<TraitType> piiTraitDef = TypesUtil.createTraitTypeDef("PII", null);
-
-        HierarchicalTypeDefinition<TraitType> jdbcTraitDef = TypesUtil.createTraitTypeDef("JdbcAccess", null);
-
-        HierarchicalTypeDefinition<TraitType> logTraitDef = TypesUtil.createTraitTypeDef("Log Data", null);
-
-        return TypesUtil.getTypesDef(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.<StructTypeDefinition>of(),
-            ImmutableList.of(dimTraitDef, factTraitDef, piiTraitDef, metricTraitDef, etlTraitDef, jdbcTraitDef, logTraitDef),
-            ImmutableList.of(dbClsDef, storageDescClsDef, columnClsDef, tblClsDef, loadProcessClsDef, viewClsDef, partClsDef));
-    }
-
-    AttributeDefinition attrDef(String name, IDataType dT) {
-        return attrDef(name, dT, Multiplicity.OPTIONAL, false, null);
-    }
-
-    AttributeDefinition attrDef(String name, IDataType dT, Multiplicity m) {
-        return attrDef(name, dT, m, false, null);
-    }
-
-    AttributeDefinition attrDef(String name, IDataType dT, Multiplicity m, boolean isComposite,
-        String reverseAttributeName) {
-        Preconditions.checkNotNull(name);
-        Preconditions.checkNotNull(dT);
-        return new AttributeDefinition(name, dT.getName(), m, isComposite, reverseAttributeName);
-    }
-
-    private void setupInstances() throws Exception {
-        Id salesDB = database("Sales", "Sales Database", "John ETL", "hdfs://host:8000/apps/warehouse/sales");
-
-        Referenceable sd =
-            storageDescriptor("hdfs://host:8000/apps/warehouse/sales", "TextInputFormat", "TextOutputFormat", true, ImmutableList.of(
-                column("time_id", "int", "time id")));
-
-        List<Referenceable> salesFactColumns = ImmutableList
-            .of(column("time_id", "int", "time id"),
-                column("product_id", "int", "product id"),
-                column("customer_id", "int", "customer id", "PII"),
-                column("sales", "double", "product id", "Metric"));
-
-        Id salesFact = table("sales_fact", "sales fact table", salesDB, sd, "Joe", "Managed", salesFactColumns, "Fact");
-
-        List<Referenceable> logFactColumns = ImmutableList
-            .of(column("time_id", "int", "time id"), column("app_id", "int", "app id"),
-                column("machine_id", "int", "machine id"), column("log", "string", "log data", "Log Data"));
-
-        List<Referenceable> timeDimColumns = ImmutableList
-            .of(column("time_id", "int", "time id"),
-                column("dayOfYear", "int", "day Of Year"),
-                column("weekDay", "int", "week Day"));
-
-        Id timeDim = table("time_dim", "time dimension table", salesDB, sd, "John Doe", "External", timeDimColumns,
-            "Dimension");
-
-        Id reportingDB =
-            database("Reporting", "reporting database", "Jane BI", "hdfs://host:8000/apps/warehouse/reporting");
-
-        Id salesFactDaily =
-            table("sales_fact_daily_mv", "sales fact daily materialized view", reportingDB, sd, "Joe BI", "Managed",
-                salesFactColumns, "Metric");
-
-        loadProcess("loadSalesDaily", "hive query for daily summary", "John ETL", ImmutableList.of(salesFact, timeDim),
-            ImmutableList.of(salesFactDaily), "create table as select ", "plan", "id", "graph", "ETL");
-
-        Id logDB = database("Logging", "logging database", "Tim ETL", "hdfs://host:8000/apps/warehouse/logging");
-
-        Id loggingFactDaily =
-            table("log_fact_daily_mv", "log fact daily materialized view", logDB, sd, "Tim ETL", "Managed",
-                logFactColumns, "Log Data");
-
-        List<Referenceable> productDimColumns = ImmutableList
-            .of(column("product_id", "int", "product id"),
-                column("product_name", "string", "product name"),
-                column("brand_name", "int", "brand name"));
-
-        Id productDim =
-            table("product_dim", "product dimension table", salesDB, sd, "John Doe", "Managed", productDimColumns,
-                "Dimension");
-
-        view("product_dim_view", reportingDB, ImmutableList.of(productDim), "Dimension", "JdbcAccess");
-
-        List<Referenceable> customerDimColumns = ImmutableList.of(
-            column("customer_id", "int", "customer id", "PII"),
-            column("name", "string", "customer name", "PII"),
-            column("address", "string", "customer address", "PII"));
-
-        Id customerDim =
-            table("customer_dim", "customer dimension table", salesDB, sd, "fetl", "External", customerDimColumns,
-                "Dimension");
-
-        view("customer_dim_view", reportingDB, ImmutableList.of(customerDim), "Dimension", "JdbcAccess");
-
-        Id salesFactMonthly =
-            table("sales_fact_monthly_mv", "sales fact monthly materialized view", reportingDB, sd, "Jane BI",
-                "Managed", salesFactColumns, "Metric");
-
-        loadProcess("loadSalesMonthly", "hive query for monthly summary", "John ETL", ImmutableList.of(salesFactDaily),
-            ImmutableList.of(salesFactMonthly), "create table as select ", "plan", "id", "graph", "ETL");
-
-        Id loggingFactMonthly =
-            table("logging_fact_monthly_mv", "logging fact monthly materialized view", logDB, sd, "Tim ETL",
-                "Managed", logFactColumns, "Log Data");
-
-        loadProcess("loadLogsMonthly", "hive query for monthly summary", "Tim ETL", ImmutableList.of(loggingFactDaily),
-            ImmutableList.of(loggingFactMonthly), "create table as select ", "plan", "id", "graph", "ETL");
-
-        partition(new ArrayList() {{ add("2015-01-01"); }}, salesFactDaily);
-    }
-
-    Id database(String name, String description, String owner, String locationUri, String... traitNames)
-        throws Exception {
-        Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames);
-        referenceable.set("name", name);
-        referenceable.set("description", description);
-        referenceable.set("owner", owner);
-        referenceable.set("locationUri", locationUri);
-        referenceable.set("createTime", System.currentTimeMillis());
-
-        ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, DATABASE_TYPE);
-        return createInstance(referenceable, clsType);
-    }
-
-    Referenceable storageDescriptor(String location, String inputFormat, String outputFormat, boolean compressed, List<Referenceable> columns)
-        throws Exception {
-        Referenceable referenceable = new Referenceable(STORAGE_DESC_TYPE);
-        referenceable.set("location", location);
-        referenceable.set("inputFormat", inputFormat);
-        referenceable.set("outputFormat", outputFormat);
-        referenceable.set("compressed", compressed);
-        referenceable.set("cols", columns);
-
-        return referenceable;
-    }
-
-    Referenceable column(String name, String dataType, String comment, String... traitNames) throws Exception {
-        Referenceable referenceable = new Referenceable(COLUMN_TYPE, traitNames);
-        referenceable.set("name", name);
-        referenceable.set("dataType", dataType);
-        referenceable.set("comment", comment);
-
-        return referenceable;
-    }
-
-    Id table(String name, String description, Id dbId, Referenceable sd, String owner, String tableType,
-        List<Referenceable> columns, String... traitNames) throws Exception {
-        Referenceable referenceable = new Referenceable(HIVE_TABLE_TYPE, traitNames);
-        referenceable.set("name", name);
-        referenceable.set("description", description);
-        referenceable.set("owner", owner);
-        referenceable.set("tableType", tableType);
-        referenceable.set("temporary", false);
-        referenceable.set("createTime", new Date(System.currentTimeMillis()));
-        referenceable.set("lastAccessTime", System.currentTimeMillis());
-        referenceable.set("retention", System.currentTimeMillis());
-
-        referenceable.set("db", dbId);
-        // todo - uncomment this, something is broken
-        referenceable.set("sd", sd);
-        referenceable.set("columns", columns);
-
-        ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, HIVE_TABLE_TYPE);
-        return createInstance(referenceable, clsType);
-    }
-
-    Id loadProcess(String name, String description, String user, List<Id> inputTables, List<Id> outputTables,
-        String queryText, String queryPlan, String queryId, String queryGraph, String... traitNames)
-        throws Exception {
-        Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames);
-        referenceable.set(AtlasClient.NAME, name);
-        referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
-        referenceable.set("description", description);
-        referenceable.set("user", user);
-        referenceable.set("startTime", System.currentTimeMillis());
-        referenceable.set("endTime", System.currentTimeMillis() + 10000);
-
-        referenceable.set("inputs", inputTables);
-        referenceable.set("outputs", outputTables);
-
-        referenceable.set("queryText", queryText);
-        referenceable.set("queryPlan", queryPlan);
-        referenceable.set("queryId", queryId);
-        referenceable.set("queryGraph", queryGraph);
-
-        ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, HIVE_PROCESS_TYPE);
-        return createInstance(referenceable, clsType);
-    }
-
-    Id view(String name, Id dbId, List<Id> inputTables, String... traitNames) throws Exception {
-        Referenceable referenceable = new Referenceable(VIEW_TYPE, traitNames);
-        referenceable.set("name", name);
-        referenceable.set("db", dbId);
-
-        referenceable.set("inputTables", inputTables);
-        ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, VIEW_TYPE);
-        return createInstance(referenceable, clsType);
-    }
-
-    Id partition(List<String> values, Id table, String... traitNames) throws Exception {
-        Referenceable referenceable = new Referenceable(PARTITION_TYPE, traitNames);
-        referenceable.set("values", values);
-        referenceable.set("table", table);
-        ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, PARTITION_TYPE);
-        return createInstance(referenceable, clsType);
-    }
-    private Id createInstance(Referenceable referenceable, ClassType clsType) throws Exception {
-        ITypedReferenceableInstance typedInstance = clsType.convert(referenceable, Multiplicity.REQUIRED);
-        List<String> guids = repository.createEntities(typedInstance);
-
-        // return the reference to created instance with guid
-        return new Id(guids.get(guids.size() - 1), 0, referenceable.getTypeName());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/b65dd91c/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java b/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java
new file mode 100644
index 0000000..d1f9430
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.thinkaurelius.titan.core.TitanGraph;
+import com.thinkaurelius.titan.core.util.TitanCleanup;
+
+import org.apache.atlas.repository.MetadataRepository;
+import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
+import org.apache.atlas.repository.graph.GraphProvider;
+import org.apache.atlas.services.MetadataService;
+import org.apache.atlas.typesystem.ITypedReferenceableInstance;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.typesystem.TypesDef;
+import org.apache.atlas.typesystem.json.TypesSerialization;
+import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.atlas.typesystem.types.AttributeDefinition;
+import org.apache.atlas.typesystem.types.ClassType;
+import org.apache.atlas.typesystem.types.DataTypes;
+import org.apache.atlas.typesystem.types.EnumTypeDefinition;
+import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition;
+import org.apache.atlas.typesystem.types.IDataType;
+import org.apache.atlas.typesystem.types.Multiplicity;
+import org.apache.atlas.typesystem.types.StructTypeDefinition;
+import org.apache.atlas.typesystem.types.TraitType;
+import org.apache.atlas.typesystem.types.TypeSystem;
+import org.apache.atlas.typesystem.types.utils.TypesUtil;
+import org.testng.annotations.Guice;
+
+import javax.inject.Inject;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/**
+ *  Base Class to set up hive types and instances for tests
+ */
+@Guice(modules = RepositoryMetadataModule.class)
+public class BaseRepositoryTest {
+
+    @Inject
+    protected MetadataService metadataService;
+
+    @Inject
+    protected MetadataRepository repository;
+
+    @Inject
+    protected GraphProvider<TitanGraph> graphProvider;
+
+    protected void setUp() throws Exception {
+        setUpTypes();
+        new GraphBackedSearchIndexer(graphProvider);
+        RequestContext.createContext();
+        setupInstances();
+        TestUtils.dumpGraph(graphProvider.get());
+    }
+
+    protected void tearDown() throws Exception {
+        TypeSystem.getInstance().reset();
+        try {
+            graphProvider.get().shutdown();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        try {
+            TitanCleanup.clear(graphProvider.get());
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    private void setUpTypes() throws Exception {
+        TypesDef typesDef = createTypeDefinitions();
+        String typesAsJSON = TypesSerialization.toJson(typesDef);
+        metadataService.createType(typesAsJSON);
+    }
+
+    protected static final String DATABASE_TYPE = "hive_db";
+    protected static final String HIVE_TABLE_TYPE = "hive_table";
+    private static final String COLUMN_TYPE = "hive_column";
+    private static final String HIVE_PROCESS_TYPE = "hive_process";
+    private static final String STORAGE_DESC_TYPE = "StorageDesc";
+    private static final String VIEW_TYPE = "View";
+    private static final String PARTITION_TYPE = "hive_partition";
+
+    TypesDef createTypeDefinitions() {
+        HierarchicalTypeDefinition<ClassType> dbClsDef = TypesUtil
+            .createClassTypeDef(DATABASE_TYPE, null,
+                TypesUtil.createUniqueRequiredAttrDef("name", DataTypes.STRING_TYPE),
+                attrDef("description", DataTypes.STRING_TYPE), attrDef("locationUri", DataTypes.STRING_TYPE),
+                attrDef("owner", DataTypes.STRING_TYPE), attrDef("createTime", DataTypes.LONG_TYPE));
+
+        HierarchicalTypeDefinition<ClassType> columnClsDef = TypesUtil
+            .createClassTypeDef(COLUMN_TYPE, null, attrDef("name", DataTypes.STRING_TYPE),
+                attrDef("dataType", DataTypes.STRING_TYPE), attrDef("comment", DataTypes.STRING_TYPE));
+
+        HierarchicalTypeDefinition<ClassType> storageDescClsDef = TypesUtil
+            .createClassTypeDef(STORAGE_DESC_TYPE, null,
+                attrDef("location", DataTypes.STRING_TYPE),
+                attrDef("inputFormat", DataTypes.STRING_TYPE), attrDef("outputFormat", DataTypes.STRING_TYPE),
+                attrDef("compressed", DataTypes.STRING_TYPE, Multiplicity.REQUIRED, false, null));
+
+
+        HierarchicalTypeDefinition<ClassType> tblClsDef = TypesUtil
+            .createClassTypeDef(HIVE_TABLE_TYPE, ImmutableSet.of("DataSet"),
+                attrDef("owner", DataTypes.STRING_TYPE),
+                attrDef("createTime", DataTypes.DATE_TYPE),
+                attrDef("lastAccessTime", DataTypes.LONG_TYPE), attrDef("tableType", DataTypes.STRING_TYPE),
+                attrDef("temporary", DataTypes.BOOLEAN_TYPE),
+                new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null),
+                // todo - uncomment this, something is broken
+                new AttributeDefinition("sd", STORAGE_DESC_TYPE, Multiplicity.REQUIRED, true, null),
+                new AttributeDefinition("columns", DataTypes.arrayTypeName(COLUMN_TYPE),
+                    Multiplicity.COLLECTION, true, null));
+
+        HierarchicalTypeDefinition<ClassType> loadProcessClsDef = TypesUtil
+            .createClassTypeDef(HIVE_PROCESS_TYPE, ImmutableSet.of("Process"),
+                attrDef("userName", DataTypes.STRING_TYPE), attrDef("startTime", DataTypes.LONG_TYPE),
+                attrDef("endTime", DataTypes.LONG_TYPE),
+                attrDef("queryText", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
+                attrDef("queryPlan", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
+                attrDef("queryId", DataTypes.STRING_TYPE, Multiplicity.REQUIRED),
+                attrDef("queryGraph", DataTypes.STRING_TYPE, Multiplicity.REQUIRED));
+
+        HierarchicalTypeDefinition<ClassType> viewClsDef = TypesUtil
+            .createClassTypeDef(VIEW_TYPE, null, attrDef("name", DataTypes.STRING_TYPE),
+                new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null),
+                new AttributeDefinition("inputTables", DataTypes.arrayTypeName(HIVE_TABLE_TYPE),
+                    Multiplicity.COLLECTION, false, null));
+
+        AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{
+            new AttributeDefinition("values", DataTypes.arrayTypeName(DataTypes.STRING_TYPE.getName()),
+                Multiplicity.OPTIONAL, false, null),
+            new AttributeDefinition("table", HIVE_TABLE_TYPE, Multiplicity.REQUIRED, false, null),
+            };
+        HierarchicalTypeDefinition<ClassType> partClsDef =
+            new HierarchicalTypeDefinition<>(ClassType.class, PARTITION_TYPE, null, null,
+                attributeDefinitions);
+
+        HierarchicalTypeDefinition<TraitType> dimTraitDef = TypesUtil.createTraitTypeDef("Dimension", null);
+
+        HierarchicalTypeDefinition<TraitType> factTraitDef = TypesUtil.createTraitTypeDef("Fact", null);
+
+        HierarchicalTypeDefinition<TraitType> metricTraitDef = TypesUtil.createTraitTypeDef("Metric", null);
+
+        HierarchicalTypeDefinition<TraitType> etlTraitDef = TypesUtil.createTraitTypeDef("ETL", null);
+
+        HierarchicalTypeDefinition<TraitType> piiTraitDef = TypesUtil.createTraitTypeDef("PII", null);
+
+        HierarchicalTypeDefinition<TraitType> jdbcTraitDef = TypesUtil.createTraitTypeDef("JdbcAccess", null);
+
+        HierarchicalTypeDefinition<TraitType> logTraitDef = TypesUtil.createTraitTypeDef("Log Data", null);
+
+        return TypesUtil.getTypesDef(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.<StructTypeDefinition>of(),
+            ImmutableList.of(dimTraitDef, factTraitDef, piiTraitDef, metricTraitDef, etlTraitDef, jdbcTraitDef, logTraitDef),
+            ImmutableList.of(dbClsDef, storageDescClsDef, columnClsDef, tblClsDef, loadProcessClsDef, viewClsDef, partClsDef));
+    }
+
+    AttributeDefinition attrDef(String name, IDataType dT) {
+        return attrDef(name, dT, Multiplicity.OPTIONAL, false, null);
+    }
+
+    AttributeDefinition attrDef(String name, IDataType dT, Multiplicity m) {
+        return attrDef(name, dT, m, false, null);
+    }
+
+    AttributeDefinition attrDef(String name, IDataType dT, Multiplicity m, boolean isComposite,
+        String reverseAttributeName) {
+        Preconditions.checkNotNull(name);
+        Preconditions.checkNotNull(dT);
+        return new AttributeDefinition(name, dT.getName(), m, isComposite, reverseAttributeName);
+    }
+
+    private void setupInstances() throws Exception {
+        Id salesDB = database("Sales", "Sales Database", "John ETL", "hdfs://host:8000/apps/warehouse/sales");
+
+        Referenceable sd =
+            storageDescriptor("hdfs://host:8000/apps/warehouse/sales", "TextInputFormat", "TextOutputFormat", true, ImmutableList.of(
+                column("time_id", "int", "time id")));
+
+        List<Referenceable> salesFactColumns = ImmutableList
+            .of(column("time_id", "int", "time id"),
+                column("product_id", "int", "product id"),
+                column("customer_id", "int", "customer id", "PII"),
+                column("sales", "double", "product id", "Metric"));
+
+        Id salesFact = table("sales_fact", "sales fact table", salesDB, sd, "Joe", "Managed", salesFactColumns, "Fact");
+
+        List<Referenceable> logFactColumns = ImmutableList
+            .of(column("time_id", "int", "time id"), column("app_id", "int", "app id"),
+                column("machine_id", "int", "machine id"), column("log", "string", "log data", "Log Data"));
+
+        List<Referenceable> timeDimColumns = ImmutableList
+            .of(column("time_id", "int", "time id"),
+                column("dayOfYear", "int", "day Of Year"),
+                column("weekDay", "int", "week Day"));
+
+        Id timeDim = table("time_dim", "time dimension table", salesDB, sd, "John Doe", "External", timeDimColumns,
+            "Dimension");
+
+        Id reportingDB =
+            database("Reporting", "reporting database", "Jane BI", "hdfs://host:8000/apps/warehouse/reporting");
+
+        Id salesFactDaily =
+            table("sales_fact_daily_mv", "sales fact daily materialized view", reportingDB, sd, "Joe BI", "Managed",
+                salesFactColumns, "Metric");
+
+        loadProcess("loadSalesDaily", "hive query for daily summary", "John ETL", ImmutableList.of(salesFact, timeDim),
+            ImmutableList.of(salesFactDaily), "create table as select ", "plan", "id", "graph", "ETL");
+
+        Id logDB = database("Logging", "logging database", "Tim ETL", "hdfs://host:8000/apps/warehouse/logging");
+
+        Id loggingFactDaily =
+            table("log_fact_daily_mv", "log fact daily materialized view", logDB, sd, "Tim ETL", "Managed",
+                logFactColumns, "Log Data");
+
+        List<Referenceable> productDimColumns = ImmutableList
+            .of(column("product_id", "int", "product id"),
+                column("product_name", "string", "product name"),
+                column("brand_name", "int", "brand name"));
+
+        Id productDim =
+            table("product_dim", "product dimension table", salesDB, sd, "John Doe", "Managed", productDimColumns,
+                "Dimension");
+
+        view("product_dim_view", reportingDB, ImmutableList.of(productDim), "Dimension", "JdbcAccess");
+
+        List<Referenceable> customerDimColumns = ImmutableList.of(
+            column("customer_id", "int", "customer id", "PII"),
+            column("name", "string", "customer name", "PII"),
+            column("address", "string", "customer address", "PII"));
+
+        Id customerDim =
+            table("customer_dim", "customer dimension table", salesDB, sd, "fetl", "External", customerDimColumns,
+                "Dimension");
+
+        view("customer_dim_view", reportingDB, ImmutableList.of(customerDim), "Dimension", "JdbcAccess");
+
+        Id salesFactMonthly =
+            table("sales_fact_monthly_mv", "sales fact monthly materialized view", reportingDB, sd, "Jane BI",
+                "Managed", salesFactColumns, "Metric");
+
+        loadProcess("loadSalesMonthly", "hive query for monthly summary", "John ETL", ImmutableList.of(salesFactDaily),
+            ImmutableList.of(salesFactMonthly), "create table as select ", "plan", "id", "graph", "ETL");
+
+        Id loggingFactMonthly =
+            table("logging_fact_monthly_mv", "logging fact monthly materialized view", logDB, sd, "Tim ETL",
+                "Managed", logFactColumns, "Log Data");
+
+        loadProcess("loadLogsMonthly", "hive query for monthly summary", "Tim ETL", ImmutableList.of(loggingFactDaily),
+            ImmutableList.of(loggingFactMonthly), "create table as select ", "plan", "id", "graph", "ETL");
+
+        partition(new ArrayList() {{ add("2015-01-01"); }}, salesFactDaily);
+    }
+
+    Id database(String name, String description, String owner, String locationUri, String... traitNames)
+        throws Exception {
+        Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames);
+        referenceable.set("name", name);
+        referenceable.set("description", description);
+        referenceable.set("owner", owner);
+        referenceable.set("locationUri", locationUri);
+        referenceable.set("createTime", System.currentTimeMillis());
+
+        ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, DATABASE_TYPE);
+        return createInstance(referenceable, clsType);
+    }
+
+    protected Referenceable storageDescriptor(String location, String inputFormat, String outputFormat, boolean compressed, List<Referenceable> columns)
+        throws Exception {
+        Referenceable referenceable = new Referenceable(STORAGE_DESC_TYPE);
+        referenceable.set("location", location);
+        referenceable.set("inputFormat", inputFormat);
+        referenceable.set("outputFormat", outputFormat);
+        referenceable.set("compressed", compressed);
+        referenceable.set("cols", columns);
+
+        return referenceable;
+    }
+
+    protected Referenceable column(String name, String dataType, String comment, String... traitNames) throws Exception {
+        Referenceable referenceable = new Referenceable(COLUMN_TYPE, traitNames);
+        referenceable.set("name", name);
+        referenceable.set("dataType", dataType);
+        referenceable.set("comment", comment);
+
+        return referenceable;
+    }
+
+    protected Id table(String name, String description, Id dbId, Referenceable sd, String owner, String tableType,
+        List<Referenceable> columns, String... traitNames) throws Exception {
+        Referenceable referenceable = new Referenceable(HIVE_TABLE_TYPE, traitNames);
+        referenceable.set("name", name);
+        referenceable.set("description", description);
+        referenceable.set("owner", owner);
+        referenceable.set("tableType", tableType);
+        referenceable.set("temporary", false);
+        referenceable.set("createTime", new Date(System.currentTimeMillis()));
+        referenceable.set("lastAccessTime", System.currentTimeMillis());
+        referenceable.set("retention", System.currentTimeMillis());
+
+        referenceable.set("db", dbId);
+        // todo - uncomment this, something is broken
+        referenceable.set("sd", sd);
+        referenceable.set("columns", columns);
+
+        ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, HIVE_TABLE_TYPE);
+        return createInstance(referenceable, clsType);
+    }
+
+    protected Id loadProcess(String name, String description, String user, List<Id> inputTables, List<Id> outputTables,
+        String queryText, String queryPlan, String queryId, String queryGraph, String... traitNames)
+        throws Exception {
+        Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames);
+        referenceable.set("name", name);
+        referenceable.set("qualifiedName", name);
+        referenceable.set("description", description);
+        referenceable.set("user", user);
+        referenceable.set("startTime", System.currentTimeMillis());
+        referenceable.set("endTime", System.currentTimeMillis() + 10000);
+
+        referenceable.set("inputs", inputTables);
+        referenceable.set("outputs", outputTables);
+
+        referenceable.set("queryText", queryText);
+        referenceable.set("queryPlan", queryPlan);
+        referenceable.set("queryId", queryId);
+        referenceable.set("queryGraph", queryGraph);
+
+        ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, HIVE_PROCESS_TYPE);
+        return createInstance(referenceable, clsType);
+    }
+
+    Id view(String name, Id dbId, List<Id> inputTables, String... traitNames) throws Exception {
+        Referenceable referenceable = new Referenceable(VIEW_TYPE, traitNames);
+        referenceable.set("name", name);
+        referenceable.set("db", dbId);
+
+        referenceable.set("inputTables", inputTables);
+        ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, VIEW_TYPE);
+        return createInstance(referenceable, clsType);
+    }
+
+    Id partition(List<String> values, Id table, String... traitNames) throws Exception {
+        Referenceable referenceable = new Referenceable(PARTITION_TYPE, traitNames);
+        referenceable.set("values", values);
+        referenceable.set("table", table);
+        ClassType clsType = TypeSystem.getInstance().getDataType(ClassType.class, PARTITION_TYPE);
+        return createInstance(referenceable, clsType);
+    }
+    private Id createInstance(Referenceable referenceable, ClassType clsType) throws Exception {
+        ITypedReferenceableInstance typedInstance = clsType.convert(referenceable, Multiplicity.REQUIRED);
+        List<String> guids = repository.createEntities(typedInstance);
+
+        // return the reference to created instance with guid
+        return new Id(guids.get(guids.size() - 1), 0, referenceable.getTypeName());
+    }
+}


Mime
View raw message