flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [17/50] [abbrv] flink git commit: [Flink-3226] Translate logical plan FlinkRels into physical plan DataSetRels.
Date Fri, 18 Mar 2016 13:48:11 GMT
[Flink-3226] Translate logical plan FlinkRels into physical plan DataSetRels.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b8028dbc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b8028dbc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b8028dbc

Branch: refs/heads/master
Commit: b8028dbc5f53edce2b337bd14c47feace39d0fbb
Parents: ba27832
Author: chengxiang li <chengxiang.li@intel.com>
Authored: Mon Feb 1 15:18:14 2016 +0800
Committer: vasia <vasia@apache.org>
Committed: Fri Mar 18 14:44:49 2016 +0100

----------------------------------------------------------------------
 .../flink/api/table/plan/PlanGenException.scala |  26 ++++
 .../flink/api/table/plan/TypeConverter.scala    |  13 +-
 .../plan/functions/AggregateFunction.scala      |  71 +++++++++
 .../table/plan/functions/FunctionUtils.scala    |  37 +++++
 .../plan/functions/aggregate/Aggregate.scala    |  42 ++++++
 .../functions/aggregate/AggregateFactory.scala  | 135 +++++++++++++++++
 .../plan/functions/aggregate/AvgAggregate.scala | 148 +++++++++++++++++++
 .../functions/aggregate/CountAggregate.scala    |  34 +++++
 .../plan/functions/aggregate/MaxAggregate.scala | 136 +++++++++++++++++
 .../plan/functions/aggregate/MinAggregate.scala | 136 +++++++++++++++++
 .../plan/functions/aggregate/SumAggregate.scala | 130 ++++++++++++++++
 .../plan/nodes/dataset/DataSetGroupReduce.scala |   6 +-
 .../table/plan/nodes/dataset/DataSetJoin.scala  |   6 +-
 .../plan/nodes/dataset/DataSetReduce.scala      |   6 +-
 .../rules/dataset/DataSetAggregateRule.scala    |  17 ++-
 .../plan/rules/dataset/DataSetJoinRule.scala    | 102 ++++++++++++-
 16 files changed, 1025 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b8028dbc/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanGenException.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanGenException.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanGenException.scala
