flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aven.wu <danxieai...@163.com>
Subject [SQL] [TableAPI] Table.sqlQuery(sql) 和 tableSink 的 table schema 类型不匹配
Date Tue, 17 Dec 2019 13:20:01 GMT

Hi!
Exception in thread "main" org.apache.flink.table.api.ValidationException: Field types of
query result and registered TableSink [aggregationTableSink] do not match.

SQL = SELECT count(*) as cnt, TUMBLE_START(rowtime, INTERVAL '10' SECOND) as tumTime FROM
abc GROUP BY TUMBLE(rowtime, INTERVAL '10' SECOND)
使用table.sqlQuery(SQL),返回的table schema 是 Query result schema: [cnt: Long, tumTime:
Timestamp]。
而使用 
JsonRowSchemaConverter.convert("{" +
"    type:'object'," +
"    properties:{" +
"        cnt: {" +
"            type: 'number'" +
"        }," +
"        tumTime:{" +
"            type:'string'," +
"            format:'date-time'" +
"        }" +
"    }" +
“}");
创建Elasticsearch6UpsertTableSink table schema 是 TableSink schema:   [cnt: BigDecimal,
tumTime: Timestamp]
而且我看了 JsonRowSchemaConverter.convert 所有的数字类型都被转成BigDecimal,导致SQL返回的schema
和 json定义的schema无法匹配。

请问是我使用的问题还是说框架存在这个问题?

附上源代码:

public class AggregationFunction {



    public static void main(String[] args) {
        String sql = "SELECT count(*) as cnt, TUMBLE_START(rowtime, INTERVAL '10' SECOND)
as tumTime FROM abc GROUP BY TUMBLE(rowtime, INTERVAL '10' SECOND)";
        StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
        senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tenv = StreamTableEnvironment.create(senv);
        DataStream<User> source = senv.addSource(new SourceFunction<User>() {
            @Override
            public void run(SourceContext<User> sourceContext) throws Exception {
                int i = 1000;
                String[] names = {"Hanmeimei", "Lilei"};
                while (i > 1) {
                    sourceContext.collect(new User(names[i%2], i, new Timestamp(System.currentTimeMillis())));
                    Thread.sleep(10);
                    i--;
                }
            }
            @Override
            public void cancel() {

            }
        });
        tenv.registerDataStream("abc", source, "name, age, timestamp, rowtime.rowtime");
        Table table = tenv.sqlQuery(sql);
        List<Host> hosts = Arrays.asList(new Host("10.20.128.210", 19201, "http"));
        TypeInformation<Row> typeInformation = JsonRowSchemaConverter.convert("{" +
                "    type:'object'," +
                "    properties:{" +
                "        cnt: {" +
                "            type: 'number'" +
                "        }," +
                "        tumTime:{" +
                "            type:'string'," +
                "            format:'date-time'" +
                "        }" +
                "    }" +
                "}");
        RowTypeInfo typeInfo = (RowTypeInfo) typeInformation;
        TypeInformation<?>[] typeInformations = typeInfo.getFieldTypes();

        String[] fieldNames = typeInfo.getFieldNames();
        TableSchema.Builder builder = TableSchema.builder();
        for (int i = 0; i < typeInformations.length; i ++) {
            builder.field(fieldNames[i], typeInformations[i]);
        }
        Elasticsearch6UpsertTableSink establesink = new Elasticsearch6UpsertTableSink(
                true,
                builder.build(),
                hosts,
                "aggregation",
                "data",
                "$",
                "n/a",
                new JsonRowSerializationSchema.Builder(typeInformation).build(),
                XContentType.JSON,
                new IgnoringFailureHandler(),
                new HashMap<>()
        );
        tenv.registerTableSink("aggregationTableSink", establesink);
        table.insertInto("aggregationTableSink");
    }


    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class User {
        private String name;

        private Integer age;

        private Timestamp timestamp;
    }


}



best wish!
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message