flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "出发" <573693...@qq.com>
Subject flink array 查询解析问题
Date Fri, 10 Apr 2020 14:10:03 GMT
1.定义ddl解析array字段时候,假如select 那个字段可以解析出。2.当我去定义自己函数时候,会出现null,flink直接跳过解析array那个函数了吗?
CREATE TABLE sourceTable (
	event_time_line array<ROW (
		`rule_name` VARCHAR,
		`count` VARCHAR
	)&gt;
) WITH (
	'connector.type' = 'kafka',
	'connector.version' = 'universal',
	'connector.startup-mode' = 'earliest-offset',
	'connector.topic' = 'topic_test_1',
	'connector.properties.zookeeper.connect' = 'localhost:2181',
	'connector.properties.bootstrap.servers' = 'localhost:9092',
	'update-mode' = 'append',
	'format.type' = 'json',
	'format.derive-schema' = 'true'
);
--可以查出数据
select event_time_line from sourceTable ;
--当我定义自己函数时候,参数value没有传过去,但是数组的size长度过去了
select type_change(event_time_line) from sourceTable ;


public class TypeChange extends ScalarFunction {
    /**
     * 为null,但是数组有长度
     * @param rows
     * @return
     */
    public String eval(Row [] rows){
        return JSONObject.toJSONString(rows);
    }

}
Mime
  • Unnamed multipart/alternative (inline, 8-Bit, 0 bytes)
View raw message