apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Thomas Weise <tho...@datatorrent.com>
Subject Re: Window Transactions
Date Sun, 30 Aug 2015 22:18:09 GMT
It becomes a distributed transaction problem, unless all state including
checkpoint can be written in a single transaction (not possible when using
HDFS for checkpointing).

If your external system transaction completes and the checkpointing fails,
the window will be reprocessed. Hence, recording the completed windowId as
part of the transaction in the external system. More frequent checkpointing
reduces the number of windows replayed during recovery, but does not
eliminate the possibility of replay. Hence the solution is in the operator.

Thomas



On Sun, Aug 30, 2015 at 12:33 PM, Timothy Farkas <tim@datatorrent.com>
wrote:

> Hi Atri,
>
> As far as I understand point 2 can be done. You can just set your
> checkpoint window to be smaller than your application window.
>
> Someone else can correct me if I'm wrong but I believe 1) depends on the
> operator.
>
> If the operator of interest doesn't talk to an external system (hdfs, a
> database, send an email) and is idempotent, then I don't think any fault
> tolerance guarantees are gained by imposing this requirement.
>
> If the operator is talking to an external system then the pattern Thomas
> described could be used to achieve this. I assume you want this
> functionality because you want to talk to a system that doesn't support
> transactions? I believe implementing all or nothing windows for writing to
> external systems will always require operator level logic that utilizes
> transactions provided by the external system. This is required because if
> an external system doesn't provide transactions there will always be a race
> condition between the work done by the operator and the data written to the
> external system.
>
> For example lets say Operator A is writing to System B, and System B
> doesn't support transactions:
>
> 1. ) Let's say Operator A works by writing tuples to System B and Operator
> A is checkpointed every application window.
> 2. ) Let's say Operator A was checkpointed before window 1 started and
> Operator A is on window 1.
> 3. ) Operator A writes tuples 1, 2, and 3 but fails before window 1
> completes.
> 4. ) Operator A is restored to the beginning of window 1 and writes tuples,
> 1, 2, and 3 again.
> 5. ) Now we have duplicates.
>
> If we try to record individual tuples sent to System B we run into a
> similar problem.
>
> 1. ) Let's say Operator A works by writing a tuple to System B and then
> records somewhere that the tuple was written to System B.
> 2. ) Let's say Operator A was checkpointed before window 1 started and
> Operator A is on window 1.
> 3. ) Operator A writes tuple 1 and then records that tuple 1 was written.
> 4. ) Operator A writes tuple 2 and then fails.
> 5. ) Operator A is restored to the beginning of window 1.
> 6. ) Operator A checks if tuple 1 was written and sees that it was, so it
> skips tuple 1.
> 7. ) Operator A checks if tuple 2 was written and sees that it wasn't, so
> it writes tuple 2 again.
> 5. ) Now tuple 2 is duplicated.
>
> Reversing the order of writing a tuple and then recording the write will
> also produce a problem. Instead of data being duplicated, data could be
> lost.
>
> In summary I think the implementation all or nothing windows depends on the
> system you are talking to and the guarantees it gives you.
>
> Thanks,
> Tim
>
> On Sun, Aug 30, 2015 at 9:05 AM, Atri Sharma <atri.jiit@gmail.com> wrote:
>
> > Thanks Thomas and everyone.
> >
> > I reflect with what Thomas explained and I have a few points to be added
> on
> > it.
> >
> > I now understand that the ability to process or commit a transaction is
> > present. I am trying to understand following use cases:
> >
> > 1) a window is defined. The operator either needs to process the entire
> > window or none at all. Can we have this functionality now?
> >
> > 2) checkpoints within a window. If we fail, we can send the last seen
> > checkpoint to the source (if source can handle it) and ask for data
> further
> > that point.
> > On 29 Aug 2015 04:38, "Thomas Weise" <thomas@datatorrent.com> wrote:
> >
> > > Atri,
> > >
> > > A concept of "transactional window" is needed for some applications
> that
> > > interact with external systems. A number of Malhar operators support it
> > > today. For example, a JDBC operator might perform all operations
> within a
> > > transaction that commences with the first write in a window and
> endWindow
> > > will commit the transaction. The engine provides the callbacks, the
> > > operator implements the transaction based on the capabilities of the
> > > external system. Note that this does not imply batching, it merely
> speaks
> > > to transaction demarcation.
> > >
> > > But this is just part of the work needed to make the operator
> > > "transactional". Windows can be reprocessed based on the processing
> > > semantics. When a container goes down, the operator will reset to the
> > > recovery checkpoint and reprocess the windows from the checkpoint till
> > the
> > > point where the failure occurred. Unless the processing done by the
> > > operator is idempotent, this would lead to incorrect results. For
> > example,
> > > if the operation was "UPDATE sometable SET count = count + 1", we would
> > > double count.
> > >
> > > One technique to deal with this is to maintain the windowId as part of
> > the
> > > state that gets committed to the external system. Now we can skip the
> > > processing if we find that the window was already processed. Of course,
> > > this requires that the upstream operators also deliver the tuples in an
> > > idempotent manner on a window replay.
> > >
> > > Thomas
> > >
> > > On Fri, Aug 28, 2015 at 2:14 PM, Chetan Narsude <
> chetan@datatorrent.com>
> > > wrote:
> > >
> > > > Atri,
> > > >
> > > >   BEGIN_WINDOW, and END_WINDOW control events demarcate the the
> > > > transaction. We do not hold the first event after BEGIN_WINDOW
> hostage
> > > > until the END_WINDOW is received. This allows us to provide almost
> zero
> > > > latency at per tuple level. This is one of the the differentiating
> > > > paradigms for Apex.
> > > >
> > > >   If we do it otherwise - the platform degrades to micro-batch
> > processing
> > > > mode. More details about it here:
> > > >
> > > >
> > > >
> > >
> >
> https://www.datatorrent.com/real-time-event-stream-processing-what-are-your-choices/
> > > >
> > > >
> > > >  Let me know if this answers your question or I misunderstood the
> > > question.
> > > >
> > > > --
> > > > Chetan
> > > >
> > > >
> > > >
> > > > On Fri, Aug 28, 2015 at 1:37 PM, Atri Sharma <atri@apache.org>
> wrote:
> > > >
> > > > > Team,
> > > > >
> > > > > Does it make sense to have functionality to have all or nothing
> > > > > transactional system for windows? With future functionality to have
> > > > dynamic
> > > > > operators I feel it makes sense to allow data from an entire window
> > to
> > > be
> > > > > processed or none of the data to be sent.
> > > > >
> > > > > I am not sure if window batching in its current form is a logical
> > > > > implementation of this feature.
> > > > >
> > > > > Thoughts?
> > > > >
> > > > > Regards,
> > > > >
> > > > > Atri
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message