Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B1AE0200C67 for ; Sun, 30 Apr 2017 18:59:43 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B0760160B8C; Sun, 30 Apr 2017 16:59:43 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id AACEE160BA4 for ; Sun, 30 Apr 2017 18:59:42 +0200 (CEST) Received: (qmail 84152 invoked by uid 500); 30 Apr 2017 16:59:41 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 84135 invoked by uid 99); 30 Apr 2017 16:59:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 30 Apr 2017 16:59:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 317AEE0061; Sun, 30 Apr 2017 16:59:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: twalthr@apache.org To: commits@flink.apache.org Date: Sun, 30 Apr 2017 16:59:42 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] flink git commit: [FLINK-6377] [table] Support map types in the Table / SQL API archived-at: Sun, 30 Apr 2017 16:59:43 -0000 [FLINK-6377] [table] Support map types in the Table / SQL API This closes #3767. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5b6e71ce Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5b6e71ce Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5b6e71ce Branch: refs/heads/master Commit: 5b6e71ceb12d1fdf7d09d70744f3c0a8a4722768 Parents: 0a33431 Author: Haohui Mai Authored: Mon Apr 24 23:12:29 2017 -0700 Committer: twalthr Committed: Sun Apr 30 18:59:05 2017 +0200 ---------------------------------------------------------------------- .../flink/table/calcite/FlinkTypeFactory.scala | 13 ++++-- .../flink/table/codegen/CodeGenerator.scala | 22 ++++++--- .../table/codegen/calls/ScalarOperators.scala | 34 +++++++++++++- .../flink/table/plan/nodes/FlinkRelNode.scala | 2 +- .../table/plan/schema/MapRelDataType.scala | 49 ++++++++++++++++++++ .../table/api/java/batch/sql/SqlITCase.java | 33 +++++++++++++ 6 files changed, 140 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5b6e71ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala index 22a5c9f..7762ff8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala @@ -28,13 +28,12 @@ import org.apache.calcite.sql.parser.SqlParserPos import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.java.typeutils.{ObjectArrayTypeInfo, RowTypeInfo} +import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo, RowTypeInfo} import org.apache.flink.api.java.typeutils.ValueTypeInfo._ import org.apache.flink.table.api.TableException -import org.apache.flink.table.plan.schema.{CompositeRelDataType, GenericRelDataType} +import org.apache.flink.table.plan.schema.{ArrayRelDataType, CompositeRelDataType, GenericRelDataType, MapRelDataType} import org.apache.flink.table.typeutils.TimeIntervalTypeInfo import org.apache.flink.table.typeutils.TypeCheckUtils.isSimple -import org.apache.flink.table.plan.schema.ArrayRelDataType import org.apache.flink.table.calcite.FlinkTypeFactory.typeInfoToSqlTypeName import org.apache.flink.types.Row @@ -123,6 +122,10 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp case oa: ObjectArrayTypeInfo[_, _] => new ArrayRelDataType(oa, createTypeFromTypeInfo(oa.getComponentInfo), true) + case mp: MapTypeInfo[_, _] => + new MapRelDataType(mp, createTypeFromTypeInfo(mp.getKeyTypeInfo), + createTypeFromTypeInfo(mp.getValueTypeInfo), true) + case ti: TypeInformation[_] => new GenericRelDataType(typeInfo, getTypeSystem.asInstanceOf[FlinkTypeSystem]) @@ -226,6 +229,10 @@ object FlinkTypeFactory { val arrayRelDataType = relDataType.asInstanceOf[ArrayRelDataType] arrayRelDataType.typeInfo + case MAP if relDataType.isInstanceOf[MapRelDataType] => + val mapRelDataType = relDataType.asInstanceOf[MapRelDataType] + mapRelDataType.typeInfo + case _@t => throw TableException(s"Type is not supported: $t") } http://git-wip-us.apache.org/repos/asf/flink/blob/5b6e71ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala index 298fb70..648efe6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala @@ -28,9 +28,9 @@ import org.apache.calcite.sql.`type`.SqlTypeName._ import org.apache.calcite.sql.fun.SqlStdOperatorTable._ import org.apache.flink.api.common.functions._ import org.apache.flink.api.common.io.GenericInputFormat -import org.apache.flink.api.common.typeinfo.{AtomicType, SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeinfo.{AtomicType, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.java.typeutils.{GenericTypeInfo, PojoTypeInfo, RowTypeInfo, TupleTypeInfo} +import org.apache.flink.api.java.typeutils._ import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.configuration.Configuration import org.apache.flink.table.api.TableConfig @@ -1414,11 +1414,19 @@ class CodeGenerator( generateArray(this, resultType, operands) case ITEM => - val array = operands.head - val index = operands(1) - requireArray(array) - requireInteger(index) - generateArrayElementAt(this, array, index) + operands.head.resultType match { + case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] => + val array = operands.head + val index = operands(1) + requireInteger(index) + generateArrayElementAt(this, array, index) + + case map: MapTypeInfo[_, _] => + val key = operands(1) + generateMapGet(this, operands.head, key) + + case _ => throw new CodeGenException("Expect an array or a map.") + } case CARDINALITY => val array = operands.head http://git-wip-us.apache.org/repos/asf/flink/blob/5b6e71ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala index 47a81ab..0c5baa6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala @@ -22,9 +22,9 @@ import org.apache.calcite.avatica.util.{DateTimeUtils, TimeUnitRange} import org.apache.calcite.util.BuiltInMethod import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation} -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo +import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo} import org.apache.flink.table.codegen.CodeGenUtils._ -import org.apache.flink.table.codegen.{CodeGenerator, CodeGenException, GeneratedExpression} +import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, GeneratedExpression} import org.apache.flink.table.typeutils.TimeIntervalTypeInfo import org.apache.flink.table.typeutils.TypeCheckUtils._ @@ -911,6 +911,36 @@ object ScalarOperators { } } + def generateMapGet( + codeGenerator: CodeGenerator, + map: GeneratedExpression, + key: GeneratedExpression) + : GeneratedExpression = { + + val resultTerm = newName("result") + val nullTerm = newName("isNull") + val ty = map.resultType.asInstanceOf[MapTypeInfo[_,_]] + val resultType = ty.getValueTypeInfo + val resultTypeTerm = boxedTypeTermForTypeInfo(ty.getValueTypeInfo) + val accessCode = if (codeGenerator.nullCheck) { + s""" + |${map.code} + |${key.code} + |boolean $nullTerm = (${map.nullTerm} || ${key.nullTerm}); + |$resultTypeTerm $resultTerm = $nullTerm ? + | null : ($resultTypeTerm) ${map.resultTerm}.get(${key.resultTerm}); + |""".stripMargin + } else { + s""" + |${map.code} + |${key.code} + |$resultTypeTerm $resultTerm = ($resultTypeTerm) + | ${map.resultTerm}.get(${key.resultTerm}); + |""".stripMargin + } + GeneratedExpression(resultTerm, nullTerm, accessCode, resultType) + } + // ---------------------------------------------------------------------------------------------- private def generateUnaryOperatorIfNotNull( http://git-wip-us.apache.org/repos/asf/flink/blob/5b6e71ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala index ccdddef..7554ea9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala @@ -93,7 +93,7 @@ trait FlinkRelNode extends RelNode { case SqlTypeName.ARRAY => // 16 is an arbitrary estimate estimateDataTypeSize(t.getComponentType) * 16 - case SqlTypeName.ANY => 128 // 128 is an arbitrary estimate + case SqlTypeName.ANY | SqlTypeName.MAP => 128 // 128 is an arbitrary estimate case _ => throw TableException(s"Unsupported data type encountered: $t") } } http://git-wip-us.apache.org/repos/asf/flink/blob/5b6e71ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/MapRelDataType.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/MapRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/MapRelDataType.scala new file mode 100644 index 0000000..b3ff99f --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/MapRelDataType.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.schema + +import com.google.common.base.Objects +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.sql.`type`.MapSqlType +import org.apache.flink.api.common.typeinfo.TypeInformation + +class MapRelDataType( + val typeInfo: TypeInformation[_], + val keyType: RelDataType, + val valueType: RelDataType, + isNullable: Boolean) extends MapSqlType(keyType, valueType, isNullable) { + + override def toString: String = s"MAP($typeInfo)" + + def canEqual(other: Any): Boolean = other.isInstanceOf[MapRelDataType] + + override def equals(other: Any): Boolean = other match { + case that: MapRelDataType => + super.equals(that) && + (that canEqual this) && + keyType == that.keyType && + valueType == that.valueType && + isNullable == that.isNullable + case _ => false + } + + override def hashCode(): Int = { + Objects.hashCode(keyType, valueType) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/5b6e71ce/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java index 5ba67dd..114226c 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java @@ -18,9 +18,14 @@ package org.apache.flink.table.api.java.batch.sql; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.types.Row; @@ -32,7 +37,10 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; @RunWith(Parameterized.class) public class SqlITCase extends TableProgramsCollectionTestBase { @@ -138,4 +146,29 @@ public class SqlITCase extends TableProgramsCollectionTestBase { String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"; compareResultAsText(results, expected); } + + @Test + public void testMap() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); + + List>> rows = new ArrayList<>(); + rows.add(new Tuple2<>(1, Collections.singletonMap("foo", "bar"))); + rows.add(new Tuple2<>(2, Collections.singletonMap("foo", "spam"))); + + TypeInformation>> ty = new TupleTypeInfo<>( + BasicTypeInfo.INT_TYPE_INFO, + new MapTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)); + + DataSet>> ds1 = env.fromCollection(rows, ty); + tableEnv.registerDataSet("t1", ds1, "a, b"); + + String sqlQuery = "SELECT b['foo'] FROM t1"; + Table result = tableEnv.sql(sqlQuery); + + DataSet resultSet = tableEnv.toDataSet(result, Row.class); + List results = resultSet.collect(); + String expected = "bar\n" + "spam\n"; + compareResultAsText(results, expected); + } }