From Ben Kirwin <>
Subject Coast, and a few implementation questions
Date Tue, 04 Nov 2014 19:06:32 GMT

Previously on this list, I mentioned I was working on a high-level
streaming framework, and trying to piggyback it on top of Samza. This
work has been going fairly well; the flow graph is compiling down to
Samza configs, and they run quite nicely on the hello-samza test
cluster. It's still very incomplete, but if you're feeling
adventurous: the project is called 'coast', and the code's up on

More links, examples, and bootstrap instructions are in the READMEs,
and I'm happy to answer any questions.

But all this is to say: I've been digging fairly deeply into Samza's
structure and implementation, and it's given me some questions of my

By default, the posted version of coast does not preserve any offsets
or state. Enabling Samza's checkpoint / changelog is straightforward,
but I'm also particularly interested in supporting exactly-once
semantics where possible. This work is coming along, but there are a
few cases where I'm fighting against Samza's regular primitives, and
it's made things more complex or less performant than I'd like. Of
course, this is a problem of my own devising; but I'd be quite
grateful if anyone has comments or suggestions.

1) A few weeks ago, the idea of a 'replayable message chooser' was
being thrown around:

I've been implementing something akin to this, but it ends up being
fairly invasive -- it interacts in a tricky way with the checkpointing
mechanism, so it doesn't fit nicely behind the MessageChooser
interface. I suspect it would be cleaner with framework support, but
I'm not sure if there's broad enough interest to justify it vs.
waiting for Kafka to have transactional messaging. If there is, I'd be
happy to start an implementation discussion on the ticket.

2) To avoid sending certain messages twice, the StreamTask needs to
check its output stream on startup to see what the 'latest' offset is.
It looks like I can get this information from a SystemAdmin instance,
but this doesn't seem to be accessible to user code. Should I create
the system myself from config during task init, or is there a better

3) To be sure the state is recoverable after a crash, I need to make
sure data is persisted in a particular order. The messages need to be
sent before the state is changelogged, and both need to be complete
before checkpointing the offsets. (Happy to go into more detail on
this if you like.) If I make both the output and changelog streams
synchronous, I think this should already work -- as long as I make the
calls in the correct order. (Right?)

But this is slow, since it requires multiple blocking network
roundtrips for every message. It should be more performant to buffer
all the changes, and then trigger a flush: first for the messages,
then the state changelog, then the offset checkpoints. Right now,
though, I don't think this is possible; the commit logic does send the
messages before triggering a checkpoint, but I can't find a way to
ensure certain output streams are flushed before others, or to flush a
single output stream explicitly. Am I missing anything? If not, would
it be worth adding support for this?

That's all for now, I think. Thanks again; and let me know if you'd
like me to elaborate on anything.

-- Ben

