atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From suma...@apache.org
Subject [1/8] incubator-atlas git commit: ATLAS-694 Update Atlas code to use graph abstraction layer (jnhagelb via sumasai)
Date Mon, 10 Oct 2016 22:37:49 GMT
Repository: incubator-atlas
Updated Branches:
  refs/heads/master 3e4f28f50 -> d2d6ff7d1


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/repository/src/test/scala/org/apache/atlas/query/GremlinTest2.scala
----------------------------------------------------------------------
diff --git a/repository/src/test/scala/org/apache/atlas/query/GremlinTest2.scala b/repository/src/test/scala/org/apache/atlas/query/GremlinTest2.scala
index f65cedb..33513c5 100755
--- a/repository/src/test/scala/org/apache/atlas/query/GremlinTest2.scala
+++ b/repository/src/test/scala/org/apache/atlas/query/GremlinTest2.scala
@@ -18,38 +18,42 @@
 
 package org.apache.atlas.query
 
-import com.thinkaurelius.titan.core.TitanGraph
-import com.thinkaurelius.titan.core.util.TitanCleanup
+import org.apache.atlas.TestUtils
 import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy
-import org.apache.atlas.query.Expressions._
-import org.apache.atlas.repository.graph.{TitanGraphProvider, GraphBackedMetadataRepository}
+import org.apache.atlas.query.Expressions._class
+import org.apache.atlas.query.Expressions._trait
+import org.apache.atlas.query.Expressions.id
+import org.apache.atlas.repository.graph.GraphBackedMetadataRepository
+import org.apache.atlas.repository.graphdb.AtlasGraph
 import org.apache.atlas.typesystem.types.TypeSystem
-import org.testng.annotations.{Test,BeforeClass,AfterClass}
+import org.testng.annotations.AfterClass
+import org.testng.annotations.BeforeClass
+import org.testng.annotations.BeforeMethod
+import org.testng.annotations.Test
+import org.apache.atlas.repository.graph.AtlasGraphProvider
 
 class GremlinTest2 extends BaseGremlinTest {
 
-  var g: TitanGraph = null
-  var gProvider:TitanGraphProvider = null;
+  var g: AtlasGraph[_,_] = null
   var gp:GraphPersistenceStrategies = null;
 
+  @BeforeMethod
+  def resetRequestContext() {
+       TestUtils.resetRequestContext();
+  }
+  
   @BeforeClass
   def beforeAll() {
     TypeSystem.getInstance().reset()
     QueryTestsUtils.setupTypes
-    gProvider = new TitanGraphProvider();
-    gp = new DefaultGraphPersistenceStrategy(new GraphBackedMetadataRepository(gProvider, null))
-    g = QueryTestsUtils.setupTestGraph(gProvider)
+    var repo = new GraphBackedMetadataRepository(null);
+    gp = new DefaultGraphPersistenceStrategy(repo)
+    g = QueryTestsUtils.setupTestGraph(repo)
   }
 
   @AfterClass
-  def afterAll() {
-    g.shutdown()
-    try {
-      TitanCleanup.clear(g);
-    } catch {
-      case ex: Exception =>
-        print("Could not clear the graph ", ex);
-    }
+  def afterAll() {     
+    AtlasGraphProvider.cleanup();
   }
 
   @Test def testTraitSelect {
@@ -111,7 +115,7 @@ class GremlinTest2 extends BaseGremlinTest {
           "LoadProcess",
           "inputTables",
           "outputTable",
-        None, Some(List("name")), true, GraphPersistenceStrategy1, g).evaluate()
+        None, Some(List("name")), true, getPersistenceStrategy(g), g).evaluate()
     validateJson(r)
   }
 
@@ -120,7 +124,7 @@ class GremlinTest2 extends BaseGremlinTest {
       "LoadProcess",
       "inputTables",
       "outputTable",
-      None, Some(List("name")), true, GraphPersistenceStrategy1, g).graph
+      None, Some(List("name")), true, getPersistenceStrategy(g), g).graph
 
     println(r.toInstanceJson)
     //validateJson(r)
@@ -131,7 +135,7 @@ class GremlinTest2 extends BaseGremlinTest {
       "LoadProcess",
       "inputTables",
       "outputTable",
-      None, Some(List("name")), true, GraphPersistenceStrategy1, g).evaluate()
+      None, Some(List("name")), true, getPersistenceStrategy(g), g).evaluate()
     validateJson(r)
   }
 
@@ -140,8 +144,12 @@ class GremlinTest2 extends BaseGremlinTest {
       "LoadProcess",
       "inputTables",
       "outputTable",
-      None, Some(List("name")), true, GraphPersistenceStrategy1, g).graph
+      None, Some(List("name")), true, getPersistenceStrategy(g), g).graph
     println(r.toInstanceJson)
   }
+  
+  private def getPersistenceStrategy(g: AtlasGraph[_,_]) : GraphPersistenceStrategies = {
+      return GraphPersistenceStrategy1(g);
+  }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/repository/src/test/scala/org/apache/atlas/query/HiveTitanSample.scala
----------------------------------------------------------------------
diff --git a/repository/src/test/scala/org/apache/atlas/query/HiveTitanSample.scala b/repository/src/test/scala/org/apache/atlas/query/HiveTitanSample.scala
old mode 100755
new mode 100644
index 2dfb67a..fa0d341
--- a/repository/src/test/scala/org/apache/atlas/query/HiveTitanSample.scala
+++ b/repository/src/test/scala/org/apache/atlas/query/HiveTitanSample.scala
@@ -18,307 +18,156 @@
 
 package org.apache.atlas.query
 
-import java.io.File
-import java.util.concurrent.atomic.AtomicInteger
-import java.util.{Date, UUID}
-import javax.script.{Bindings, ScriptEngine, ScriptEngineManager}
+import scala.collection.JavaConversions._
 
-import com.thinkaurelius.titan.core.TitanGraph
-import org.apache.atlas.repository.Constants
-import org.apache.atlas.repository.graph.TitanGraphProvider
-import org.apache.atlas.TestUtils
-import org.apache.commons.io.FileUtils
 
