flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Arvid Heise <ar...@ververica.com>
Subject Re: Process available data and stop with savepoint
Date Mon, 18 May 2020 20:05:44 GMT
I also previously had some low volume data sources that I wanted to process
and I was always convinced that the proper solution would be to have
auto-scaling and just decrease the used resources as much as possible
(which is not trivial because of state rescaling). But thinking a bit
further, it would even need to scale down to 0 to really meet the needs to
some kind of hibernation mode (maybe some K8s operator that implements a
duality of real cluster and cron job?).

Unfortunately, scaling down won't be in Flink before 1.12 and scaling up is
still limited. The vision of hibernation would be even farer down the road
(possibly never).

So for the time being, your approach definitively looks correct to me, save
that I would take batch out of the equation. You actually would
mini-streams and not mini-batches.

Now to your questions:

1)

> > @Arvid: You could monitor the number of records being processed and
> trigger stop/cancel-with-savepoint accordingly.
>
> I was thinking in a similar direction, but got stuck on what records I
> should be counting and how in case of a JOIN... if I have two or more input
> streams and some arbitrary SQL transforms - the number of rows in the
> output may be different from the number of rows read.
>

I'd be pragmatic, sum the numBytesOut metric on all operators and if it's 0
after 10 min, assume processing is done.

2) Re ContinuousFileReaderOperator.java#L222
<https://github.com/apache/flink/blob/release-1.10.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java#L222>:
this line is only called on close, which in turn is only called for
PROCESS_ONCE (and not during stop/cancel with savepoint). I can go into
details if you are interested. Stop with savepoint was previously called
cancel with savepoint, exactly because it cancels all tasks (but I guess it
was deemed to technical).

3) For both stopWithSavepoint and the REST API call stop [1], you actually
have the option advanceToEndOfEventTime (flag indicating if the source
should inject a MAX_WATERMARK in the pipeline). Set it to false and no
window will be fired.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-stop

4)

> I'm curious how Flink handles data in-flight during
> env.stopWithSavepoint():
>

