Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 37C08200C78 for ; Wed, 3 May 2017 14:10:11 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 361F9160BB5; Wed, 3 May 2017 12:10:11 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2E653160BD4 for ; Wed, 3 May 2017 14:10:10 +0200 (CEST) Received: (qmail 59035 invoked by uid 500); 3 May 2017 12:10:08 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 58278 invoked by uid 99); 3 May 2017 12:10:08 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 May 2017 12:10:08 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 70573E04F2; Wed, 3 May 2017 12:10:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: fhueske@apache.org To: commits@flink.apache.org Date: Wed, 03 May 2017 12:10:27 -0000 Message-Id: <9b47f176dd7f4bcc85e989d628d0e5a4@git.apache.org> In-Reply-To: <2de511ffe12d45ca85e8d3de85af107f@git.apache.org> References: <2de511ffe12d45ca85e8d3de85af107f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [20/50] [abbrv] flink git commit: [FLINK-6059] [table] Reject GenericType when converting DataSet or DataStream to Table. archived-at: Wed, 03 May 2017 12:10:11 -0000 [FLINK-6059] [table] Reject GenericType when converting DataSet or DataStream to Table. This closes #3546. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c8eb55f1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c8eb55f1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c8eb55f1 Branch: refs/heads/table-retraction Commit: c8eb55f17d64722bb600c1083a478ab99e53f4ec Parents: 2c68085 Author: Fabian Hueske Authored: Wed Mar 15 13:24:42 2017 +0100 Committer: Fabian Hueske Committed: Sat Apr 29 00:44:55 2017 +0200 ---------------------------------------------------------------------- .../flink/table/api/TableEnvironment.scala | 13 +++++++- .../api/java/batch/TableEnvironmentITCase.java | 31 ++++++++++++++++++++ .../flink/table/TableEnvironmentTest.scala | 18 ++++++++++-- .../scala/batch/TableEnvironmentITCase.scala | 30 +++++++++++++++++++ 4 files changed, 88 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c8eb55f1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 16c40fe..bd974b0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -565,7 +565,14 @@ abstract class TableEnvironment(val config: TableConfig) { */ protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = { - (TableEnvironment.getFieldNames(inputType), TableEnvironment.getFieldIndices(inputType)) + + if (inputType.isInstanceOf[GenericTypeInfo[A]] && inputType.getTypeClass == classOf[Row]) { + throw new TableException( + "An input of GenericTypeInfo cannot be converted to Table. " + + "Please specify the type of the input with a RowTypeInfo.") + } else { + (TableEnvironment.getFieldNames(inputType), TableEnvironment.getFieldIndices(inputType)) + } } /** @@ -584,6 +591,10 @@ abstract class TableEnvironment(val config: TableConfig) { TableEnvironment.validateType(inputType) val indexedNames: Array[(Int, String)] = inputType match { + case g: GenericTypeInfo[A] if g.getTypeClass == classOf[Row] => + throw new TableException( + "An input of GenericTypeInfo cannot be converted to Table. " + + "Please specify the type of the input with a RowTypeInfo.") case a: AtomicType[A] => if (exprs.length != 1) { throw new TableException("Table of atomic type can only have a single field.") http://git-wip-us.apache.org/repos/asf/flink/blob/c8eb55f1/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java index e165983..cab3855 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java @@ -28,6 +28,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.calcite.tools.RuleSets; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.table.api.java.BatchTableEnvironment; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; @@ -46,6 +47,8 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import static org.junit.Assert.assertTrue; + @RunWith(Parameterized.class) public class TableEnvironmentITCase extends TableProgramsCollectionTestBase { @@ -375,6 +378,34 @@ public class TableEnvironmentITCase extends TableProgramsCollectionTestBase { } @Test(expected = TableException.class) + public void testGenericRow() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); + + // use null value the enforce GenericType + DataSet dataSet = env.fromElements(Row.of(1, 2L, "Hello", null)); + assertTrue(dataSet.getType() instanceof GenericTypeInfo); + assertTrue(dataSet.getType().getTypeClass().equals(Row.class)); + + // Must fail. Cannot import DataSet with GenericTypeInfo. + tableEnv.fromDataSet(dataSet); + } + + @Test(expected = TableException.class) + public void testGenericRowWithAlias() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); + + // use null value the enforce GenericType + DataSet dataSet = env.fromElements(Row.of((Integer)null)); + assertTrue(dataSet.getType() instanceof GenericTypeInfo); + assertTrue(dataSet.getType().getTypeClass().equals(Row.class)); + + // Must fail. Cannot import DataSet with GenericTypeInfo. + tableEnv.fromDataSet(dataSet, "nullField"); + } + + @Test(expected = TableException.class) public void testAsWithToFewFields() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); http://git-wip-us.apache.org/repos/asf/flink/blob/c8eb55f1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala index 3d93f45..9939a9c 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala @@ -21,13 +21,13 @@ package org.apache.flink.table import org.apache.flink.api.scala._ import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor} import org.apache.flink.table.api.scala._ +import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo, TypeExtractor} import org.apache.flink.table.api.TableException import org.apache.flink.table.expressions.{Alias, UnresolvedFieldReference} import org.apache.flink.table.utils.{MockTableEnvironment, TableTestBase} -import org.apache.flink.table.utils.TableTestUtil._ - +import org.apache.flink.table.utils.TableTestUtil.{batchTableNode, binaryNode, streamTableNode, term, unaryNode} +import org.apache.flink.types.Row import org.junit.Test import org.junit.Assert.assertEquals @@ -46,6 +46,8 @@ class TableEnvironmentTest extends TableTestBase { val atomicType = INT_TYPE_INFO + val genericRowType = new GenericTypeInfo[Row](classOf[Row]) + @Test def testGetFieldInfoTuple(): Unit = { val fieldInfo = tEnv.getFieldInfo(tupleType) @@ -78,6 +80,11 @@ class TableEnvironmentTest extends TableTestBase { fieldInfo._2.zip(Array(0)).foreach(x => assertEquals(x._2, x._1)) } + @Test(expected = classOf[TableException]) + def testGetFieldInfoGenericRow(): Unit = { + tEnv.getFieldInfo(genericRowType) + } + @Test def testGetFieldInfoTupleNames(): Unit = { val fieldInfo = tEnv.getFieldInfo( @@ -278,6 +285,11 @@ class TableEnvironmentTest extends TableTestBase { )) } + @Test(expected = classOf[TableException]) + def testGetFieldInfoGenericRowAlias(): Unit = { + tEnv.getFieldInfo(genericRowType, Array(UnresolvedFieldReference("first"))) + } + @Test def testSqlWithoutRegisteringForBatchTables(): Unit = { val util = batchTestUtil() http://git-wip-us.apache.org/repos/asf/flink/blob/c8eb55f1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala index 6cbe834..e61e190 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.api.scala.batch import java.util +import org.apache.flink.api.java.typeutils.GenericTypeInfo import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala._ import org.apache.flink.api.scala.util.CollectionDataSets @@ -31,6 +32,7 @@ import org.apache.flink.test.util.TestBaseUtils import org.junit._ import org.junit.runner.RunWith import org.junit.runners.Parameterized +import org.junit.Assert.assertTrue import scala.collection.JavaConverters._ @@ -254,6 +256,34 @@ class TableEnvironmentITCase( CollectionDataSets.get3TupleDataSet(env) .toTable(tEnv, 'a as 'foo, 'b, 'c) } + + @Test(expected = classOf[TableException]) + def testGenericRow() { + val env = ExecutionEnvironment.getExecutionEnvironment + val tableEnv = TableEnvironment.getTableEnvironment(env, config) + + // use null value the enforce GenericType + val dataSet = env.fromElements(Row.of(null)) + assertTrue(dataSet.getType().isInstanceOf[GenericTypeInfo[_]]) + assertTrue(dataSet.getType().getTypeClass == classOf[Row]) + + // Must fail. Cannot import DataSet with GenericTypeInfo. + tableEnv.fromDataSet(dataSet) + } + + @Test(expected = classOf[TableException]) + def testGenericRowWithAlias() { + val env = ExecutionEnvironment.getExecutionEnvironment + val tableEnv = TableEnvironment.getTableEnvironment(env, config) + + // use null value the enforce GenericType + val dataSet = env.fromElements(Row.of(null)) + assertTrue(dataSet.getType().isInstanceOf[GenericTypeInfo[_]]) + assertTrue(dataSet.getType().getTypeClass == classOf[Row]) + + // Must fail. Cannot import DataSet with GenericTypeInfo. + tableEnv.fromDataSet(dataSet, "nullField") + } } object TableEnvironmentITCase {