flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timo Walther <twal...@apache.org>
Subject Re: Help on RowTypeInfo?
Date Sun, 29 Oct 2017 05:37:33 GMT
Hi,

the translate() method is an internal method. You can use 
"toRetractStream(table, Row.class)" or "toAppendStream(table, 
Row.class)" to convert you table into a stream. Make sure to use the 
correct StreamTableEnvironment for your API: 
org.apache.flink.table.api.java.StreamTableEnvironment

Regards,
Timo

Am 10/29/17 um 5:53 AM schrieb PaulWu:
> Please help how to "translate" table to DataStream in the fellowing code.
>
> StreamTableEnvironment ste =
> StreamTableEnvironment.getTableEnvironment(EXE_ENV);
>          ste.registerDataStreamInternal("abc", stream);
>          Table ts = ste.sql("select * from abc");
>          ts = ts.as("count,word");
>          System.out.println("ts=" + ts.getSchema());
>          ts.printSchema();
>          String[] names = new String[]{"count", "word"};
>          TypeInformation[] types = new TypeInformation[]{Types.STRING,
> Types.STRING};
>
>          RowTypeInfo tpe = Types.ROW(types);
>          DataStream<Row> ds = ste.translate(ts, ste.queryConfig(), true,
> true, tpe);
>          ds.print();
>
> It throws an exception:
> Exception in thread "main" scala.MatchError: Row(f0: String, f1: String) (of
> class org.apache.flink.api.java.typeutils.RowTypeInfo)
> 	at
> org.apache.flink.table.api.StreamTableEnvironment.getConversionMapperWithChanges(StreamTableEnvironment.scala:293)
> 	at
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:679)
> 	at
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:645)
> 	at
> com.att.ariso.ReadFromKafkaGASFPBTable.main(ReadFromKafkaGASFPBTable.java:127)
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Mime
View raw message