flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dawid Wysakowicz <dwysakow...@apache.org>
Subject Re: [FlinkSQL] is there a way to read/write local json data in Flink SQL like that of kafka?
Date Tue, 29 Oct 2019 10:14:57 GMT
Hi,

Unfortunately it is not possible out of the box. The only format that
the filesystem connector supports as of now is CSV.

As a workaround you could create a Table out of a DataStream reusing the
JsonRowDeserializationSchema. Have a look at the example below:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        DataStream<byte[]> input = env.fromElements(
            "{\"lon\": 123.23, \"rideTime\": \"2019\", \"obj\": {\"numb\": 1234}}".getBytes()
        ); // or read from file record by record

        JsonRowDeserializationSchema jsonSchema = new JsonRowDeserializationSchema.Builder(...).build();

        TypeInformation<Row> producedType = jsonSchema.getProducedType();
        SingleOutputStreamOperator<Row> in = input.map(jsonSchema::deserialize)
            .returns(producedType);

        tEnv.registerDataStream("t", in);

	Table table = tEnv.sqlQuery("SELECT * FROM t");

Best,

Dawid

On 29/10/2019 10:59, Anyang Hu wrote:
> Hi guys,
>
> In flink1.9, we can set `connector.type` to `kafka` and `format.type`
> to json to read/write json data from kafka or write json data to kafka.
>
> In my scenario, I wish to read local json data as a souce table, since
> I need to do local debug and don't consume online kafka data.
>
> For example:
>
>     create table source (
>     first varchar,
>     id int
>     ) with (
>     'connector.type' = 'filesystem',
>     'connector.path' = '/path/to/json',
>     'format.type' = 'json'
>     )
>
>
> In addition, writing local json data is also needed.
>
> Does anyone have similar needs?
>
> Best regards,
> Anyang

Mime
View raw message