flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject [1/3] flink git commit: [FLINK-7137] [table] Rework nullability handling
Date Mon, 24 Jul 2017 12:07:03 GMT
Repository: flink
Updated Branches:
  refs/heads/master bb118104b -> c0bad3b80


[FLINK-7137] [table] Rework nullability handling


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7aa11565
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7aa11565
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7aa11565

Branch: refs/heads/master
Commit: 7aa115658b23c19fbcc8e3d1d83113608ebd7ce7
Parents: 2d1e08a
Author: twalthr <twalthr@apache.org>
Authored: Wed Jul 19 16:23:09 2017 +0200
Committer: twalthr <twalthr@apache.org>
Committed: Mon Jul 24 13:59:15 2017 +0200

----------------------------------------------------------------------
 .../flink/table/calcite/FlinkTypeFactory.scala  | 195 ++++++++++++-------
 .../calcite/RelTimeIndicatorConverter.scala     |  10 +-
 .../flink/table/codegen/ExpressionReducer.scala |  12 +-
 .../flink/table/expressions/aggregations.scala  |   2 +-
 .../apache/flink/table/expressions/array.scala  |   2 +-
 .../apache/flink/table/expressions/call.scala   |   4 +-
 .../apache/flink/table/expressions/cast.scala   |   5 +-
 .../flink/table/expressions/literals.scala      |   2 +-
 .../apache/flink/table/expressions/time.scala   |   4 +-
 .../table/functions/utils/AggSqlFunction.scala  |   4 +-
 .../functions/utils/ScalarSqlFunction.scala     |   5 +-
 .../logical/rel/LogicalWindowAggregate.scala    |   2 +-
 .../logical/FlinkLogicalWindowAggregate.scala   |   2 +-
 .../table/plan/schema/ArrayRelDataType.scala    |   3 +-
 .../plan/schema/CompositeRelDataType.scala      |  13 +-
 .../plan/schema/FlinkTableFunctionImpl.scala    |   2 +-
 .../table/plan/schema/GenericRelDataType.scala  |  11 +-
 .../plan/schema/TimeIndicatorRelDataType.scala  |   6 +-
 .../api/stream/table/GroupWindowTest.scala      |   4 +-
 .../plan/TimeIndicatorConversionTest.scala      |   4 +-
 .../runtime/batch/table/AggregateITCase.scala   |  17 ++
 21 files changed, 200 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index b63a3ad..6c23b9e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -49,48 +49,53 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
   // NOTE: for future data types it might be necessary to
   // override more methods of RelDataTypeFactoryImpl
 