new file mode 100644
index 0000000..2fd400d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanGenException.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan
+
+class PlanGenException(message: String, exception: Exception) extends
+    RuntimeException(message: String, exception: Exception){
+
+  def this(message: String){
+    this(message, null)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8028dbc/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
index 227b3e8..b7cb200 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.api.table.plan
 
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.rel.core.JoinRelType._
 import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
@@ -29,8 +29,11 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo
 import org.apache.flink.api.java.typeutils.ValueTypeInfo._
 import org.apache.flink.api.table.typeinfo.RowTypeInfo
 import org.apache.flink.api.table.{Row, TableException}
-
 import scala.collection.JavaConversions._
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.sql.`type`.SqlTypeName
 
 object TypeConverter {
 
@@ -139,4 +142,10 @@ object TypeConverter {
     returnType.asInstanceOf[TypeInformation[Any]]
   }
 
+  def sqlJoinTypeToFlinkJoinType(sqlJoinType: JoinRelType): JoinType = sqlJoinType match {
+    case INNER => JoinType.INNER
+    case LEFT => JoinType.LEFT_OUTER
+    case RIGHT => JoinType.RIGHT_OUTER
+    case FULL => JoinType.FULL_OUTER
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b8028dbc/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala
new file mode 100644
index 0000000..4abf2d2
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.functions
+
+import java.lang.Iterable
+
+import com.google.common.base.Preconditions
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.api.table.plan.functions.aggregate.Aggregate
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+
+import scala.collection.JavaConversions._
+
+/**
+ * A wrapper Flink GroupReduceOperator UDF of aggregates, it takes the grouped data as input,
+ * feed to the aggregates, and collect the record with aggregated value.
+ *
+ * @param aggregates Sql aggregate functions.
+ * @param fields  The grouped keys' index.
+ */
+class AggregateFunction(
+    private val aggregates: Array[Aggregate[_ <: Any]],
+    private val fields: Array[Int]) extends RichGroupReduceFunction[Any, Any] {
+
+  override def open(config: Configuration) {
+    Preconditions.checkNotNull(aggregates)
+    Preconditions.checkNotNull(fields)
+    Preconditions.checkArgument(aggregates.size == fields.size)
+
+    aggregates.foreach(_.initiateAggregate)
+  }
+
+  override def reduce(records: Iterable[Any], out: Collector[Any]): Unit = {
+    var currentValue: Any = null
+
+    // iterate all input records, feed to each aggregate.
+    val aggregateAndField = aggregates.zip(fields)
+    records.foreach {
+      value =>
+        currentValue = value
+        aggregateAndField.foreach {
+          case (aggregate, field) =>
+            aggregate.aggregate(FunctionUtils.getFieldValue(value, field))
+        }
+    }
+
+    // reuse the latest record, and set all the aggregated values.
+    aggregateAndField.foreach {
+      case (aggregate, field) =>
+        FunctionUtils.putFieldValue(currentValue, field, aggregate.getAggregated())
+    }
+
+    out.collect(currentValue)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8028dbc/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/FunctionUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/FunctionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/FunctionUtils.scala
new file mode 100644
index 0000000..9d62b7c
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/FunctionUtils.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.functions
+
+import org.apache.flink.api.table.Row
+
+object FunctionUtils {
+
+  def getFieldValue(record: Any, fieldIndex: Int): Any = {
+    record match {
+      case row: Row => row.productElement(fieldIndex)
+      case _ => throw new UnsupportedOperationException("Do not support types other than Row now.")
+    }
+  }
+
+  def putFieldValue(record: Any, fieldIndex: Int, fieldValue: Any): Unit = {
+    record match {
+      case row: Row => row.setField(fieldIndex, fieldValue)
+      case _ => throw new UnsupportedOperationException("Do not support types other than Row now.")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8028dbc/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/Aggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/Aggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/Aggregate.scala
new file mode 100644
index 0000000..5800d5f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/Aggregate.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.functions.aggregate
+
+/**
+ * Represent a Sql aggregate function, user should initiate the aggregate at first, then feed it
+ * with grouped aggregate field values, and get aggregated value finally.
+ * @tparam T
+ */
+trait Aggregate[T] {
+  /**
+   * Initiate current aggregate state.
+   */
+  def initiateAggregate
+
+  /**
+   * Feed the aggregate field value.
+   * @param value
+   */
+  def aggregate(value: Any)
+
+  /**
+   * Return final aggregated value.
+   * @return
+   */
+  def getAggregated(): T
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8028dbc/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AggregateFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AggregateFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AggregateFactory.scala
new file mode 100644
index 0000000..a95a163
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AggregateFactory.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.functions.aggregate
+
+import java.util
+
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.sql.SqlAggFunction
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.`type`.SqlTypeName._
+import org.apache.calcite.sql.fun._
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.api.table.plan.PlanGenException
+import org.apache.flink.api.table.plan.functions.AggregateFunction
+
+object AggregateFactory {
+
+  def createAggregateInstance(aggregateCalls: Seq[AggregateCall]):
+    RichGroupReduceFunction[Any, Any] = {
+
+    val fieldIndexes = new Array[Int](aggregateCalls.size)
+    val aggregates = new Array[Aggregate[_ <: Any]](aggregateCalls.size)
+    aggregateCalls.zipWithIndex.map { case (aggregateCall, index) =>
+      val sqlType = aggregateCall.getType
+      val argList: util.List[Integer] = aggregateCall.getArgList
+      // currently assume only aggregate on singleton field.
+      if (argList.isEmpty) {
+        if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
+          fieldIndexes(index) = 0
+        } else {
+          throw new PlanGenException("Aggregate fields should not be empty.")
+        }
+      } else {
+        fieldIndexes(index) = argList.get(0);
+      }
+      aggregateCall.getAggregation match {
+        case _: SqlSumAggFunction | _: SqlSumEmptyIsZeroAggFunction => {
+          sqlType.getSqlTypeName match {
+            case TINYINT =>
+              aggregates(index) = new TinyIntSumAggregate
+            case SMALLINT =>
+              aggregates(index) = new SmallIntSumAggregate
+            case INTEGER =>
+              aggregates(index) = new IntSumAggregate
+            case BIGINT =>
+              aggregates(index) = new LongSumAggregate
+            case FLOAT =>
+              aggregates(index) = new FloatSumAggregate
+            case DOUBLE =>
+              aggregates(index) = new DoubleSumAggregate
+            case sqlType: SqlTypeName =>
+              throw new PlanGenException("Sum aggregate does no support type:" + sqlType)
+          }
+        }
+        case _: SqlAvgAggFunction => {
+          sqlType.getSqlTypeName match {
+            case TINYINT =>
+              aggregates(index) = new TinyIntAvgAggregate
+            case SMALLINT =>
+              aggregates(index) = new SmallIntAvgAggregate
+            case INTEGER =>
+              aggregates(index) = new IntAvgAggregate
+            case BIGINT =>
+              aggregates(index) = new LongAvgAggregate
+            case FLOAT =>
+              aggregates(index) = new FloatAvgAggregate
+            case DOUBLE =>
+              aggregates(index) = new DoubleAvgAggregate
+            case sqlType: SqlTypeName =>
+              throw new PlanGenException("Avg aggregate does no support type:" + sqlType)
+          }
+        }
+        case sqlMinMaxFunction: SqlMinMaxAggFunction => {
+          if (sqlMinMaxFunction.isMin) {
+            sqlType.getSqlTypeName match {
+              case TINYINT =>
+                aggregates(index) = new TinyIntMinAggregate
+              case SMALLINT =>
+                aggregates(index) = new SmallIntMinAggregate
+              case INTEGER =>
+                aggregates(index) = new IntMinAggregate
+              case BIGINT =>
+                aggregates(index) = new LongMinAggregate
+              case FLOAT =>
+                aggregates(index) = new FloatMinAggregate
+              case DOUBLE =>
+                aggregates(index) = new DoubleMinAggregate
+              case sqlType: SqlTypeName =>
+                throw new PlanGenException("Min aggregate does no support type:" + sqlType)
+            }
+          } else {
+            sqlType.getSqlTypeName match {
+              case TINYINT =>
+                aggregates(index) = new TinyIntMaxAggregate
+              case SMALLINT =>
+                aggregates(index) = new SmallIntMaxAggregate
+              case INTEGER =>
+                aggregates(index) = new IntMaxAggregate
+              case BIGINT =>
+                aggregates(index) = new LongMaxAggregate
+              case FLOAT =>
+                aggregates(index) = new FloatMaxAggregate
+              case DOUBLE =>
+                aggregates(index) = new DoubleMaxAggregate
+              case sqlType: SqlTypeName =>
+                throw new PlanGenException("Max aggregate does no support type:" + sqlType)
+            }
+          }
+        }
+        case _: SqlCountAggFunction =>
+          aggregates(index) = new CountAggregate
+        case unSupported: SqlAggFunction =>
+          throw new PlanGenException("unsupported Function: " + unSupported.getName)
+      }
+    }
+
+    new AggregateFunction(aggregates, fieldIndexes)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8028dbc/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AvgAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AvgAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AvgAggregate.scala
new file mode 100644
index 0000000..e9c5f8f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AvgAggregate.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.functions.aggregate
+
+abstract class AvgAggregate[T] extends Aggregate[T] {
+
+}
+
+// TinyInt average aggregate return Int as aggregated value.
+class TinyIntAvgAggregate extends AvgAggregate[Int] {
+  private var avgValue: Double = 0
+  private var count: Int = 0
+
+  override def initiateAggregate: Unit = {
+    avgValue = 0
+    count = 0
+  }
+
+  override def aggregate(value: Any): Unit = {
+    count += 1
+    val current = value.asInstanceOf[Byte]
+    avgValue += (current - avgValue) / count
+  }
+
+  override def getAggregated(): Int = {
+    avgValue.toInt
+  }
+}
+
+// SmallInt average aggregate return Int as aggregated value.
+class SmallIntAvgAggregate extends AvgAggregate[Int] {
+  private var avgValue: Double = 0
+  private var count: Int = 0
+
+  override def initiateAggregate: Unit = {
+    avgValue = 0
+    count = 0
+  }
+
+  override def aggregate(value: Any): Unit = {
+    count += 1
+    val current = value.asInstanceOf[Short]
+    avgValue += (current - avgValue) / count
+  }
+
+  override def getAggregated(): Int = {
+    avgValue.toInt
+  }
+}
+
+// Int average aggregate return Int as aggregated value.
+class IntAvgAggregate extends AvgAggregate[Int] {
+  private var avgValue: Double = 0
+  private var count: Int = 0
+
+  override def initiateAggregate: Unit = {
+    avgValue = 0
+    count = 0
+  }
+
+  override def aggregate(value: Any): Unit = {
+    count += 1
+    val current = value.asInstanceOf[Int]
+    avgValue += (current - avgValue) / count
+  }
+
+  override def getAggregated(): Int = {
+    avgValue.toInt
+  }
+}
+
+// Long average aggregate return Long as aggregated value.
+class LongAvgAggregate extends AvgAggregate[Long] {
+  private var avgValue: Double = 0
+  private var count: Int = 0
+
+  override def initiateAggregate: Unit = {
+    avgValue = 0
+    count = 0
+  }
+
+  override def aggregate(value: Any): Unit = {
+    count += 1
+    val current = value.asInstanceOf[Long]
+    avgValue += (current - avgValue) / count
+  }
+
+  override def getAggregated(): Long = {
+    avgValue.toLong
+  }
+}
+
+// Float average aggregate return Float as aggregated value.
+class FloatAvgAggregate extends AvgAggregate[Float] {
+  private var avgValue: Double = 0
+  private var count: Int = 0
+
+  override def initiateAggregate: Unit = {
+    avgValue = 0
+    count = 0
+  }
+
+  override def aggregate(value: Any): Unit = {
+    count += 1
+    val current = value.asInstanceOf[Float]
+    avgValue += (current - avgValue) / count
+  }
+
+  override def getAggregated(): Float = {
+    avgValue.toFloat
+  }
+}
+
+// Double average aggregate return Double as aggregated value.
+class DoubleAvgAggregate extends AvgAggregate[Double] {
+  private var avgValue: Double = 0
+  private var count: Int = 0
+
+  override def initiateAggregate: Unit = {
+    avgValue = 0
+    count = 0
+  }
+
+  override def aggregate(value: Any): Unit = {
+    count += 1
+    val current = value.asInstanceOf[Double]
+    avgValue += (current - avgValue) / count
+  }
+
+  override def getAggregated(): Double = {
+    avgValue
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8028dbc/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/CountAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/CountAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/CountAggregate.scala
new file mode 100644
index 0000000..ab6b170
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/CountAggregate.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.functions.aggregate
+
+class CountAggregate extends Aggregate[Long] {
+  private var count: Long = 0L
+
+  override def initiateAggregate: Unit = {
+    count = 0
+  }
+
+  override def aggregate(value: Any): Unit = {
+    count += 1
+  }
+
+  override def getAggregated(): Long = {
+    count
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8028dbc/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MaxAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MaxAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MaxAggregate.scala
new file mode 100644
index 0000000..072eb3f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MaxAggregate.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.functions.aggregate
+
+abstract class MaxAggregate[T] extends Aggregate[T]{
+
+}
+
+class TinyIntMaxAggregate extends MaxAggregate[Byte] {
+  private var max = Byte.MaxValue
+
+  override def initiateAggregate: Unit = {
+    max = Byte.MaxValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+    val current = value.asInstanceOf[Byte]
+    if (current < max) {
+      max = current
+    }
+  }
+
+  override def getAggregated(): Byte = {
+    max
+  }
+}
+
+class SmallIntMaxAggregate extends MaxAggregate[Short] {
+  private var max = Short.MaxValue
+
+  override def initiateAggregate: Unit = {
+    max = Short.MaxValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+    val current = value.asInstanceOf[Short]
+    if (current < max) {
+      max = current
+    }
+  }
+
+  override def getAggregated(): Short = {
+    max
+  }
+}
+
+class IntMaxAggregate extends MaxAggregate[Int] {
+  private var max = Int.MaxValue
+
+  override def initiateAggregate: Unit = {
+    max = Int.MaxValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+    val current = value.asInstanceOf[Int]
+    if (current < max) {
+      max = current
+    }
+  }
+
+  override def getAggregated(): Int = {
+    max
+  }
+}
+
+class LongMaxAggregate extends MaxAggregate[Long] {
+  private var max = Long.MaxValue
+
+  override def initiateAggregate: Unit = {
+    max = Int.MaxValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+    val current = value.asInstanceOf[Long]
+    if (current < max) {
+      max = current
+    }
+  }
+
+  override def getAggregated(): Long = {
+    max
+  }
+}
+
+class FloatMaxAggregate extends MaxAggregate[Float] {
+  private var max = Float.MaxValue
+
+  override def initiateAggregate: Unit = {
+    max = Int.MaxValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+    val current = value.asInstanceOf[Float]
+    if (current < max) {
+      max = current
+    }
+  }
+
+  override def getAggregated(): Float = {
+    max
+  }
+}
+
+class DoubleMaxAggregate extends MaxAggregate[Double] {
+  private var max = Double.MaxValue
+
+  override def initiateAggregate: Unit = {
+    max = Int.MaxValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+    val current = value.asInstanceOf[Double]
+    if (current < max) {
+      max = current
+    }
+  }
+
+  override def getAggregated(): Double = {
+    max
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8028dbc/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MinAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MinAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MinAggregate.scala
new file mode 100644
index 0000000..c233b8e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MinAggregate.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.functions.aggregate
+
+abstract class MinAggregate[T] extends Aggregate[T]{
+
+}
+
+class TinyIntMinAggregate extends MinAggregate[Byte] {
+  private var min = Byte.MaxValue
+
+  override def initiateAggregate: Unit = {
+    min = Byte.MaxValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+    val current = value.asInstanceOf[Byte]
+    if (current < min) {
+      min = current
+    }
+  }
+
+  override def getAggregated(): Byte = {
+    min
+  }
+}
+
+class SmallIntMinAggregate extends MinAggregate[Short] {
+  private var min = Short.MaxValue
+
+  override def initiateAggregate: Unit = {
+    min = Short.MaxValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+    val current = value.asInstanceOf[Short]
+    if (current < min) {
+      min = current
+    }
+  }
+
+  override def getAggregated(): Short = {
+    min
+  }
+}
+
+class IntMinAggregate extends MinAggregate[Int] {
+  private var min = Int.MaxValue
+
+  override def initiateAggregate: Unit = {
+    min = Int.MaxValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+    val current = value.asInstanceOf[Int]
+    if (current < min) {
+      min = current
+    }
+  }
+
+  override def getAggregated(): Int = {
+    min
+  }
+}
+
+class LongMinAggregate extends MinAggregate[Long] {
+  private var min = Long.MaxValue
+
+  override def initiateAggregate: Unit = {
+    min = Int.MaxValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+    val current = value.asInstanceOf[Long]
+    if (current < min) {
+      min = current
+    }
+  }
+
+  override def getAggregated(): Long = {
+    min
+  }
+}
+
+class FloatMinAggregate extends MinAggregate[Float] {
+  private var min = Float.MaxValue
+
+  override def initiateAggregate: Unit = {
+    min = Int.MaxValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+    val current = value.asInstanceOf[Float]
+    if (current < min) {
+      min = current
+    }
+  }
+
+  override def getAggregated(): Float = {
+    min
+  }
+}
+
+class DoubleMinAggregate extends MinAggregate[Double] {
+  private var min = Double.MaxValue
+
+  override def initiateAggregate: Unit = {
+    min = Int.MaxValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+    val current = value.asInstanceOf[Double]
+    if (current < min) {
+      min = current
+    }
+  }
+
+  override def getAggregated(): Double = {
+    min
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8028dbc/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/SumAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/SumAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/SumAggregate.scala
new file mode 100644
index 0000000..14d1a73
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/SumAggregate.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan.functions.aggregate
+
+abstract class SumAggregate[T] extends Aggregate[T]{
+
+}
+
+// TinyInt sum aggregate return Int as aggregated value.
+class TinyIntSumAggregate extends SumAggregate[Int] {
+
+  private var sumValue: Int = 0
+
+  override def initiateAggregate: Unit = {
+    sumValue = 0
+  }
+
+
+  override def getAggregated(): Int = {
+    sumValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+    sumValue += value.asInstanceOf[Byte]
+  }
+}
+
+// SmallInt sum aggregate return Int as aggregated value.
+class SmallIntSumAggregate extends SumAggregate[Int] {
+
+  private var sumValue: Int = 0
+
+  override def initiateAggregate: Unit = {
+    sumValue = 0
+  }
+
+  override def getAggregated(): Int = {
+    sumValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+    sumValue += value.asInstanceOf[Short]
+  }
+}
+
+// Int sum aggregate return Int as aggregated value.
+class IntSumAggregate extends SumAggregate[Int] {
+
+  private var sumValue: Int = 0
+
+  override def initiateAggregate: Unit = {
+    sumValue = 0
+  }
+
+
+  override def getAggregated(): Int = {
+    sumValue
+  }
+
+  override def aggregate(value: Any): Unit = {
+    sumValue += value.asInstanceOf[Int]
+  }
+}
+
+// Long sum aggregate return Long as aggregated value.
+class LongSumAggregate extends SumAggregate[Long] {
+
+  private var sumValue: Long = 0L
+
+  override def initiateAggregate: Unit = {
+    sumValue = 0
+  }
+
+  override def aggregate(value: Any): Unit = {
+    sumValue += value.asInstanceOf[Long]
+  }
+
+  override def getAggregated(): Long = {
+    sumValue
+  }
+}
+
+// Float sum aggregate return Float as aggregated value.
+class FloatSumAggregate extends SumAggregate[Float] {
+  private var sumValue: Float = 0
+
+  override def initiateAggregate: Unit = {
+    sumValue = 0
+  }
+
+  override def aggregate(value: Any): Unit = {
+    sumValue += value.asInstanceOf[Float]
+  }
+
+  override def getAggregated(): Float = {
+    sumValue
+  }
+}
+
+// Double sum aggregate return Double as aggregated value.
+class DoubleSumAggregate extends SumAggregate[Double] {
+  private var sumValue: Double = 0
+
+  override def initiateAggregate: Unit = {
+    sumValue = 0
+  }
+
+  override def aggregate(value: Any): Unit = {
+    sumValue += value.asInstanceOf[Double]
+  }
+
+  override def getAggregated(): Double = {
+    sumValue
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b8028dbc/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
index ae76d29..70810c8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
@@ -18,9 +18,9 @@
 
 package org.apache.flink.api.table.plan.nodes.dataset
 
-import org.apache.calcite.plan.{RelTraitSet, RelOptCluster}
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.{RelWriter, RelNode, SingleRel}
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.flink.api.common.functions.GroupReduceFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
@@ -36,7 +36,7 @@ class DataSetGroupReduce(
     rowType: RelDataType,
     opName: String,
     groupingKeys: Array[Int],
-    func: GroupReduceFunction[Row, Row])
+    func: GroupReduceFunction[Any, Any])
   extends SingleRel(cluster, traitSet, input)
   with DataSetRel {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b8028dbc/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
index de436be..6f988be 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
@@ -18,9 +18,9 @@
 
 package org.apache.flink.api.table.plan.nodes.dataset
 
-import org.apache.calcite.plan.{RelTraitSet, RelOptCluster}
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.{RelWriter, BiRel, RelNode}
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
 import org.apache.flink.api.common.functions.JoinFunction
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -42,7 +42,7 @@ class DataSetJoin(
     joinKeysRight: Array[Int],
     joinType: JoinType,
     joinHint: JoinHint,
-    func: JoinFunction[Row, Row, Row])
+    func: JoinFunction[Any, Any, Any])
   extends BiRel(cluster, traitSet, left, right)
   with DataSetRel {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b8028dbc/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala
index e6fc0f9..361f869 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala
@@ -18,9 +18,9 @@
 
 package org.apache.flink.api.table.plan.nodes.dataset
 
-import org.apache.calcite.plan.{RelTraitSet, RelOptCluster}
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.{RelWriter, RelNode, SingleRel}
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.flink.api.common.functions.ReduceFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
@@ -36,7 +36,7 @@ class DataSetReduce(
     rowType: RelDataType,
     opName: String,
     groupingKeys: Array[Int],
-    func: ReduceFunction[Row])
+    func: ReduceFunction[Any])
   extends SingleRel(cluster, traits, input)
   with DataSetRel {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b8028dbc/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala
index 1d17d63..9ecd9d0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala
@@ -21,9 +21,12 @@ package org.apache.flink.api.table.plan.rules.dataset
 import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetReduce}
+import org.apache.flink.api.table.plan.functions.aggregate.AggregateFactory
+import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetGroupReduce}
 import org.apache.flink.api.table.plan.nodes.logical.{FlinkAggregate, FlinkConvention}
 
+import scala.collection.JavaConversions._
+
 class DataSetAggregateRule
   extends ConverterRule(
     classOf[FlinkAggregate],
@@ -37,14 +40,20 @@ class DataSetAggregateRule
     val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
     val convInput: RelNode = RelOptRule.convert(agg.getInput, DataSetConvention.INSTANCE)
 
-    new DataSetReduce(
+    val grouping = agg.getGroupSet.asList().map {
+      case a: java.lang.Integer => a.intValue
+    }.toArray
+
+    val aggregateFunction = AggregateFactory.createAggregateInstance(agg.getAggCallList)
+
+    new DataSetGroupReduce(
       rel.getCluster,
       traitSet,
       convInput,
       rel.getRowType,
       agg.toString,
-      Array[Int](),
-      null)
+      grouping,
+      aggregateFunction)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b8028dbc/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala
index 3d2117d..69c86c8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala
@@ -20,10 +20,17 @@ package org.apache.flink.api.table.plan.rules.dataset
 
 import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
 import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.{RelDataTypeField, RelDataType}
 import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.flink.api.java.operators.join.JoinType
+import org.apache.calcite.rex.{RexCall, RexInputRef}
+import org.apache.calcite.sql.SqlKind
+import org.apache.flink.api.table.plan.PlanGenException
 import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetJoin}
-import org.apache.flink.api.table.plan.nodes.logical.{FlinkJoin, FlinkConvention}
+import org.apache.flink.api.table.plan.nodes.logical.{FlinkConvention, FlinkJoin}
+import org.apache.flink.api.table.plan.TypeConverter._
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
 
 class DataSetJoinRule
   extends ConverterRule(
@@ -39,6 +46,10 @@ class DataSetJoinRule
     val convLeft: RelNode = RelOptRule.convert(join.getInput(0), DataSetConvention.INSTANCE)
     val convRight: RelNode = RelOptRule.convert(join.getInput(1), DataSetConvention.INSTANCE)
 
+    val joinKeys = getJoinKeys(join)
+
+    // There would be a FlinkProject after FlinkJoin to handle the output fields afterward join,
+    // so we do not need JoinFunction here by now.
     new DataSetJoin(
       rel.getCluster,
       traitSet,
@@ -46,12 +57,93 @@ class DataSetJoinRule
       convRight,
       rel.getRowType,
       join.toString,
-      Array[Int](),
-      Array[Int](),
-      JoinType.INNER,
+      joinKeys._1,
+      joinKeys._2,
+      sqlJoinTypeToFlinkJoinType(join.getJoinType),
       null,
       null)
   }
+
+  private def getJoinKeys(join: FlinkJoin): (Array[Int], Array[Int]) = {
+    val joinKeys = ArrayBuffer.empty[(Int, Int)]
+    parseJoinRexNode(join.getCondition.asInstanceOf[RexCall], joinKeys)
+
+    val joinedRowType= join.getRowType
+    val leftRowType = join.getLeft.getRowType
+    val rightRowType = join.getRight.getRowType
+
+    // The fetched join key index from Calcite is based on joined row type, we need
+    // the join key index based on left/right input row type.
+    val joinKeyPairs: ArrayBuffer[(Int, Int)] = joinKeys.map {
+      case (first, second) =>
+        var leftIndex = findIndexInSingleInput(first, joinedRowType, leftRowType)
+        if (leftIndex == -1) {
+          leftIndex = findIndexInSingleInput(second, joinedRowType, leftRowType)
+          if (leftIndex == -1) {
+            throw new PlanGenException("Invalid join condition, could not find " +
+              joinedRowType.getFieldNames.get(first) + " and " +
+              joinedRowType.getFieldNames.get(second) + " in left table")
+          }
+          val rightIndex = findIndexInSingleInput(first, joinedRowType, rightRowType)
+          if (rightIndex == -1) {
+            throw new PlanGenException("Invalid join condition could not find " +
+              joinedRowType.getFieldNames.get(first) + " in right table")
+          }
+          (leftIndex, rightIndex)
+        } else {
+          val rightIndex = findIndexInSingleInput(second, joinedRowType, rightRowType)
+          if (rightIndex == -1) {
+            throw new PlanGenException("Invalid join condition could not find " +
+              joinedRowType.getFieldNames.get(second) + " in right table")
+          }
+          (leftIndex, rightIndex)
+        }
+    }
+
+    val joinKeysPair = joinKeyPairs.unzip
+
+    (joinKeysPair._1.toArray, joinKeysPair._2.toArray)
+  }
+
+  // Parse the join condition recursively, find all the join keys' index.
+  private def parseJoinRexNode(condition: RexCall, joinKeys: ArrayBuffer[(Int, Int)]): Unit = {
+    condition.getOperator.getKind match {
+      case SqlKind.AND =>
+        condition.getOperands.foreach {
+          operand => parseJoinRexNode(operand.asInstanceOf[RexCall], joinKeys)
+        }
+      case SqlKind.EQUALS =>
+        val operands = condition.getOperands
+        val leftIndex = operands(0).asInstanceOf[RexInputRef].getIndex
+        val rightIndex = operands(1).asInstanceOf[RexInputRef].getIndex
+        joinKeys += (leftIndex -> rightIndex)
+      case _ =>
+        // Do not support operands like OR in join condition due to the limitation
+        // of current Flink JoinOperator implementation.
+        throw new PlanGenException("Do not support operands other than " +
+          "AND and Equals in join condition now.")
+    }
+  }
+
+  // Find the field index of input row type.
+  private def findIndexInSingleInput(
+      globalIndex: Int,
+      joinedRowType: RelDataType,
+      inputRowType: RelDataType): Int = {
+
+    val fieldType: RelDataTypeField = joinedRowType.getFieldList.get(globalIndex)
+    inputRowType.getFieldList.zipWithIndex.foreach {
+      case (inputFieldType, index) =>
+        if (inputFieldType.getName.equals(fieldType.getName)
+          && inputFieldType.getType.equals(fieldType.getType)) {
+
+          return index
+        }
+    }
+
+    // return -1 if match none field of input row type.
+    -1
+  }
 }
 
 object DataSetJoinRule {


Mime
View raw message