flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From MAHESH KUMAR <r.mahesh.kumar....@gmail.com>
Subject Re: Flink - Iteration and Backpressure
Date Wed, 14 Jun 2017 05:14:52 GMT
Hi Robert/Team,

Is there any recommended solution or any other insight on how I must be
doing it?

Thanks and Regards,

On Thu, Jun 1, 2017 at 10:32 AM, MAHESH KUMAR <r.mahesh.kumar.blr@gmail.com>

> Hi Robert,
> The Message Auditor System must monitor all the 4 kafka queue and gather
> information about messages that made through all of them or say
> specifically which queue a particular message did not make it through.
> We want the window time to be equivalent to our SLA time so that any
> message that does not make through all the 4 stages would be deemed as
> failed(expired). If we make the window time equal to our SLA time then the
> buffers may become full at a faster pace since only at the end of the
> window, the messages will be categorized to successful/failed. Having
> iteration helps us to maintain a smaller window where if a message has
> passed through all the stages within a very short interval(very small
> compared to SLA) we can categorize it as successful and continue for the
> messages that has not made it to the final stage(failed/expired). This is
> the reason we use Iteration.
> We could probably avoid Iteration and create a larger time window with SLA
> time. The system may/will still face the same issue, the back pressure
> won't allow new messages to go through and the messages inside the window
> may expire although they actually have passed through the 4 stages. Is
> there any recommended way to go about it?
> Thanks and Regards,
> Mahesh
> On Thu, Jun 1, 2017 at 9:49 AM, Robert Metzger <rmetzger@apache.org>
> wrote:
>> Hi Mahesh,
>> why do you need to iterate over the elements?
>> I wonder if you can't just stream the data from kafka1-kafka4 through
>> your topology?
>> On Fri, May 26, 2017 at 7:21 PM, MAHESH KUMAR <
>> r.mahesh.kumar.blr@gmail.com> wrote:
>>> Hi Team,
>>> I am trying to build an audit like system where I read messages from "n"
>>> Kafka queues, key by a unique key and then reduce them to a single message,
>>> if it has passed through all the "n" Kafka queues in a window time of "m"
>>> hours/days, the message has succeeded else it has expired.
>>> I can get it working in my test case but can't get it working when there
>>> are million of messages, there are very few messages that goes to the
>>> success stage in the iteration, huge amount of messages are sent back to
>>> the iteration, hence it create back pressure and it does not read the
>>> messages from Kafka queues anymore. Since no new messages are read, the
>>> messages inside the window no longer succeed, they keep going through the
>>> iterator forever and expire although they must succeed.
>>> I read about the buffer which when full creates back pressure and does
>>> not read any more messages. The system is suppose to be a light weight
>>> audit system and audit messages created are very small in size. Is it
>>> possible to increase the size of the buffer to avoid back pressure? Is
>>> there an alternative solution to this issue?
>>> The code looks like this:
>>> val unionInputStream = union(kafka1,kafka2,kafka3,kafka4)
>>> def audit() = {
>>>  reducedStream = unionInputStream.keyby(keyFunc
>>> tion).window(TumblingProcessingTimeWindow).reduce(reduceFunction)
>>> splitStreams = reducedStream.split(splitFunction)
>>> splitStreams.select(success).addSink(terminalSink)
>>> splitStreams.select(expire).addSink(expireSink)
>>> (splitStreams.select(replay), splitStreams.select(success))
>>> }
>>> unionInputStream.iterate(audit(_))
>>> Thanks and Regards,
>>> Mahesh

View raw message