-import scala.collection.mutable.ArrayBuffer
+import org.apache.atlas.typesystem.ITypedReferenceableInstance
+import org.apache.atlas.typesystem.json.TypedReferenceableInstanceSerializer
+import org.apache.atlas.utils.HiveModel.Column
+import org.apache.atlas.utils.HiveModel.DB
+import org.apache.atlas.utils.HiveModel.HiveOrder
+import org.apache.atlas.utils.HiveModel.LoadProcess
+import org.apache.atlas.utils.HiveModel.Partition
+import org.apache.atlas.utils.HiveModel.StorageDescriptor
+import org.apache.atlas.utils.HiveModel.Table
+import org.apache.atlas.utils.HiveModel.View
+import scala.collection.mutable.Buffer
 
-object HiveTitanSample {
-
-    private var nextVertexId: AtomicInteger = new AtomicInteger(0)
-    private var nextEdgeId: AtomicInteger = new AtomicInteger(1000)
-
-    trait Vertex {
-        val _id: String
-
-        def id = _id
-        val __version = 0
-        val __guid = s"""${UUID.randomUUID()}""".stripMargin
-
-        def addEdge(to: Vertex, label: String, edges: ArrayBuffer[String]): Int = {
-            val edgeId = nextEdgeId.incrementAndGet();
-            edges +=
-                s"""{"_id" : "${edgeId}", "_type" : "edge", "_inV" : "${to.id}", "_outV" : "$id", "_label" : "$label"}"""
-            edgeId
-        }
-
-        def toGSon(vertices: ArrayBuffer[String],
-                   edges: ArrayBuffer[String]): Unit = {
-
-            val sb = new StringBuilder
-            sb.append( s"""{"${Constants.ENTITY_TYPE_PROPERTY_KEY}" : "${this.getClass.getSimpleName}", "_type" : "vertex"""")
-
-            this.getClass.getDeclaredFields filter (_.getName != "traits") foreach { f =>
-                f.setAccessible(true)
-                val fV = f.get(this)
-                val convertedVal = fV match {
-                    case _: String => s""""$fV""""
-                    case ls: List[_] if isPrimitiveType(ls) =>
-                            s"""["${ls.mkString(",")}"]"""
-                    case d: Date => d.getTime
-                    case _ => fV
-                }
-
-                convertedVal match {
-                    case x: Vertex => addEdge(x, s"__${this.getClass.getSimpleName}.${f.getName}", edges)
-                    case l: List[_] => val edgeList = l.map(x =>
-                        s""""${addEdge(x.asInstanceOf[Vertex], s"__${this.getClass.getSimpleName}.${f.getName}", edges)}""""
-                    )
-                    if(l.head.isInstanceOf[Struct]) {
-                        sb.append( s""", "${this.getClass.getSimpleName}.${f.getName}" : ${edgeList.mkString("[", ",", "]")}""")
-                    }
-                    case _ => sb.append( s""", "${f.getName}" : $convertedVal""")
-                        sb.append( s""", "${this.getClass.getSimpleName}.${f.getName}" : $convertedVal""")
-                }
-            }
-
-            this.getClass.getDeclaredFields filter (_.getName == "traits") foreach { f =>
-                f.setAccessible(true)
-                var traits = f.get(this).asInstanceOf[Option[List[Trait]]]
-
-                if (traits.isDefined) {
-                    val fV = traits.get.map(_.getClass.getSimpleName).mkString(",")
-                    sb.append( s""", "${Constants.TRAIT_NAMES_PROPERTY_KEY}" : "$fV"""")
-                }
-            }
-
-            sb.append("}")
-            vertices += sb.toString()
-        }
-
-        def isPrimitiveType(ls: List[_]) : Boolean = {
-            ls.head match {
-                case _: String => true
-                case _: Byte => true
-                case _: Short => true
-                case _: Int => true
-                case _: Long => true
-                case _: Float => true
-                case _: Double => true
-                case _: BigDecimal => true
-                case _: BigInt => true
-                case _: Boolean => true
-                case default => false
-            }
-        }
-    }
-
-    trait Trait extends Vertex
-
-    trait Struct extends Vertex
-
-    trait Instance extends Vertex {
-        val traits: Option[List[Trait]]
-
-        override def toGSon(vertices: ArrayBuffer[String],
-                            edges: ArrayBuffer[String]): Unit = {
-            super.toGSon(vertices, edges)
-
-            if (traits.isDefined) {
-                traits.get foreach { t =>
-                    t.toGSon(vertices, edges)
-                    addEdge(t, s"${t.getClass.getSimpleName}", edges)
-                }
-            }
-        }
-
-    }
-
-    case class JdbcAccess(_id: String = "" + nextVertexId.incrementAndGet()) extends Trait
-
-    case class PII(_id: String = "" + nextVertexId.incrementAndGet()) extends Trait
-
-    case class Dimension(_id: String = "" + nextVertexId.incrementAndGet()) extends Trait
-
-    case class Metric(_id: String = "" + nextVertexId.incrementAndGet()) extends Trait
-
-    case class ETL(_id: String = "" + nextVertexId.incrementAndGet()) extends Trait
-
-
-    case class DB(name: String, owner: String, createTime: Int, clusterName: String, traits: Option[List[Trait]] = None,
-                  _id: String = "" + nextVertexId.incrementAndGet()) extends Instance
-
-    case class HiveOrder(col: String, order: Int,
-                  _id: String = "" + nextVertexId.incrementAndGet()) extends Struct
-
-    case class StorageDescriptor(inputFormat: String, outputFormat: String,
-                                 sortCols: List[Struct], _id: String = "" + nextVertexId.incrementAndGet()) extends Struct {
-
-        override def toGSon(vertices: ArrayBuffer[String],
-                   edges: ArrayBuffer[String]): Unit = {
-            sortCols.foreach(_.toGSon(vertices, edges))
-            super.toGSon(vertices, edges)
-        }
-    }
-
-    case class Column(name: String, dataType: String, sd: StorageDescriptor,
-                      traits: Option[List[Trait]] = None,
-                      _id: String = "" + nextVertexId.incrementAndGet()) extends Instance
-
-    case class Table(name: String, db: DB, sd: StorageDescriptor,
-                     created: Date,
-                     traits: Option[List[Trait]] = None,
-                     _id: String = "" + nextVertexId.incrementAndGet()) extends Instance
 
-    case class TableDef(name: String, db: DB, sd: StorageDescriptor,
-                        columns: List[(String, String, Option[List[Trait]])],
-                        traits: Option[List[Trait]] = None,
-                        created: Option[Date] = None) {
-        val createdDate : Date = created match {
-            case Some(x) => x
-            case None => new Date(TestUtils.TEST_DATE_IN_LONG)
-        }
 
-        val colDefs = columns map { c =>
-            Column(c._1, c._2, sd, c._3)
-        }
-        val tablDef = Table(name, db, sd, createdDate, traits)
-
-        def toGSon(vertices: ArrayBuffer[String],
-                   edges: ArrayBuffer[String]): Unit = {
-            sd.toGSon(vertices, edges)
-            colDefs foreach {
-                _.toGSon(vertices, edges)
-            }
-            tablDef.toGSon(vertices, edges)
-        }
-    }
-
-    case class Partition(values: List[String], table: Table, traits: Option[List[Trait]] = None,
-                  _id: String = "" + nextVertexId.incrementAndGet()) extends Instance
-
-    case class LoadProcess(name: String, inputTables: List[Vertex],
-                           outputTable: Vertex,
-                           traits: Option[List[Trait]] = None,
-                           _id: String = "" + nextVertexId.incrementAndGet()) extends Instance
-
-    case class View(name: String, db: DB, inputTables: List[Vertex],
-                    traits: Option[List[Trait]] = None,
-                    _id: String = "" + nextVertexId.incrementAndGet()) extends Instance
-
-    val salesDB = DB("Sales", "John ETL", 1000, "test")
-    val salesFact = TableDef("sales_fact",
+object HiveTitanSample {
+       
+    val MetricTrait = "Metric"
+    val DimensionTrait = "Dimension"
+    val ETLTrait = "ETL"
+    val JdbcAccessTrait = "JdbcAccess"
+    
+    val salesDB = new DB("Sales", "John ETL", 1000, "test")
+
+    
+    
+    val salesFact = new Table("sales_fact",
         salesDB,
-        StorageDescriptor("TextInputFormat",
-            "TextOutputFormat", List(HiveOrder("customer_id", 0))),
-        List(
-            ("time_id", "int", None),
-            ("product_id", "int", None),
-            ("customer_id", "int", None),
-            ("created", "date", None),
-            ("sales", "double", Some(List(Metric())))
-        ))
-    val productDim = TableDef("product_dim",
+        new StorageDescriptor("TextInputFormat",
+            "TextOutputFormat", List(new HiveOrder("customer_id", 0))),
+            List(
+                new Column("time_id", "int"),
+                new Column("product_id", "int"),
+                new Column("customer_id", "int"),
+                new Column("created", "date"),
+                new Column("sales", "double").withTrait(MetricTrait)
+                )
+        );
+    
+    
+    val productDim = new Table("product_dim",
         salesDB,
-        StorageDescriptor("TextInputFormat",
-            "TextOutputFormat", List(HiveOrder("product_id", 0))),
+        new StorageDescriptor("TextInputFormat",
+            "TextOutputFormat", List(new HiveOrder("product_id", 0))),
         List(
-            ("product_id", "int", None),
-            ("product_name", "string", None),
-            ("brand_name", "string", None)
-        ),
-        Some(List(Dimension())))
-    val timeDim = TableDef("time_dim",
+            new Column("product_id", "int"),
+            new Column("product_name", "string"),
+            new Column("brand_name", "string")
+        )
+    ).withTrait(DimensionTrait)
+
+    val timeDim = new Table("time_dim",
         salesDB,
-        StorageDescriptor("TextInputFormat",
-            "TextOutputFormat", List(HiveOrder("time_id", 0))),
+        new StorageDescriptor("TextInputFormat",
+            "TextOutputFormat", List(new HiveOrder("time_id", 0))),
         List(
-            ("time_id", "int", None),
-            ("dayOfYear", "int", None),
-            ("weekDay", "string", None)
-        ),
-        Some(List(Dimension())))
-    val customerDim = TableDef("customer_dim",
+             new Column("time_id", "int"),
+             new Column("dayOfYear", "int"),
+             new Column("weekDay", "string")
+        )
+    ).withTrait(DimensionTrait)
+        
+    val customerDim = new Table("customer_dim",
         salesDB,
-        StorageDescriptor("TextInputFormat",
-            "TextOutputFormat", List(HiveOrder("customer_id", 0))),
+        new StorageDescriptor("TextInputFormat",
+            "TextOutputFormat", List(new HiveOrder("customer_id", 0))),
         List(
-            ("customer_id", "int", None),
-            ("name", "int", None),
-            ("address", "string", Some(List(PII())))
-        ),
-        Some(List(Dimension())))
+             new Column("customer_id", "int"),
+             new Column("name", "int"),
+             new Column("address", "string").withTrait("PII")
+        )
+    ).withTrait(DimensionTrait)
+
 
-    val reportingDB = DB("Reporting", "Jane BI", 1500, "test")
-    val salesFactDaily = TableDef("sales_fact_daily_mv",
+    val reportingDB = new DB("Reporting", "Jane BI", 1500, "test")
+    val salesFactDaily = new Table("sales_fact_daily_mv",
         reportingDB,
-        StorageDescriptor("TextInputFormat",
-            "TextOutputFormat", List(HiveOrder("customer_id", 0))),
+        new StorageDescriptor("TextInputFormat",
+            "TextOutputFormat", List(new HiveOrder("customer_id", 0))),
         List(
-            ("time_id", "int", None),
-            ("product_id", "int", None),
-            ("customer_id", "int", None),
-            ("sales", "double", Some(List(Metric())))
-        ))
-    val loadSalesFactDaily = LoadProcess("loadSalesDaily",
-        List(salesFact.tablDef, timeDim.tablDef), salesFactDaily.tablDef,
-        Some(List(ETL())))
+             new Column("time_id", "int"),
+             new Column("product_id", "int"),
+             new Column("customer_id", "int"),
+             new Column("sales", "double").withTrait(MetricTrait)
+        )
+    )
+    
+    val loadSalesFactDaily = new LoadProcess(
+            "loadSalesDaily",
+            List(salesFact, timeDim), 
+            salesFactDaily
+    ).withTrait(ETLTrait)
+        
 
 
-    val productDimView = View("product_dim_view", reportingDB,
-        List(productDim.tablDef),
-        Some(List(Dimension(), JdbcAccess())))
+    val productDimView = new View(
+        "product_dim_view", 
+        reportingDB,
+        List(productDim)
+    ).withTraits(List(DimensionTrait, JdbcAccessTrait))
 
