samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jordan Lewis <>
Subject Re: Sharing heap memory in Samza
Date Tue, 28 Oct 2014 19:16:54 GMT
Hey Chris,

Thanks for the detailed response.

Your proposed solution for adding parallelism makes sense, but I don't yet
understand the importance of the blocking step in window(). Couldn't you
instead concurrently commit offsets for each owned partition by taking the
minimum offset of the threads working on that partition, minus one? That
way, in the worst case, you'd only screw up by forgetting to commit some
just-finished work until the next call to window().

We've had some experience with this strategy before, actually. We used to
have each machine use a single Kafka worker thread that read from all of
the partitions that it owned, and send the messages it consumes to a worker
pool (sized proportionally to the number of cores on the machine) for
processing. As you mention it's tricky to do the offset management right in
this way. However, we recently switched to having each machine have as many
Kafka-managed consumer threads as cores, and did away with the separate
thread pool. We like this approach a lot - it's simple, easy to manage, and
doesn't expose us to data loss. Have you considered adding this kind of
partition/task based parallelism to Samza? It seems to me that this isn't
so hard to understand, and seems like it might produce less overhead.
However, I can also see the appeal of having the simple one thread per
container model.

Let's pretend for a moment that cross-task memory sharing was implemented,
and that we also choose the dangerous road of adding multithreading to our
task implementations. Since Samza was built with single-threaded containers
in mind, it seems to me that it might be tricky to get Samza to tell YARN
that it wants n compute units for a single container. Is there a way to
accomplish this?

Jordan Lewis

On Mon, Oct 27, 2014 at 5:51 PM, Chris Riccomini <> wrote:

> Hey Jordan,
> Your question touches on a couple of things:
> 1. Shared objects between Samza tasks within one container.
> 2. Multi-threaded SamzaContainers.
> For (1), there is some discussion on shared state here:
> The outcome of this ticket was that it's something we want, but aren't
> implementing right now. The idea is to have a state shore that's shared
> amongst all tasks in a container. The store would be immutable, and would
> be restored on startup via a stream that had all required data.
> An alternative to this is to just have a static variable that all tasks
> use. This will allow all tasks within one container to use the object.
> We've done this before, and it works reasonably well for immutable
> objects, which you have.
> For (2), we've actively tried to avoid adding threading to the
> SamzaContainer. Having a single threaded container has worked out pretty
> well for us, and greatly simplifies the mental model that people need to
> have to use Samza. Our advice to people who ask about adding parallelism
> is to tell them to add more containers.
> That said, it is possible to run threads inside a StreamTask if you really
> want to increase your parallelism. Again, I would advise against this. If
> not implemented properly, doing so can lead to data loss. The proper way
> to implement threading inside a StreamTask is to have an thread pool
> execute, and give threads messages as process() is called. You must then
> disable offset checkpointing by setting to -1. Lastly, your
> task must implement WindowableTask. In the window method, it must block on
> all threads that are currently processing a message. When all threads have
> finished processing, it's then safe to checkpoint offsets, and the window
> method must call coordinator.commit().
> We've written a task that does this as well, and it works, but you have to
> know what you're doing to get it right.
> So, I think the two state options are:
> 1. Wait for global state to be implemented (or implement it yourself :)).
> This could take a while.
> 2. Use static objects to share state among StreamTasks in a given
> SamzaContainer.
> And for parallelism:
> 1. Increase partition/container count for your job.
> 2. Add threads to your StreamTasks.
> Cheers,
> Chris
> On 10/27/14 12:52 PM, "Jordan Lewis" <> wrote:
> >Hi,
> >
> >My team is interested in trying out Samza to augment or replace our
> >hand-rolled Kafka-based stream processing system. I have a question about
> >sharing memory across task instances.
> >
> >Currently, our main stream processing application has some large,
> >immutable
> >objects that need to be loaded into JVM heap memory in order to process
> >messages on any partition of certain topics. We use thread-based
> >parallelism in our system, so that the Kafka consumer threads on each
> >machine listening to these topics can use the same instance of these large
> >heap objects. This is very convenient, as these objects are so large that
> >storing multiple copies of them would be quite wasteful.
> >
> >To use Samza, it seems as though each JVM would have to store copies of
> >these objects separately, even if we were to use LevelDB's off-heap
> >storage
> >- each JVM would eventually have to inflate the off-heap memory into
> >regular Java objects to be usable. One solution to this problem could be
> >using something like Google's Flatbuffers [0] for these large objects - so
> >that we could use accessors on large, off-heap ByteBuffers without having
> >to actually deserialize them. However, we think that doing this for all of
> >the relevant data we have would be a lot of work.
> >
> >Have you guys considered implementing a thread-based parallelism model for
> >Samza, whether as a replacement or alongside the current JVM-based
> >parallelism approach? What obstacles are there to making this happen,
> >assuming that decided not to do it? This approach would be invaluable for
> >our use case, since we rely so heavily (perhaps unfortunately so) on these
> >shared heap data structures.
> >
> >Thanks,
> >Jordan Lewis
> >
> >[0]:

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message