flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "G.S.Vijay Raajaa" <gsvijayraa...@gmail.com>
Subject Default value - Time window expires with no data from source
Date Sat, 24 Jun 2017 16:22:36 GMT
Hi,

I am trying to implement a flink job which takes the twitter as the source
and collects tweets from a list of hashtags. The flink job basically
aggregates the volume of tweets per hashtag in a given time frame. I have
implemented them successfully, but then if there is no tweet across all the
hashtags I need to send out a default value of 0 across all hashtags. Not
sure how to implement this functionality.

Code Snippet :

env.addSource(source)

            .flatMap(new ExtractHashTagsSymbols(tickers))

            .keyBy(0)

            .timeWindow(Time.seconds(Integer.parseInt(window_time)))

            .sum(1)

            .timeWindowAll(Time.seconds(Integer.parseInt(window_time)))

            .apply(new GetVolume(tickerVolumeMap))

.addSink(new SinkFunction<JSONObject>(){



    public void invoke(JSONObject value) throws Exception {

    System.out.println("Twitter Volume:"+value.toString());

    //JsonParser jsonParser = new JsonParser();

        //JsonObject gsonObject =
(JsonObject)jsonParser.parse(value.toString());

    pushToSocket(value, socket_url);

    }

    });


The above code waits for window_time time frame and computes the tweet
volume and sends out a json.

Regards,

Vijay Raajaa GS

Mime
View raw message