flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Metzger <rmetz...@apache.org>
Subject Re: Flink - Iteration and Backpressure
Date Thu, 01 Jun 2017 15:49:27 GMT
Hi Mahesh,

why do you need to iterate over the elements?

I wonder if you can't just stream the data from kafka1-kafka4 through your
topology?



On Fri, May 26, 2017 at 7:21 PM, MAHESH KUMAR <r.mahesh.kumar.blr@gmail.com>
wrote:

> Hi Team,
>
> I am trying to build an audit like system where I read messages from "n"
> Kafka queues, key by a unique key and then reduce them to a single message,
> if it has passed through all the "n" Kafka queues in a window time of "m"
> hours/days, the message has succeeded else it has expired.
>
> I can get it working in my test case but can't get it working when there
> are million of messages, there are very few messages that goes to the
> success stage in the iteration, huge amount of messages are sent back to
> the iteration, hence it create back pressure and it does not read the
> messages from Kafka queues anymore. Since no new messages are read, the
> messages inside the window no longer succeed, they keep going through the
> iterator forever and expire although they must succeed.
>
> I read about the buffer which when full creates back pressure and does not
> read any more messages. The system is suppose to be a light weight audit
> system and audit messages created are very small in size. Is it possible to
> increase the size of the buffer to avoid back pressure? Is there an
> alternative solution to this issue?
>
> The code looks like this:
>
> val unionInputStream = union(kafka1,kafka2,kafka3,kafka4)
>
> def audit() = {
>  reducedStream = unionInputStream.keyby(keyFunction).window(
> TumblingProcessingTimeWindow).reduce(reduceFunction)
> splitStreams = reducedStream.split(splitFunction)
> splitStreams.select(success).addSink(terminalSink)
> splitStreams.select(expire).addSink(expireSink)
> (splitStreams.select(replay), splitStreams.select(success))
> }
>
> unionInputStream.iterate(audit(_))
>
>
>
> Thanks and Regards,
> Mahesh
>

Mime
View raw message