flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From MAHESH KUMAR <r.mahesh.kumar....@gmail.com>
Subject Re: Flink - Iteration and Backpressure
Date Thu, 01 Jun 2017 16:32:03 GMT
Hi Robert,

The Message Auditor System must monitor all the 4 kafka queue and gather
information about messages that made through all of them or say
specifically which queue a particular message did not make it through.
We want the window time to be equivalent to our SLA time so that any
message that does not make through all the 4 stages would be deemed as
failed(expired). If we make the window time equal to our SLA time then the
buffers may become full at a faster pace since only at the end of the
window, the messages will be categorized to successful/failed. Having
iteration helps us to maintain a smaller window where if a message has
passed through all the stages within a very short interval(very small
compared to SLA) we can categorize it as successful and continue for the
messages that has not made it to the final stage(failed/expired). This is
the reason we use Iteration.

We could probably avoid Iteration and create a larger time window with SLA
time. The system may/will still face the same issue, the back pressure
won't allow new messages to go through and the messages inside the window
may expire although they actually have passed through the 4 stages. Is
there any recommended way to go about it?

Thanks and Regards,

On Thu, Jun 1, 2017 at 9:49 AM, Robert Metzger <rmetzger@apache.org> wrote:

> Hi Mahesh,
> why do you need to iterate over the elements?
> I wonder if you can't just stream the data from kafka1-kafka4 through your
> topology?
> On Fri, May 26, 2017 at 7:21 PM, MAHESH KUMAR <
> r.mahesh.kumar.blr@gmail.com> wrote:
>> Hi Team,
>> I am trying to build an audit like system where I read messages from "n"
>> Kafka queues, key by a unique key and then reduce them to a single message,
>> if it has passed through all the "n" Kafka queues in a window time of "m"
>> hours/days, the message has succeeded else it has expired.
>> I can get it working in my test case but can't get it working when there
>> are million of messages, there are very few messages that goes to the
>> success stage in the iteration, huge amount of messages are sent back to
>> the iteration, hence it create back pressure and it does not read the
>> messages from Kafka queues anymore. Since no new messages are read, the
>> messages inside the window no longer succeed, they keep going through the
>> iterator forever and expire although they must succeed.
>> I read about the buffer which when full creates back pressure and does
>> not read any more messages. The system is suppose to be a light weight
>> audit system and audit messages created are very small in size. Is it
>> possible to increase the size of the buffer to avoid back pressure? Is
>> there an alternative solution to this issue?
>> The code looks like this:
>> val unionInputStream = union(kafka1,kafka2,kafka3,kafka4)
>> def audit() = {
>>  reducedStream = unionInputStream.keyby(keyFunc
>> tion).window(TumblingProcessingTimeWindow).reduce(reduceFunction)
>> splitStreams = reducedStream.split(splitFunction)
>> splitStreams.select(success).addSink(terminalSink)
>> splitStreams.select(expire).addSink(expireSink)
>> (splitStreams.select(replay), splitStreams.select(success))
>> }
>> unionInputStream.iterate(audit(_))
>> Thanks and Regards,
>> Mahesh

View raw message