-    val customerDimView = View("customer_dim_view", reportingDB,
-        List(customerDim.tablDef),
-        Some(List(Dimension(), JdbcAccess())))
+    val customerDimView = new View(
+        "customer_dim_view", 
+        reportingDB,
+        List(customerDim)
+        
+    ).withTraits(List(DimensionTrait, JdbcAccessTrait))
 
-    val salesFactMonthly = TableDef("sales_fact_monthly_mv",
+    val salesFactMonthly = new Table("sales_fact_monthly_mv",
         reportingDB,
-        StorageDescriptor("TextInputFormat",
-            "TextOutputFormat", List(HiveOrder("customer_id", 0))),
+        new StorageDescriptor(
+                "TextInputFormat",
+                "TextOutputFormat", 
+                List(new HiveOrder("customer_id", 0))
+        ),
         List(
-            ("time_id", "int", None),
-            ("product_id", "int", None),
-            ("customer_id", "int", None),
-            ("sales", "double", Some(List(Metric())))
-        ))
-    val loadSalesFactMonthly = LoadProcess("loadSalesMonthly",
-        List(salesFactDaily.tablDef), salesFactMonthly.tablDef,
-        Some(List(ETL())))
-
-    val salesDailyPartition = Partition(List("2015-01-01"),salesFactDaily.tablDef)
-
-
-    val vertices: ArrayBuffer[String] = new ArrayBuffer[String]()
-    val edges: ArrayBuffer[String] = new ArrayBuffer[String]()
-
-    salesDB.toGSon(vertices, edges)
-    salesFact.toGSon(vertices, edges)
-    productDim.toGSon(vertices, edges)
-    timeDim.toGSon(vertices, edges)
-    customerDim.toGSon(vertices, edges)
-
-    reportingDB.toGSon(vertices, edges)
-    salesFactDaily.toGSon(vertices, edges)
-    loadSalesFactDaily.toGSon(vertices, edges)
-    productDimView.toGSon(vertices, edges)
-    customerDimView.toGSon(vertices, edges)
-    salesFactMonthly.toGSon(vertices, edges)
-    loadSalesFactMonthly.toGSon(vertices, edges)
-    salesDailyPartition.toGSon(vertices, edges)
-
-    def toGSon(): String = {
-        s"""{
-        "mode":"NORMAL",
-        "vertices": ${vertices.mkString("[\n\t", ",\n\t", "\n]")},
-        "edges": ${edges.mkString("[\n\t", ",\n\t", "\n]")}
-        }
-        """.stripMargin
-    }
-
-    def writeGson(fileName: String): Unit = {
-        FileUtils.writeStringToFile(new File(fileName), toGSon())
-    }
-
+             new Column("time_id", "int"),
+             new Column("product_id", "int"),
+             new Column("customer_id", "int"),
+             new Column("sales", "double").withTrait(MetricTrait)
+        )
+    )
+    val loadSalesFactMonthly = new LoadProcess("loadSalesMonthly",
+        List(salesFactDaily), salesFactMonthly).withTraits(List(ETLTrait))
+
+    val salesDailyPartition = new Partition(List("2015-01-01"), salesFactDaily)
+   
+    import scala.collection.JavaConversions._
+   
+   def getEntitiesToCreate() : Buffer[ITypedReferenceableInstance] = {
+       var list = salesDB.getTypedReferencebles() ++
+          salesFact.getTypedReferencebles() ++
+          productDim.getTypedReferencebles() ++
+          timeDim.getTypedReferencebles() ++
+          customerDim.getTypedReferencebles() ++
+          reportingDB.getTypedReferencebles() ++
+          salesFactDaily.getTypedReferencebles() ++
+          loadSalesFactDaily.getTypedReferencebles() ++
+          productDimView.getTypedReferencebles() ++
+          customerDimView.getTypedReferencebles() ++
+          salesFactMonthly.getTypedReferencebles() ++
+          loadSalesFactMonthly.getTypedReferencebles() ++
+          salesDailyPartition.getTypedReferencebles();
+       return list;
+       
+   }
+
+   
 
     val GremlinQueries = List(
         // 1. List all DBs
@@ -367,28 +216,28 @@ object HiveTitanSample {
     )
 }
 
