beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ankur Chauhan <an...@malloc64.com>
Subject Re: Reprocessing historic data with streaming jobs
Date Mon, 01 May 2017 16:51:27 GMT
I have sort of a similar usecase when dealing with failed / cancelled / broken streaming pipelines.
We have an operator that continuously monitors the min-watermark of the pipeline and when
it detects that the watermark is not advancing for more than some threshold. We start a new
pipeline and initiate a "patcher" batch dataflow that reads the event backups over the possibly
broken time range (+/- 1 hour).
It works out well but has the overhead of having to build out an external operator process
that can detect when to do the batch dataflow process. 

Sent from my iPhone

> On May 1, 2017, at 09:37, Thomas Groh <tgroh@google.com> wrote:
> 
> You should also be able to simply add a Bounded Read from the backup data source to your
pipeline and flatten it with your Pubsub topic. Because all of the elements produced by both
the bounded and unbounded sources will have consistent timestamps, when you run the pipeline
the watermark will be held until all of the data is read from the bounded sources. Once this
is done, your pipeline can continue processing only elements from the PubSub source. If you
don't want the backlog and the current processing to occur in the same pipeline, running the
same pipeline but just reading from the archival data should be sufficient (all of the processing
would be identical, just the source would need to change).
> 
> If you read from both the "live" and "archival" sources within the same pipeline, you
will need to use additional machines so the backlog can be processed promptly if you use a
watermark based trigger; watermarks will be held until the bounded source is fully processed.
> 
>> On Mon, May 1, 2017 at 9:29 AM, Lars BK <larsbkrogvig@gmail.com> wrote:
>> I did not see Lukasz reply before I posted, and I will have to read it a bit later!
>> 
>>> man. 1. mai 2017 kl. 18.28 skrev Lars BK <larsbkrogvig@gmail.com>:
>>> Yes, precisely. 
>>> 
>>> I think that could work, yes. What you are suggesting sounds like idea 2) in
my original question.
>>> 
>>> My main concern is that I would have to allow a great deal of lateness and that
old windows would consume too much memory. Whether it works in my case or not I don't know
yet as I haven't tested it. 
>>> 
>>> What if I had to process even older data? Could I handle any "oldness" of data
by increasing the allowed lateness and throwing machines at the problem to hold all the old
windows in memory while the backlog is processed? If so, great! But I would have to dial the
allowed lateness back down when the processing has caught up with the present. 
>>> 
>>> Is there some intended way of handling reprocessing like this? Maybe not? Perhaps
it is more of a Pubsub and Dataflow question than a Beam question when it comes down to it.

