flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From PaulWu <zwu....@gmail.com>
Subject Help on RowTypeInfo?
Date Sun, 29 Oct 2017 04:53:03 GMT
Please help how to "translate" table to DataStream in the fellowing code.

StreamTableEnvironment ste =
StreamTableEnvironment.getTableEnvironment(EXE_ENV);
        ste.registerDataStreamInternal("abc", stream);
        Table ts = ste.sql("select * from abc");
        ts = ts.as("count,word");
        System.out.println("ts=" + ts.getSchema());
        ts.printSchema();
        String[] names = new String[]{"count", "word"};
        TypeInformation[] types = new TypeInformation[]{Types.STRING,
Types.STRING};

        RowTypeInfo tpe = Types.ROW(types);
        DataStream<Row> ds = ste.translate(ts, ste.queryConfig(), true,
true, tpe);
        ds.print();

It throws an exception:
Exception in thread "main" scala.MatchError: Row(f0: String, f1: String) (of
class org.apache.flink.api.java.typeutils.RowTypeInfo)
	at
org.apache.flink.table.api.StreamTableEnvironment.getConversionMapperWithChanges(StreamTableEnvironment.scala:293)
	at
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:679)
	at
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:645)
	at
com.att.ariso.ReadFromKafkaGASFPBTable.main(ReadFromKafkaGASFPBTable.java:127)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Mime
View raw message