-object TestApp extends App with GraphUtils {
-
-    val g: TitanGraph = TitanGraphProvider.getGraphInstance
-    val manager: ScriptEngineManager = new ScriptEngineManager
-    val engine: ScriptEngine = manager.getEngineByName("gremlin-groovy")
-    val bindings: Bindings = engine.createBindings
-    bindings.put("g", g)
-
-    val hiveGraphFile = FileUtils.getTempDirectory().getPath + File.separator + System.nanoTime() + ".gson"
-    HiveTitanSample.writeGson(hiveGraphFile)
-    bindings.put("hiveGraphFile", hiveGraphFile)
-
-    try {
-        engine.eval("g.loadGraphSON(hiveGraphFile)", bindings)
-
-        println(engine.eval("g.V.typeName.toList()", bindings))
-
-        HiveTitanSample.GremlinQueries.foreach { q =>
-            println(q)
-            println("Result: " + engine.eval(q + ".toList()", bindings))
-        }
-    } finally {
-        g.shutdown()
-    }
-}
\ No newline at end of file
+//object TestApp extends App with GraphUtils {
+//
+//    val g: TitanGraph = TitanGraphProvider.getGraphInstance
+//    val manager: ScriptEngineManager = new ScriptEngineManager
+//    val engine: ScriptEngine = manager.getEngineByName("gremlin-groovy")
+//    val bindings: Bindings = engine.createBindings
+//    bindings.put("g", g)
+//
+//    val hiveGraphFile = FileUtils.getTempDirectory().getPath + File.separator + System.nanoTime() + ".gson"
+//    HiveTitanSample.writeGson(hiveGraphFile)
+//    bindings.put("hiveGraphFile", hiveGraphFile)
+//
+//    try {
+//        engine.eval("g.loadGraphSON(hiveGraphFile)", bindings)
+//
+//        println(engine.eval("g.V.typeName.toList()", bindings))
+//
+//        HiveTitanSample.GremlinQueries.foreach { q =>
+//            println(q)
+//            println("Result: " + engine.eval(q + ".toList()", bindings))
+//        }
+//    } finally {
+//        g.shutdown()
+//    }
+//}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/repository/src/test/scala/org/apache/atlas/query/LineageQueryTest.scala
----------------------------------------------------------------------
diff --git a/repository/src/test/scala/org/apache/atlas/query/LineageQueryTest.scala b/repository/src/test/scala/org/apache/atlas/query/LineageQueryTest.scala
index c8b635a..bb44686 100755
--- a/repository/src/test/scala/org/apache/atlas/query/LineageQueryTest.scala
+++ b/repository/src/test/scala/org/apache/atlas/query/LineageQueryTest.scala
@@ -18,43 +18,51 @@
 
 package org.apache.atlas.query
 
