From Ben Kirwin <>
Subject Re: Coast, and a few implementation questions
Date Fri, 28 Nov 2014 17:47:36 GMT
Sorry for the late reply on this!

I ended up writing up some of the tools I'm using to get exactly-once
semantics.[0] It's still a bit rough, but I decided that another week
sitting around on my disk wasn't going to do anything to improve it --
let me know if anything needs clarification.

That's also not a full answer, since it doesn't explain how those
primitives are used in `coast`. It's intentionally undocumented for
the moment, since it's a work in progress, but here's where the
coordination overhead sits right now:

- Any transformations / folds / aggregations that require state use
Samza's existing state / changelog machinery. `coast` tracks a couple
offsets up/downstream from the state along with the state itself, so
it adds a extra couple of `long`s per changelog message.
- If messages are grouped differently in the input and output, `coast`
adds the source partition/offset to the output messages -- which is
about 12 bytes of per-message overhead.
- To avoid duplicate messages, `coast` needs to checkpoint the input
and output offsets together. Samza doesn't give user code access to
the offsets, so I've been maintaining this within the task as
additional state.
- For jobs with multiple inputs, `coast` needs to remember the order
in which messages arrived so it can reproduce it if there's a failure.
This 'merge log' itself is not too expensive, but tracking the current
offset in that log has been a surprising pain, since it too needs to
be consistent with the checkpointed offsets. The only way I've found
so far involves having the same task both produce to and consume from
the same 'merge log' stream; this is not quite as awful as it sounds,
but it does create a lot of latency for no good reason.

So there's a couple places where more flexible offset handling would
substantially simplify things -- I'll think this over a bit more and
open a ticket. If I can work those issues out, though, the main cost
to enabling exactly-once becomes a few extra bytes of per-message
overhead for certain streams -- and I'm happy to live with that.

As it stands, though, `coast` *does* implement a exactly-once
semantics on top of the 0.8 Samza branch. I have some more cleanup,
testing, and polish to do, but I'm hoping to push out a new version of
`coast` that supports both the exactly-once backend and an
overhead-free at-least-once version. If all goes well, that should
happen sometime next week?


On Fri, Nov 7, 2014 at 1:05 PM, Ben Kirwin <> wrote:
>>> but I think coast actually has a pretty good shot at making that easier
>>>-- it has quite a lot of 'structural' knowledge about the flow of data,
>>>so it should be able to do a pretty good job of inserting the necessary
>>>checks / checkpoints / etc. one DAG node at a time.
>> True. Given that you know exactly what computation is going on, it seems
>> more tractable. I'm curious how you plan to implement exactly once. Do you
>> have any docs?
> Nothing worth reading, but I'm hoping to have something reasonable out
> in the next week or two. When that comes together, I'll send it along.

Ben Kirwin

