flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kumar Bolar, Harshith" <hk...@arity.com>
Subject KeyBy is not creating different keyed streams for different keys
Date Tue, 29 Jan 2019 04:46:55 GMT
Hi all,

I'm reading a simple JSON string as input and keying the stream based on two fields A and
B. But KeyBy is generating the same keyed stream for different values of B but for a particular
combination of A and B.

The input:

{
    "A": "352580084349898",
    "B": "1546559127",
    "C": "A"
}

This is the core logic of my Flink code:

DataStream<GenericDataObject> genericDataObjectDataStream = inputStream
            .map(new MapFunction<String, GenericDataObject>() {
                @Override
                public GenericDataObject map(String s) throws Exception {
                    JSONObject jsonObject = new JSONObject(s);
                    GenericDataObject genericDataObject = new GenericDataObject();
                    genericDataObject.setA(jsonObject.getString("A"));
                    genericDataObject.setB(jsonObject.getString("B"));
                    genericDataObject.setC(jsonObject.getString("C"));
                    return genericDataObject;
                }
            });
DataStream<GenericDataObject> testStream = genericDataObjectDataStream
            .keyBy("A", "B")
            .map(new MapFunction<GenericDataObject, GenericDataObject>() {
                @Override
                public GenericDataObject map(GenericDataObject genericDataObject) throws Exception
{
                    return genericDataObject;
                }
            });
testStream.print();

GenericDataObject is a POJO with three fields A, B and C .

And this is the console output for different values of field B.

5> GenericDataObject{A='352580084349898', B='1546559224', C='A'}
5> GenericDataObject{A='352580084349898', B='1546559127', C='A'}
4> GenericDataObject{A='352580084349898', B='1546559234', C='A'}
3> GenericDataObject{A='352580084349898', B='1546559254', C='A'}
5> GenericDataObject{A='352580084349898', B='1546559224', C='A'}

Notice lines 1, 2 and 3. Even though they have different values of B, they are being put in
the same keyed stream (5). I must be doing something fundamentally wrong here.

Thanks,
Harshith

Mime
View raw message