flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dawid Wysakowicz <dwysakow...@apache.org>
Subject Re: Querying nested JSON stream?
Date Mon, 21 Oct 2019 07:30:00 GMT
Hi Srikanth,

Flink SQL supports nested objects, therefore you should not need to run
a separate flattening job. If you are using Kafka as a source for your
stream it should be fairly easy. You just need to define a proper json
schema for your stream as in this example[1][2]. If you use a different
source for your events it might be a bit more involving but still you
can reuse the `JsonRowDeserializationSchema` (Unfortunately Kafka is the
only table source that supports formats out of the box). E.g. have a
look at a simple example, where I use a collection of strings as an input:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        DataStream<byte[]> input = env.fromElements(
            "{\"lon\": 123.23, \"rideTime\": \"2019\", \"obj\": {\"numb\": 1234}}".getBytes()
        );

        JsonRowDeserializationSchema jsonSchema = new JsonRowDeserializationSchema.Builder("{"
+
            "  type: 'object'," +
            "  properties: {" +
            "    lon: {" +
            "      type: 'number'" +
            "    }," +
            "    rideTime: {" +
            "      type: 'string'" +
            "    }," +
            "    obj: {" +
            "      type: 'object'," +
            "      properties: {" +
            "        numb: {" +
            "          type: 'number'" +
            "        }" +
            "      }" +
            "    }" +
            "  }" +
            "}").build();

        TypeInformation<Row> producedType = jsonSchema.getProducedType();
        SingleOutputStreamOperator<Row> in = input.map(jsonSchema::deserialize)
            .returns(producedType);

        tEnv.registerDataStream("t", in);

	Table table = tEnv.sqlQuery("SELECT obj.numb FROM t"); // you can query nested fields

Hope that helps.

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#kafka-connector

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#json-format

On 17/10/2019 15:58, srikanth flink wrote:
> Hi there,
>
> I'm using Flink SQL clinet to run the jobs for me. My stream is a JSON
> with nested objects. Couldn't find much document on querying the
> nested JSON, so I had to flatten the JSON and use as:
> SELECT `source.ip`, `destination.ip`, `dns.query`, `organization.id
> <http://organization.id>`, `dns.answers.data` FROM source;
>
> Can someone help me with the query, querying nested JSON so I could
> save resources running flattening job?
>
>
> Thanks
> Srikanth
Mime
View raw message