Neither in checkpoint nor in savepoints, will in-flight data be part of the
checkpoint. (Well we are actually developing unaligned checkpoints which
include in-flight data, but that's new)
In both cases checkpoint barriers are inserted at the source and then
tickled through the graph. At each point when they reach an operator the
respective state is stored.
So if you have read all data at the sources and then trigger a
checkpoint/savepoint, it is guaranteed that no unprocessed data is left
when the checkpoint/savepoint is finished as no records are overtaken by
the barrier (for unaligned checkpoint that's a different story).
That means you can trigger your hibernation savepoint after all sources
have been processed.

HOWEVER, if you use PROCESS_ONCE, things actually don't work unfortunately.
Because the source closes itself after having read the last file, a
checkpoint barrier will not be propagated through the source and thus the
checkpoint never completes. So it's absolutely mandatory currently to use
PROCESS_CONTINUOUSLY.

5)

> Is there perhaps a way to send other types of special messages like
> Watermarks through the streams without them being treated as data? I wonder
> if I could send some special "InputWillBlock" message from the file readers
> and wait for it to propagate to the output to understand that processing
> has finished.
>
That is an interesting line of thinking. We have idle StreamStatus that are
also propagated downstream, but it's currently not public API. It's
currently propagated until OperatorChain and then just used on output side.

On Mon, May 18, 2020 at 8:48 PM Sergii Mikhtoniuk <mikhtoniuk@gmail.com>
wrote:

> Thanks all for responding.
>
> To give a bit more context:
> - I'm building a tool that performs a *fully deterministic* stream
> processing of mission-critical data
> - all input data is in the form of an append-only event log (Parquet files)
> - users define streaming SQL transformations to do all kinds of analysis
> of those event logs (joins, enrichment, aggregating events into reports
> etc.)
> - results are also written as append-only event logs
>
> I think this use case fits very well into "batch is a special case of
> streaming" idea. Even though I have all data history on disk I want the SQL
> queries users write to be fully agnostic of how data is ingested, stored,
> how frequently new data arrives, or how frequently it's processed.
>
>
> Say for example you want to generate a weekly summary report of COVID-19
> cases per country:
>
> - You could write a batch job that reads the last processed week end date
> from previous output, checks if one full week passed since then, checks
> that all input data sources already posted full data for that week, and
> finally filters and aggregates the data.
>
> - ...But isn't it much more elegant to express it as a tumbling window
> aggregation, where watermarks do all the hard job for you? This kind of
> "write query once and run it forever" is what I'm aiming for and why I'm
> not using batch processing.
>
>
> As for why start-stop-continue behavior - most data I'm dealing with is
> low volume and low frequency. Think open datasets you find on government
> data portals, e.g. property assessment data, zoning, transit lines. All of
> those are updated very infrequently, so If I run the application constantly
> it will be idle 99% of the time. Thus I use the "pull" model, where user
> runs the app to update some query result to the latest available data when
> necessary.
>
> I realize that this kind of usage is very different from how Flink is
> usually deployed, but imho it's not too far-fetched.
>
>
> Going back to specific problems that I encountered:
>
> It seems to be not possible to use Flink 1.10 for *unbounded* file streams
> at all. When reading files with FileProcessingMode.PROCESS_CONTINUOUSLY the
> following line emits the MAX_WATERMARK into the stream even when
> env.stopWithSavepoint() is called, prematurely closing my tumbling windows:
>
> https://github.com/apache/flink/blob/release-1.10.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java#L222
>
> I had to implement my own ContinuousFileReaderOperator as a workaround.
>
>
> > @Arvid: You could monitor the number of records being processed and
> trigger stop/cancel-with-savepoint accordingly.
>
> I was thinking in a similar direction, but got stuck on what records I
> should be counting and how in case of a JOIN... if I have two or more input
> streams and some arbitrary SQL transforms - the number of rows in the
> output may be different from the number of rows read.
>
> I'm curious how Flink handles data in-flight during
> env.stopWithSavepoint():
> - Will it wait for records to propagate through the topology? ...In this
> case all I need is to ensure that reader read all available data before
> calling stop.
> - ...Or will in-flight records become part of the savepoint? ...In this
> case I'll need to think of a way to make sure not only reading but all
> processing finishes too.
>
> Is there perhaps a way to send other types of special messages like
> Watermarks through the streams without them being treated as data? I wonder
> if I could send some special "InputWillBlock" message from the file readers
> and wait for it to propagate to the output to understand that processing
> has finished.
>
>
> Again, thanks everyone for your help!
> - Sergii
>
> On Mon, May 18, 2020 at 8:45 AM Thomas Huang <lyanghwy@hotmail.com> wrote:
>
>> Hi,
>>
>> Actually, seems like spark dynamic allocation  saves more resources in
>> that case.
>>
>> ------------------------------
>> *From:* Arvid Heise <arvid@ververica.com>
>> *Sent:* Monday, May 18, 2020 11:15:09 PM
>> *To:* Congxian Qiu <qcx978132955@gmail.com>
>> *Cc:* Sergii Mikhtoniuk <mikhtoniuk@gmail.com>; user <
>> user@flink.apache.org>
>> *Subject:* Re: Process available data and stop with savepoint
>>
>> Hi Sergii,
>>
>> your requirements feel a bit odd. It's neither batch nor streaming.
>>
>> Could you tell us why it's not possible to let the job run as a streaming
>> job that runs continuously? Is it just a matter of saving costs?
>> If so, you could monitor the number of records being processed and
>> trigger stop/cancel-with-savepoint accordingly.
>>
>> On Mon, May 18, 2020 at 7:19 AM Congxian Qiu <qcx978132955@gmail.com>
>> wrote:
>>
>> Hi Sergii
>>
>> If I understand correctly, you want to process all the files in some
>> directory, and do not want to process them multiple times. I'm not sure if
>> using `FileProcessingMode#PROCESS_CONTINUOUSLY`
>> instead of `FileProcessingMode#PROCESS_ONCE`[1] can satisfy your needs,
>> and keep the job running 7*24.
>>
>> but be careful, under `FileProcessingMode#CONTINUOUSLY` mode, when a
>> file is modified, its contents are re-processed entirely. This can break
>> the “exactly-once” semantics, as appending data at the end of a file will
>> lead to all its contents being re-processed.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/datastream_api.html#data-sources
>>
>> Best,
>> Congxian
>>
>>
>> Sergii Mikhtoniuk <mikhtoniuk@gmail.com> 于2020年5月18日周一 上午5:47写道:
>>
>> Hello,
>>
>> I'm migrating my Spark-based stream processing application to Flink
>> (Calcite SQL and temporal tables look too attractive to resist).
>>
>> My Spark app works as follows:
>> - application is started periodically
>> - it reads a directory of Parquet files as a stream
>> - SQL transformations are applied
>> - resulting append stream is written to another directory
>> - it runs until all available data is processed
>> - checkpoints its state
>> - and **exits**
>> - upon next run it resumes where it left off, processing only new data
>>
>> I'm having difficulties replicating this start-stop-resume behavior with
>> Flink.
>>
>> When I setup my input stream using:
>>
>>     env.readFile[Row](..., FileProcessingMode.PROCESS_CONTINUOUSLY)
>>
>> ... I get an infinite stream, but the application will naturally keep
>> running until aborted manually.
>>
>> When I use FileProcessingMode.PROCESS_ONCE - the application exits after
>> exhausting all inputs, but it seems that Flink also treats the end of the
>> stream as max watermark so, for example, it will close all tumbling windows
>> that I don't want to be closed yet since more data will arrive upon next
>> run.
>>
>> Is there a way not to emit a max watermark with PROCESS_ONCE? If so, can
>> I still trigger a savepoint when env.execute() returns?
>>
>> Alternatively, if I use PROCESS_CONTINUOUSLY along with
>> env.executeAsync() is there a way for me to detect when file stream was
>> exhausted to call job.stopWithSavepoint()?
>>
>> Thanks for your help!
>> - Sergii
>>
>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Mime
View raw message