flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Paul Smith <psm...@aconex.com>
Subject Re: Batch stream Sink delay ?
Date Wed, 15 Mar 2017 22:54:43 GMT
Thanks Fabian, I’m pretty sure you are correct here.  I can see in the Metric view that the
currentLowWaterMark is set to MIN_VALUE by the looks of it, so Watermarks are not being emitted
at all until the end.  This stays all the way through the job.

I’m not sure why this is the case.  I’ve verified that my TimestampExtractor class is
being called, and returning the value I’d expect (The timestamp from the log line), and
looks legitimate.  My WindowFunction which is doing the aggregation is not being called until
right at the end of the job, but yet the windows comes out ok in the destination.

I am not sure how to debug this one.  Do you or anyone have any other suggestions on how to
debug why the Windowing is not being triggered?  I can’t glean any useful further ideas
from the metrics I can see..

Appreciate the help

Cheers,

Paul

From: Fabian Hueske <fhueske@gmail.com>
Reply-To: "user@flink.apache.org" <user@flink.apache.org>
Date: Tuesday, 14 March 2017 at 7:59 pm
To: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: Batch stream Sink delay ?

Hi Paul,
This might be an issue with the watermarks. A window operation can only be compute and emit
its results when the watermark time is later than the end time of the window.
Each operator keeps track of the maximum timestamp of all its input tasks and computes its
own time as the minimum of all those maximum timestamps.
If one (or all) watermarks received by the window operator is not later than the end time
of the window, the window will not be computed.
When an file input is completely processed, Flink sends a Long.MAX_VALUE timestamp which might
trigger the execution at the end of the job.
I would try to debug the watermarks of your job. The web dashboard provides a few metrics
for that.
Best, Fabian

2017-03-14 2:47 GMT+01:00 Paul Smith <psmith@aconex.com<mailto:psmith@aconex.com>>:
Using Flink 1.2.

Hi all, I have a question about Batch processing and Sinks.  I have a Flink job that parses
a User Request log file that contains performance data per request. It accumulates metric
data values into 15 minute time windows.  Each log line is mapped to multiple records, so
that each of the metric value can be 'billed' against a few different categories (User, Organisation,
Project, Action).  The goal of the Flink job is to distil the volume of request data into
more manageable summaries.  We can then see breakdowns of, say, User CPU utilised by distinct
categories (e.g. Users) and allow us to look for trends/outliers.

It is working very well as a batch, but with one unexpected behaviour.

What I can't understand is that the Sink does not appear to get _any_ records until the rest
of the chain has completed over the entire file.  I've played around with parallelism (1->3),
and also verified with logging that the Sink isn't seeing any data until the entire previous
chain is complete.  Is this expected behaviour? I was thinking that as each Time Window passed
a 'block' of the results would be emitted to the sink.  Since we use Event Time characteristics,
the batch job ought to emit these chunks as each 15 minute segment passes?

You can see the base job here, with an example of a single log line with the metrics here:

https://gist.github.com/tallpsmith/a2e5212547fb3c7220b0e49846d2f152

Each Category I've called an 'identifierType' (User, Organisation..), with the 'identifier'
the value of that (a UserId for example).  I key the stream by this pair of records and then
Fold the records by the Time window summing each metric type's value up.  I have yet to work
out the proper use case for Fold versus Reduce, I may have got that wrong, but I can't see
how that changes the flow here.

The output is a beautiful rolled up summary by 15 minutes by each identifierType & identifier.
 I have yet to attempt a live Streaming version of this, but had thought the Batch version
would also be concurrent and start emitting 15 minute windows as soon as the stream chunks
transitions into the next window.   Given the entire job takes about 5 minutes on my laptop
for 8 million raw source lines whose data is spread out over an entire day, I would have thought
there's some part of that 5 minutes that would be emitting chunks of summary data to the sink?
 But nothing turns up until the entire job is done.

Maybe the data is just too small.. Maybe there's buffering going on somewhere in the chain.
?

Any pointers would be appreciated in understanding the flow here.

Cheers,

Paul Smith



Mime
View raw message