flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Fabian Hueske (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6476) Table environment register row data stream
Date Mon, 08 May 2017 08:18:04 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16000418#comment-16000418

Fabian Hueske commented on FLINK-6476:

Hi [~rtudoran],

you are right, it should be possible convert a DataStream<Row> (or DataSet<Row>)
into a Table.
As you noticed, the TableEnvironment.getFieldInfo() lacks support for RowTypeInfo.

Another issue is that it is not possible to reliably extract the TypeInformation for Row (neither
from signatures nor object instances). 
By default, Flink will use a GenericType<Row> for a DataSet<Row> or DataStream<Row>.
Since, GenericType<Row> does not contain any information about the fields of a Row is
cannot be used to create a Table. (This issue was address in FLINK-6059)
Therefore, users must manually specify a RowTypeInfo to create a Table.

The Java API offers the DataStream.returns() method to hint types. In Scala, we use implicit
types to override the automatically extracted type:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val data = List(Row.of("Hello", "Worlds", Int.box(1)))
implicit val tpe: TypeInformation[Row] = new RowTypeInfo(Types.STRING, Types.STRING, Types.INT)
// tpe is automatically chosen when an implicit value of type TypeInformation[Row] is requested.
val stream = env.fromCollection(data)

> Table environment register row data stream
> ------------------------------------------
>                 Key: FLINK-6476
>                 URL: https://issues.apache.org/jira/browse/FLINK-6476
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>         Environment: java/scala
>            Reporter: radu
>            Assignee: radu
>              Labels: feature, patch
> Registering as table source streams with Row is currently not possible:
> Java:
> DataStream<Row> ds = ...
> tableEnv.registerDataStream("MyTableRow", ds, "a, b, c ...");
> org.apache.flink.table.api.TableException: Source of type Row(f0: Integer, f1: Long,
f2: Integer, f3: String, f4: Integer) cannot be converted into Table.
> 	at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:680)
> 	at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:363)
> 	at org.apache.flink.table.api.java.StreamTableEnvironment.registerDataStream(StreamTableEnvironment.scala:133)
> 	at org.apache.flink.table.api.java.stream.sql.SqlITCase.testRow2(SqlITCase.java:92)
> Scala:
> val ds:DataStream[Row] = ...
> tableEnv.registerDataStream("MyTableRow", ds, "a, b, c, d, e");
> org.apache.flink.api.java.typeutils.GenericTypeInfo cannot be cast to org.apache.flink.api.common.typeutils.CompositeType
> This can be supported by extending the in the  org.apache.flink.table.api.TableEnvironment
> getFieldInfo()
> and by constructing the StreamTableSource correspondingly

This message was sent by Atlassian JIRA

View raw message