-  private val seenTypes = mutable.HashMap[TypeInformation[_], RelDataType]()
-
-  def createTypeFromTypeInfo(typeInfo: TypeInformation[_]): RelDataType =
-    createTypeFromTypeInfo(typeInfo, nullable = false)
-
-  def createTypeFromTypeInfo(typeInfo: TypeInformation[_], nullable: Boolean): RelDataType
= {
-    // simple type can be converted to SQL types and vice versa
-    if (isSimple(typeInfo)) {
-      val sqlType = typeInfoToSqlTypeName(typeInfo)
-      sqlType match {
-
-        case INTERVAL_YEAR_MONTH =>
-          createSqlIntervalType(
-            new SqlIntervalQualifier(TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO))
-
-        case INTERVAL_DAY_SECOND =>
-          createSqlIntervalType(
-            new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO))
-
-        case TIMESTAMP if typeInfo.isInstanceOf[TimeIndicatorTypeInfo] =>
-          if (typeInfo.asInstanceOf[TimeIndicatorTypeInfo].isEventTime) {
-            createRowtimeIndicatorType()
-          } else {
-            createProctimeIndicatorType()
-          }
-
-        case _ =>
-          createTypeWithNullability(createSqlType(sqlType), nullable)
+  private val seenTypes = mutable.HashMap[(TypeInformation[_], Boolean), RelDataType]()
+
+  def createTypeFromTypeInfo(
+      typeInfo: TypeInformation[_],
+      isNullable: Boolean)
+    : RelDataType = {
+
+      // we cannot use seenTypes for simple types,
+      // because time indicators and timestamps would be the same
+
+      val relType = if (isSimple(typeInfo)) {
+        // simple types can be converted to SQL types and vice versa
+        val sqlType = typeInfoToSqlTypeName(typeInfo)
+        sqlType match {
+
+          case INTERVAL_YEAR_MONTH =>
+            createSqlIntervalType(
+              new SqlIntervalQualifier(TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO))
+
+          case INTERVAL_DAY_SECOND =>
+            createSqlIntervalType(
+              new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO))
+
+          case TIMESTAMP if typeInfo.isInstanceOf[TimeIndicatorTypeInfo] =>
+            if (typeInfo.asInstanceOf[TimeIndicatorTypeInfo].isEventTime) {
+              createRowtimeIndicatorType()
+            } else {
+              createProctimeIndicatorType()
+            }
+
+          case _ =>
+            createSqlType(sqlType)
+        }
+      } else {
+        // advanced types require specific RelDataType
+        // for storing the original TypeInformation
+        seenTypes.getOrElseUpdate((typeInfo, isNullable), createAdvancedType(typeInfo, isNullable))
       }
-    }
-    // advanced types require specific RelDataType
-    // for storing the original TypeInformation
-    else {
-      seenTypes.getOrElseUpdate(typeInfo, canonize(createAdvancedType(typeInfo)))
-    }
+
+    createTypeWithNullability(relType, isNullable)
   }
 
   /**
     * Creates a indicator type for processing-time, but with similar properties as SQL timestamp.
     */
   def createProctimeIndicatorType(): RelDataType = {
-    val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+    val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP, isNullable = false)
     canonize(
       new TimeIndicatorRelDataType(
         getTypeSystem,
@@ -103,7 +108,7 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
     * Creates a indicator type for event-time, but with similar properties as SQL timestamp.
     */
   def createRowtimeIndicatorType(): RelDataType = {
-    val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+    val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP, isNullable = false)
     canonize(
       new TimeIndicatorRelDataType(
         getTypeSystem,
@@ -113,6 +118,56 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
   }
 
   /**
+    * Creates types that create custom [[RelDataType]]s that wrap Flink's [[TypeInformation]].
+    */
+  private def createAdvancedType(
+      typeInfo: TypeInformation[_],
+      isNullable: Boolean): RelDataType = {
+
+    val relType = typeInfo match {
+
+      case ct: CompositeType[_] =>
+        new CompositeRelDataType(ct, isNullable, this)
+
+      case pa: PrimitiveArrayTypeInfo[_] =>
+        new ArrayRelDataType(
+          pa,
+          createTypeFromTypeInfo(pa.getComponentType, isNullable = false),
+          isNullable)
+
+      case ba: BasicArrayTypeInfo[_, _] =>
+        new ArrayRelDataType(
+          ba,
+          createTypeFromTypeInfo(ba.getComponentInfo, isNullable = true),
+          isNullable)
+
+      case oa: ObjectArrayTypeInfo[_, _] =>
+        new ArrayRelDataType(
+          oa,
+          createTypeFromTypeInfo(oa.getComponentInfo, isNullable = true),
+          isNullable)
+
+      case mp: MapTypeInfo[_, _] =>
+        new MapRelDataType(
+          mp,
+          createTypeFromTypeInfo(mp.getKeyTypeInfo, isNullable = true),
+          createTypeFromTypeInfo(mp.getValueTypeInfo, isNullable = true),
+          isNullable)
+
+      case ti: TypeInformation[_] =>
+        new GenericRelDataType(
+          ti,
+          isNullable,
+          getTypeSystem.asInstanceOf[FlinkTypeSystem])
+
+      case ti@_ =>
+        throw TableException(s"Unsupported type information: $ti")
+    }
+
+    canonize(relType)
+  }
+
+  /**
     * Creates a struct type with the input fieldNames and input fieldTypes using FlinkTypeFactory
     *
     * @param fieldNames field names
@@ -153,13 +208,15 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
         addedTimeAttributes += 1
       } else {
         val field = fields(i - addedTimeAttributes)
-        logicalRowTypeBuilder.add(field._1, createTypeFromTypeInfo(field._2)).nullable(true)
+        logicalRowTypeBuilder.add(field._1, createTypeFromTypeInfo(field._2, isNullable =
true))
       }
     }
 
     logicalRowTypeBuilder.build
   }
 
+  // ----------------------------------------------------------------------------------------------
+
   override def createSqlType(typeName: SqlTypeName, precision: Int): RelDataType = {
     // it might happen that inferred VARCHAR types overflow as we set them to Int.MaxValue
     // always set those to default value
@@ -170,52 +227,48 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
     }
   }
 
-  override def createArrayType(elementType: RelDataType, maxCardinality: Long): RelDataType
=
-    new ArrayRelDataType(
+  override def createArrayType(elementType: RelDataType, maxCardinality: Long): RelDataType
= {
+    val relType = new ArrayRelDataType(
       ObjectArrayTypeInfo.getInfoFor(FlinkTypeFactory.toTypeInfo(elementType)),
       elementType,
-      true)
+      isNullable = false)
 
-  private def createAdvancedType(typeInfo: TypeInformation[_]): RelDataType = typeInfo match
{
-    case ct: CompositeType[_] =>
-      new CompositeRelDataType(ct, this)
+    canonize(relType)
+  }
 
-    case pa: PrimitiveArrayTypeInfo[_] =>
-      new ArrayRelDataType(pa, createTypeFromTypeInfo(pa.getComponentType), false)
+  override def createTypeWithNullability(
+      relDataType: RelDataType,
+      isNullable: Boolean): RelDataType = {
 
-    case ba: BasicArrayTypeInfo[_, _] =>
-      new ArrayRelDataType(ba, createTypeFromTypeInfo(ba.getComponentInfo), true)
+    // nullability change not necessary
+    if (relDataType.isNullable == isNullable) {
+      return canonize(relDataType)
+    }
 
-    case oa: ObjectArrayTypeInfo[_, _] =>
-      new ArrayRelDataType(oa, createTypeFromTypeInfo(oa.getComponentInfo), true)
+    // change nullability
+    val newType = relDataType match {
 
-    case mp: MapTypeInfo[_, _] =>
-      new MapRelDataType(mp, createTypeFromTypeInfo(mp.getKeyTypeInfo),
-        createTypeFromTypeInfo(mp.getValueTypeInfo), true)
+      case composite: CompositeRelDataType =>
+        new CompositeRelDataType(composite.compositeType, isNullable, this)
 
-    case ti: TypeInformation[_] =>
-      createTypeWithNullability(
-        new GenericRelDataType(ti, getTypeSystem.asInstanceOf[FlinkTypeSystem]),
-        nullable = true
-      )
+      case array: ArrayRelDataType =>
+        new ArrayRelDataType(array.typeInfo, array.getComponentType, isNullable)
 
-    case ti@_ =>
-      throw TableException(s"Unsupported type information: $ti")
-  }
+      case map: MapRelDataType =>
+        new MapRelDataType(map.typeInfo, map.keyType, map.valueType, isNullable)
+
+      case generic: GenericRelDataType =>
+        new GenericRelDataType(generic.typeInfo, isNullable, typeSystem)
+
+      case timeIndicator: TimeIndicatorRelDataType =>
+        timeIndicator
 
-  override def createTypeWithNullability(
-      relDataType: RelDataType,
-      nullable: Boolean)
-    : RelDataType = relDataType match {
-      case composite: CompositeRelDataType =>
-        // at the moment we do not care about nullability
-        canonize(composite)
-      case array: ArrayRelDataType =>
-        val elementType = createTypeWithNullability(array.getComponentType, nullable)
-        canonize(new ArrayRelDataType(array.typeInfo, elementType, nullable))
       case _ =>
-        super.createTypeWithNullability(relDataType, nullable)
+        super.createTypeWithNullability(relDataType, isNullable)
     }
+
+    canonize(newType)
+  }
 }
 
 object FlinkTypeFactory {

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
index d76613e..eb14291 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
@@ -42,9 +42,9 @@ import scala.collection.mutable
 class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
 
   private val timestamp = rexBuilder
-    .getTypeFactory
-    .asInstanceOf[FlinkTypeFactory]
-    .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+      .getTypeFactory
+      .asInstanceOf[FlinkTypeFactory]
+      .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP, isNullable = false)
 
   override def visit(intersect: LogicalIntersect): RelNode =
     throw new TableException("Logical intersect in a stream environment is not supported
yet.")
@@ -341,7 +341,7 @@ object RelTimeIndicatorConverter {
       .getRexBuilder
       .getTypeFactory
       .asInstanceOf[FlinkTypeFactory]
-      .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+      .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP, isNullable = false)
 
     // convert all time indicators types to timestamps
     val fields = rootRel.getRowType.getFieldList.map { field =>
@@ -381,7 +381,7 @@ class RexTimeIndicatorMaterializer(
   private val timestamp = rexBuilder
     .getTypeFactory
     .asInstanceOf[FlinkTypeFactory]
-    .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+    .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP, isNullable = false)
 
   override def visitInputRef(inputRef: RexInputRef): RexNode = {
     // reference is interesting

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
index cf36417..3e71c99 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
@@ -53,15 +53,21 @@ class ExpressionReducer(config: TableConfig)
       // we need to cast here for RexBuilder.makeLiteral
       case (SqlTypeName.DATE, e) =>
         Some(
-          rexBuilder.makeCast(typeFactory.createTypeFromTypeInfo(BasicTypeInfo.INT_TYPE_INFO),
e)
+          rexBuilder.makeCast(
+            typeFactory.createTypeFromTypeInfo(BasicTypeInfo.INT_TYPE_INFO, e.getType.isNullable),
+            e)
         )
       case (SqlTypeName.TIME, e) =>
         Some(
-          rexBuilder.makeCast(typeFactory.createTypeFromTypeInfo(BasicTypeInfo.INT_TYPE_INFO),
e)
+          rexBuilder.makeCast(
+            typeFactory.createTypeFromTypeInfo(BasicTypeInfo.INT_TYPE_INFO, e.getType.isNullable),
+            e)
         )
       case (SqlTypeName.TIMESTAMP, e) =>
         Some(
-          rexBuilder.makeCast(typeFactory.createTypeFromTypeInfo(BasicTypeInfo.LONG_TYPE_INFO),
e)
+          rexBuilder.makeCast(
+            typeFactory.createTypeFromTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, e.getType.isNullable),
+            e)
         )
 
       // we don't support object literals yet, we skip those constant expressions

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
index a9901a3..c2d1bdf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
@@ -67,7 +67,7 @@ case class Sum(child: Expression) extends Aggregation {
   override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = {
     val returnType = relBuilder
       .getTypeFactory.asInstanceOf[FlinkTypeFactory]
-      .createTypeFromTypeInfo(resultType)
+      .createTypeFromTypeInfo(resultType, isNullable = true)
     new SqlSumAggFunction(returnType)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
index 7211733..3288478 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
@@ -38,7 +38,7 @@ case class ArrayConstructor(elements: Seq[Expression]) extends Expression
{
     val relDataType = relBuilder
       .asInstanceOf[FlinkRelBuilder]
       .getTypeFactory
-      .createTypeFromTypeInfo(resultType)
+      .createTypeFromTypeInfo(resultType, isNullable = false)
     val values = elements.map(_.toRexNode).toList.asJava
     relBuilder
       .getRexBuilder

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
index bd4fa2f..eb4b402 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
@@ -104,7 +104,7 @@ case class OverCall(
     val operator: SqlAggFunction = agg.asInstanceOf[Aggregation].getSqlAggFunction()
     val aggResultType = relBuilder
       .getTypeFactory.asInstanceOf[FlinkTypeFactory]
-      .createTypeFromTypeInfo(agg.resultType)
+      .createTypeFromTypeInfo(agg.resultType, isNullable = true)
 
     // assemble exprs by agg children
     val aggExprs = agg.asInstanceOf[Aggregation].children.map(_.toRexNode(relBuilder)).asJava
@@ -151,7 +151,7 @@ case class OverCall(
       case b: Literal =>
         val returnType = relBuilder
           .getTypeFactory.asInstanceOf[FlinkTypeFactory]
-          .createTypeFromTypeInfo(Types.DECIMAL)
+          .createTypeFromTypeInfo(Types.DECIMAL, isNullable = true)
 
         val sqlOperator = new SqlPostfixOperator(
           sqlKind.name,

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cast.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cast.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cast.scala
index 312bf12..ba08ccb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cast.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cast.scala
@@ -30,12 +30,13 @@ case class Cast(child: Expression, resultType: TypeInformation[_]) extends
Unary
 
   override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
     val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+    val childRexNode = child.toRexNode
     relBuilder
       .getRexBuilder
       // we use abstract cast here because RelBuilder.cast() has to many side effects
       .makeAbstractCast(
-        typeFactory.createTypeFromTypeInfo(resultType),
-        child.toRexNode)
+        typeFactory.createTypeFromTypeInfo(resultType, childRexNode.getType.isNullable),
+        childRexNode)
   }
 
   override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = {

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
index 053e7ed..eb9c4f5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
@@ -121,7 +121,7 @@ case class Null(resultType: TypeInformation[_]) extends LeafExpression
{
     val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
     rexBuilder
       .makeCast(
-        typeFactory.createTypeFromTypeInfo(resultType),
+        typeFactory.createTypeFromTypeInfo(resultType, isNullable = true),
         rexBuilder.constantNull())
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
index 250ec0a..0a02666 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
@@ -103,7 +103,9 @@ case class Extract(timeIntervalUnit: Expression, temporal: Expression)
extends E
 
     // TODO convert this into Table API expressions to make the code more readable
     val rexBuilder = relBuilder.getRexBuilder
-    val resultType = relBuilder.getTypeFactory().createTypeFromTypeInfo(LONG_TYPE_INFO)
+    val resultType = relBuilder
+      .getTypeFactory()
+      .createTypeFromTypeInfo(LONG_TYPE_INFO, isNullable = true)
     var result = rexBuilder.makeReinterpretCast(
       resultType,
       temporal,

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
index 4197760..b7d9991 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
@@ -97,7 +97,7 @@ object AggSqlFunction {
               s"Operand types of ${signatureToString(operandTypeInfo)} could not be inferred."))
 
         val inferredTypes = getParameterTypes(aggregateFunction, foundSignature.drop(1))
-          .map(typeFactory.createTypeFromTypeInfo)
+          .map(typeFactory.createTypeFromTypeInfo(_, isNullable = true))
 
         for (i <- operandTypes.indices) {
           if (i < inferredTypes.length - 1) {
@@ -120,7 +120,7 @@ object AggSqlFunction {
 
     new SqlReturnTypeInference {
       override def inferReturnType(opBinding: SqlOperatorBinding): RelDataType = {
-        typeFactory.createTypeFromTypeInfo(resultType)
+        typeFactory.createTypeFromTypeInfo(resultType, isNullable = true)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
index b1b45cd..0776f7a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
@@ -85,8 +85,7 @@ object ScalarSqlFunction {
               s"Expected: ${signaturesToString(scalarFunction, "eval")}")
         }
         val resultType = getResultTypeOfScalarFunction(scalarFunction, foundSignature.get)
-        val t = typeFactory.createTypeFromTypeInfo(resultType)
-        typeFactory.createTypeWithNullability(t, nullable = true)
+        typeFactory.createTypeFromTypeInfo(resultType, isNullable = true)
       }
     }
   }
@@ -111,7 +110,7 @@ object ScalarSqlFunction {
 
         val inferredTypes = scalarFunction
           .getParameterTypes(foundSignature)
-          .map(typeFactory.createTypeFromTypeInfo)
+          .map(typeFactory.createTypeFromTypeInfo(_, isNullable = true))
 
         for (i <- operandTypes.indices) {
           if (i < inferredTypes.length - 1) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
index 4443d6c..81f6bf0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
@@ -76,7 +76,7 @@ class LogicalWindowAggregate(
     namedProperties.foreach { namedProp =>
       builder.add(
         namedProp.name,
-        typeFactory.createTypeFromTypeInfo(namedProp.property.resultType)
+        typeFactory.createTypeFromTypeInfo(namedProp.property.resultType, isNullable = false)
       )
     }
     builder.build()

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
index 8154738..3e605e8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
@@ -81,7 +81,7 @@ class FlinkLogicalWindowAggregate(
     namedProperties.foreach { namedProp =>
       builder.add(
         namedProp.name,
-        typeFactory.createTypeFromTypeInfo(namedProp.property.resultType)
+        typeFactory.createTypeFromTypeInfo(namedProp.property.resultType, isNullable = false)
       )
     }
     builder.build()

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala
index f7d9e1d..ed64c62 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala
@@ -42,7 +42,8 @@ class ArrayRelDataType(
     case that: ArrayRelDataType =>
       super.equals(that) &&
         (that canEqual this) &&
-        typeInfo == that.typeInfo
+        typeInfo == that.typeInfo &&
+        isNullable == that.isNullable
     case _ => false
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
index 3694cc5..e0c6b6f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
@@ -31,10 +31,12 @@ import scala.collection.JavaConverters._
   * Composite type for encapsulating Flink's [[CompositeType]].
   *
   * @param compositeType CompositeType to encapsulate
+  * @param nullable flag if type can be nullable
   * @param typeFactory Flink's type factory
   */
 class CompositeRelDataType(
     val compositeType: CompositeType[_],
+    val nullable: Boolean,
     typeFactory: FlinkTypeFactory)
   extends RelRecordType(StructKind.PEEK_FIELDS, createFieldList(compositeType, typeFactory))
{
 
@@ -46,7 +48,8 @@ class CompositeRelDataType(
     case that: CompositeRelDataType =>
       super.equals(that) &&
         (that canEqual this) &&
-        compositeType == that.compositeType
+        compositeType == that.compositeType &&
+        nullable == that.nullable
     case _ => false
   }
 
@@ -54,6 +57,8 @@ class CompositeRelDataType(
     compositeType.hashCode()
   }
 
+  override def isNullable: Boolean = nullable
+
 }
 
 object CompositeRelDataType {
@@ -73,11 +78,11 @@ object CompositeRelDataType {
         new RelDataTypeFieldImpl(
           name,
           index,
-          typeFactory.createTypeFromTypeInfo(compositeType.getTypeAt(index), nullable = true))
-            .asInstanceOf[RelDataTypeField]
+          // TODO the composite type should provide the information if subtypes are nullable
+          typeFactory.createTypeFromTypeInfo(compositeType.getTypeAt(index), isNullable =
true)
+        ).asInstanceOf[RelDataTypeField]
       }
       .toList
       .asJava
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
index 1c05883..27fc2ea 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
@@ -78,7 +78,7 @@ class FlinkTableFunctionImpl[T](
     fieldNames
       .zip(fieldTypes)
       .foreach { f =>
-        builder.add(f._1, flinkTypeFactory.createTypeFromTypeInfo(f._2)).nullable(true)
+        builder.add(f._1, flinkTypeFactory.createTypeFromTypeInfo(f._2, isNullable = true))
       }
     builder.build
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala
index d93908b..84dd669 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala
@@ -18,23 +18,27 @@
 
 package org.apache.flink.table.plan.schema
 
+import org.apache.calcite.rel.`type`.RelDataTypeSystem
 import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName}
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.calcite.FlinkTypeSystem
 
 /**
   * Generic type for encapsulating Flink's [[TypeInformation]].
   *
   * @param typeInfo TypeInformation to encapsulate
+  * @param nullable flag if type can be nullable
   * @param typeSystem Flink's type system
   */
 class GenericRelDataType(
     val typeInfo: TypeInformation[_],
-    typeSystem: FlinkTypeSystem)
+    val nullable: Boolean,
+    typeSystem: RelDataTypeSystem)
   extends BasicSqlType(
     typeSystem,
     SqlTypeName.ANY) {
 
+  isNullable = nullable
+
   override def toString = s"ANY($typeInfo)"
 
   def canEqual(other: Any): Boolean = other.isInstanceOf[GenericRelDataType]
@@ -43,7 +47,8 @@ class GenericRelDataType(
     case that: GenericRelDataType =>
       super.equals(that) &&
         (that canEqual this) &&
-        typeInfo == that.typeInfo
+        typeInfo == that.typeInfo &&
+        nullable == that.nullable
     case _ => false
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
index 5e27061..ace881a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
@@ -38,12 +38,14 @@ class TimeIndicatorRelDataType(
     case that: TimeIndicatorRelDataType =>
       super.equals(that) &&
         isEventTime == that.isEventTime
-    case that: BasicSqlType =>
-      super.equals(that)
     case _ => false
   }
 
   override def hashCode(): Int = {
     super.hashCode() + 42 // we change the hash code to differentiate from regular timestamps
   }
+
+  override def toString: String = {
+    s"TIME ATTRIBUTE(${if (isEventTime) "ROWTIME" else "PROCTIME"})"
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
index a024460..599c76b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
@@ -772,8 +772,8 @@ class GroupWindowTest extends TableTestBase {
       ),
       term("select",
         "string",
-        "+(CAST(TMP_0), 1) AS s1",
-        "+(CAST(TMP_0), 3) AS s2",
+        "+(TMP_0, 1) AS s1",
+        "+(TMP_0, 3) AS s2",
         "TMP_1 AS x",
         "TMP_1 AS x2",
         "TMP_2 AS x3",

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
index 90c8ea4..870025e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
@@ -161,8 +161,8 @@ class TimeIndicatorConversionTest extends TableTestBase {
         term("invocation",
           s"${func.functionIdentifier}(TIME_MATERIALIZATION($$0), TIME_MATERIALIZATION($$3),
'')"),
         term("function", func),
-        term("rowType", "RecordType(TIMESTAMP(3) rowtime, BIGINT long, INTEGER int, " +
-          "TIMESTAMP(3) proctime, VARCHAR(2147483647) s)"),
+        term("rowType", "RecordType(TIME ATTRIBUTE(ROWTIME) rowtime, BIGINT long, INTEGER
int, " +
+          "TIME ATTRIBUTE(PROCTIME) proctime, VARCHAR(2147483647) s)"),
         term("joinType", "INNER")
       ),
       term("select",

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/AggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/AggregateITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/AggregateITCase.scala
index 457142c..d563f96 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/AggregateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/AggregateITCase.scala
@@ -42,6 +42,23 @@ class AggregationsITCase(
   extends TableProgramsCollectionTestBase(configMode) {
 
   @Test
+  def testAggregationWithCaseClass(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val inputTable = CollectionDataSets.getSmallNestedTupleDataSet(env).toTable(tEnv, 'a,
'b)
+    tEnv.registerDataSet("MyTable", inputTable)
+
+    val result = tEnv.scan("MyTable")
+      .where('a.get("_1") > 0)
+      .select('a.get("_1").avg, 'a.get("_2").sum, 'b.count)
+
+    val expected = "2,6,3"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
   def testAggregationTypes(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment


Mime
View raw message