samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jordan Lewis <jor...@knewton.com>
Subject Re: Sharing heap memory in Samza
Date Tue, 28 Oct 2014 21:29:48 GMT
On Tue, Oct 28, 2014 at 5:17 PM, Chris Riccomini <
criccomini@linkedin.com.invalid> wrote:

> Hey Jordan,
>
> > Couldn't you instead concurrently commit offsets for each owned
> >partition by taking the minimum offset of the threads working on that
> >partition, minus one? That way, in the worst case, you'd only screw up by
> >forgetting to commit some just-finished work until the next call to
> >window().
>
> Yes, you could, but this would require changes to Samza, itself. The
> window() method can be done today with no changes to Samza.
>

I must be missing something - since in your suggested implementation the
Task itself manages the thread pool, what's stopping window() from doing
what I suggested without changing Samza? Oh, I guess the problem is that
Samza makes one Task instance per partition regardless of your parallelism
settings? So the thread pool you suggest is actually parallelism within a
single partition?



> One other random aside on the threading situation is that, if you care
> about message ordering, you'll need to make sure that messages that are
> handed off to threads are done so based on their key or the partition they
> came from. Otherwise, t2 could get m1, and t1 could get m2, and t1 might
> finish processing first, which would lead to out-of-order processing
> (multi-subscriber partitions within a single job).


Right - that makes sense.



> > However, we recently switched to having each machine have as many
> >Kafka-managed consumer threads as cores, and did away with the separate
> >thread pool.
>
> Unless I'm misunderstanding, this is exactly what Samza does, doesn't it?
> Each SamzaContainer is single threaded, so running N of them on a machine,
> where N is the number of cores, results in the exact same model (since
> each SamzaContainer has its own consumer threads).
>

The only difference is that Samza has one JVM per core, each with a single
(or perhaps more than one, but at least blocking on each other?) consumer
thread, whereas what we've been working with is one thread per core. This
is a big deal to us mostly because of the large object memory sharing I was
talking about before, but also probably because JVMs have non-trivial
overhead in memory and CPU.



> > Since Samza was built with single-threaded containers in mind, it seems
> >to me that it might be tricky to get Samza to tell YARN that it wants n
> >compute units for a single container. Is there a way to accomplish this?
>
>
> This trickiness is why we are encouraging the one core per container
> model. You can get around this by using the yarn.container.cpu.cores
> setting, though. Setting this to a higher number will tell YARN that more
> cores are being used.
>

Got it.

Thanks,
Jordan


