Return-Path: X-Original-To: apmail-atlas-commits-archive@minotaur.apache.org Delivered-To: apmail-atlas-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1E59618F85 for ; Tue, 16 Jun 2015 23:05:17 +0000 (UTC) Received: (qmail 32208 invoked by uid 500); 16 Jun 2015 23:05:17 -0000 Delivered-To: apmail-atlas-commits-archive@atlas.apache.org Received: (qmail 32181 invoked by uid 500); 16 Jun 2015 23:05:17 -0000 Mailing-List: contact commits-help@atlas.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@atlas.incubator.apache.org Delivered-To: mailing list commits@atlas.incubator.apache.org Received: (qmail 32134 invoked by uid 99); 16 Jun 2015 23:05:16 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Jun 2015 23:05:16 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 50ED6CE48D for ; Tue, 16 Jun 2015 23:05:16 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.134 X-Spam-Level: * X-Spam-Status: No, score=1.134 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.648, URIBL_BLOCKED=0.001, WEIRD_QUOTING=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id fVCHSNm8cBv3 for ; Tue, 16 Jun 2015 23:05:03 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 3A5EF4E5F0 for ; Tue, 16 Jun 2015 23:04:37 +0000 (UTC) Received: (qmail 30375 invoked by uid 99); 16 Jun 2015 23:04:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Jun 2015 23:04:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8B55CE3C55; Tue, 16 Jun 2015 23:04:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: venkatesh@apache.org To: commits@atlas.incubator.apache.org Date: Tue, 16 Jun 2015 23:05:03 -0000 Message-Id: <22607ad5e1c940a78bdd0ec5febf55de@git.apache.org> In-Reply-To: <29f8990c10f64784b72a9cafbbc9676e@git.apache.org> References: <29f8990c10f64784b72a9cafbbc9676e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [29/58] [abbrv] incubator-atlas git commit: Refactor packages and scripts to Atlas (cherry picked from commit 414beba) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/scala/org/apache/atlas/query/GraphPersistenceStrategies.scala ---------------------------------------------------------------------- diff --git a/repository/src/main/scala/org/apache/atlas/query/GraphPersistenceStrategies.scala b/repository/src/main/scala/org/apache/atlas/query/GraphPersistenceStrategies.scala new file mode 100755 index 0000000..b475407 --- /dev/null +++ b/repository/src/main/scala/org/apache/atlas/query/GraphPersistenceStrategies.scala @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.query + +import com.thinkaurelius.titan.core.TitanVertex +import com.tinkerpop.blueprints.Direction +import org.apache.atlas.query.Expressions.{ComparisonExpression, ExpressionException} +import org.apache.atlas.query.TypeUtils.FieldInfo +import org.apache.atlas.typesystem.persistence.Id +import org.apache.atlas.typesystem.types.DataTypes._ +import org.apache.atlas.typesystem.types._ +import org.apache.atlas.typesystem.{ITypedInstance, ITypedReferenceableInstance} + +import scala.collection.JavaConversions._ + +/** + * Represents the Bridge between the QueryProcessor and the Graph Persistence scheme used. + * Some of the behaviors captured are: + * - how is type and id information stored in the Vertex that represents an [[ITypedReferenceableInstance]] + * - how are edges representing trait and attribute relationships labelled. + * - how are attribute names mapped to Property Keys in Vertices. + * + * This is a work in progress. + */ +trait GraphPersistenceStrategies { + /** + * Name of attribute used to store typeName in vertex + */ + def typeAttributeName: String + + /** + * Name of attribute used to store super type names in vertex. + */ + def superTypeAttributeName: String + + /** + * Name of attribute used to store guid in vertex + */ + def idAttributeName : String + + /** + * Given a dataType and a reference attribute, how is edge labeled + */ + def edgeLabel(iDataType: IDataType[_], aInfo: AttributeInfo): String + + def traitLabel(cls: IDataType[_], traitName: String): String + + def instanceToTraitEdgeDirection : String = "out" + def traitToInstanceEdgeDirection = instanceToTraitEdgeDirection match { + case "out" => "in" + case "in" => "out" + case x => x + } + + /** + * The propertyKey used to store the attribute in a Graph Vertex. + * @param dataType + * @param aInfo + * @return + */ + def fieldNameInVertex(dataType: IDataType[_], aInfo: AttributeInfo): String + + /** + * from a vertex for an [[ITypedReferenceableInstance]] get the traits that it has. + * @param v + * @return + */ + def traitNames(v: TitanVertex): java.util.List[String] + + def edgeLabel(fInfo: FieldInfo): String = fInfo match { + case FieldInfo(dataType, aInfo, null, null) => edgeLabel(dataType, aInfo) + case FieldInfo(dataType, aInfo, reverseDataType, null) => edgeLabel(reverseDataType, aInfo) + case FieldInfo(dataType, null, null, traitName) => traitLabel(dataType, traitName) + } + + def fieldPrefixInSelect: String + + /** + * extract the Id from a Vertex. + * @param dataTypeNm the dataType of the instance that the given vertex represents + * @param v + * @return + */ + def getIdFromVertex(dataTypeNm: String, v: TitanVertex): Id + + def constructInstance[U](dataType: IDataType[U], v: java.lang.Object): U + + def gremlinCompOp(op: ComparisonExpression) = op.symbol match { + case "=" => "T.eq" + case "!=" => "T.neq" + case ">" => "T.gt" + case ">=" => "T.gte" + case "<" => "T.lt" + case "<=" => "T.lte" + case _ => throw new ExpressionException(op, "Comparison operator not supported in Gremlin") + } + + def loopObjectExpression(dataType: IDataType[_]) = { + _typeTestExpression(dataType.getName, "it.object") + } + + def addGraphVertexPrefix(preStatements : Traversable[String]) = !collectTypeInstancesIntoVar + + /** + * Controls behavior of how instances of a Type are discovered. + * - query is generated in a way that indexes are exercised using a local set variable across multiple lookups + * - query is generated using an 'or' expression. + * + * '''This is a very bad idea: controlling query execution behavior via query generation.''' But our current + * knowledge of seems to indicate we have no choice. See + * [[https://groups.google.com/forum/#!topic/gremlin-users/n1oV86yr4yU discussion in Gremlin group]]. + * Also this seems a fragile solution, dependend on the memory requirements of the Set variable. + * For now enabling via the '''collectTypeInstancesIntoVar''' behavior setting. Reverting back would require + * setting this to false. + * + * Long term have to get to the bottom of Gremlin: + * - there doesn't seem to be way to see the physical query plan. Maybe we should directly interface with Titan. + * - At least from querying perspective a columnar db maybe a better route. Daniel Abadi did some good work + * on showing how to use a columnar store as a Graph Db. + * + * + * @return + */ + def collectTypeInstancesIntoVar = true + + def typeTestExpression(typeName : String, intSeq : IntSequence) : Seq[String] = { + if (collectTypeInstancesIntoVar) + typeTestExpressionMultiStep(typeName, intSeq) + else + typeTestExpressionUsingFilter(typeName) + } + + private def typeTestExpressionUsingFilter(typeName : String) : Seq[String] = { + Seq(s"""filter${_typeTestExpression(typeName, "it")}""") + } + + private def _typeTestExpression(typeName: String, itRef: String): String = { + s"""{(${itRef}.'${typeAttributeName}' == '${typeName}') | + |(${itRef}.'${superTypeAttributeName}' ? + |${itRef}.'${superTypeAttributeName}'.contains('${typeName}') : false)}""". + stripMargin.replace(System.getProperty("line.separator"), "") + } + + private def typeTestExpressionMultiStep(typeName : String, intSeq : IntSequence) : Seq[String] = { + + val varName = s"_var_${intSeq.next}" + Seq( + newSetVar(varName), + fillVarWithTypeInstances(typeName, varName), + fillVarWithSubTypeInstances(typeName, varName), + s"$varName._()" + ) + } + + private def newSetVar(varName : String) = s"$varName = [] as Set" + + private def fillVarWithTypeInstances(typeName : String, fillVar : String) = { + s"""g.V().has("${typeAttributeName}", "${typeName}").fill($fillVar)""" + } + + private def fillVarWithSubTypeInstances(typeName : String, fillVar : String) = { + s"""g.V().has("${superTypeAttributeName}", "${typeName}").fill($fillVar)""" + } +} + +object GraphPersistenceStrategy1 extends GraphPersistenceStrategies { + val typeAttributeName = "typeName" + val superTypeAttributeName = "superTypeNames" + val idAttributeName = "guid" + + def edgeLabel(dataType: IDataType[_], aInfo: AttributeInfo) = s"${dataType.getName}.${aInfo.name}" + + val fieldPrefixInSelect = "it" + + def traitLabel(cls: IDataType[_], traitName: String) = s"${cls.getName}.$traitName" + + def fieldNameInVertex(dataType: IDataType[_], aInfo: AttributeInfo) = aInfo.name + + def getIdFromVertex(dataTypeNm: String, v: TitanVertex): Id = + new Id(v.getId.toString, 0, dataTypeNm) + + def traitNames(v: TitanVertex): java.util.List[String] = { + val s = v.getProperty[String]("traitNames") + if (s != null) { + Seq[String](s.split(","): _*) + } else { + Seq() + } + } + + def constructInstance[U](dataType: IDataType[U], v: AnyRef): U = { + dataType.getTypeCategory match { + case DataTypes.TypeCategory.PRIMITIVE => dataType.convert(v, Multiplicity.OPTIONAL) + case DataTypes.TypeCategory.STRUCT + if dataType.getName == TypeSystem.getInstance().getIdType.getName => { + val sType = dataType.asInstanceOf[StructType] + val sInstance = sType.createInstance() + val tV = v.asInstanceOf[TitanVertex] + sInstance.set(TypeSystem.getInstance().getIdType.typeNameAttrName, + tV.getProperty[java.lang.String](typeAttributeName)) + sInstance.set(TypeSystem.getInstance().getIdType.idAttrName, + tV.getProperty[java.lang.String](idAttributeName)) + dataType.convert(sInstance, Multiplicity.OPTIONAL) + } + case DataTypes.TypeCategory.STRUCT => { + val sType = dataType.asInstanceOf[StructType] + val sInstance = sType.createInstance() + loadStructInstance(sType, sInstance, v.asInstanceOf[TitanVertex]) + dataType.convert(sInstance, Multiplicity.OPTIONAL) + } + case DataTypes.TypeCategory.TRAIT => { + val tType = dataType.asInstanceOf[TraitType] + val tInstance = tType.createInstance() + /* + * this is not right, we should load the Instance associated with this trait. + * for now just loading the trait struct. + */ + loadStructInstance(tType, tInstance, v.asInstanceOf[TitanVertex]) + dataType.convert(tInstance, Multiplicity.OPTIONAL) + } + case DataTypes.TypeCategory.CLASS => { + val cType = dataType.asInstanceOf[ClassType] + val cInstance = constructClassInstance(dataType.asInstanceOf[ClassType], v.asInstanceOf[TitanVertex]) + dataType.convert(cInstance, Multiplicity.OPTIONAL) + } + case DataTypes.TypeCategory.ENUM => dataType.convert(v, Multiplicity.OPTIONAL) + case x => throw new UnsupportedOperationException(s"load for ${dataType} not supported") + } + } + + def loadStructInstance(dataType: IConstructableType[_, _ <: ITypedInstance], + typInstance: ITypedInstance, v: TitanVertex): Unit = { + import scala.collection.JavaConversions._ + dataType.fieldMapping().fields.foreach { t => + val fName = t._1 + val aInfo = t._2 + loadAttribute(dataType, aInfo, typInstance, v) + } + } + + def constructClassInstance(dataType: ClassType, v: TitanVertex): ITypedReferenceableInstance = { + val id = getIdFromVertex(dataType.name, v) + val tNms = traitNames(v) + val cInstance = dataType.createInstance(id, tNms: _*) + // load traits + tNms.foreach { tNm => + val tLabel = traitLabel(dataType, tNm) + val edges = v.getEdges(Direction.OUT, tLabel) + val tVertex = edges.iterator().next().getVertex(Direction.IN).asInstanceOf[TitanVertex] + val tType = TypeSystem.getInstance().getDataType[TraitType](classOf[TraitType], tNm) + val tInstance = cInstance.getTrait(tNm).asInstanceOf[ITypedInstance] + loadStructInstance(tType, tInstance, tVertex) + } + loadStructInstance(dataType, cInstance, v) + cInstance + } + + def loadAttribute(dataType: IDataType[_], aInfo: AttributeInfo, i: ITypedInstance, v: TitanVertex): Unit = { + aInfo.dataType.getTypeCategory match { + case DataTypes.TypeCategory.PRIMITIVE => loadPrimitiveAttribute(dataType, aInfo, i, v) + case DataTypes.TypeCategory.ENUM => loadEnumAttribute(dataType, aInfo, i, v) + case DataTypes.TypeCategory.ARRAY => + throw new UnsupportedOperationException(s"load for ${aInfo.dataType()} not supported") + case DataTypes.TypeCategory.MAP => + throw new UnsupportedOperationException(s"load for ${aInfo.dataType()} not supported") + case DataTypes.TypeCategory.STRUCT => loadStructAttribute(dataType, aInfo, i, v) + case DataTypes.TypeCategory.TRAIT => + throw new UnsupportedOperationException(s"load for ${aInfo.dataType()} not supported") + case DataTypes.TypeCategory.CLASS => loadStructAttribute(dataType, aInfo, i, v) + } + } + + private def loadEnumAttribute(dataType: IDataType[_], aInfo: AttributeInfo, i: ITypedInstance, v: TitanVertex) + : Unit = { + val fName = fieldNameInVertex(dataType, aInfo) + i.setInt(aInfo.name, v.getProperty[java.lang.Integer](fName)) + } + + private def loadPrimitiveAttribute(dataType: IDataType[_], aInfo: AttributeInfo, + i: ITypedInstance, v: TitanVertex): Unit = { + val fName = fieldNameInVertex(dataType, aInfo) + aInfo.dataType() match { + case x: BooleanType => i.setBoolean(aInfo.name, v.getProperty[java.lang.Boolean](fName)) + case x: ByteType => i.setByte(aInfo.name, v.getProperty[java.lang.Byte](fName)) + case x: ShortType => i.setShort(aInfo.name, v.getProperty[java.lang.Short](fName)) + case x: IntType => i.setInt(aInfo.name, v.getProperty[java.lang.Integer](fName)) + case x: LongType => i.setLong(aInfo.name, v.getProperty[java.lang.Long](fName)) + case x: FloatType => i.setFloat(aInfo.name, v.getProperty[java.lang.Float](fName)) + case x: DoubleType => i.setDouble(aInfo.name, v.getProperty[java.lang.Double](fName)) + case x: StringType => i.setString(aInfo.name, v.getProperty[java.lang.String](fName)) + case _ => throw new UnsupportedOperationException(s"load for ${aInfo.dataType()} not supported") + } + } + + private def loadStructAttribute(dataType: IDataType[_], aInfo: AttributeInfo, + i: ITypedInstance, v: TitanVertex): Unit = { + val eLabel = edgeLabel(FieldInfo(dataType, aInfo, null)) + val edges = v.getEdges(Direction.OUT, eLabel) + val sVertex = edges.iterator().next().getVertex(Direction.IN).asInstanceOf[TitanVertex] + if (aInfo.dataType().getTypeCategory == DataTypes.TypeCategory.STRUCT) { + val sType = aInfo.dataType().asInstanceOf[StructType] + val sInstance = sType.createInstance() + loadStructInstance(sType, sInstance, sVertex) + i.set(aInfo.name, sInstance) + } else { + val cInstance = constructClassInstance(aInfo.dataType().asInstanceOf[ClassType], sVertex) + i.set(aInfo.name, cInstance) + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/scala/org/apache/atlas/query/GremlinEvaluator.scala ---------------------------------------------------------------------- diff --git a/repository/src/main/scala/org/apache/atlas/query/GremlinEvaluator.scala b/repository/src/main/scala/org/apache/atlas/query/GremlinEvaluator.scala new file mode 100755 index 0000000..edb190d --- /dev/null +++ b/repository/src/main/scala/org/apache/atlas/query/GremlinEvaluator.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.query + +import javax.script.{Bindings, ScriptEngine, ScriptEngineManager} + +import com.thinkaurelius.titan.core.TitanGraph +import com.tinkerpop.pipes.util.structures.Row +import org.apache.atlas.query.TypeUtils.ResultWithPathStruct +import org.apache.atlas.typesystem.json._ +import org.apache.atlas.typesystem.types._ +import org.json4s._ +import org.json4s.native.Serialization._ + +import scala.language.existentials + +case class GremlinQueryResult(query: String, + resultDataType: IDataType[_], + rows: List[_]) { + def toJson = JsonHelper.toJson(this) +} + +class GremlinEvaluator(qry: GremlinQuery, persistenceStrategy: GraphPersistenceStrategies, g: TitanGraph) { + + val manager: ScriptEngineManager = new ScriptEngineManager + val engine: ScriptEngine = manager.getEngineByName("gremlin-groovy") + val bindings: Bindings = engine.createBindings + bindings.put("g", g) + + /** + * + * @param gResultObj is the object returned from gremlin. This must be a List + * @param qryResultObj is the object constructed for the output w/o the Path. + * @return a ResultWithPathStruct + */ + def addPathStruct(gResultObj : AnyRef, qryResultObj : Any) : Any = { + if ( !qry.isPathExpresion) { + qryResultObj + } else { + import scala.collection.JavaConversions._ + import scala.collection.JavaConverters._ + val iPaths = gResultObj.asInstanceOf[java.util.List[AnyRef]].init + + val oPaths = iPaths.map { p => + persistenceStrategy.constructInstance(TypeSystem.getInstance().getIdType.getStructType, p) + }.toList.asJava + val sType = qry.expr.dataType.asInstanceOf[StructType] + val sInstance = sType.createInstance() + sInstance.set(ResultWithPathStruct.pathAttrName, oPaths) + sInstance.set(ResultWithPathStruct.resultAttrName, qryResultObj) + sInstance + } + } + + def instanceObject(v : AnyRef) : AnyRef = { + if ( qry.isPathExpresion ) { + import scala.collection.JavaConversions._ + v.asInstanceOf[java.util.List[AnyRef]].last + } else { + v + } + } + + def evaluate(): GremlinQueryResult = { + import scala.collection.JavaConversions._ + val rType = qry.expr.dataType + val oType = if (qry.isPathExpresion) qry.expr.children(0).dataType else rType + val rawRes = engine.eval(qry.queryStr, bindings) + + if (!qry.hasSelectList) { + val rows = rawRes.asInstanceOf[java.util.List[AnyRef]].map { v => + val iV = instanceObject(v) + val o = persistenceStrategy.constructInstance(oType, iV) + addPathStruct(v, o) + } + GremlinQueryResult(qry.expr.toString, rType, rows.toList) + } else { + val sType = oType.asInstanceOf[StructType] + val rows = rawRes.asInstanceOf[java.util.List[AnyRef]].map { r => + val rV = instanceObject(r).asInstanceOf[Row[java.util.List[AnyRef]]] + val sInstance = sType.createInstance() + val selExpr = + (if (qry.isPathExpresion) qry.expr.children(0) else qry.expr). + asInstanceOf[Expressions.SelectExpression] + selExpr.selectListWithAlias.foreach { aE => + val cName = aE.alias + val (src, idx) = qry.resultMaping(cName) + val v = rV.getColumn(src).get(idx) + sInstance.set(cName, persistenceStrategy.constructInstance(aE.dataType, v)) + } + addPathStruct(r, sInstance) + } + GremlinQueryResult(qry.expr.toString, rType, rows.toList) + } + + } +} + +object JsonHelper { + + class GremlinQueryResultSerializer() + extends Serializer[GremlinQueryResult] { + def deserialize(implicit format: Formats) = { + throw new UnsupportedOperationException("Deserialization of GremlinQueryResult not supported") + } + + def serialize(implicit f: Formats) = { + case GremlinQueryResult(query, rT, rows) => + JObject(JField("query", JString(query)), + JField("dataType", TypesSerialization.toJsonValue(rT)(f)), + JField("rows", Extraction.decompose(rows)(f)) + ) + } + } + + implicit val formats = org.json4s.native.Serialization.formats(NoTypeHints) + new TypedStructSerializer + + new TypedReferenceableInstanceSerializer + new BigDecimalSerializer + new BigIntegerSerializer + + new GremlinQueryResultSerializer + + def toJson(r: GremlinQueryResult): String = { + writePretty(r) + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/scala/org/apache/atlas/query/GremlinQuery.scala ---------------------------------------------------------------------- diff --git a/repository/src/main/scala/org/apache/atlas/query/GremlinQuery.scala b/repository/src/main/scala/org/apache/atlas/query/GremlinQuery.scala new file mode 100755 index 0000000..914ed7a --- /dev/null +++ b/repository/src/main/scala/org/apache/atlas/query/GremlinQuery.scala @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.query + +import org.apache.atlas.query.Expressions._ +import org.apache.atlas.typesystem.types.DataTypes.TypeCategory + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +trait IntSequence { + def next: Int +} + +case class GremlinQuery(expr: Expression, queryStr: String, resultMaping: Map[String, (String, Int)]) { + + def hasSelectList = resultMaping != null + + def isPathExpresion = expr.isInstanceOf[PathExpression] +} + +trait SelectExpressionHandling { + + /** + * To aide in gremlinQuery generation add an alias to the input of SelectExpressions + */ + class AddAliasToSelectInput extends PartialFunction[Expression, Expression] { + + private var idx = 0 + + def isDefinedAt(e: Expression) = true + + class DecorateFieldWithAlias(aliasE: AliasExpression) + extends PartialFunction[Expression, Expression] { + def isDefinedAt(e: Expression) = true + + def apply(e: Expression) = e match { + case fe@FieldExpression(fieldName, fInfo, None) => + FieldExpression(fieldName, fInfo, Some(BackReference(aliasE.alias, aliasE.child, None))) + case _ => e + } + } + + def apply(e: Expression) = e match { + case SelectExpression(aliasE@AliasExpression(_, _), selList) => { + idx = idx + 1 + SelectExpression(aliasE, selList.map(_.transformUp(new DecorateFieldWithAlias(aliasE)))) + } + case SelectExpression(child, selList) => { + idx = idx + 1 + val aliasE = AliasExpression(child, s"_src$idx") + SelectExpression(aliasE, selList.map(_.transformUp(new DecorateFieldWithAlias(aliasE)))) + } + case _ => e + } + } + + def getSelectExpressionSrc(e: Expression): List[String] = { + val l = ArrayBuffer[String]() + e.traverseUp { + case BackReference(alias, _, _) => l += alias + } + l.toSet.toList + } + + def validateSelectExprHaveOneSrc: PartialFunction[Expression, Unit] = { + case SelectExpression(_, selList) => { + selList.foreach { se => + val srcs = getSelectExpressionSrc(se) + if (srcs.size > 1) { + throw new GremlinTranslationException(se, "Only one src allowed in a Select Expression") + } + } + } + } + + def groupSelectExpressionsBySrc(sel: SelectExpression): mutable.LinkedHashMap[String, List[Expression]] = { + val m = mutable.LinkedHashMap[String, List[Expression]]() + sel.selectListWithAlias.foreach { se => + val l = getSelectExpressionSrc(se.child) + if (!m.contains(l(0))) { + m(l(0)) = List() + } + m(l(0)) = m(l(0)) :+ se.child + } + m + } + + /** + * For each Output Column in the SelectExpression compute the ArrayList(Src) this maps to and the position within + * this list. + * @param sel + * @return + */ + def buildResultMapping(sel: SelectExpression): Map[String, (String, Int)] = { + val srcToExprs = groupSelectExpressionsBySrc(sel) + val m = new mutable.HashMap[String, (String, Int)] + sel.selectListWithAlias.foreach { se => + val src = getSelectExpressionSrc(se.child)(0) + val srcExprs = srcToExprs(src) + var idx = srcExprs.indexOf(se.child) + m(se.alias) = (src, idx) + } + m.toMap + } + +} + +class GremlinTranslationException(expr: Expression, reason: String) extends +ExpressionException(expr, s"Unsupported Gremlin translation: $reason") + +class GremlinTranslator(expr: Expression, + gPersistenceBehavior: GraphPersistenceStrategies) + extends SelectExpressionHandling { + + val preStatements = ArrayBuffer[String]() + val postStatements = ArrayBuffer[String]() + + val wrapAndRule: PartialFunction[Expression, Expression] = { + case f: FilterExpression if !f.condExpr.isInstanceOf[LogicalExpression] => + FilterExpression(f.child, new LogicalExpression("and", List(f.condExpr))) + } + + val validateComparisonForm: PartialFunction[Expression, Unit] = { + case c@ComparisonExpression(_, left, right) => + if (!left.isInstanceOf[FieldExpression]) { + throw new GremlinTranslationException(c, s"lhs of comparison is not a field") + } + if (!right.isInstanceOf[Literal[_]]) { + throw new GremlinTranslationException(c, + s"rhs of comparison is not a literal") + } + () + } + + val counter = new IntSequence { + var i: Int = -1; + + def next: Int = { + i += 1; i + } + } + + def addAliasToLoopInput(c: IntSequence = counter): PartialFunction[Expression, Expression] = { + case l@LoopExpression(aliasE@AliasExpression(_, _), _, _) => l + case l@LoopExpression(inputExpr, loopExpr, t) => { + val aliasE = AliasExpression(inputExpr, s"_loop${c.next}") + LoopExpression(aliasE, loopExpr, t) + } + } + + def instanceClauseToTop(topE : Expression) : PartialFunction[Expression, Expression] = { + case le : LogicalExpression if (le fastEquals topE) => { + le.instance() + } + case ce : ComparisonExpression if (ce fastEquals topE) => { + ce.instance() + } + case he : hasFieldUnaryExpression if (he fastEquals topE) => { + he.instance() + } + } + + def traitClauseWithInstanceForTop(topE : Expression) : PartialFunction[Expression, Expression] = { + case te : TraitExpression if (te fastEquals topE) => { + val theTrait = te.as("theTrait") + val theInstance = theTrait.traitInstance().as("theInstance") + val outE = + theInstance.select(id("theTrait").as("traitDetails"), + id("theInstance").as("instanceInfo")) + QueryProcessor.validate(outE) + } + } + + def typeTestExpression(typeName : String) : String = { + val stats = gPersistenceBehavior.typeTestExpression(typeName, counter) + preStatements ++= stats.init + stats.last + } + + private def genQuery(expr: Expression, inSelect: Boolean): String = expr match { + case ClassExpression(clsName) => + typeTestExpression(clsName) + case TraitExpression(clsName) => + typeTestExpression(clsName) + case fe@FieldExpression(fieldName, fInfo, child) if fe.dataType.getTypeCategory == TypeCategory.PRIMITIVE => { + val fN = "\"" + gPersistenceBehavior.fieldNameInVertex(fInfo.dataType, fInfo.attrInfo) + "\"" + child match { + case Some(e) => s"${genQuery(e, inSelect)}.$fN" + case None => s"$fN" + } + } + case fe@FieldExpression(fieldName, fInfo, child) + if fe.dataType.getTypeCategory == TypeCategory.CLASS || fe.dataType.getTypeCategory == TypeCategory.STRUCT => { + val direction = if (fInfo.isReverse) "in" else "out" + val edgeLbl = gPersistenceBehavior.edgeLabel(fInfo) + val step = s"""$direction("$edgeLbl")""" + child match { + case Some(e) => s"${genQuery(e, inSelect)}.$step" + case None => step + } + } + case fe@FieldExpression(fieldName, fInfo, child) + if fInfo.traitName != null => { + val direction = gPersistenceBehavior.instanceToTraitEdgeDirection + val edgeLbl = gPersistenceBehavior.edgeLabel(fInfo) + val step = s"""$direction("$edgeLbl")""" + child match { + case Some(e) => s"${genQuery(e, inSelect)}.$step" + case None => step + } + } + case c@ComparisonExpression(symb, f@FieldExpression(fieldName, fInfo, ch), l) => { + val fieldGremlinExpr = s"${gPersistenceBehavior.fieldNameInVertex(fInfo.dataType, fInfo.attrInfo)}" + ch match { + case Some(child) => { + s"""${genQuery(child, inSelect)}.has("$fieldGremlinExpr", ${gPersistenceBehavior.gremlinCompOp(c)}, $l)""" + } + case None => s"""has("$fieldGremlinExpr", ${gPersistenceBehavior.gremlinCompOp(c)}, $l)""" + } + } + case fil@FilterExpression(child, condExpr) => { + s"${genQuery(child, inSelect)}.${genQuery(condExpr, inSelect)}" + } + case l@LogicalExpression(symb, children) => { + s"""$symb${children.map("_()." + genQuery(_, inSelect)).mkString("(", ",", ")")}""" + } + case sel@SelectExpression(child, selList) => { + val m = groupSelectExpressionsBySrc(sel) + var srcNamesList: List[String] = List() + var srcExprsList: List[List[String]] = List() + val it = m.iterator + while (it.hasNext) { + val (src, selExprs) = it.next + srcNamesList = srcNamesList :+ s""""$src"""" + srcExprsList = srcExprsList :+ selExprs.map { selExpr => + genQuery(selExpr, true) + } + } + val srcNamesString = srcNamesList.mkString("[", ",", "]") + val srcExprsStringList = srcExprsList.map { + _.mkString("[", ",", "]") + } + val srcExprsString = srcExprsStringList.foldLeft("")(_ + "{" + _ + "}") + s"${genQuery(child, inSelect)}.select($srcNamesString)$srcExprsString" + } + case loop@LoopExpression(input, loopExpr, t) => { + val inputQry = genQuery(input, inSelect) + val loopingPathGExpr = genQuery(loopExpr, inSelect) + val loopGExpr = s"""loop("${input.asInstanceOf[AliasExpression].alias}")""" + val untilCriteria = if (t.isDefined) s"{it.loops < ${t.get.value}}" else "{true}" + val loopObjectGExpr = gPersistenceBehavior.loopObjectExpression(input.dataType) + s"""${inputQry}.${loopingPathGExpr}.${loopGExpr}${untilCriteria}${loopObjectGExpr}""" + } + case BackReference(alias, _, _) => + if (inSelect) gPersistenceBehavior.fieldPrefixInSelect else s"""back("$alias")""" + case AliasExpression(child, alias) => s"""${genQuery(child, inSelect)}.as("$alias")""" + case isTraitLeafExpression(traitName, Some(clsExp)) => + s"""out("${gPersistenceBehavior.traitLabel(clsExp.dataType, traitName)}")""" + case isTraitUnaryExpression(traitName, child) => + s"""out("${gPersistenceBehavior.traitLabel(child.dataType, traitName)}")""" + case hasFieldLeafExpression(fieldName, Some(clsExp)) => + s"""has("$fieldName")""" + case hasFieldUnaryExpression(fieldName, child) => + s"""${genQuery(child, inSelect)}.has("$fieldName")""" + case ArithmeticExpression(symb, left, right) => s"${genQuery(left, inSelect)} $symb ${genQuery(right, inSelect)}" + case l: Literal[_] => l.toString + case in@TraitInstanceExpression(child) => { + val direction = gPersistenceBehavior.traitToInstanceEdgeDirection + s"${genQuery(child, inSelect)}.$direction()" + } + case in@InstanceExpression(child) => { + s"${genQuery(child, inSelect)}" + } + case pe@PathExpression(child) => { + s"${genQuery(child, inSelect)}.path" + } + case x => throw new GremlinTranslationException(x, "expression not yet supported") + } + + def genFullQuery(expr: Expression): String = { + var q = genQuery(expr, false) + + if(gPersistenceBehavior.addGraphVertexPrefix(preStatements)) { + q = s"g.V.$q" + } + + q = s"$q.toList()" + + q = (preStatements ++ Seq(q) ++ postStatements).mkString("", ";", "") + /* + * the L:{} represents a groovy code block; the label is needed + * to distinguish it from a groovy closure. + */ + s"L:{$q}" + } + + def translate(): GremlinQuery = { + var e1 = expr.transformUp(wrapAndRule) + + e1.traverseUp(validateComparisonForm) + + e1 = e1.transformUp(new AddAliasToSelectInput) + e1.traverseUp(validateSelectExprHaveOneSrc) + e1 = e1.transformUp(addAliasToLoopInput()) + e1 = e1.transformUp(instanceClauseToTop(e1)) + e1 = e1.transformUp(traitClauseWithInstanceForTop(e1)) + + e1 match { + case e1: SelectExpression => { + val rMap = buildResultMapping(e1) + GremlinQuery(e1, genFullQuery(e1), rMap) + } + case pe@PathExpression(se@SelectExpression(child, selectList)) => { + val rMap = buildResultMapping(se) + GremlinQuery(e1, genFullQuery(e1), rMap) + } + case e1 => GremlinQuery(e1, genFullQuery(e1), null) + } + + } + + /* + * Translation Issues: + * 1. back references in filters. For e.g. testBackreference: 'DB as db Table where (db.name = "Reporting")' + * this is translated to: + * g.V.has("typeName","DB").as("db").in("Table.db").and(_().back("db").has("name", T.eq, "Reporting")).map().toList() + * But the '_().back("db") within the and is ignored, the has condition is applied on the current element. + * The solution is to to do predicate pushdown and apply the filter immediately on top of the referred Expression. + */ + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/scala/org/apache/atlas/query/QueryParser.scala ---------------------------------------------------------------------- diff --git a/repository/src/main/scala/org/apache/atlas/query/QueryParser.scala b/repository/src/main/scala/org/apache/atlas/query/QueryParser.scala new file mode 100755 index 0000000..5b129fb --- /dev/null +++ b/repository/src/main/scala/org/apache/atlas/query/QueryParser.scala @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.query + +import org.apache.atlas.query.Expressions._ + +import scala.util.parsing.combinator.lexical.StdLexical +import scala.util.parsing.combinator.syntactical.StandardTokenParsers +import scala.util.parsing.combinator.{ImplicitConversions, PackratParsers} +import scala.util.parsing.input.CharArrayReader._ + +trait QueryKeywords { + this: StandardTokenParsers => + + import scala.language.implicitConversions + + protected case class Keyword(str: String) + + protected implicit def asParser(k: Keyword): Parser[String] = k.str + + protected val LPAREN = Keyword("(") + protected val RPAREN = Keyword(")") + protected val EQ = Keyword("=") + protected val LT = Keyword("<") + protected val GT = Keyword(">") + protected val NEQ = Keyword("!=") + protected val LTE = Keyword("<=") + protected val GTE = Keyword(">=") + protected val COMMA = Keyword(",") + protected val AND = Keyword("and") + protected val OR = Keyword("or") + protected val PLUS = Keyword("+") + protected val MINUS = Keyword("-") + protected val STAR = Keyword("*") + protected val DIV = Keyword("/") + protected val DOT = Keyword(".") + + protected val SELECT = Keyword("select") + protected val FROM = Keyword("from") + protected val WHERE = Keyword("where") + protected val GROUPBY = Keyword("groupby") + protected val LOOP = Keyword("loop") + protected val ISA = Keyword("isa") + protected val IS = Keyword("is") + protected val HAS = Keyword("has") + protected val AS = Keyword("as") + protected val TIMES = Keyword("times") + protected val WITHPATH = Keyword("withPath") +} + +trait ExpressionUtils { + + def loop(input: Expression, l: (Expression, Option[Literal[Integer]], Option[String])) = l match { + case (c, None, None) => input.loop(c) + case (c, t, None) => input.loop(c, t.get) + case (c, None, Some(a)) => input.loop(c).as(a) + case (c, t, Some(a)) => input.loop(c, t.get).as(a) + } + + def select(input: Expression, s: List[(Expression, Option[String])]) = { + val selList = s.map { t => + t._2 match { + case None => t._1 + case _ => t._1.as(t._2.get) + } + } + input.select(selList: _*) + } + + def leftmostId(e: Expression) = { + var le: IdExpression = null + e.traverseUp { case i: IdExpression if le == null => le = i} + le + } + + def notIdExpression = new PartialFunction[Expression, Expression] { + def isDefinedAt(x: Expression): Boolean = !x.isInstanceOf[IdExpression] + + def apply(e: Expression) = e + } + + def replaceIdWithField(id: IdExpression, fe: UnresolvedFieldExpression): PartialFunction[Expression, Expression] = { + case e: IdExpression if e == id => fe + } + + def merge(snglQuery1: Expression, sngQuery2: Expression): Expression = { + val leftSrcId = leftmostId(sngQuery2) + sngQuery2.transformUp(replaceIdWithField(leftSrcId, snglQuery1.field(leftSrcId.name))) + } +} + +class QueryParser extends StandardTokenParsers with QueryKeywords with ExpressionUtils with PackratParsers { + + import scala.language.higherKinds + + private val reservedWordsDelims: Seq[String] = this. + getClass.getMethods.filter(_.getReturnType == classOf[Keyword]).map(_.invoke(this).asInstanceOf[Keyword].str) + + private val (queryreservedWords: Seq[String], querydelims: Seq[String]) = + reservedWordsDelims.partition(s => s.charAt(0).isLetter) + + override val lexical = new QueryLexer(queryreservedWords, querydelims) + + def apply(input: String): Either[NoSuccess, Expression] = { + phrase(queryWithPath)(new lexical.Scanner(input)) match { + case Success(r, x) => Right(r) + case f@Failure(m, x) => Left(f) + case e@Error(m, x) => Left(e) + } + } + + def queryWithPath = query ~ opt(WITHPATH) ^^ { + case q ~ None => q + case q ~ p => q.path() + } + + def query: Parser[Expression] = rep1sep(singleQuery, opt(COMMA)) ^^ { l => l match { + case h :: Nil => h + case h :: t => t.foldLeft(h)(merge(_, _)) + } + } + + def singleQuery = singleQrySrc ~ opt(loopExpression) ~ opt(selectClause) ^^ { + case s ~ None ~ None => s + case s ~ l ~ None => loop(s, l.get) + case s ~ l ~ sel if l.isDefined => select(loop(s, l.get), sel.get) + case s ~ None ~ sel => select(s, sel.get) + } + + /** + * A SingleQuerySrc can have the following forms: + * 1. FROM id [WHERE] [expr] -> from optionally followed by a filter + * 2. WHERE expr -> where clause, FROM is assumed to be the leftmost Id in the where clause + * 3. expr (that is not an IdExpression) -> where clause, FROM is assumed to be the leftmost Id in the expr + * 4. Id [WHERE] [expr] -> from optionally followed by a filter + * + * @return + */ + def singleQrySrc: Parser[Expression] = FROM ~ fromSrc ~ opt(WHERE) ~ opt(expr ^? notIdExpression) ^^ { + case f ~ i ~ w ~ None => i + case f ~ i ~ w ~ c => i.where(c.get) + } | + WHERE ~ (expr ^? notIdExpression) ^^ { case w ~ e => { + val lId = leftmostId(e) + if (lId == null) { + failure("cannot infer Input from the where clause") + } + lId.where(e) + } + } | + expr ^? notIdExpression ^^ { case e => { + val lId = leftmostId(e) + if (lId == null) { + failure("cannot infer Input from the where clause") + } + lId.where(e) + } + } | + fromSrc ~ opt(WHERE) ~ opt(expr ^? notIdExpression) ^^ { + case i ~ w ~ None => i + case i ~ w ~ c => i.where(c.get) + } + + def fromSrc = identifier ~ AS ~ alias ^^ { case s ~ a ~ al => s.as(al)} | + identifier + + + def loopExpression: Parser[(Expression, Option[Literal[Integer]], Option[String])] = + LOOP ~ (LPAREN ~> query <~ RPAREN) ~ opt(intConstant <~ TIMES) ~ opt(AS ~> alias) ^^ { + case l ~ e ~ None ~ a => (e, None, a) + case l ~ e ~ Some(i) ~ a => (e, Some(int(i)), a) + } + + def selectClause: Parser[List[(Expression, Option[String])]] = SELECT ~ rep1sep(selectExpression, COMMA) ^^ { + case s ~ cs => cs + } + + def selectExpression: Parser[(Expression, Option[String])] = expr ~ opt(AS ~> alias) ^^ { + case e ~ a => (e, a) + } + + def expr: Parser[Expression] = compE ~ opt(rep(exprRight)) ^^ { + case l ~ None => l + case l ~ Some(r) => r.foldLeft(l) { (l, r) => l.logicalOp(r._1)(r._2)} + } + + def exprRight = (AND | OR) ~ compE ^^ { case op ~ c => (op, c)} + + def compE = + arithE ~ (LT | LTE | EQ | NEQ | GT | GTE) ~ arithE ^^ { case l ~ op ~ r => l.compareOp(op)(r)} | + arithE ~ (ISA | IS) ~ ident ^^ { case l ~ i ~ t => l.isTrait(t)} | + arithE ~ HAS ~ ident ^^ { case l ~ i ~ f => l.hasField(f)} | + arithE + + def arithE = multiE ~ opt(rep(arithERight)) ^^ { + case l ~ None => l + case l ~ Some(r) => r.foldLeft(l) { (l, r) => l.arith(r._1)(r._2)} + } + + def arithERight = (PLUS | MINUS) ~ multiE ^^ { case op ~ r => (op, r)} + + def multiE = atomE ~ opt(rep(multiERight)) ^^ { + case l ~ None => l + case l ~ Some(r) => r.foldLeft(l) { (l, r) => l.arith(r._1)(r._2)} + } + + def multiERight = (STAR | DIV) ~ atomE ^^ { case op ~ r => (op, r)} + + + def atomE = literal | identifier | LPAREN ~> expr <~ RPAREN + + def identifier = rep1sep(ident, DOT) ^^ { l => l match { + case h :: Nil => id(h) + case h :: t => { + t.foldLeft(id(h).asInstanceOf[Expression])(_.field(_)) + } + } + } + + def alias = ident | stringLit + + def literal = booleanConstant ^^ { + boolean(_) + } | + intConstant ^^ { + int(_) + } | + longConstant ^^ { + long(_) + } | + floatConstant ^^ { + float(_) + } | + doubleConstant ^^ { + double(_) + } | + stringLit ^^ { + string(_) + } + + def booleanConstant: Parser[String] = + elem("int", _.isInstanceOf[lexical.BooleanLiteral]) ^^ (_.chars) + + def intConstant: Parser[String] = + elem("int", _.isInstanceOf[lexical.IntLiteral]) ^^ (_.chars) + + def longConstant: Parser[String] = + elem("int", _.isInstanceOf[lexical.LongLiteral]) ^^ (_.chars) + + def floatConstant: Parser[String] = + elem("int", _.isInstanceOf[lexical.FloatLiteral]) ^^ (_.chars) + + def doubleConstant: Parser[String] = + elem("int", _.isInstanceOf[lexical.DoubleLiteral]) ^^ (_.chars) + +} + +class QueryLexer(val keywords: Seq[String], val delims: Seq[String]) extends StdLexical with ImplicitConversions { + + case class BooleanLiteral(chars: String) extends Token { + override def toString = chars + } + + case class IntLiteral(chars: String) extends Token { + override def toString = chars + } + + case class LongLiteral(chars: String) extends Token { + override def toString = chars + } + + case class FloatLiteral(chars: String) extends Token { + override def toString = chars + } + + case class DoubleLiteral(chars: String) extends Token { + override def toString = chars + } + + reserved ++= keywords.flatMap(w => allCaseVersions(w)) + + delimiters ++= delims + + override lazy val token: Parser[Token] = + ( + (trueP | falseP) + | longConstant ^^ LongLiteral + | intConstant ^^ IntLiteral + | floatConstant ^^ FloatLiteral + | dubConstant ^^ DoubleLiteral + | identifier ^^ processIdent + | string ^^ StringLit + | EofCh ^^^ EOF + | '\'' ~> failure("unclosed string literal") + | '"' ~> failure("unclosed string literal") + | delim + | '.' ^^^ new Keyword(".") + | failure("illegal character") + ) + + override def identChar = letter | elem('_') + + def identifier = identChar ~ (identChar | digit).* ^^ { case first ~ rest => (first :: rest).mkString} | + '`' ~> chrExcept('`', '\n', EofCh).* <~ '`' ^^ { + _ mkString "" + } + + override def whitespace: Parser[Any] = + (whitespaceChar + | '/' ~ '*' ~ comment + | '/' ~ '/' ~ chrExcept(EofCh, '\n').* + | '#' ~ chrExcept(EofCh, '\n').* + | '/' ~ '*' ~ failure("unclosed comment") + ).* + + protected override def comment: Parser[Any] = ( + commentChar.* ~ '*' ~ '/' + ) + + protected def commentChar = chrExcept(EofCh, '*') | '*' ~ not('/') + + def string = '\"' ~> chrExcept('\"', '\n', EofCh).* <~ '\"' ^^ { + _ mkString "" + } | + '\'' ~> chrExcept('\'', '\n', EofCh).* <~ '\'' ^^ { + _ mkString "" + } + + def zero: Parser[String] = '0' ^^^ "0" + + def nonzero = elem("nonzero digit", d => d.isDigit && d != '0') + + def sign = elem("sign character", d => d == '-' || d == '+') + + def exponent = elem("exponent character", d => d == 'e' || d == 'E') + + + def intConstant = opt(sign) ~> zero | intList + + def intList = opt(sign) ~ nonzero ~ rep(digit) ^^ { case s ~ x ~ y => (optString("", s) :: x :: y) mkString ""} + + def fracPart: Parser[String] = '.' ~> rep(digit) ^^ { r => + "." + (r mkString "") + } + + def expPart = exponent ~ opt(sign) ~ rep1(digit) ^^ { case e ~ s ~ d => + e.toString + optString("", s) + d.mkString("") + } + + def dubConstant = opt(sign) ~ digit.+ ~ fracPart ~ opt(expPart) ^^ { + case s ~ i ~ f ~ e => { + optString("", s) + (i mkString "") + f + optString("", e) + } + } + + def floatConstant = opt(sign) ~ digit.* ~ fracPart ~ 'f' ^^ { case s ~ i ~ fr ~ f => + optString("", s) + i + fr + } | opt(sign) ~ digit.+ ~ opt(fracPart) ~ 'f' ^^ { case s ~ i ~ fr ~ f => + optString("", s) + i + optString("", fr) + } + + def longConstant = intConstant ~ 'l' ^^ { case i ~ l => i} + + def trueP = 't' ~ 'r' ~ 'u' ~ 'e' ^^^ BooleanLiteral("true") + + def falseP = 'f' ~ 'a' ~ 'l' ~ 's' ~ 'e' ^^^ BooleanLiteral("false") + + private def optString[A](pre: String, a: Option[A]) = a match { + case Some(x) => pre + x.toString + case None => "" + } + + /** Generate all variations of upper and lower case of a given string */ + def allCaseVersions(s: String, prefix: String = ""): Stream[String] = { + if (s.isEmpty) { + Stream(prefix) + } else { + allCaseVersions(s.tail, prefix + s.head.toLower) #::: + allCaseVersions(s.tail, prefix + s.head.toUpper) + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/scala/org/apache/atlas/query/QueryProcessor.scala ---------------------------------------------------------------------- diff --git a/repository/src/main/scala/org/apache/atlas/query/QueryProcessor.scala b/repository/src/main/scala/org/apache/atlas/query/QueryProcessor.scala new file mode 100755 index 0000000..0d2a908 --- /dev/null +++ b/repository/src/main/scala/org/apache/atlas/query/QueryProcessor.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.query + +import com.thinkaurelius.titan.core.TitanGraph +import org.apache.atlas.query.Expressions._ +import org.slf4j.{Logger, LoggerFactory} + +object QueryProcessor { + val LOG : Logger = LoggerFactory.getLogger("org.apache.atlas.query.QueryProcessor") + + def evaluate(e: Expression, g: TitanGraph, gP : GraphPersistenceStrategies = GraphPersistenceStrategy1): + GremlinQueryResult = { + val e1 = validate(e) + val q = new GremlinTranslator(e1, gP).translate() + LOG.debug("Query: " + e1) + LOG.debug("Expression Tree:\n" + e1.treeString) + LOG.debug("Gremlin Query: " + q.queryStr) + new GremlinEvaluator(q, gP, g).evaluate() + } + + def validate(e: Expression): Expression = { + val e1 = e.transformUp(new Resolver()) + + e1.traverseUp { + case x: Expression if !x.resolved => + throw new ExpressionException(x, s"Failed to resolved expression $x") + } + + /* + * trigger computation of dataType of expression tree + */ + e1.dataType + + /* + * ensure fieldReferences match the input expression's dataType + */ + val e2 = e1.transformUp(FieldValidator) + val e3 = e2.transformUp(new Resolver()) + + e3.dataType + + e3 + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/scala/org/apache/atlas/query/Resolver.scala ---------------------------------------------------------------------- diff --git a/repository/src/main/scala/org/apache/atlas/query/Resolver.scala b/repository/src/main/scala/org/apache/atlas/query/Resolver.scala new file mode 100755 index 0000000..142ba8d --- /dev/null +++ b/repository/src/main/scala/org/apache/atlas/query/Resolver.scala @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.query + +import org.apache.atlas.query.Expressions._ +import org.apache.atlas.typesystem.types.IDataType + +class Resolver(srcExpr: Option[Expression] = None, aliases: Map[String, Expression] = Map(), + connectClassExprToSrc: Boolean = false) + extends PartialFunction[Expression, Expression] { + + import org.apache.atlas.query.TypeUtils._ + + def isDefinedAt(x: Expression) = true + + def apply(e: Expression): Expression = e match { + case idE@IdExpression(name) => { + val backExpr = aliases.get(name) + if (backExpr.isDefined) { + return new BackReference(name, backExpr.get, None) + } + if (srcExpr.isDefined) { + val fInfo = resolveReference(srcExpr.get.dataType, name) + if (fInfo.isDefined) { + return new FieldExpression(name, fInfo.get, None) + } + } + val cType = resolveAsClassType(name) + if (cType.isDefined) { + return new ClassExpression(name) + } + val tType = resolveAsTraitType(name) + if (tType.isDefined) { + return new TraitExpression(name) + } + idE + } + case ce@ClassExpression(clsName) if connectClassExprToSrc && srcExpr.isDefined => { + val fInfo = resolveReference(srcExpr.get.dataType, clsName) + if (fInfo.isDefined) { + return new FieldExpression(clsName, fInfo.get, None) + } + ce + } + case f@UnresolvedFieldExpression(child, fieldName) if child.resolved => { + var fInfo: Option[FieldInfo] = None + + fInfo = resolveReference(child.dataType, fieldName) + if (fInfo.isDefined) { + return new FieldExpression(fieldName, fInfo.get, Some(child)) + } + val tType = resolveAsTraitType(fieldName) + if (tType.isDefined) { + return new FieldExpression(fieldName, FieldInfo(child.dataType, null, null, fieldName), Some(child)) + } + f + } + case isTraitLeafExpression(traitName, classExpression) + if srcExpr.isDefined && !classExpression.isDefined => + isTraitLeafExpression(traitName, srcExpr) + case hasFieldLeafExpression(traitName, classExpression) + if srcExpr.isDefined && !classExpression.isDefined => + hasFieldLeafExpression(traitName, srcExpr) + case f@FilterExpression(inputExpr, condExpr) if inputExpr.resolved => { + val r = new Resolver(Some(inputExpr), inputExpr.namedExpressions) + return new FilterExpression(inputExpr, condExpr.transformUp(r)) + } + case SelectExpression(child, selectList) if child.resolved => { + val r = new Resolver(Some(child), child.namedExpressions) + return new SelectExpression(child, selectList.map { + _.transformUp(r) + }) + } + case l@LoopExpression(inputExpr, loopExpr, t) if inputExpr.resolved => { + val r = new Resolver(Some(inputExpr), inputExpr.namedExpressions, true) + return new LoopExpression(inputExpr, loopExpr.transformUp(r), t) + } + case x => x + } +} + +/** + * - any FieldReferences that explicitly reference the input, can be converted to implicit references + * - any FieldReferences that explicitly reference a + */ +object FieldValidator extends PartialFunction[Expression, Expression] { + + def isDefinedAt(x: Expression) = true + + def isSrc(e: Expression) = e.isInstanceOf[ClassExpression] || e.isInstanceOf[TraitExpression] + + def validateQualifiedField(srcDataType: IDataType[_]): PartialFunction[Expression, Expression] = { + case FieldExpression(fNm, fInfo, Some(child)) + if (child.children == Nil && !child.isInstanceOf[BackReference] && child.dataType == srcDataType) => + FieldExpression(fNm, fInfo, None) + case fe@FieldExpression(fNm, fInfo, Some(child)) if isSrc(child) => + throw new ExpressionException(fe, s"srcType of field doesn't match input type") + case hasFieldUnaryExpression(fNm, child) if child.dataType == srcDataType => + hasFieldLeafExpression(fNm) + case hF@hasFieldUnaryExpression(fNm, child) if isSrc(child) => + throw new ExpressionException(hF, s"srcType of field doesn't match input type") + case isTraitUnaryExpression(fNm, child) if child.dataType == srcDataType => + isTraitLeafExpression(fNm) + case iT@isTraitUnaryExpression(fNm, child) if isSrc(child) => + throw new ExpressionException(iT, s"srcType of field doesn't match input type") + } + + def validateOnlyFieldReferencesInLoopExpressions(loopExpr: LoopExpression) + : PartialFunction[Expression, Unit] = { + case f: FieldExpression => () + case x => throw new ExpressionException(loopExpr, + s"Loop Expression can only contain field references; '${x.toString}' not supported.") + } + + def apply(e: Expression): Expression = e match { + case f@FilterExpression(inputExpr, condExpr) => { + val validatedCE = condExpr.transformUp(validateQualifiedField(inputExpr.dataType)) + if (validatedCE.fastEquals(condExpr)) { + f + } else { + new FilterExpression(inputExpr, validatedCE) + } + } + case SelectExpression(child, selectList) if child.resolved => { + val v = validateQualifiedField(child.dataType) + return new SelectExpression(child, selectList.map { + _.transformUp(v) + }) + } + case l@LoopExpression(inputExpr, loopExpr, t) => { + val validatedLE = loopExpr.transformUp(validateQualifiedField(inputExpr.dataType)) + val l1 = { + if (validatedLE.fastEquals(loopExpr)) l + else new LoopExpression(inputExpr, validatedLE, t) + } + l1.loopingExpression.traverseUp(validateOnlyFieldReferencesInLoopExpressions(l1)) + l1 + } + case x => x + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/scala/org/apache/atlas/query/TypeUtils.scala ---------------------------------------------------------------------- diff --git a/repository/src/main/scala/org/apache/atlas/query/TypeUtils.scala b/repository/src/main/scala/org/apache/atlas/query/TypeUtils.scala new file mode 100755 index 0000000..4212a34 --- /dev/null +++ b/repository/src/main/scala/org/apache/atlas/query/TypeUtils.scala @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.query + +import java.util +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.atlas.MetadataException +import org.apache.atlas.query.Expressions.{PathExpression, SelectExpression} +import org.apache.atlas.typesystem.types.DataTypes.{ArrayType, PrimitiveType, TypeCategory} +import org.apache.atlas.typesystem.types._ + +object TypeUtils { + val typSystem = TypeSystem.getInstance() + + def numericTypes : Seq[PrimitiveType[_]] = Seq(DataTypes.BYTE_TYPE, + DataTypes.SHORT_TYPE, + DataTypes.INT_TYPE, + DataTypes.FLOAT_TYPE, + DataTypes.LONG_TYPE, + DataTypes.DOUBLE_TYPE, + DataTypes.BIGINTEGER_TYPE, + DataTypes.BIGDECIMAL_TYPE) + + def combinedType(typ1 : IDataType[_], typ2 : IDataType[_]) : PrimitiveType[_] = { + val typ1Idx = if (numericTypes.contains(typ1)) Some(numericTypes.indexOf(typ1)) else None + val typ2Idx = if (numericTypes.contains(typ2)) Some(numericTypes.indexOf(typ2)) else None + + if ( typ1Idx.isDefined && typ2Idx.isDefined ) { + val rIdx = math.max(typ1Idx.get, typ2Idx.get) + + if ( (typ1 == DataTypes.FLOAT_TYPE && typ2 == DataTypes.LONG_TYPE) || + (typ1 == DataTypes.LONG_TYPE && typ2 == DataTypes.FLOAT_TYPE) ) { + return DataTypes.DOUBLE_TYPE + } + return numericTypes(rIdx) + } + + throw new MetadataException(s"Cannot combine types: ${typ1.getName} and ${typ2.getName}") + } + + var tempStructCounter : AtomicInteger = new AtomicInteger(0) + val TEMP_STRUCT_NAME_PREFIX = "__tempQueryResultStruct" + def createStructType(selectExprs : List[Expressions.AliasExpression]) : StructType = { + val aDefs = new Array[AttributeDefinition](selectExprs.size) + selectExprs.zipWithIndex.foreach { t => + val (e,i) = t + aDefs(i) = new AttributeDefinition(e.alias,e.dataType.getName, Multiplicity.OPTIONAL, false, null) + } + return typSystem.defineQueryResultType(s"${TEMP_STRUCT_NAME_PREFIX}${tempStructCounter.getAndIncrement}", + null, + aDefs:_*); + } + + object ResultWithPathStruct { + val pathAttrName = "path" + val resultAttrName = "result" + val pathAttrType = DataTypes.arrayTypeName(typSystem.getIdType.getStructType) + + val pathAttr = new AttributeDefinition(pathAttrName, pathAttrType, Multiplicity.COLLECTION, false, null) + + def createType(pE : PathExpression, resultType : IDataType[_]) : StructType = { + val resultAttr = new AttributeDefinition(resultAttrName, resultType.getName, Multiplicity.REQUIRED, false, null) + val typName = s"${TEMP_STRUCT_NAME_PREFIX}${tempStructCounter.getAndIncrement}" + val m : java.util.HashMap[String, IDataType[_]] = new util.HashMap[String, IDataType[_]]() + if ( pE.child.isInstanceOf[SelectExpression]) { + m.put(pE.child.dataType.getName, pE.child.dataType) + } + typSystem.defineQueryResultType(typName, m, pathAttr, resultAttr); + } + } + + /** + * Structure representing the Closure Graph. + * Returns: + * 1. A map of vertexId -> vertex Info(these are the attributes requested in the query) + * 2. A edges map: each entry is a mapping from an vertexId to the List of adjacent vertexIds. + * + * '''The Vertex Map doesn't contain all the vertices in the Graph. Only the ones for which Attributes are + * available.''' These are the vertices that represent the EntityType whose Closure was requested. For e.g. for + * Table Lineage the ''vertex map'' will contain information about Tables, but not about ''Load Process'' vertices + * that connect Tables. + */ + object GraphResultStruct { + val SRC_PREFIX = "src" + val DEST_PREFIX = "dest" + + val verticesAttrName = "vertices" + val edgesAttrName = "edges" + val vertexIdAttrName = "vertexId" + + lazy val edgesAttrType = typSystem.defineMapType(DataTypes.STRING_TYPE, + typSystem.defineArrayType(DataTypes.STRING_TYPE)) + + def createType(resultWithPathType: StructType): StructType = { + val resultType = resultWithPathType.fieldMapping().fields.get(ResultWithPathStruct.resultAttrName).dataType() + + val verticesAttrType = typSystem.defineMapType(DataTypes.STRING_TYPE, + vertexType(resultType.asInstanceOf[StructType])) + val typName = s"${TEMP_STRUCT_NAME_PREFIX}${tempStructCounter.getAndIncrement}" + val verticesAttr = new AttributeDefinition(verticesAttrName, verticesAttrType.getName, + Multiplicity.REQUIRED, false, null) + val edgesAttr = new AttributeDefinition(edgesAttrName, edgesAttrType.getName, Multiplicity.REQUIRED, false, null) + + val m: java.util.HashMap[String, IDataType[_]] = new util.HashMap[String, IDataType[_]]() + m.put(resultWithPathType.getName, resultWithPathType) + m.put(resultType.getName, resultType) + m.put(edgesAttrType.getName, edgesAttrType) + m.put(verticesAttrType.getName, verticesAttrType) + typSystem.defineQueryResultType(typName, m, verticesAttr, edgesAttr) + } + + private def vertexType(resultType: StructType): StructType = { + + import scala.collection.JavaConverters._ + + var attrs: List[AttributeDefinition] = + resultType.fieldMapping.fields.asScala.filter(_._1.startsWith(s"${SRC_PREFIX}_")).mapValues { aInfo => + + new AttributeDefinition(aInfo.name.substring(s"${SRC_PREFIX}_".length), aInfo.dataType.getName, + aInfo.multiplicity, aInfo.isComposite, aInfo.reverseAttributeName) + }.values.toList + + attrs = new AttributeDefinition(vertexIdAttrName, typSystem.getIdType.getStructType.name, + Multiplicity.REQUIRED, false, null) :: attrs + + return typSystem.defineQueryResultType(s"${TEMP_STRUCT_NAME_PREFIX}${tempStructCounter.getAndIncrement}", + null, + attrs: _*) + } + } + + def fieldMapping(iDataType: IDataType[_]) : Option[FieldMapping] = iDataType match { + case c : ClassType => Some(c.fieldMapping()) + case t : TraitType => Some(t.fieldMapping()) + case s : StructType => Some(s.fieldMapping()) + case _ => None + } + + def hasFields(iDataType: IDataType[_]) : Boolean = { + fieldMapping(iDataType).isDefined + } + + import scala.language.existentials + case class FieldInfo(dataType : IDataType[_], + attrInfo : AttributeInfo, + reverseDataType : IDataType[_] = null, + traitName : String = null) { + def isReverse = reverseDataType != null + override def toString : String = { + if ( traitName != null ) { + s"""FieldInfo("${dataType.getName}", "$traitName")""" + } + else if ( reverseDataType == null ) { + s"""FieldInfo("${dataType.getName}", "${attrInfo.name}")""" + } else { + s"""FieldInfo("${dataType.getName}", "${attrInfo.name}", "${reverseDataType.getName}")""" + } + } + } + + val FIELD_QUALIFIER = "(.*?)(->.*)?".r + + /** + * Given a ComposedType `t` and a name resolve using the following rules: + * - if `id` is a field in `t` resolve to the field + * - if `id` is the name of a Struct|Class|Trait Type and it has a field that is of type `t` then return that type + * + * For e.g. + * 1. if we have types Table(name : String, cols : List[Column]), Column(name : String) then + * `resolveReference(Table, "cols")` resolves to type Column. So a query can be "Table.cols" + * 2. But if we have Table(name : String), Column(name : String, tbl : Table) then "Table.Column" will resolve + * to type Column + * + * This way the language will support navigation even if the relationship is one-sided. + * + * @param typ + * @param id + * @return + */ + def resolveReference(typ : IDataType[_], id : String) : Option[FieldInfo] = { + + val fMap = fieldMapping(typ) + if ( fMap.isDefined) { + + if (fMap.get.fields.containsKey(id)) { + return Some(FieldInfo(typ,fMap.get.fields.get(id))) + } + + try { + val FIELD_QUALIFIER(clsNm, rest) = id + val idTyp = typSystem.getDataType(classOf[IDataType[_]], clsNm) + val idTypFMap = fieldMapping(idTyp) + + if (rest != null ) { + val attrNm = rest.substring(2) + + if (idTypFMap.get.fields.containsKey(attrNm)) { + return Some(FieldInfo(typ,idTypFMap.get.fields.get(attrNm), idTyp)) + } + } + + if (idTypFMap.isDefined) { + import scala.collection.JavaConversions._ + val fields: Seq[AttributeInfo] = idTypFMap.get.fields.values().filter { aInfo => + aInfo.dataType() == typ || + ( aInfo.dataType().getTypeCategory == TypeCategory.ARRAY && + aInfo.dataType().asInstanceOf[ArrayType].getElemType == typ + ) + }.toSeq + if (fields.size == 1) { + return Some(FieldInfo(typ, fields(0), idTyp)) + } + /* + * is there only 1 array field of this type? + * If yes resolve to it. + * @todo: allow user to specify the relationship to follow by further qualifying the type. for e.g. + * field("LoadProcess.inputTables") + */ + val aFields = fields.filter { aInfo => aInfo.dataType().getTypeCategory == TypeCategory.ARRAY} + if (aFields.size == 1) { + return Some(FieldInfo(typ, aFields(0), idTyp)) + } + } + } catch { + case _ : MetadataException => None + } + } + None + } + + def resolveAsClassType(id : String) : Option[ClassType] = { + try { + Some(typSystem.getDataType(classOf[ClassType], id)) + } catch { + case _ : MetadataException => None + } + } + + def resolveAsTraitType(id : String) : Option[TraitType] = { + try { + Some(typSystem.getDataType(classOf[TraitType], id)) + } catch { + case _ : MetadataException => None + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/scala/org/apache/hadoop/metadata/query/ClosureQuery.scala ---------------------------------------------------------------------- diff --git a/repository/src/main/scala/org/apache/hadoop/metadata/query/ClosureQuery.scala b/repository/src/main/scala/org/apache/hadoop/metadata/query/ClosureQuery.scala deleted file mode 100755 index 7ebbf64..0000000 --- a/repository/src/main/scala/org/apache/hadoop/metadata/query/ClosureQuery.scala +++ /dev/null @@ -1,330 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.metadata.query - -import java.util - -import Expressions._ -import com.thinkaurelius.titan.core.TitanGraph -import org.apache.hadoop.metadata.MetadataException -import org.apache.hadoop.metadata.typesystem.ITypedStruct -import org.apache.hadoop.metadata.typesystem.json.{InstanceSerialization, Serialization} -import org.apache.hadoop.metadata.typesystem.persistence.{Id, StructInstance} -import org.apache.hadoop.metadata.typesystem.types.{TypeSystem, StructType, DataTypes} -import org.apache.hadoop.metadata.typesystem.types.DataTypes.{MapType, PrimitiveType} - -/** - * Represents a Query to compute the closure based on a relationship between entities of a particular type. - * For e.g. Database Tables are related to each other to capture the '''Lineage''' of data in a Table based - * on other Tables. - * - * A Closure Query is specified by the following information: - * - The Type whose instances are in a closure relationship. For e.g. 'Table' - * - The Closure relation. This is specified as an ''Attribute path''. For e.g. if we have the following model: - * {{{ - * class Table { - * name : String, - * ... - * } - * - * class LoadTableProcess { - * name : String, - * inputTables : List[Table], - * outputTable : Table, - * ... - * } - * }}} - * ''LoadTable'' instance captures the relationship between the data in an output Table and a set of input Tables. - * In order to compute the '''Lineage''' of a Table, the ''Attribute path'' that relates 2 Tables is - * '''[(LoadTableProcess,outputTable), inputTables]'''. This list is saying that for any Table I want to connect to other - * tables via the LoadProcess.outputTable attribute, and then via the inputTables attribute. So each entry in the - * Attribute Path represents an attribute in an object. For reverse relations the Type and attribute must be specified, - * as in 'LoadTableProcess,outputTable)', whereas for forward relations the attribute name is sufficient. - * - The depth of the traversal. Certain times you are not interested in the complete closure, but to only - * discover related instances up to a certain depth. Specify the depth as number of hops, or you can ask for the - * complete closure. - * - You can ask for certain attributes to be returned. For e.g. you may only want the Table name, owner and - * creationDate. By default only the Ids of the related instances is returned. - * - For pair of related instances, you optionally ask for the Path of the relation to be returned. This is - * returned as a list of ''Id''s. - * - * Given these 5 things the ClosureQuery can be executed, it returns a GremlinQueryResult of the Closure Query. - */ -trait ClosureQuery { - - val SRC_PREFIX = TypeUtils.GraphResultStruct.SRC_PREFIX - val DEST_PREFIX = TypeUtils.GraphResultStruct.DEST_PREFIX - - sealed trait PathAttribute { - - def toExpr : Expression = this match { - case r : Relation => id(r.attributeName) - case rr : ReverseRelation => id(s"${rr.typeName}->${rr.attributeName}") - } - - def toFieldName : String = this match { - case r : Relation => r.attributeName - case rr : ReverseRelation => rr.typeName - } - } - case class ReverseRelation(typeName : String, attributeName : String) extends PathAttribute - case class Relation(attributeName : String) extends PathAttribute - - /** - * Type on whose instances the closure needs to be computed - * @return - */ - def closureType : String - - /** - * specify how instances are related. - */ - def closureRelation : List[PathAttribute] - - /** - * The maximum hops between related instances. A [[None]] implies there is maximum. - * @return - */ - def depth : Option[Int] - - /** - * The attributes to return for the instances. These will be prefixed by 'src_' and 'dest_' in the - * output rows. - * @return - */ - def selectAttributes : Option[List[String]] - - /** - * specify if the Path should be returned. - * @return - */ - def withPath : Boolean - - def persistenceStrategy: GraphPersistenceStrategies - def g: TitanGraph - - def pathExpr : Expressions.Expression = { - closureRelation.tail.foldLeft(closureRelation.head.toExpr)((b,a) => b.field(a.toFieldName)) - } - - def selectExpr(alias : String) : List[Expression] = { - selectAttributes.map { _.map { a => - id(alias).field(a).as(s"${alias}_$a") - } - }.getOrElse(List(id(alias))) - } - - /** - * hook to allow a filter to be added for the closureType - * @param expr - * @return - */ - def srcCondition(expr : Expression) : Expression = expr - - def expr : Expressions.Expression = { - val e = srcCondition(Expressions._class(closureType)).as(SRC_PREFIX).loop(pathExpr).as(DEST_PREFIX). - select((selectExpr(SRC_PREFIX) ++ selectExpr(DEST_PREFIX)):_*) - if (withPath) e.path else e - } - - def evaluate(): GremlinQueryResult = { - var e = expr - QueryProcessor.evaluate(e, g, persistenceStrategy) - } - - def graph : GraphResult = { - - if (!withPath) { - throw new ExpressionException(expr, "Graph requested for non Path Query") - } - - import scala.collection.JavaConverters._ - - val res = evaluate() - - val graphResType = TypeUtils.GraphResultStruct.createType(res.resultDataType.asInstanceOf[StructType]) - val vertexPayloadType = { - val mT = graphResType.fieldMapping.fields.get(TypeUtils.GraphResultStruct.verticesAttrName). - dataType().asInstanceOf[MapType] - mT.getValueType.asInstanceOf[StructType] - } - - def id(idObj : StructInstance) : String = idObj.getString(TypeSystem.ID_STRUCT_ID_ATTRNAME) - - def vertexStruct(idObj : StructInstance, resRow : ITypedStruct, attrPrefix : String) : StructInstance = { - val vP = vertexPayloadType.createInstance() - vP.set(TypeUtils.GraphResultStruct.vertexIdAttrName, idObj) - vertexPayloadType.fieldMapping.fields.asScala.keys. - filter(_ != TypeUtils.GraphResultStruct.vertexIdAttrName).foreach{a => - vP.set(a, resRow.get(s"${attrPrefix}$a")) - } - vP.asInstanceOf[StructInstance] - } - - val instance = graphResType.createInstance() - val vertices = new util.HashMap[String, AnyRef]() - val edges = new util.HashMap[String,java.util.List[String]]() - - /** - * foreach resultRow - * for each Path entry - * add an entry in the edges Map - * add an entry for the Src Vertex to the vertex Map - * add an entry for the Dest Vertex to the vertex Map - */ - res.rows.map(_.asInstanceOf[StructInstance]).foreach { r => - - val path = r.get(TypeUtils.ResultWithPathStruct.pathAttrName).asInstanceOf[java.util.List[_]].asScala - val srcVertex = path.head.asInstanceOf[StructInstance] - - var currVertex = srcVertex - path.tail.foreach { n => - val nextVertex = n.asInstanceOf[StructInstance] - val iList = if (!edges.containsKey(id(currVertex))) { - val l = new util.ArrayList[String]() - edges.put(id(currVertex), l) - l - } else { - edges.get(id(currVertex)) - } - if ( !iList.contains(id(nextVertex))) { - iList.add(id(nextVertex)) - } - currVertex = nextVertex - } - val vertex = r.get(TypeUtils.ResultWithPathStruct.resultAttrName) - vertices.put(id(srcVertex), vertexStruct(srcVertex, - r.get(TypeUtils.ResultWithPathStruct.resultAttrName).asInstanceOf[ITypedStruct], - s"${SRC_PREFIX}_")) - vertices.put(id(currVertex), vertexStruct(currVertex, - r.get(TypeUtils.ResultWithPathStruct.resultAttrName).asInstanceOf[ITypedStruct], - s"${DEST_PREFIX}_")) - } - - instance.set(TypeUtils.GraphResultStruct.verticesAttrName, vertices) - instance.set(TypeUtils.GraphResultStruct.edgesAttrName, edges) - GraphResult(res.query, instance) - } -} - -/** - * Closure for a single instance. Instance is specified by an ''attributeToSelectInstance'' and the value - * for the attribute. - * - * @tparam T - */ -trait SingleInstanceClosureQuery[T] extends ClosureQuery { - - def attributeToSelectInstance : String - - def attributeTyp : PrimitiveType[T] - def instanceValue : T - - override def srcCondition(expr : Expression) : Expression = { - expr.where( - Expressions.id(attributeToSelectInstance).`=`(Expressions.literal(attributeTyp, instanceValue)) - ) - } -} - -/** - * A ClosureQuery to compute '''Lineage''' for Hive tables. Assumes the Lineage relation is captured in a ''CTAS'' - * type, and the table relations are captured as attributes from a CTAS instance to Table instances. - * - * @param tableTypeName The name of the Table Type. - * @param ctasTypeName The name of the Create Table As Select(CTAS) Type. - * @param ctasInputTableAttribute The attribute in CTAS Type that associates it to the ''Input'' tables. - * @param ctasOutputTableAttribute The attribute in CTAS Type that associates it to the ''Output'' tables. - * @param depth depth as needed by the closure Query. - * @param selectAttributes as needed by the closure Query. - * @param withPath as needed by the closure Query. - * @param persistenceStrategy as needed to evaluate the Closure Query. - * @param g as needed to evaluate the Closure Query. - */ -case class HiveLineageQuery(tableTypeName : String, - tableName : String, - ctasTypeName : String, - ctasInputTableAttribute : String, - ctasOutputTableAttribute : String, - depth : Option[Int], - selectAttributes : Option[List[String]], - withPath : Boolean, - persistenceStrategy: GraphPersistenceStrategies, - g: TitanGraph - ) extends SingleInstanceClosureQuery[String] { - - val closureType : String = tableTypeName - - val attributeToSelectInstance = "name" - val attributeTyp = DataTypes.STRING_TYPE - - val instanceValue = tableName - - lazy val closureRelation = List( - ReverseRelation(ctasTypeName, ctasOutputTableAttribute), - Relation(ctasInputTableAttribute) - ) -} - -/** - * A ClosureQuery to compute where a table is used based on the '''Lineage''' for Hive tables. - * Assumes the Lineage relation is captured in a ''CTAS'' - * type, and the table relations are captured as attributes from a CTAS instance to Table instances. - * - * @param tableTypeName The name of the Table Type. - * @param ctasTypeName The name of the Create Table As Select(CTAS) Type. - * @param ctasInputTableAttribute The attribute in CTAS Type that associates it to the ''Input'' tables. - * @param ctasOutputTableAttribute The attribute in CTAS Type that associates it to the ''Output'' tables. - * @param depth depth as needed by the closure Query. - * @param selectAttributes as needed by the closure Query. - * @param withPath as needed by the closure Query. - * @param persistenceStrategy as needed to evaluate the Closure Query. - * @param g as needed to evaluate the Closure Query. - */ -case class HiveWhereUsedQuery(tableTypeName : String, - tableName : String, - ctasTypeName : String, - ctasInputTableAttribute : String, - ctasOutputTableAttribute : String, - depth : Option[Int], - selectAttributes : Option[List[String]], - withPath : Boolean, - persistenceStrategy: GraphPersistenceStrategies, - g: TitanGraph - ) extends SingleInstanceClosureQuery[String] { - - val closureType : String = tableTypeName - - val attributeToSelectInstance = "name" - val attributeTyp = DataTypes.STRING_TYPE - - val instanceValue = tableName - - lazy val closureRelation = List( - ReverseRelation(ctasTypeName, ctasInputTableAttribute), - Relation(ctasOutputTableAttribute) - ) -} - -case class GraphResult(query: String, result : ITypedStruct) { - - def toTypedJson = Serialization.toJson(result) - - def toInstanceJson = InstanceSerialization.toJson(result) -} \ No newline at end of file