Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id CC603200BF6 for ; Tue, 10 Jan 2017 22:30:23 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id CAE7B160B3D; Tue, 10 Jan 2017 21:30:23 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2FEC6160B2C for ; Tue, 10 Jan 2017 22:30:22 +0100 (CET) Received: (qmail 18236 invoked by uid 500); 10 Jan 2017 21:30:21 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 18227 invoked by uid 99); 10 Jan 2017 21:30:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 10 Jan 2017 21:30:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 36428DFA98; Tue, 10 Jan 2017 21:30:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: fhueske@apache.org To: commits@flink.apache.org Date: Tue, 10 Jan 2017 21:30:21 -0000 Message-Id: <7c3c69ece6514526b2f8a3b49e59dcb2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] flink git commit: [FLINK-5280] [table] Refactor TableSource interface. archived-at: Tue, 10 Jan 2017 21:30:24 -0000 Repository: flink Updated Branches: refs/heads/master d4d7cc326 -> 2af939a10 [FLINK-5280] [table] Refactor TableSource interface. This closes #3039. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/38ded2bb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/38ded2bb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/38ded2bb Branch: refs/heads/master Commit: 38ded2bb00aeb5c9581fa7ef313e5b9f803f5c26 Parents: d4d7cc3 Author: Ivan Mushketyk Authored: Thu Dec 22 21:26:34 2016 +0000 Committer: Fabian Hueske Committed: Tue Jan 10 20:45:10 2017 +0100 ---------------------------------------------------------------------- .../connectors/kafka/KafkaTableSource.java | 21 +-- .../flink/table/api/BatchTableEnvironment.scala | 2 +- .../table/api/StreamTableEnvironment.scala | 2 +- .../flink/table/api/TableEnvironment.scala | 135 ++++++++++++++----- .../utils/UserDefinedFunctionUtils.scala | 22 +-- .../flink/table/plan/nodes/FlinkRel.scala | 11 +- .../nodes/dataset/BatchTableSourceScan.scala | 8 +- .../datastream/StreamTableSourceScan.scala | 8 +- .../dataSet/BatchTableSourceScanRule.scala | 6 +- ...ushProjectIntoBatchTableSourceScanRule.scala | 3 +- ...shProjectIntoStreamTableSourceScanRule.scala | 3 +- .../datastream/StreamTableSourceScanRule.scala | 6 +- .../flink/table/plan/schema/FlinkTable.scala | 6 +- .../table/plan/schema/TableSourceTable.scala | 13 +- .../flink/table/sources/CsvTableSource.scala | 11 +- .../flink/table/sources/DefinedFieldNames.scala | 35 +++++ .../flink/table/sources/TableSource.scala | 21 +-- .../table/api/java/batch/TableSourceITCase.java | 11 +- .../flink/table/TableEnvironmentTest.scala | 7 +- .../api/scala/batch/TableSourceITCase.scala | 19 +++ .../flink/table/utils/CommonTestData.scala | 42 +++++- .../flink/api/scala/io/CsvInputFormatTest.scala | 2 - 22 files changed, 259 insertions(+), 135 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java index 9a9c85d..dd32bdd 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java @@ -19,12 +19,12 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.types.Row; import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.table.sources.StreamTableSource; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; import java.util.Properties; @@ -112,23 +112,8 @@ public abstract class KafkaTableSource implements StreamTableSource { } @Override - public int getNumberOfFields() { - return fieldNames.length; - } - - @Override - public String[] getFieldsNames() { - return fieldNames; - } - - @Override - public TypeInformation[] getFieldTypes() { - return fieldTypes; - } - - @Override public TypeInformation getReturnType() { - return new RowTypeInfo(fieldTypes); + return new RowTypeInfo(fieldTypes, fieldNames); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala index 59cad80..4b9936d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala @@ -298,7 +298,7 @@ abstract class BatchTableEnvironment( * @return The [[DataSet]] that corresponds to the translated [[Table]]. */ protected def translate[A](logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataSet[A] = { - validateType(tpe) + TableEnvironment.validateType(tpe) logicalPlan match { case node: DataSetRel => http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index 4e43001..c08b502 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -308,7 +308,7 @@ abstract class StreamTableEnvironment( protected def translate[A] (logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataStream[A] = { - validateType(tpe) + TableEnvironment.validateType(tpe) logicalPlan match { case node: DataStreamRel => http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 1c899e1..2dcfc95 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -18,8 +18,8 @@ package org.apache.flink.table.api -import _root_.java.util.concurrent.atomic.AtomicInteger import _root_.java.lang.reflect.Modifier +import _root_.java.util.concurrent.atomic.AtomicInteger import org.apache.calcite.config.Lex import org.apache.calcite.jdbc.CalciteSchema @@ -32,7 +32,8 @@ import org.apache.calcite.sql.parser.SqlParser import org.apache.calcite.sql.util.ChainedSqlOperatorTable import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RuleSet, RuleSets} import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} -import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv} import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv} @@ -48,6 +49,7 @@ import org.apache.flink.table.functions.{ScalarFunction, TableFunction} import org.apache.flink.table.plan.cost.DataSetCostFactory import org.apache.flink.table.plan.schema.RelTable import org.apache.flink.table.sinks.TableSink +import org.apache.flink.table.sources.{DefinedFieldNames, TableSource} import org.apache.flink.table.validate.FunctionCatalog import _root_.scala.collection.JavaConverters._ @@ -336,48 +338,16 @@ abstract class TableEnvironment(val config: TableConfig) { frameworkConfig } - protected def validateType(typeInfo: TypeInformation[_]): Unit = { - val clazz = typeInfo.getTypeClass - if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || - !Modifier.isPublic(clazz.getModifiers) || - clazz.getCanonicalName == null) { - throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + - s"static and globally accessible.") - } - } - /** * Returns field names and field positions for a given [[TypeInformation]]. * - * Field names are automatically extracted for - * [[org.apache.flink.api.common.typeutils.CompositeType]]. - * The method fails if inputType is not a - * [[org.apache.flink.api.common.typeutils.CompositeType]]. - * * @param inputType The TypeInformation extract the field names and positions from. * @tparam A The type of the TypeInformation. * @return A tuple of two arrays holding the field names and corresponding field positions. */ protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]): - (Array[String], Array[Int]) = - { - validateType(inputType) - - val fieldNames: Array[String] = inputType match { - case t: TupleTypeInfo[A] => t.getFieldNames - case c: CaseClassTypeInfo[A] => c.getFieldNames - case p: PojoTypeInfo[A] => p.getFieldNames - case r: RowTypeInfo => r.getFieldNames - case tpe => - throw new TableException(s"Type $tpe lacks explicit field naming") - } - val fieldIndexes = fieldNames.indices.toArray - - if (fieldNames.contains("*")) { - throw new TableException("Field name can not be '*'.") - } - - (fieldNames, fieldIndexes) + (Array[String], Array[Int]) = { + (TableEnvironment.getFieldNames(inputType), TableEnvironment.getFieldIndices(inputType)) } /** @@ -393,7 +363,7 @@ abstract class TableEnvironment(val config: TableConfig) { inputType: TypeInformation[A], exprs: Array[Expression]): (Array[String], Array[Int]) = { - validateType(inputType) + TableEnvironment.validateType(inputType) val indexedNames: Array[(Int, String)] = inputType match { case a: AtomicType[A] => @@ -554,4 +524,95 @@ object TableEnvironment { new ScalaStreamTableEnv(executionEnvironment, tableConfig) } + + /** + * Returns field names for a given [[TypeInformation]]. + * + * @param inputType The TypeInformation extract the field names. + * @tparam A The type of the TypeInformation. + * @return An array holding the field names + */ + def getFieldNames[A](inputType: TypeInformation[A]): Array[String] = { + validateType(inputType) + + val fieldNames: Array[String] = inputType match { + case t: CompositeType[_] => t.getFieldNames + case a: AtomicType[_] => Array("f0") + case tpe => + throw new TableException(s"Currently only CompositeType and AtomicType are supported. " + + s"Type $tpe lacks explicit field naming") + } + + if (fieldNames.contains("*")) { + throw new TableException("Field name can not be '*'.") + } + + fieldNames + } + + /** + * Validate if class represented by the typeInfo is static and globally accessible + * @param typeInfo type to check + * @throws TableException if type does not meet these criteria + */ + def validateType(typeInfo: TypeInformation[_]): Unit = { + val clazz = typeInfo.getTypeClass + if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || + !Modifier.isPublic(clazz.getModifiers) || + clazz.getCanonicalName == null) { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + + s"static and globally accessible.") + } + } + + /** + * Returns field indexes for a given [[TypeInformation]]. + * + * @param inputType The TypeInformation extract the field positions from. + * @return An array holding the field positions + */ + def getFieldIndices(inputType: TypeInformation[_]): Array[Int] = { + getFieldNames(inputType).indices.toArray + } + + /** + * Returns field types for a given [[TypeInformation]]. + * + * @param inputType The TypeInformation to extract field types from. + * @return An array holding the field types. + */ + def getFieldTypes(inputType: TypeInformation[_]): Array[TypeInformation[_]] = { + validateType(inputType) + + inputType match { + case t: CompositeType[_] => 0.until(t.getArity).map(t.getTypeAt(_)).toArray + case a: AtomicType[_] => Array(a.asInstanceOf[TypeInformation[_]]) + case tpe => + throw new TableException(s"Currently only CompositeType and AtomicType are supported.") + } + } + + /** + * Returns field names for a given [[TableSource]]. + * + * @param tableSource The TableSource to extract field names from. + * @tparam A The type of the TableSource. + * @return An array holding the field names. + */ + def getFieldNames[A](tableSource: TableSource[A]): Array[String] = tableSource match { + case d: DefinedFieldNames => d.getFieldNames + case _ => TableEnvironment.getFieldNames(tableSource.getReturnType) + } + + /** + * Returns field indices for a given [[TableSource]]. + * + * @param tableSource The TableSource to extract field indices from. + * @tparam A The type of the TableSource. + * @return An array holding the field indices. + */ + def getFieldIndices[A](tableSource: TableSource[A]): Array[Int] = tableSource match { + case d: DefinedFieldNames => d.getFieldIndices + case _ => TableEnvironment.getFieldIndices(tableSource.getReturnType) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala index aa3fab0..fa4668d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala @@ -29,7 +29,7 @@ import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.api.{TableException, ValidationException} +import org.apache.flink.table.api.{TableEnvironment, TableException, ValidationException} import org.apache.flink.table.functions.{ScalarFunction, TableFunction, UserDefinedFunction} import org.apache.flink.table.plan.schema.FlinkTableFunctionImpl import org.apache.flink.util.InstantiationUtil @@ -268,23 +268,9 @@ object UserDefinedFunctionUtils { def getFieldInfo(inputType: TypeInformation[_]) : (Array[String], Array[Int], Array[TypeInformation[_]]) = { - val fieldNames: Array[String] = inputType match { - case t: CompositeType[_] => t.getFieldNames - case a: AtomicType[_] => Array("f0") - case tpe => - throw new TableException(s"Currently only CompositeType and AtomicType are supported. " + - s"Type $tpe lacks explicit field naming") - } - val fieldIndexes = fieldNames.indices.toArray - val fieldTypes: Array[TypeInformation[_]] = fieldNames.map { i => - inputType match { - case t: CompositeType[_] => t.getTypeAt(i).asInstanceOf[TypeInformation[_]] - case a: AtomicType[_] => a.asInstanceOf[TypeInformation[_]] - case tpe => - throw new TableException(s"Currently only CompositeType and AtomicType are supported.") - } - } - (fieldNames, fieldIndexes, fieldTypes) + (TableEnvironment.getFieldNames(inputType), + TableEnvironment.getFieldIndices(inputType), + TableEnvironment.getFieldTypes(inputType)) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala index 835f316..9b844be 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala @@ -18,7 +18,9 @@ package org.apache.flink.table.plan.nodes -import org.apache.calcite.rel.`type`.RelDataType +import java.util + +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField} import org.apache.calcite.rex._ import org.apache.calcite.sql.`type`.SqlTypeName import org.apache.flink.api.common.functions.MapFunction @@ -103,10 +105,12 @@ trait FlinkRel { } + private[flink] def estimateRowSize(rowType: RelDataType): Double = { + val fieldList = rowType.getFieldList - rowType.getFieldList.map(_.getType.getSqlTypeName).foldLeft(0) { (s, t) => - t match { + fieldList.map(_.getType.getSqlTypeName).zipWithIndex.foldLeft(0) { (s, t) => + t._1 match { case SqlTypeName.TINYINT => s + 1 case SqlTypeName.SMALLINT => s + 2 case SqlTypeName.INTEGER => s + 4 @@ -120,6 +124,7 @@ trait FlinkRel { case typeName if SqlTypeName.YEAR_INTERVAL_TYPES.contains(typeName) => s + 8 case typeName if SqlTypeName.DAY_INTERVAL_TYPES.contains(typeName) => s + 4 case SqlTypeName.TIME | SqlTypeName.TIMESTAMP | SqlTypeName.DATE => s + 12 + case SqlTypeName.ROW => s + estimateRowSize(fieldList.get(t._2).getType()).asInstanceOf[Int] case _ => throw TableException(s"Unsupported data type encountered: $t") } } http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala index 09cb180..73dddc6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala @@ -23,7 +23,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelNode, RelWriter} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet -import org.apache.flink.table.api.BatchTableEnvironment +import org.apache.flink.table.api.{BatchTableEnvironment, TableEnvironment} import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.schema.TableSourceTable import org.apache.flink.table.sources.BatchTableSource @@ -38,7 +38,9 @@ class BatchTableSourceScan( override def deriveRowType() = { val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] - flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, tableSource.getFieldTypes) + flinkTypeFactory.buildRowDataType( + TableEnvironment.getFieldNames(tableSource), + TableEnvironment.getFieldTypes(tableSource.getReturnType)) } override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { @@ -57,7 +59,7 @@ class BatchTableSourceScan( override def explainTerms(pw: RelWriter): RelWriter = { super.explainTerms(pw) - .item("fields", tableSource.getFieldsNames.mkString(", ")) + .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", ")) } override def translateToPlan( http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala index 702b6eb..7550593 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala @@ -26,7 +26,7 @@ import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.schema.TableSourceTable import org.apache.flink.table.sources.StreamTableSource import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment} /** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */ class StreamTableSourceScan( @@ -38,7 +38,9 @@ class StreamTableSourceScan( override def deriveRowType() = { val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] - flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, tableSource.getFieldTypes) + flinkTypeFactory.buildRowDataType( + TableEnvironment.getFieldNames(tableSource), + TableEnvironment.getFieldTypes(tableSource.getReturnType)) } override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { @@ -57,7 +59,7 @@ class StreamTableSourceScan( override def explainTerms(pw: RelWriter): RelWriter = { super.explainTerms(pw) - .item("fields", tableSource.getFieldsNames.mkString(", ")) + .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", ")) } override def translateToPlan( http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala index d699585..d9f5bf8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala @@ -39,9 +39,9 @@ class BatchTableSourceScanRule /** Rule must only match if TableScan targets a [[BatchTableSource]] */ override def matches(call: RelOptRuleCall): Boolean = { val scan: TableScan = call.rel(0).asInstanceOf[TableScan] - val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable]) + val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable[_]]) dataSetTable match { - case tst: TableSourceTable => + case tst: TableSourceTable[_] => tst.tableSource match { case _: BatchTableSource[_] => true @@ -57,7 +57,7 @@ class BatchTableSourceScanRule val scan: TableScan = rel.asInstanceOf[TableScan] val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) - val tableSource = scan.getTable.unwrap(classOf[TableSourceTable]).tableSource + val tableSource = scan.getTable.unwrap(classOf[TableSourceTable[_]]).tableSource .asInstanceOf[BatchTableSource[_]] new BatchTableSourceScan( rel.getCluster, http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala index 7adec48..70639b7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/PushProjectIntoBatchTableSourceScanRule.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.plan.rules.dataSet import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.plan.nodes.dataset.{BatchTableSourceScan, DataSetCalc} import org.apache.flink.table.plan.rules.util.RexProgramProjectExtractor._ import org.apache.flink.table.sources.{BatchTableSource, ProjectableTableSource} @@ -47,7 +48,7 @@ class PushProjectIntoBatchTableSourceScanRule extends RelOptRule( val usedFields: Array[Int] = extractRefInputFields(calc.calcProgram) // if no fields can be projected, we keep the original plan. - if (scan.tableSource.getNumberOfFields != usedFields.length) { + if (TableEnvironment.getFieldNames(scan.tableSource).length != usedFields.length) { val originTableSource = scan.tableSource.asInstanceOf[ProjectableTableSource[_]] val newTableSource = originTableSource.projectFields(usedFields) val newScan = new BatchTableSourceScan( http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala index 654fb8f..a6d4b82 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/PushProjectIntoStreamTableSourceScanRule.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.plan.rules.datastream import org.apache.calcite.plan.RelOptRule._ import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.plan.nodes.datastream.{DataStreamCalc, StreamTableSourceScan} import org.apache.flink.table.plan.rules.util.RexProgramProjectExtractor._ import org.apache.flink.table.sources.{ProjectableTableSource, StreamTableSource} @@ -48,7 +49,7 @@ class PushProjectIntoStreamTableSourceScanRule extends RelOptRule( val usedFields = extractRefInputFields(calc.calcProgram) // if no fields can be projected, we keep the original plan - if (scan.tableSource.getNumberOfFields != usedFields.length) { + if (TableEnvironment.getFieldNames(scan.tableSource).length != usedFields.length) { val originTableSource = scan.tableSource.asInstanceOf[ProjectableTableSource[_]] val newTableSource = originTableSource.projectFields(usedFields) val newScan = new StreamTableSourceScan( http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala index 296c86b..c6c7c59 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala @@ -40,9 +40,9 @@ class StreamTableSourceScanRule /** Rule must only match if TableScan targets a [[StreamTableSource]] */ override def matches(call: RelOptRuleCall): Boolean = { val scan: TableScan = call.rel(0).asInstanceOf[TableScan] - val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable]) + val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable[_]]) dataSetTable match { - case tst: TableSourceTable => + case tst: TableSourceTable[_] => tst.tableSource match { case _: StreamTableSource[_] => true @@ -59,7 +59,7 @@ class StreamTableSourceScanRule val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE) // The original registered table source - val table: TableSourceTable = scan.getTable.unwrap(classOf[TableSourceTable]) + val table = scan.getTable.unwrap(classOf[TableSourceTable[_]]) val tableSource: StreamTableSource[_] = table.tableSource.asInstanceOf[StreamTableSource[_]] new StreamTableSourceScan( http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala index 8bb5c81..971f54f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala @@ -22,7 +22,7 @@ import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} import org.apache.calcite.schema.impl.AbstractTable import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.table.api.TableException +import org.apache.flink.table.api.{TableEnvironment, TableException} import org.apache.flink.table.calcite.FlinkTypeFactory abstract class FlinkTable[T]( @@ -44,14 +44,14 @@ abstract class FlinkTable[T]( val fieldTypes: Array[TypeInformation[_]] = typeInfo match { - case cType: CompositeType[T] => + case cType: CompositeType[_] => if (fieldNames.length != cType.getArity) { throw new TableException( s"Arity of type (" + cType.getFieldNames.deep + ") " + "not equal to number of field names " + fieldNames.deep + ".") } fieldIndexes.map(cType.getTypeAt(_).asInstanceOf[TypeInformation[_]]) - case aType: AtomicType[T] => + case aType: AtomicType[_] => if (fieldIndexes.length != 1 || fieldIndexes(0) != 0) { throw new TableException( "Non-composite input type may have only a single field and its index must be 0.") http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala index 0f55daf..4f82f5e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala @@ -18,13 +18,12 @@ package org.apache.flink.table.plan.schema -import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.sources.TableSource -import org.apache.flink.types.Row /** Table which defines an external table via a [[TableSource]] */ -class TableSourceTable(val tableSource: TableSource[_]) - extends FlinkTable[Row]( - typeInfo = new RowTypeInfo(tableSource.getFieldTypes: _*), - fieldIndexes = 0.until(tableSource.getNumberOfFields).toArray, - fieldNames = tableSource.getFieldsNames) +class TableSourceTable[T](val tableSource: TableSource[T]) + extends FlinkTable[T]( + typeInfo = tableSource.getReturnType, + fieldIndexes = TableEnvironment.getFieldIndices(tableSource), + fieldNames = TableEnvironment.getFieldNames(tableSource)) http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala index 20e8bb9..f59a331 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/CsvTableSource.scala @@ -73,7 +73,7 @@ class CsvTableSource( throw TableException("Number of field names and field types must be equal.") } - private val returnType = new RowTypeInfo(fieldTypes: _*) + private val returnType = new RowTypeInfo(fieldTypes, fieldNames) private var selectedFields: Array[Int] = fieldTypes.indices.toArray @@ -87,15 +87,6 @@ class CsvTableSource( execEnv.createInput(createCsvInput(), returnType) } - /** Returns the types of the table fields. */ - override def getFieldTypes: Array[TypeInformation[_]] = fieldTypes - - /** Returns the names of the table fields. */ - override def getFieldsNames: Array[String] = fieldNames - - /** Returns the number of fields of the table. */ - override def getNumberOfFields: Int = fieldNames.length - /** Returns the [[RowTypeInfo]] for the return type of the [[CsvTableSource]]. */ override def getReturnType: RowTypeInfo = returnType http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldNames.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldNames.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldNames.scala new file mode 100644 index 0000000..bead3e9 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedFieldNames.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sources + +/** + * Trait that defines custom field names and their indices in the underlying + * data type. + * + * Should be extended together with [[TableSource]] trait. + */ +trait DefinedFieldNames { + + /** Returns the names of the table fields. */ + def getFieldNames: Array[String] + + /** Returns the indices of the table fields. */ + def getFieldIndices: Array[Int] + +} http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala index 9d4ba68..a3eb03d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala @@ -19,22 +19,23 @@ package org.apache.flink.table.sources import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableEnvironment -/** Defines an external table by providing schema information, i.e., field names and types. +/** Defines an external table by providing schema information and used to produce a + * [[org.apache.flink.api.scala.DataSet]] or [[org.apache.flink.streaming.api.scala.DataStream]]. + * Schema information consists of a data type, field names, and corresponding indices of + * these names in the data type. + * + * To define a TableSource one need to implement [[TableSource#getReturnType]]. In this case + * field names and field indices are derived from the returned type. + * + * In case if custom field names are required one need to additionally implement + * the [[DefinedFieldNames]] trait. * * @tparam T The return type of the [[TableSource]]. */ trait TableSource[T] { - /** Returns the number of fields of the table. */ - def getNumberOfFields: Int - - /** Returns the names of the table fields. */ - def getFieldsNames: Array[String] - - /** Returns the types of the table fields. */ - def getFieldTypes: Array[TypeInformation[_]] - /** Returns the [[TypeInformation]] for the return type of the [[TableSource]]. */ def getReturnType: TypeInformation[T] http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java index e5777f2..d67725e 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java @@ -18,17 +18,15 @@ package org.apache.flink.table.api.java.batch; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.table.utils.CommonTestData; -import org.apache.flink.types.Row; -import org.apache.flink.table.api.java.BatchTableEnvironment; -import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase; import org.apache.flink.table.sources.BatchTableSource; +import org.apache.flink.table.utils.CommonTestData; +import org.apache.flink.types.Row; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -91,5 +89,4 @@ public class TableSourceITCase extends TableProgramsTestBase { compareResultAsText(results, expected); } - } http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala index b90de97..f91aee9 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala @@ -68,9 +68,12 @@ class TableEnvironmentTest { fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1)) } - @Test(expected = classOf[TableException]) + @Test def testGetFieldInfoAtomic(): Unit = { - tEnv.getFieldInfo(atomicType) + val fieldInfo = tEnv.getFieldInfo(atomicType) + + fieldInfo._1.zip(Array("f0")).foreach(x => assertEquals(x._2, x._1)) + fieldInfo._2.zip(Array(0)).foreach(x => assertEquals(x._2, x._1)) } @Test http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala index a9218ac..f5ab352 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala @@ -26,6 +26,7 @@ import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.utils.CommonTestData import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.TestBaseUtils +import org.apache.flink.types.Row import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -86,4 +87,22 @@ class TableSourceITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } + @Test + def testNestedBatchTableSourceSQL(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tableEnv = TableEnvironment.getTableEnvironment(env, config) + val nestedTable = CommonTestData.getNestedTableSource + + tableEnv.registerTableSource("NestedPersons", nestedTable) + + val result = tableEnv.sql("SELECT NestedPersons.firstName, NestedPersons.lastName," + + "NestedPersons.address.street, NestedPersons.address.city AS city " + + "FROM NestedPersons " + + "WHERE NestedPersons.address.city LIKE 'Dublin'").collect() + + val expected = "Bob,Taylor,Pearse Street,Dublin" + + TestBaseUtils.compareResultAsText(result.asJava, expected) + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala index 349b369..6e4859b 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala @@ -19,9 +19,16 @@ package org.apache.flink.table.utils import java.io.{File, FileOutputStream, OutputStreamWriter} +import java.util -import org.apache.flink.api.common.typeinfo.BasicTypeInfo -import org.apache.flink.table.sources.CsvTableSource +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo, TypeExtractor} +import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.table.sources.{BatchTableSource, CsvTableSource, TableSource} +import org.apache.flink.api.scala._ object CommonTestData { @@ -60,4 +67,35 @@ object CommonTestData { ignoreComments = "%" ) } + + def getNestedTableSource: BatchTableSource[Person] = { + new BatchTableSource[Person] { + override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Person] = { + val executionEnvironment = ExecutionEnvironment.getExecutionEnvironment + executionEnvironment.fromCollection( + util.Arrays.asList( + new Person("Mike", "Smith", new Address("5th Ave", "New-York")), + new Person("Sally", "Miller", new Address("Potsdamer Platz", "Berlin")), + new Person("Bob", "Taylor", new Address("Pearse Street", "Dublin"))), + getReturnType + ) + } + + override def getReturnType: TypeInformation[Person] = { + TypeExtractor.getForClass(classOf[Person]) + } + } + } + + class Person(var firstName: String, var lastName: String, var address: Address) { + def this() { + this(null, null, null) + } + } + + class Address(var street: String, var city: String) { + def this() { + this(null, null) + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/38ded2bb/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala index 539a257..925ee78 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala @@ -19,8 +19,6 @@ package org.apache.flink.api.scala.io import java.io.{File, FileOutputStream, FileWriter, OutputStreamWriter} -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.java.io.PojoCsvInputFormat import org.apache.flink.api.java.io.TupleCsvInputFormat import org.apache.flink.api.java.io.CsvInputFormatTest.TwitterPOJO