samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bart Wyatt <>
Subject consistency between input, output and changelog streams
Date Tue, 07 Apr 2015 14:27:22 GMT
We are trying to make sure that we are handling the proper edge cases in our stateful tasks
so that our processes can survive failure well.

Given the changelog will recreate the KV store (state) up to the point of time of the last
durable changelog write(Ts), the checkpoint will start input from the point of time represented
in the last durable checkpoint write(Ti) and the output will have messages from it at the
3rd point in time of the last durable output write(To), our current assumption is that in
all recovery cases:

Ti <= Ts <= To

This means that some input may be "replayed" from the point of view of the KV store which
is handled by normal at-least-once-delivery semantics processing and that we may duplicate
output messages that would have been produced between Ts and To which is also consistent with

However, I cannot find code that backs this assumptions and I'm hoping I've just missed it,

If To < Ts, then we may drop output because the state assumed it was already written and
due to timing of actual writes to kafka or durability concerns the output is not there.  This
is important for a job, for example, that emits "session started @ X" messages on the first
message for any given session key.  The state will see a repeated message as a duplicate and
not emit the output.  I think this is solvable in the job as long as To >= Ti, but I am
not certain the solution is generally applicable to tasks where side-effects of previous input
exist in the state and have an effect on future output.

If Ts < Ti, then our stateful task will effectively drop input, even though it may have
produced some or all of the output for those messages in its previous incarnation, as the
state used for all future processes will not have the side effects of processing the messages
between Ts and Ti. We see no solution for this at the task level as it would require collusion
between two backing system (checkpoints and changelogs) to correct, presumably by rewinding
Ti to Ts.

Perhaps my code search failed because I was expecting some colluding system that would wait
for output to write out before writing changelog entries and then again before checkpoints
and that was to presumptive.  Is there something about the code, the assumption or my edge
analysis that I've missed to address this?


COMMUNICATION intended solely for the recipient and, therefore, may not be retransmitted to
any party outside of the recipient's organization without the prior written consent of the
sender. If you have received this e-mail in error please notify the sender immediately by
telephone or reply e-mail and destroy the original message without making a copy. Deep Silver,
Inc. accepts no liability for any losses or damages resulting from infected e-mail transmissions
and viruses in e-mail attachments.

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message