samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ben Kirwin <...@kirw.in>
Subject Re: Determinism and exactly-once semantics
Date Wed, 01 Oct 2014 03:50:33 GMT
Thanks for the comments -- responses inline.

> Some context: I've been experimenting with the design a high-level
> > dataflowy streaming framework, built around Kafka's partitioned logs.
> I've
> > been looking at using Samza as a backend for this, where the dataflow
> graph
> > gets compiled down to a set of Samza jobs -- so if my priorities seem
> > strange, it's because I'm motivated by this particular use-case.
>
> That sounds fun. Would love to hear more about your framework when you're
> ready to share.
>

It is fun! I'll let you know when I have something worth sharing.

In https://issues.apache.org/jira/browse/SAMZA-300 we discussed the idea of
> "write locks" on streams, which could be used to enforce such exclusive
> ownership. Nothing implemented yet, though.
>

Ah, interesting. It seems there are two cases where things could go wrong
otherwise: two jobs might accidentally assume exclusive use of some stream
/ topic, or some job might leave two copies of the same producer running
through some poor failure-detection. For the former, the idea of having a
centralized server where jobs / etc. register seems a very reasonable one.
For the latter, it seems one would need some minimal support on the Kafka
side?


> > Assuming (for a moment) that our tasks are deterministic, there's a
> couple
> > strategies that give you exactly-once behaviour:
> >
> > If a task owns the partition it's writing to, this behaviour is pretty
> easy
> > to get. Periodically, as the task is running, we can 'checkpoint' the
> > offset of the last message we wrote *out*, along with the input offset.
>
> This is quite like what we had in mind with Idempotent Producer (before
> the idea of idempotent producer got generalised into transactions in Kafka).
>

The main difference, I think, is that if we can assume only one producer
(modulo the issues mentioned above) we can just conflate the topic's offset
and the producer's 'sequence number' -- so we don't need to keep any
server-side metadata.


> Our tendency has been that if Kafka is providing some facility, it would
> be better for Samza to use that facility, rather than to implement its own
> version of the same. Of course that runs counter to wanting to support
> other message brokers, and also we're deviating from that principle in some
> places (even when Kafka adds support for tracking consumer offsets, Samza
> will probably keep its own checkpointing implementation).
>
> That's just saying: it's not obvious whether Samza should be implementing
> something like idempotency, or whether it's best left to the messaging
> layer. There are good arguments in either direction.
>

Certainly! There's definitely a handful of complexity / performance /
redundancy / generality tradeoffs to weigh off here. I suspect the 'right
choice' is quite application-specific.

> The easiest way to extend this to nondeterministic streams is to just make
> > those streams deterministic: you just need to log all your 'choices'
> before
> > making them visible to the rest of the system, so you can replay them if
> > something goes wrong. For example, if the nondeterminism is from the
> > message arrival order, it's enough to take Chris Riccomini's suggestion
> and
> > log whenever the MessageChooser chooses a stream. In the worst case --
> say,
> > if you have some nondeterministic code that sends messages and mutates
> > state -- you need to capture all output and state changes, and make sure
> > they're all logged together before continuing.
>
> I don't quite follow. What would that logging look like (assuming it
> involves requests to external systems like a remote database)? Would all
> external communication have to go through some kind of logging layer?


Suppose I'm going through a stream of Wikipedia edits, and I have a task
that hits some database to check that the user is not a bot, does some
calculation on the edited text, and emits the results on another stream. I
could always buffer up my output and write it to some internal stream I own
before sending it, and then I could be *absolutely* sure that I could
reproduce it after a failure. Otherwise, if I'm quite sure that my
calculation is pure, I could decide to only log the isABot boolean -- it
should come out to the same answer.


> Nondeterminism can also come from stuff like using the system clock,
> random number generator, order of iteration in a hash table, etc. You can
> say "just don't do that" (and indeed some frameworks do), but it's hard to
> enforce, and it would be hard for users to understand why a single
> Math.random() throws off the correctness of their entire job. Given that
> the intention of exactly-once semantics is to simplify the programming
> abstraction, I'm not totally convinced that's a win.
>

In general I agree -- 'transactional' is almost certainly the right
default, especially for imperative code. Most folks don't want to mess
around at this level of detail, and I'm confident most code could use the
safety net.

On the other hand, things look a bit different from the perspective of a
framework author. (Or someone else looking for fine-grained control.)
Suppose I'm trying to translate the following SQL to code: 'SELECT user_id,
COUNT(*) FROM PageViewEvent GROUP BY user_id'. I know that "getting the
user_id field from the event message" and "counting the events for a single
user" are both deterministic. I need some way to avoid counting the same
event twice, so 'at least once' is not a good fit -- but I also don't need
the full overhead of a transaction. If those are the only two options
available, it's a less appealing target for this sort of work.

Of course, that may turn out to be the right choice for Samza; but it
*seems* like it should be possible to support both use-cases.

> For me, I think the only thing missing right now is a little more control
> > over the incoming messages. For example, the first exactly-once strategy
> I
> > outlined above needed to bootstrap itself from a stream of (input offset,
> > output offset) pairs, and then take that most recent input offset and
> start
> > consuming from *there* -- instead of whatever's in Samza's checkpoint
> > stream.
>
> Could you perhaps do this by implementing a CheckpointManager (steal the
> implementation from the Kafka checkpoint manager, but change it to suit
> your needs)?
>
> > On the other hand, I'm pretty sure a lower-level mechanism that allowed
> you
> > to 'pull' or 'request' particular input could support both my unusual
> > requirements and the existing checkpoint / message chooser / bootstrap
> > stream machinery on top.
>
> Something like this was requested on
> https://issues.apache.org/jira/browse/SAMZA-255 -- if that is what you're
> looking for, could you explain your use case on that issue please? It's
> always possible to add features if there's a compelling use case, we just
> want to make sure they are well thought out.
>

I don't think enough data gets passed through the CheckpointManager
interface to implement what I had in mind. I'll try and work up a full
example and add it to the issue.


> Best,
> Martin
>
>
-- Ben

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message