-import com.thinkaurelius.titan.core.TitanGraph
-import com.thinkaurelius.titan.core.util.TitanCleanup
+import org.apache.atlas.TestUtils
 import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy
-import org.apache.atlas.query.Expressions._
-import org.apache.atlas.repository.graph.{GraphBackedMetadataRepository, TitanGraphProvider}
+import org.apache.atlas.query.Expressions._class
+import org.apache.atlas.query.Expressions.id
+import org.apache.atlas.query.Expressions.int
+import org.apache.atlas.repository.graph.AtlasGraphProvider
+import org.apache.atlas.repository.graph.GraphBackedMetadataRepository
+import org.apache.atlas.repository.graphdb.AtlasGraph
 import org.apache.atlas.typesystem.types.TypeSystem
-import org.testng.annotations.{Test,BeforeClass,AfterClass}
+import org.testng.annotations.AfterClass
+import org.testng.annotations.BeforeClass
+import org.testng.annotations.BeforeMethod
+import org.testng.annotations.Test
 
 class LineageQueryTest extends BaseGremlinTest {
 
-    var g: TitanGraph = null
-    var gProvider:TitanGraphProvider = null;
+    var g: AtlasGraph[_,_] = null
     var gp:GraphPersistenceStrategies = null;
 
+    @BeforeMethod
+    def resetRequestContext() {
+        TestUtils.resetRequestContext()
+    }
+
+
     @BeforeClass
     def beforeAll() {
-      TypeSystem.getInstance().reset()
-      QueryTestsUtils.setupTypes
-      gProvider = new TitanGraphProvider();
-      gp = new DefaultGraphPersistenceStrategy(new GraphBackedMetadataRepository(gProvider, null))
-      g = QueryTestsUtils.setupTestGraph(gProvider)
+        TypeSystem.getInstance().reset()
+        var repo = new GraphBackedMetadataRepository(null);
+        TestUtils.setupGraphProvider(repo);
+        //force graph to be initialized first
+        AtlasGraphProvider.getGraphInstance();
+      
+        //create types and indices up front.  Without this, some of the property keys (particularly __traitNames and __superTypes)
+        //get ended up created implicitly with some graph backends with the wrong multiplicity.  This also makes the queries  
+        //we execute perform better :-)
+       QueryTestsUtils.setupTypesAndIndices()
+    
+       gp = new DefaultGraphPersistenceStrategy(repo);
+       g = QueryTestsUtils.setupTestGraph(repo)
     }
 
     @AfterClass
     def afterAll() {
-      try {
-          g.shutdown()
-      } catch {
-        case ex: Exception =>
-          print("Could not shutdown the graph ", ex);
-      }
-      try {
-        TitanCleanup.clear(g);
-      } catch {
-        case ex: Exception =>
-          print("Could not clear the graph ", ex);
-      }
+        AtlasGraphProvider.cleanup()
     }
 
     val PREFIX_SPACES_REGEX = ("\\n\\s*").r

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/repository/src/test/scala/org/apache/atlas/query/QueryTestsUtils.scala
----------------------------------------------------------------------
diff --git a/repository/src/test/scala/org/apache/atlas/query/QueryTestsUtils.scala b/repository/src/test/scala/org/apache/atlas/query/QueryTestsUtils.scala
index b5faaf3..33275d3 100755
--- a/repository/src/test/scala/org/apache/atlas/query/QueryTestsUtils.scala
+++ b/repository/src/test/scala/org/apache/atlas/query/QueryTestsUtils.scala
@@ -22,10 +22,8 @@ import java.io.File
 import javax.script.{Bindings, ScriptEngine, ScriptEngineManager}
 
 import com.google.common.collect.ImmutableList
-import com.thinkaurelius.titan.core.{TitanFactory, TitanGraph}
-import com.tinkerpop.blueprints.Vertex
+import org.apache.atlas.repository.graphdb.AtlasVertex
 import com.typesafe.config.{Config, ConfigFactory}
-import org.apache.atlas.repository.graph.TitanGraphProvider
 import org.apache.atlas.typesystem.types._
 import org.apache.commons.configuration.{Configuration, ConfigurationException, MapConfiguration}
 import org.apache.commons.io.FileUtils
@@ -34,6 +32,13 @@ import org.json.JSONObject
 import org.skyscreamer.jsonassert.JSONAssert
 
 import scala.util.Random
+import org.apache.atlas.repository.MetadataRepository
+import org.apache.atlas.repository.graphdb.AtlasGraph
+import org.apache.atlas.repository.graph.AtlasGraphProvider
+import java.net.URL
+import org.apache.atlas.repository.graph.GraphBackedSearchIndexer
+import org.apache.atlas.typesystem.TypesDef
+import org.apache.atlas.typesystem.ITypedReferenceableInstance
 
 
 trait GraphUtils {
@@ -52,12 +57,12 @@ trait GraphUtils {
     }
 
 
-    def titanGraph(conf: Configuration) = {
+    def graph(conf: Configuration) = {
         try {
-            val g = TitanFactory.open(conf)
+            val g = AtlasGraphProvider.getGraphInstance
             val mgmt = g.getManagementSystem
-            val typname = mgmt.makePropertyKey("typeName").dataType(classOf[String]).make()
-            mgmt.buildIndex("byTypeName", classOf[Vertex]).addKey(typname).buildCompositeIndex()
+            val typname = mgmt.makePropertyKey("typeName", classOf[String], null);
+            mgmt.createExactMatchIndex("byTypeName", false, List(typname));
             mgmt.commit()
             g
         } catch {
@@ -68,7 +73,21 @@ trait GraphUtils {
 
 object QueryTestsUtils extends GraphUtils {
 
-    def setupTypes: Unit = {
+     def setupTypesAndIndices() : Unit = {
+        val indexer = new GraphBackedSearchIndexer();
+        val typesDef : TypesDef = defineTypes;
+        val newTypes = TypeSystem.getInstance.defineTypes(typesDef);
+        indexer.onAdd(newTypes.values());        
+    }
+     
+     def setupTypes: Unit = {
+        
+        val types : TypesDef = defineTypes;
+        TypeSystem.getInstance.defineTypes(types);
+    }
+    
+     
+    def defineTypes: TypesDef = {
         def attrDef(name: String, dT: IDataType[_],
                     m: Multiplicity = Multiplicity.OPTIONAL,
                     isComposite: Boolean = false,
@@ -144,36 +163,29 @@ object QueryTestsUtils extends GraphUtils {
         def jdbcTraitDef = new HierarchicalTypeDefinition[TraitType](classOf[TraitType], "JdbcAccess", null, null,
             Array[AttributeDefinition]())
 
-        TypeSystem.getInstance().defineTypes(ImmutableList.of[EnumTypeDefinition],
-            ImmutableList.of[StructTypeDefinition](hiveOrderDef),
-            ImmutableList.of[HierarchicalTypeDefinition[TraitType]](dimTraitDef, piiTraitDef,
+        TypesDef(Seq[EnumTypeDefinition](),
+           Seq[StructTypeDefinition](hiveOrderDef),
+            Seq[HierarchicalTypeDefinition[TraitType]](dimTraitDef, piiTraitDef,
                 metricTraitDef, etlTraitDef, jdbcTraitDef),
-            ImmutableList.of[HierarchicalTypeDefinition[ClassType]](dbClsDef, storageDescClsDef, columnClsDef, tblClsDef,
+            Seq[HierarchicalTypeDefinition[ClassType]](dbClsDef, storageDescClsDef, columnClsDef, tblClsDef,
                 partitionClsDef, loadProcessClsDef, viewClsDef))
-
-        ()
     }
 
-    def setupTestGraph(gp: TitanGraphProvider): TitanGraph = {
-        var conf = TitanGraphProvider.getConfiguration
-        conf.setProperty("storage.directory",
-            conf.getString("storage.directory") + "/../graph-data/" + RandomStringUtils.randomAlphanumeric(10))
-        val g = TitanFactory.open(conf)
-        val manager: ScriptEngineManager = new ScriptEngineManager
-        val engine: ScriptEngine = manager.getEngineByName("gremlin-groovy")
-        val bindings: Bindings = engine.createBindings
-        bindings.put("g", g)
-
-        val hiveGraphFile = FileUtils.getTempDirectory().getPath + File.separator + System.nanoTime() + ".gson"
-        HiveTitanSample.writeGson(hiveGraphFile)
-        bindings.put("hiveGraphFile", hiveGraphFile)
-
-        engine.eval("g.loadGraphSON(hiveGraphFile)", bindings)
+    def setupTestGraph(repo : MetadataRepository): AtlasGraph[_,_] = {
+
+        val g = AtlasGraphProvider.getGraphInstance();
+        val entities = HiveTitanSample.getEntitiesToCreate();
+        repo.createEntities(entities:_*)   
+        g.commit();
         g
     }
+    
+   
 
 }
 
+
+
 trait BaseGremlinTest {
   val STRUCT_NAME_REGEX = (TypeUtils.TEMP_STRUCT_NAME_PREFIX + "\\d+").r
   def validateJson(r: GremlinQueryResult, expected: String = null): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/typesystem/src/main/java/org/apache/atlas/typesystem/types/Multiplicity.java
----------------------------------------------------------------------
diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/types/Multiplicity.java b/typesystem/src/main/java/org/apache/atlas/typesystem/types/Multiplicity.java
index 06da32e..18ef2ee 100755
--- a/typesystem/src/main/java/org/apache/atlas/typesystem/types/Multiplicity.java
+++ b/typesystem/src/main/java/org/apache/atlas/typesystem/types/Multiplicity.java
@@ -41,6 +41,11 @@ public final class Multiplicity {
         this.isUnique = isUnique;
     }
 
+    public boolean isMany() {
+        return upper > 1;
+    }
+
+    
     public boolean nullAllowed() {
         return lower == 0;
     }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/typesystem/src/test/resources/atlas-application.properties
----------------------------------------------------------------------
diff --git a/typesystem/src/test/resources/atlas-application.properties b/typesystem/src/test/resources/atlas-application.properties
index fb31462..108630b 100644
--- a/typesystem/src/test/resources/atlas-application.properties
+++ b/typesystem/src/test/resources/atlas-application.properties
@@ -19,6 +19,8 @@
 #system property
 atlas.data=${sys:user.dir}/target/data
 
+
+
 #re-use existing property
 atlas.graph.data=${atlas.data}/graph
 
@@ -30,10 +32,17 @@ atlas.db=${atlasdb}
 
 atlas.TypeSystem.impl=org.apache.atlas.typesystem.types.TypeSystem
 
+
+
 #########  Atlas Server Configs #########
 atlas.rest.address=http://localhost:31000
 
 #########  Graph Database Configs  #########
+
+
+# Graph database implementation.  Value inserted by maven.
+atlas.graphdb.backend=${graphdb.backend.impl}
+
 # Graph Storage
 atlas.graph.storage.backend=${titan.storage.backend}
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/webapp/pom.xml
----------------------------------------------------------------------
diff --git a/webapp/pom.xml b/webapp/pom.xml
index 8fe4b9b..82f307c 100755
--- a/webapp/pom.xml
+++ b/webapp/pom.xml
@@ -35,6 +35,7 @@
         <projectBaseDir>${project.basedir}/..</projectBaseDir>
         <debug.jetty.daemon>true</debug.jetty.daemon>
         <packages.to.exclude />
+        <log4j.configuration.url>file://${project.build.directory}/../../distro/src/conf/atlas-log4j.xml</log4j.configuration.url>
     </properties>
 
     <profiles>
@@ -85,6 +86,18 @@
                 <packages.to.exclude>WEB-INF/lib/je-*.jar</packages.to.exclude>
             </properties>
         </profile>
+
+        <profile>
+            <id>Windows</id>
+            <activation>
+                <os>
+                    <family>windows</family>
+                </os>
+            </activation>
+            <properties>
+                <log4j.configuration.url>file:/${project.build.directory}/../../distro/src/conf/atlas-log4j.xml</log4j.configuration.url>                
+            </properties>
+        </profile>    
     </profiles>
 
     <dependencies>
@@ -105,11 +118,6 @@
 
         <dependency>
             <groupId>org.apache.atlas</groupId>
-            <artifactId>atlas-graphdb-titan0</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.atlas</groupId>
             <artifactId>atlas-client</artifactId>
         </dependency>
 
@@ -148,6 +156,12 @@
             <artifactId>atlas-catalog</artifactId>
         </dependency>
 
+        <dependency>
+          <groupId>org.apache.atlas</groupId>
+          <artifactId>atlas-graphdb-impls</artifactId>
+          <type>pom</type>
+        </dependency>
+
         <!-- supports simple auth handler -->
         <dependency>
             <groupId>org.apache.httpcomponents</groupId>
@@ -455,7 +469,7 @@
                     <systemProperties>
                         <systemProperty>
                             <name>log4j.configuration</name>
-                            <value>file://${project.build.directory}/../../distro/src/conf/atlas-log4j.xml</value>
+                            <value>${log4j.configuration.url}</value>
                         </systemProperty>
                         <systemProperty>
                             <name>atlas.log.file</name>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
index a1d3187..7a8a4f0 100755
--- a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
+++ b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java
@@ -18,26 +18,19 @@
 
 package org.apache.atlas.web.listeners;
 
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.Key;
-import com.google.inject.Module;
-import com.google.inject.Provider;
-import com.google.inject.Stage;
-import com.google.inject.TypeLiteral;
-import com.google.inject.servlet.GuiceServletContextListener;
-import com.sun.jersey.api.core.PackagesResourceConfig;
-import com.sun.jersey.guice.JerseyServletModule;
-import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
-import com.thinkaurelius.titan.core.TitanGraph;
-import com.tinkerpop.blueprints.Graph;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.servlet.ServletContextEvent;
+
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.RepositoryMetadataModule;
 import org.apache.atlas.ha.HAConfiguration;
 import org.apache.atlas.notification.NotificationModule;
-import org.apache.atlas.repository.graph.GraphProvider;
+import org.apache.atlas.repository.graph.AtlasGraphProvider;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.service.Services;
 import org.apache.atlas.web.filters.ActiveServerFilter;
 import org.apache.atlas.web.filters.AuditFilter;
@@ -48,9 +41,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.bridge.SLF4JBridgeHandler;
 
-import javax.servlet.ServletContextEvent;
-import java.util.HashMap;
-import java.util.Map;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.Stage;
+import com.google.inject.servlet.GuiceServletContextListener;
+import com.sun.jersey.api.core.PackagesResourceConfig;
+import com.sun.jersey.guice.JerseyServletModule;
+import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
 
 public class GuiceServletConfig extends GuiceServletContextListener {
 
@@ -159,10 +157,8 @@ public class GuiceServletConfig extends GuiceServletContextListener {
         if(injector != null) {
             //stop services
             stopServices();
-
-            TypeLiteral<GraphProvider<TitanGraph>> graphProviderType = new TypeLiteral<GraphProvider<TitanGraph>>() {};
-            Provider<GraphProvider<TitanGraph>> graphProvider = injector.getProvider(Key.get(graphProviderType));
-            final Graph graph = graphProvider.get().get();
+            
+            final AtlasGraph graph = AtlasGraphProvider.getGraphInstance();
 
             try {
                 graph.shutdown();

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/webapp/src/main/java/org/apache/atlas/web/resources/BaseService.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/BaseService.java b/webapp/src/main/java/org/apache/atlas/web/resources/BaseService.java
index d43c8cc..dfd29b1 100644
--- a/webapp/src/main/java/org/apache/atlas/web/resources/BaseService.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/BaseService.java
@@ -18,22 +18,31 @@
 
 package org.apache.atlas.web.resources;
 
-import com.google.gson.Gson;
-import com.google.gson.JsonSyntaxException;
-import org.apache.atlas.catalog.*;
-import org.apache.atlas.catalog.exception.*;
-import org.apache.atlas.repository.graph.TitanGraphProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.UriInfo;
-import javax.xml.bind.annotation.XmlRootElement;
 import java.io.UnsupportedEncodingException;
 import java.net.URLDecoder;
 import java.util.Collection;
 import java.util.Map;
 
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.UriInfo;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.atlas.catalog.JsonSerializer;
+import org.apache.atlas.catalog.Request;
+import org.apache.atlas.catalog.ResourceProvider;
+import org.apache.atlas.catalog.Result;
+import org.apache.atlas.catalog.exception.CatalogException;
+import org.apache.atlas.catalog.exception.CatalogRuntimeException;
+import org.apache.atlas.catalog.exception.InvalidPayloadException;
+import org.apache.atlas.catalog.exception.InvalidQueryException;
+import org.apache.atlas.catalog.exception.ResourceNotFoundException;
+import org.apache.atlas.repository.graph.AtlasGraphProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+
 /**
  * Base class for all v1 API services.
  */
@@ -135,7 +144,7 @@ public abstract class BaseService {
     //todo: abstract via AtlasTypeSystem
     // ensure that the thread wasn't re-pooled with an existing transaction
     protected void initializeGraphTransaction() {
-        TitanGraphProvider.getGraphInstance().rollback();
+        AtlasGraphProvider.getGraphInstance().rollback();
     }
 
     private RuntimeException wrapRuntimeException(RuntimeException e) {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index 683a028..961154b 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -56,49 +56,55 @@ public class NotificationHookConsumerKafkaTest {
 
     @Test
     public void testConsumerConsumesNewMessageWithAutoCommitDisabled() throws AtlasException, InterruptedException {
-        produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity()));
-
-        NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
-                createNewConsumer(kafkaNotification, false);
-        LocalAtlasClient localAtlasClient = mock(LocalAtlasClient.class);
-        NotificationHookConsumer notificationHookConsumer =
-                new NotificationHookConsumer(kafkaNotification, localAtlasClient);
-        NotificationHookConsumer.HookConsumer hookConsumer =
-                notificationHookConsumer.new HookConsumer(consumer);
-
-        consumeOneMessage(consumer, hookConsumer);
-        verify(localAtlasClient).setUser("test_user1");
-
-        // produce another message, and make sure it moves ahead. If commit succeeded, this would work.
-        produceMessage(new HookNotification.EntityCreateRequest("test_user2", createEntity()));
-        consumeOneMessage(consumer, hookConsumer);
-        verify(localAtlasClient).setUser("test_user2");
-
-        kafkaNotification.close();
+        try {
+            produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity()));
+    
+            NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
+                    createNewConsumer(kafkaNotification, false);
+            LocalAtlasClient localAtlasClient = mock(LocalAtlasClient.class);
+            NotificationHookConsumer notificationHookConsumer =
+                    new NotificationHookConsumer(kafkaNotification, localAtlasClient);
+            NotificationHookConsumer.HookConsumer hookConsumer =
+                    notificationHookConsumer.new HookConsumer(consumer);
+    
+            consumeOneMessage(consumer, hookConsumer);
+            verify(localAtlasClient).setUser("test_user1");
+    
+            // produce another message, and make sure it moves ahead. If commit succeeded, this would work.
+            produceMessage(new HookNotification.EntityCreateRequest("test_user2", createEntity()));
+            consumeOneMessage(consumer, hookConsumer);
+            verify(localAtlasClient).setUser("test_user2");
+        }
+        finally {
+            kafkaNotification.close();
+        }
     }
 
     @Test(dependsOnMethods = "testConsumerConsumesNewMessageWithAutoCommitDisabled")
     public void testConsumerRemainsAtSameMessageWithAutoCommitEnabled() throws Exception {
-        produceMessage(new HookNotification.EntityCreateRequest("test_user3", createEntity()));
-
-        NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
-                createNewConsumer(kafkaNotification, true);
-        LocalAtlasClient localAtlasClient = mock(LocalAtlasClient.class);
-        NotificationHookConsumer notificationHookConsumer =
-                new NotificationHookConsumer(kafkaNotification, localAtlasClient);
-        NotificationHookConsumer.HookConsumer hookConsumer =
-                notificationHookConsumer.new HookConsumer(consumer);
-
-        consumeOneMessage(consumer, hookConsumer);
-        verify(localAtlasClient).setUser("test_user3");
-
-        // produce another message, but this will not be consumed, as commit code is not executed in hook consumer.
-        produceMessage(new HookNotification.EntityCreateRequest("test_user4", createEntity()));
-
-        consumeOneMessage(consumer, hookConsumer);
-        verify(localAtlasClient).setUser("test_user3");
-
-        kafkaNotification.close();
+        try {
+            produceMessage(new HookNotification.EntityCreateRequest("test_user3", createEntity()));
+    
+            NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
+                    createNewConsumer(kafkaNotification, true);
+            LocalAtlasClient localAtlasClient = mock(LocalAtlasClient.class);
+            NotificationHookConsumer notificationHookConsumer =
+                    new NotificationHookConsumer(kafkaNotification, localAtlasClient);
+            NotificationHookConsumer.HookConsumer hookConsumer =
+                    notificationHookConsumer.new HookConsumer(consumer);
+    
+            consumeOneMessage(consumer, hookConsumer);
+            verify(localAtlasClient).setUser("test_user3");
+    
+            // produce another message, but this will not be consumed, as commit code is not executed in hook consumer.
+            produceMessage(new HookNotification.EntityCreateRequest("test_user4", createEntity()));
+    
+            consumeOneMessage(consumer, hookConsumer);
+            verify(localAtlasClient).setUser("test_user3");
+        }
+        finally {
+            kafkaNotification.close();
+        }
     }
 
     NotificationConsumer<HookNotification.HookNotificationMessage> createNewConsumer(
@@ -126,6 +132,7 @@ public class NotificationHookConsumerKafkaTest {
         final Referenceable entity = new Referenceable(AtlasClient.DATA_SET_SUPER_TYPE);
         entity.set("name", "db" + randomString());
         entity.set("description", randomString());
+        entity.set("qualifiedName", randomString());
         return entity;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/webapp/src/test/java/org/apache/atlas/web/listeners/TestGuiceServletConfig.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/listeners/TestGuiceServletConfig.java b/webapp/src/test/java/org/apache/atlas/web/listeners/TestGuiceServletConfig.java
index 08bb125..88cfc63 100644
--- a/webapp/src/test/java/org/apache/atlas/web/listeners/TestGuiceServletConfig.java
+++ b/webapp/src/test/java/org/apache/atlas/web/listeners/TestGuiceServletConfig.java
@@ -16,20 +16,18 @@
  */
 package org.apache.atlas.web.listeners;
 
-import com.google.inject.Key;
-import com.google.inject.Module;
-import com.google.inject.Provider;
-import com.google.inject.TypeLiteral;
-import com.thinkaurelius.titan.core.TitanGraph;
-import com.thinkaurelius.titan.core.util.TitanCleanup;
+import javax.servlet.ServletContextEvent;
+
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasException;
-import org.apache.atlas.repository.graph.GraphProvider;
+import org.apache.atlas.repository.graph.AtlasGraphProvider;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.commons.configuration.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.servlet.ServletContextEvent;
+import com.google.inject.Module;
+import com.thinkaurelius.titan.core.util.TitanCleanup;
 
 public class TestGuiceServletConfig extends GuiceServletConfig {
 
@@ -47,13 +45,11 @@ public class TestGuiceServletConfig extends GuiceServletConfig {
         super.contextDestroyed(servletContextEvent);
 
         if(injector != null) {
-            TypeLiteral<GraphProvider<TitanGraph>> graphProviderType = new TypeLiteral<GraphProvider<TitanGraph>>() {};
-            Provider<GraphProvider<TitanGraph>> graphProvider = injector.getProvider(Key.get(graphProviderType));
-            TitanGraph graph = graphProvider.get().get();
+            AtlasGraph graph = AtlasGraphProvider.getGraphInstance();
 
             LOG.info("Clearing graph store");
             try {
-                TitanCleanup.clear(graph);
+                AtlasGraphProvider.cleanup();
             } catch (Exception e) {
                 LOG.warn("Clearing graph store failed ", e);
             }


Mime
View raw message