On 10/28/14 12:16 PM, "Jordan Lewis" <jordan@knewton.com> wrote:
>
> >Hey Chris,
> >
> >Thanks for the detailed response.
> >
> >Your proposed solution for adding parallelism makes sense, but I don't yet
> >understand the importance of the blocking step in window(). Couldn't you
> >instead concurrently commit offsets for each owned partition by taking the
> >minimum offset of the threads working on that partition, minus one? That
> >way, in the worst case, you'd only screw up by forgetting to commit some
> >just-finished work until the next call to window().
> >
> >We've had some experience with this strategy before, actually. We used to
> >have each machine use a single Kafka worker thread that read from all of
> >the partitions that it owned, and send the messages it consumes to a
> >worker
> >pool (sized proportionally to the number of cores on the machine) for
> >processing. As you mention it's tricky to do the offset management right
> >in
> >this way. However, we recently switched to having each machine have as
> >many
> >Kafka-managed consumer threads as cores, and did away with the separate
> >thread pool. We like this approach a lot - it's simple, easy to manage,
> >and
> >doesn't expose us to data loss. Have you considered adding this kind of
> >partition/task based parallelism to Samza? It seems to me that this isn't
> >so hard to understand, and seems like it might produce less overhead.
> >However, I can also see the appeal of having the simple one thread per
> >container model.
> >
> >Let's pretend for a moment that cross-task memory sharing was implemented,
> >and that we also choose the dangerous road of adding multithreading to our
> >task implementations. Since Samza was built with single-threaded
> >containers
> >in mind, it seems to me that it might be tricky to get Samza to tell YARN
> >that it wants n compute units for a single container. Is there a way to
> >accomplish this?
> >
> >Thanks,
> >Jordan Lewis
> >
> >On Mon, Oct 27, 2014 at 5:51 PM, Chris Riccomini <
> >criccomini@linkedin.com.invalid> wrote:
> >
> >> Hey Jordan,
> >>
> >> Your question touches on a couple of things:
> >>
> >> 1. Shared objects between Samza tasks within one container.
> >> 2. Multi-threaded SamzaContainers.
> >>
> >> For (1), there is some discussion on shared state here:
> >>
> >>   https://issues.apache.org/jira/browse/SAMZA-402
> >>
> >> The outcome of this ticket was that it's something we want, but aren't
> >> implementing right now. The idea is to have a state shore that's shared
> >> amongst all tasks in a container. The store would be immutable, and
> >>would
> >> be restored on startup via a stream that had all required data.
> >>
> >> An alternative to this is to just have a static variable that all tasks
> >> use. This will allow all tasks within one container to use the object.
> >> We've done this before, and it works reasonably well for immutable
> >> objects, which you have.
> >>
> >> For (2), we've actively tried to avoid adding threading to the
> >> SamzaContainer. Having a single threaded container has worked out pretty
> >> well for us, and greatly simplifies the mental model that people need to
> >> have to use Samza. Our advice to people who ask about adding parallelism
> >> is to tell them to add more containers.
> >>
> >> That said, it is possible to run threads inside a StreamTask if you
> >>really
> >> want to increase your parallelism. Again, I would advise against this.
> >>If
> >> not implemented properly, doing so can lead to data loss. The proper way
> >> to implement threading inside a StreamTask is to have an thread pool
> >> execute, and give threads messages as process() is called. You must then
> >> disable offset checkpointing by setting task.commit.ms to -1. Lastly,
> >>your
> >> task must implement WindowableTask. In the window method, it must block
> >>on
> >> all threads that are currently processing a message. When all threads
> >>have
> >> finished processing, it's then safe to checkpoint offsets, and the
> >>window
> >> method must call coordinator.commit().
> >>
> >> We've written a task that does this as well, and it works, but you have
> >>to
> >> know what you're doing to get it right.
> >>
> >> So, I think the two state options are:
> >>
> >> 1. Wait for global state to be implemented (or implement it yourself
> >>:)).
> >> This could take a while.
> >> 2. Use static objects to share state among StreamTasks in a given
> >> SamzaContainer.
> >>
> >> And for parallelism:
> >>
> >> 1. Increase partition/container count for your job.
> >> 2. Add threads to your StreamTasks.
> >>
> >> Cheers,
> >> Chris
> >>
> >> On 10/27/14 12:52 PM, "Jordan Lewis" <jordan@knewton.com> wrote:
> >>
> >> >Hi,
> >> >
> >> >My team is interested in trying out Samza to augment or replace our
> >> >hand-rolled Kafka-based stream processing system. I have a question
> >>about
> >> >sharing memory across task instances.
> >> >
> >> >Currently, our main stream processing application has some large,
> >> >immutable
> >> >objects that need to be loaded into JVM heap memory in order to process
> >> >messages on any partition of certain topics. We use thread-based
> >> >parallelism in our system, so that the Kafka consumer threads on each
> >> >machine listening to these topics can use the same instance of these
> >>large
> >> >heap objects. This is very convenient, as these objects are so large
> >>that
> >> >storing multiple copies of them would be quite wasteful.
> >> >
> >> >To use Samza, it seems as though each JVM would have to store copies of
> >> >these objects separately, even if we were to use LevelDB's off-heap
> >> >storage
> >> >- each JVM would eventually have to inflate the off-heap memory into
> >> >regular Java objects to be usable. One solution to this problem could
> >>be
> >> >using something like Google's Flatbuffers [0] for these large objects
> >>- so
> >> >that we could use accessors on large, off-heap ByteBuffers without
> >>having
> >> >to actually deserialize them. However, we think that doing this for
> >>all of
> >> >the relevant data we have would be a lot of work.
> >> >
> >> >Have you guys considered implementing a thread-based parallelism model
> >>for
> >> >Samza, whether as a replacement or alongside the current JVM-based
> >> >parallelism approach? What obstacles are there to making this happen,
> >> >assuming that decided not to do it? This approach would be invaluable
> >>for
> >> >our use case, since we rely so heavily (perhaps unfortunately so) on
> >>these
> >> >shared heap data structures.
> >> >
> >> >Thanks,
> >> >Jordan Lewis
> >> >
> >> >[0]: http://google.github.io/flatbuffers/
> >>
> >>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message