samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yan Fang <yanfang...@gmail.com>
Subject Re: Sharing heap memory in Samza
Date Thu, 30 Oct 2014 23:55:51 GMT
looked at Chronicle Map a little, seems we could have a Chronicle Map
backed state store, which allows sharing the state through all nodes/ all
processes in one machine.

Fang, Yan
yanfang724@gmail.com
+1 (206) 849-4108

On Thu, Oct 30, 2014 at 10:05 AM, Roger Hoover <roger.hoover@gmail.com>
wrote:

> One other possibility is to use shared memory with something like Chronicle
> Map <http://openhft.net/products/chronicle-map/>.
>
> On Wed, Oct 29, 2014 at 12:53 PM, Jordan Lewis <jordan@knewton.com> wrote:
>
> > On Tue, Oct 28, 2014 at 5:39 PM, Chris Riccomini <
> > criccomini@linkedin.com.invalid> wrote:
> > >
> > > The problem is that coordinator.commit doesn't take parameters. It just
> > > tells Samza to commit the offset that *it* knows you've processed up
> to.
> > > The way Samza knows which offsets you've processed up to is implicit:
> > when
> > > StreamTask.process returns, Samza assumes that your task has processed
> > the
> > > message, and the offset is therefore safe to commit.
> > >
> >
> > Oh, I see! That makes sense. I didn't realize that the coordinator only
> > lets you request a commit in that way.
> >
> > > This is a big deal to us mostly because of the large object memory
> > > >sharing I was talking about before, but also probably because JVMs
> have
> > > >non-trivial overhead in memory and CPU.
> > >
> > > Ah! I think I understand now. The problem is you want a high level of
> > > parallelism, but every time you add it with a container, you pay for it
> > in
> > > memory by having another copy of this large object.
> >
> >
> > Yep -  exactly.
> >
> >
> > >
> > > Yea, unfortunately, right now the best you can do is to run a thread
> pool
> > > inside the container.
> > >
> >
> > Okay. Are there any plans in the works to expose a thread-based
> parallelism
> > model? In other words, keep the same mental model of one TaskInstance per
> > partition, but have the RunLoop distribute work to the TaskInstances in a
> > container in a concurrent manner instead of a serial one. I would be very
> > interested in such a project.
> >
> >
> > - Jordan
> >
> >
> > >
> > > Cheers,
> > > Chris
> > >
> > > On 10/28/14 2:29 PM, "Jordan Lewis" <jordan@knewton.com> wrote:
> > >
> > > >On Tue, Oct 28, 2014 at 5:17 PM, Chris Riccomini <
> > > >criccomini@linkedin.com.invalid> wrote:
> > > >
> > > >> Hey Jordan,
> > > >>
> > > >> > Couldn't you instead concurrently commit offsets for each owned
> > > >> >partition by taking the minimum offset of the threads working
on
> that
> > > >> >partition, minus one? That way, in the worst case, you'd only
screw
> > up
> > > >>by
> > > >> >forgetting to commit some just-finished work until the next call
to
> > > >> >window().
> > > >>
> > > >> Yes, you could, but this would require changes to Samza, itself. The
> > > >> window() method can be done today with no changes to Samza.
> > > >>
> > > >
> > > >I must be missing something - since in your suggested implementation
> the
> > > >Task itself manages the thread pool, what's stopping window() from
> doing
> > > >what I suggested without changing Samza? Oh, I guess the problem is
> that
> > > >Samza makes one Task instance per partition regardless of your
> > parallelism
> > > >settings? So the thread pool you suggest is actually parallelism
> within
> > a
> > > >single partition?
> > > >
> > > >
> > > >
> > > >> One other random aside on the threading situation is that, if you
> care
> > > >> about message ordering, you'll need to make sure that messages that
> > are
> > > >> handed off to threads are done so based on their key or the
> partition
> > > >>they
> > > >> came from. Otherwise, t2 could get m1, and t1 could get m2, and t1
> > might
> > > >> finish processing first, which would lead to out-of-order processing
> > > >> (multi-subscriber partitions within a single job).
> > > >
> > > >
> > > >Right - that makes sense.
> > > >
> > > >
> > > >
> > > >> > However, we recently switched to having each machine have as
many
> > > >> >Kafka-managed consumer threads as cores, and did away with the
> > separate
> > > >> >thread pool.
> > > >>
> > > >> Unless I'm misunderstanding, this is exactly what Samza does,
> doesn't
> > > >>it?
> > > >> Each SamzaContainer is single threaded, so running N of them on a
> > > >>machine,
> > > >> where N is the number of cores, results in the exact same model
> (since
> > > >> each SamzaContainer has its own consumer threads).
> > > >>
> > > >
> > > >The only difference is that Samza has one JVM per core, each with a
> > single
> > > >(or perhaps more than one, but at least blocking on each other?)
> > consumer
> > > >thread, whereas what we've been working with is one thread per core.
> > This
> > > >is a big deal to us mostly because of the large object memory sharing
> I
> > > >was
> > > >talking about before, but also probably because JVMs have non-trivial
> > > >overhead in memory and CPU.
> > > >
> > > >
> > > >
> > > >> > Since Samza was built with single-threaded containers in mind,
it
> > > >>seems
> > > >> >to me that it might be tricky to get Samza to tell YARN that it
> > wants n
> > > >> >compute units for a single container. Is there a way to accomplish
> > > >>this?
> > > >>
> > > >>
> > > >> This trickiness is why we are encouraging the one core per container
> > > >> model. You can get around this by using the yarn.container.cpu.cores
> > > >> setting, though. Setting this to a higher number will tell YARN that
> > > >>more
> > > >> cores are being used.
> > > >>
> > > >
> > > >Got it.
> > > >
> > > >Thanks,
> > > >Jordan
> > > >
> > > >
> > > >On 10/28/14 12:16 PM, "Jordan Lewis" <jordan@knewton.com> wrote:
> > > >>
> > > >> >Hey Chris,
> > > >> >
> > > >> >Thanks for the detailed response.
> > > >> >
> > > >> >Your proposed solution for adding parallelism makes sense, but
I
> > don't
> > > >>yet
> > > >> >understand the importance of the blocking step in window().
> Couldn't
> > > >>you
> > > >> >instead concurrently commit offsets for each owned partition by
> > taking
> > > >>the
> > > >> >minimum offset of the threads working on that partition, minus
one?
> > > >>That
> > > >> >way, in the worst case, you'd only screw up by forgetting to commit
> > > >>some
> > > >> >just-finished work until the next call to window().
> > > >> >
> > > >> >We've had some experience with this strategy before, actually.
We
> > used
> > > >>to
> > > >> >have each machine use a single Kafka worker thread that read from
> all
> > > >>of
> > > >> >the partitions that it owned, and send the messages it consumes
to
> a
> > > >> >worker
> > > >> >pool (sized proportionally to the number of cores on the machine)
> for
> > > >> >processing. As you mention it's tricky to do the offset management
> > > >>right
> > > >> >in
> > > >> >this way. However, we recently switched to having each machine
have
> > as
> > > >> >many
> > > >> >Kafka-managed consumer threads as cores, and did away with the
> > separate
> > > >> >thread pool. We like this approach a lot - it's simple, easy to
> > manage,
> > > >> >and
> > > >> >doesn't expose us to data loss. Have you considered adding this
> kind
> > of
> > > >> >partition/task based parallelism to Samza? It seems to me that
this
> > > >>isn't
> > > >> >so hard to understand, and seems like it might produce less
> overhead.
> > > >> >However, I can also see the appeal of having the simple one thread
> > per
> > > >> >container model.
> > > >> >
> > > >> >Let's pretend for a moment that cross-task memory sharing was
> > > >>implemented,
> > > >> >and that we also choose the dangerous road of adding multithreading
> > to
> > > >>our
> > > >> >task implementations. Since Samza was built with single-threaded
> > > >> >containers
> > > >> >in mind, it seems to me that it might be tricky to get Samza to
> tell
> > > >>YARN
> > > >> >that it wants n compute units for a single container. Is there
a
> way
> > to
> > > >> >accomplish this?
> > > >> >
> > > >> >Thanks,
> > > >> >Jordan Lewis
> > > >> >
> > > >> >On Mon, Oct 27, 2014 at 5:51 PM, Chris Riccomini <
> > > >> >criccomini@linkedin.com.invalid> wrote:
> > > >> >
> > > >> >> Hey Jordan,
> > > >> >>
> > > >> >> Your question touches on a couple of things:
> > > >> >>
> > > >> >> 1. Shared objects between Samza tasks within one container.
> > > >> >> 2. Multi-threaded SamzaContainers.
> > > >> >>
> > > >> >> For (1), there is some discussion on shared state here:
> > > >> >>
> > > >> >>   https://issues.apache.org/jira/browse/SAMZA-402
> > > >> >>
> > > >> >> The outcome of this ticket was that it's something we want,
but
> > > >>aren't
> > > >> >> implementing right now. The idea is to have a state shore
that's
> > > >>shared
> > > >> >> amongst all tasks in a container. The store would be immutable,
> and
> > > >> >>would
> > > >> >> be restored on startup via a stream that had all required
data.
> > > >> >>
> > > >> >> An alternative to this is to just have a static variable
that all
> > > >>tasks
> > > >> >> use. This will allow all tasks within one container to use
the
> > > >>object.
> > > >> >> We've done this before, and it works reasonably well for
> immutable
> > > >> >> objects, which you have.
> > > >> >>
> > > >> >> For (2), we've actively tried to avoid adding threading to
the
> > > >> >> SamzaContainer. Having a single threaded container has worked
out
> > > >>pretty
> > > >> >> well for us, and greatly simplifies the mental model that
people
> > > >>need to
> > > >> >> have to use Samza. Our advice to people who ask about adding
> > > >>parallelism
> > > >> >> is to tell them to add more containers.
> > > >> >>
> > > >> >> That said, it is possible to run threads inside a StreamTask
if
> you
> > > >> >>really
> > > >> >> want to increase your parallelism. Again, I would advise
against
> > > >>this.
> > > >> >>If
> > > >> >> not implemented properly, doing so can lead to data loss.
The
> > proper
> > > >>way
> > > >> >> to implement threading inside a StreamTask is to have an
thread
> > pool
> > > >> >> execute, and give threads messages as process() is called.
You
> must
> > > >>then
> > > >> >> disable offset checkpointing by setting task.commit.ms to
-1.
> > > Lastly,
> > > >> >>your
> > > >> >> task must implement WindowableTask. In the window method,
it must
> > > >>block
> > > >> >>on
> > > >> >> all threads that are currently processing a message. When
all
> > threads
> > > >> >>have
> > > >> >> finished processing, it's then safe to checkpoint offsets,
and
> the
> > > >> >>window
> > > >> >> method must call coordinator.commit().
> > > >> >>
> > > >> >> We've written a task that does this as well, and it works,
but
> you
> > > >>have
> > > >> >>to
> > > >> >> know what you're doing to get it right.
> > > >> >>
> > > >> >> So, I think the two state options are:
> > > >> >>
> > > >> >> 1. Wait for global state to be implemented (or implement
it
> > yourself
> > > >> >>:)).
> > > >> >> This could take a while.
> > > >> >> 2. Use static objects to share state among StreamTasks in
a given
> > > >> >> SamzaContainer.
> > > >> >>
> > > >> >> And for parallelism:
> > > >> >>
> > > >> >> 1. Increase partition/container count for your job.
> > > >> >> 2. Add threads to your StreamTasks.
> > > >> >>
> > > >> >> Cheers,
> > > >> >> Chris
> > > >> >>
> > > >> >> On 10/27/14 12:52 PM, "Jordan Lewis" <jordan@knewton.com>
wrote:
> > > >> >>
> > > >> >> >Hi,
> > > >> >> >
> > > >> >> >My team is interested in trying out Samza to augment
or replace
> > our
> > > >> >> >hand-rolled Kafka-based stream processing system. I have
a
> > question
> > > >> >>about
> > > >> >> >sharing memory across task instances.
> > > >> >> >
> > > >> >> >Currently, our main stream processing application has
some
> large,
> > > >> >> >immutable
> > > >> >> >objects that need to be loaded into JVM heap memory in
order to
> > > >>process
> > > >> >> >messages on any partition of certain topics. We use thread-based
> > > >> >> >parallelism in our system, so that the Kafka consumer
threads on
> > > >>each
> > > >> >> >machine listening to these topics can use the same instance
of
> > these
> > > >> >>large
> > > >> >> >heap objects. This is very convenient, as these objects
are so
> > large
> > > >> >>that
> > > >> >> >storing multiple copies of them would be quite wasteful.
> > > >> >> >
> > > >> >> >To use Samza, it seems as though each JVM would have
to store
> > > >>copies of
> > > >> >> >these objects separately, even if we were to use LevelDB's
> > off-heap
> > > >> >> >storage
> > > >> >> >- each JVM would eventually have to inflate the off-heap
memory
> > into
> > > >> >> >regular Java objects to be usable. One solution to this
problem
> > > >>could
> > > >> >>be
> > > >> >> >using something like Google's Flatbuffers [0] for these
large
> > > >>objects
> > > >> >>- so
> > > >> >> >that we could use accessors on large, off-heap ByteBuffers
> without
> > > >> >>having
> > > >> >> >to actually deserialize them. However, we think that
doing this
> > for
> > > >> >>all of
> > > >> >> >the relevant data we have would be a lot of work.
> > > >> >> >
> > > >> >> >Have you guys considered implementing a thread-based
parallelism
> > > >>model
> > > >> >>for
> > > >> >> >Samza, whether as a replacement or alongside the current
> JVM-based
> > > >> >> >parallelism approach? What obstacles are there to making
this
> > > >>happen,
> > > >> >> >assuming that decided not to do it? This approach would
be
> > > >>invaluable
> > > >> >>for
> > > >> >> >our use case, since we rely so heavily (perhaps unfortunately
> so)
> > on
> > > >> >>these
> > > >> >> >shared heap data structures.
> > > >> >> >
> > > >> >> >Thanks,
> > > >> >> >Jordan Lewis
> > > >> >> >
> > > >> >> >[0]: http://google.github.io/flatbuffers/
> > > >> >>
> > > >> >>
> > > >>
> > > >>
> > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message