flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [1/2] flink git commit: [FLINK-3597][tableAPI] Set relational expressions as DataSet operator names.
Date Mon, 14 Mar 2016 13:01:31 GMT
Repository: flink
Updated Branches:
  refs/heads/tableOnCalcite 3f8cea74a -> 729bcb2ca


[FLINK-3597][tableAPI] Set relational expressions as DataSet operator names.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/239fc2a2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/239fc2a2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/239fc2a2

Branch: refs/heads/tableOnCalcite
Commit: 239fc2a28fae7b07514582c8032649065f742180
Parents: 3f8cea7
Author: Fabian Hueske <fhueske@apache.org>
Authored: Fri Mar 11 11:56:41 2016 +0100
Committer: vasia <vasia@apache.org>
Committed: Mon Mar 14 13:07:29 2016 +0100

----------------------------------------------------------------------
 .../plan/nodes/dataset/DataSetAggregate.scala   | 30 ++++++++++++++-
 .../table/plan/nodes/dataset/DataSetCalc.scala  | 40 +++++++++++++++++++-
 .../table/plan/nodes/dataset/DataSetJoin.scala  | 17 ++++++++-
 .../table/plan/nodes/dataset/DataSetRel.scala   | 25 ++++++++++++
 .../plan/nodes/dataset/DataSetSource.scala      |  5 ++-
 5 files changed, 111 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/239fc2a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