>>> 
>>> 
>>>> man. 1. mai 2017 kl. 17.25 skrev Jean-Baptiste Onofré <jb@nanthrax.net>:
>>>> OK, so the messages are "re-publish" on the topic, with the same timestamp
as
>>>> the original and consume again by the pipeline.
>>>> 
>>>> Maybe, you can play with the allowed lateness and late firings ?
>>>> 
>>>> Something like:
>>>> 
>>>>            Window.into(FixedWindows.of(Duration.minutes(xx)))
>>>>                .triggering(AfterWatermark.pastEndOfWindow()
>>>>                    .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>>>>                        .plusDelayOf(FIVE_MINUTES))
>>>>                    .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
>>>>                        .plusDelayOf(TEN_MINUTES)))
>>>>                .withAllowedLateness(Duration.minutes()
>>>>                .accumulatingFiredPanes())
>>>> 
>>>> Thoughts ?
>>>> 
>>>> Regards
>>>> JB
>>>> 
>>>> On 05/01/2017 05:12 PM, Lars BK wrote:
>>>> > Hi Jean-Baptiste,
>>>> >
>>>> > I think the key point in my case is that I have to process or reprocess
"old"
>>>> > messages. That is, messages that are late because they are streamed
from an
>>>> > archive file and are older than the allowed lateness in the pipeline.
>>>> >
>>>> > In the case I described the messages had already been processed once
and no
>>>> > longer in the topic, so they had to be sent and processed again. But
it might as
>>>> > well have been that I had received a backfill of data that absolutely
needs to
>>>> > be processed regardless of it being later than the allowed lateness
with respect
>>>> > to present time.
>>>> >
>>>> > So when I write this now it really sounds like I either need to allow
more
>>>> > lateness or somehow rewind the watermark!
>>>> >
>>>> > Lars
>>>> >
>>>> > man. 1. mai 2017 kl. 16.34 skrev Jean-Baptiste Onofré <jb@nanthrax.net
>>>> > <mailto:jb@nanthrax.net>>:
>>>> >
>>>> >     Hi Lars,
>>>> >
>>>> >     interesting use case indeed ;)
>>>> >
>>>> >     Just to understand: if possible, you don't want to re-consume the
messages from
>>>> >     the PubSub topic right ? So, you want to "hold" the PCollections
for late data
>>>> >     processing ?
>>>> >
>>>> >     Regards
>>>> >     JB
>>>> >
>>>> >     On 05/01/2017 04:15 PM, Lars BK wrote:
>>>> >     > Hi,
>>>> >     >
>>>> >     > Is there a preferred way of approaching reprocessing historic
data with
>>>> >     > streaming jobs?
>>>> >     >
>>>> >     > I want to pose this as a general question, but I'm working
with Pubsub and
>>>> >     > Dataflow specifically. I am a fan of the idea of replaying/fast
forwarding
>>>> >     > through historic data to reproduce results (as you perhaps
would with Kafka),
>>>> >     > but I'm having a hard time unifying this way of thinking with
the concepts of
>>>> >     > watermarks and late data in Beam. I'm not sure how to best
mimic this with the
>>>> >     > tools I'm using, or if there is a better way.
>>>> >     >
>>>> >     > If there is a previous discussion about this I might have missed
(and I'm
>>>> >     > guessing there is), please direct me to it!
>>>> >     >
>>>> >     >
>>>> >     > The use case:
>>>> >     >
>>>> >     > Suppose I discover a bug in a streaming job with event time
windows and an
>>>> >     > allowed lateness of 7 days, and that I subsequently have to
reprocess all the
>>>> >     > data for the past month. Let us also assume that I have an
archive of my
>>>> >     source
>>>> >     > data (in my case in Google cloud storage) and that I can republish
it all
>>>> >     to the
>>>> >     > message queue I'm using.
>>>> >     >
>>>> >     > Some ideas that may or may not work I would love to get your
thoughts on:
>>>> >     >
>>>> >     > 1) Start a new instance of the job that reads from a separate
source to
>>>> >     which I
>>>> >     > republish all messages. This shouldn't work because 14 days
of my data is
>>>> >     later
>>>> >     > than the allowed limit, buy the remaining 7 days should be
reprocessed as
>>>> >     intended.
>>>> >     >
>>>> >     > 2) The same as 1), but with allowed lateness of one month.
When the job is
>>>> >     > caught up, the lateness can be adjusted back to 7 days. I am
afraid this
>>>> >     > approach may consume too much memory since I'm letting a whole
month of
>>>> >     windows
>>>> >     > remain in memory. Also I wouldn't get the same triggering behaviour
as in the
>>>> >     > original job since most or all of the data is late with respect
to the
>>>> >     > watermark, which I assume is near real time when the historic
data enters the
>>>> >     > pipeline.
>>>> >     >
>>>> >     > 3) The same as 1), but with the republishing first and only
starting the
>>>> >     new job
>>>> >     > when all messages are already waiting in the queue. The watermark
should then
>>>> >     > start one month back in time and only catch up with the present
once all the
>>>> >     > data is reprocessed, yielding no late data. (Experiments I've
done with this
>>>> >     > approach produce somewhat unexpected results where early panes
that are older
>>>> >     > than 7 days appear to be both the first and the last firing
from their
>>>> >     > respective windows.) Early firings triggered by processing
time would probably
>>>> >     > differ by the results should be the same? This approach also
feels a bit
>>>> >     awkward
>>>> >     > as it requires more orchestration.
>>>> >     >
>>>> >     > 4) Batch process the archived data instead and start a streaming
job in
>>>> >     > parallel. Would this in a sense be a more honest approach since
I'm actually
>>>> >     > reprocessing batches of archived data? The triggering behaviour
in the
>>>> >     streaming
>>>> >     > version of the job would not apply in batch, and I would want
to avoid
>>>> >     stitching
>>>> >     > together results from two jobs if I can.
>>>> >     >
>>>> >     >
>>>> >     > These are the approaches I've thought of currently, and any
input is much
>>>> >     > appreciated.  Have any of you faced similar situations, and
how did you
>>>> >     solve them?
>>>> >     >
>>>> >     >
>>>> >     > Regards,
>>>> >     > Lars
>>>> >     >
>>>> >     >
>>>> >
>>>> >     --
>>>> >     Jean-Baptiste Onofré
>>>> >     jbonofre@apache.org <mailto:jbonofre@apache.org>
>>>> >     http://blog.nanthrax.net
>>>> >     Talend - http://www.talend.com
>>>> >
>>>> 
>>>> --
>>>> Jean-Baptiste Onofré
>>>> jbonofre@apache.org
>>>> http://blog.nanthrax.net
>>>> Talend - http://www.talend.com
> 

Mime
View raw message