index d3416ee..6bf7309 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
@@ -92,14 +92,21 @@ class DataSetAggregate(
     .toArray
 
     val rowTypeInfo = new RowTypeInfo(fieldTypes, rowType.getFieldNames.asScala)
-    val mappedInput = inputDS.map(aggregateResult._1)
+    val aggString = aggregationToString
+    val rowTypeInfo = new RowTypeInfo(fieldTypes)
+    val mappedInput = inputDS.map(aggregateResult._1).name(s"prepare $aggString")
     val groupReduceFunction = aggregateResult._2
 
     if (groupingKeys.length > 0) {
+
+      val inFields = inputType.getFieldNames.asScala.toList
+      val groupByString = s"groupBy: (${grouping.map( inFields(_) ).mkString(", ")})"
+
       mappedInput.asInstanceOf[DataSet[Row]]
         .groupBy(groupingKeys: _*)
         .reduceGroup(groupReduceFunction)
         .returns(rowTypeInfo)
+          .name(groupByString + ", " + aggString)
         .asInstanceOf[DataSet[Any]]
     }
     else {
@@ -110,4 +117,25 @@ class DataSetAggregate(
         .asInstanceOf[DataSet[Any]]
     }
   }
+
+  private def aggregationToString: String = {
+
+    val inFields = inputType.getFieldNames.asScala.toList
+    val outFields = rowType.getFieldNames.asScala.toList
+    val aggs = namedAggregates.map(_.getKey)
+
+    val groupFieldsString = grouping.map( inFields(_) )
+    val aggsString = aggs.map( a => s"${a.getAggregation}(${inFields(a.getArgList.get(0))})")
+
+    val outFieldsString = (groupFieldsString ++ aggsString).zip(outFields).map {
+      case (f, o) => if (f == o) {
+        f
+      } else {
+        s"$f AS $o"
+      }
+    }
+
+    s"select: (${outFieldsString.mkString(", ")})"
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/239fc2a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
index d7c71cc..8cb901c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
@@ -28,8 +28,10 @@ import org.apache.flink.api.table.codegen.CodeGenerator
 import org.apache.flink.api.table.plan.TypeConverter._
 import org.apache.flink.api.table.runtime.FlatMapRunner
 import org.apache.flink.api.table.TableConfig
-import org.apache.calcite.rex.RexProgram
+import org.apache.calcite.rex._
+
 import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 /**
   * Flink RelNode which matches along with LogicalCalc.
@@ -147,6 +149,40 @@ class DataSetCalc(
       genFunction.code,
       genFunction.returnType)
 
-    inputDS.flatMap(mapFunc)
+    val calcDesc = calcProgramToString()
+
+    inputDS.flatMap(mapFunc).name(calcDesc)
   }
+
+  private def calcProgramToString(): String = {
+
+    val cond = calcProgram.getCondition
+    val proj = calcProgram.getProjectList.asScala.toList
+    val localExprs = calcProgram.getExprList.asScala.toList
+    val inFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
+    val outFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
+
+    val projString = s"select: (${
+      proj
+        .map(getExpressionString(_, inFields, Some(localExprs)))
+        .zip(outFields).map { case (e, o) => {
+            if (e != o) {
+              e + " AS " + o
+            } else {
+              e
+            }
+          }
+        }
+        .mkString(", ")
+    })"
+    if (cond != null) {
+      val condString = s"where: (${getExpressionString(cond, inFields, Some(localExprs))})"
+
+      condString + ", " + projString
+    } else {
+      projString
+    }
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/239fc2a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
index 1d293d2..5c1f9bb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
@@ -33,9 +33,11 @@ import org.apache.flink.api.table.{TableException, TableConfig}
 import org.apache.flink.api.common.functions.FlatJoinFunction
 import org.apache.flink.api.table.plan.TypeConverter._
 import scala.collection.mutable.ArrayBuffer
-import scala.collection.JavaConversions._
 import org.apache.calcite.rex.RexNode
 
+import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
+
 /**
   * Flink RelNode which matches along with JoinOperator and its related operations.
   */
@@ -142,8 +144,19 @@ class DataSetJoin(
       genFunction.code,
       genFunction.returnType)
 
+    val joinOpName = joinConditionToString()
+
     leftDataSet.join(rightDataSet).where(leftKeys.toArray: _*).equalTo(rightKeys.toArray:
_*)
-      .`with`(joinFun).asInstanceOf[DataSet[Any]]
+      .`with`(joinFun).name(joinOpName).asInstanceOf[DataSet[Any]]
+  }
+
+  private def joinConditionToString(): String = {
+
+    val inFields = joinRowType.getFieldNames.asScala.toList
+    val condString = s"where: (${getExpressionString(joinCondition, inFields, None)})"
+    val outFieldString = s"join: (${rowType.getFieldNames.asScala.toList.mkString(", ")})"
+
+    condString + ", "+outFieldString
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/239fc2a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
index 35e23f7..d708048 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
@@ -19,10 +19,13 @@
 package org.apache.flink.api.table.plan.nodes.dataset
 
 import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rex._
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.table.TableConfig
 
+import scala.collection.JavaConversions._
+
 trait DataSetRel extends RelNode {
 
   /**
@@ -41,5 +44,27 @@ trait DataSetRel extends RelNode {
       expectedType: Option[TypeInformation[Any]] = None)
     : DataSet[Any]
 
+  private[flink] def getExpressionString(
+    expr: RexNode,
+    inFields: List[String],
+    localExprsTable: Option[List[RexNode]]): String = {
+
+    expr match {
+      case i: RexInputRef => inFields.get(i.getIndex)
+      case l: RexLiteral => l.toString
+      case l: RexLocalRef if localExprsTable.isEmpty =>
+        throw new IllegalArgumentException("Encountered RexLocalRef without local expression
table")
+      case l: RexLocalRef =>
+        val lExpr = localExprsTable.get(l.getIndex)
+        getExpressionString(lExpr, inFields, localExprsTable)
+      case c: RexCall => {
+        val op = c.getOperator.toString
+        val ops = c.getOperands.map(getExpressionString(_, inFields, localExprsTable))
+        s"$op(${ops.mkString(", ")})"
+      }
+      case _ => throw new IllegalArgumentException("Unknown expression type: " + expr)
+    }
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/239fc2a2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
index 50e01e7..a3801c9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
@@ -32,6 +32,7 @@ import org.apache.flink.api.table.plan.TypeConverter.determineReturnType
 import org.apache.flink.api.table.plan.schema.DataSetTable
 import org.apache.flink.api.table.runtime.MapRunner
 
+import scala.collection.JavaConverters._
 import scala.collection.JavaConversions._
 
 /**
@@ -112,7 +113,9 @@ class DataSetSource(
             genFunction.code,
             genFunction.returnType)
 
-          inputDataSet.map(mapFunc)
+          val opName = s"from: (${rowType.getFieldNames.asScala.toList.mkString(", ")})"
+
+          inputDataSet.map(mapFunc).name(opName)
         }
         // no conversion necessary, forward
         else {


Mime
View raw message