samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Pick <pickda...@gmail.com>
Subject Re: Question on table to table joins
Date Wed, 19 Nov 2014 19:21:11 GMT
Hey Chris,

1. How large is the per-partition state that we're talking about? We've
tested our RocksDB (0.8.0) implementation in excess of 80GB, and it
continues to perform at the same rate as smaller store sizes. The only
real trade-off appears to be restoration time, which seems to scale
linearly (2x the data takes 2x as long to restore). Depending on your
latency requirements, this may or may not be acceptable.

Right now it's hundreds of gig. Like I mentioned before we'd love to
partition our data better, but because we don't have a common join key on
every table we haven't really come up with a good scheme for doing this.

2. "And from my understanding of how Samza tasks work you can't consume
partition 0 from one topic and an arbitrary partition from another topic."
In 0.8.0, we allow pluggable partition assignment, using the
SystemStreamPartitionGrouper interface. There is still a limit that
prevents you from assigning the same partition to more than one task,
though. This is discussed in detail in this ticket:
https://issues.apache.org/jira/browse/SAMZA-353.

This is great, though I think ideally what we would want is to have a kafka
topic per database table, and a Samza task that could consume all of one
topic for a table that's fairly small (e.g. customers) and a single
partition of a big table (e.g. posts).

3. As Martin said, we ended up moving the YARN NMs off of the Kafka broker
nodes at LinkedIn due to page cache contention between the Kafka broker
and stateful Samza jobs. We actually published an engineering blog post
today (http://engineering.linkedin.com/samza/operating-apache-samza-scale)
that documents how we run Samza. Kafka brokers rely on page cache as their
"cache", so taking page cache away to use for stateful stream processors
can impact the latency and throughput of the brokers (they start hitting
disk more).

Thanks! I was surprised when I heard you mention (I think it was you Chris)
that LinkedIn was running Samza on the same boxes with Kafka. I'll read
through that post now.

David

On Wed, Nov 19, 2014 at 12:59 PM, Chris Riccomini <
criccomini@linkedin.com.invalid> wrote:

> Hey David,
>
> Hmm, this is interesting. Some notes/questions:
>
> 1. How large is the per-partition state that we're talking about? We've
> tested our RocksDB (0.8.0) implementation in excess of 80GB, and it
> continues to perform at the same rate as smaller store sizes. The only
> real trade-off appears to be restoration time, which seems to scale
> linearly (2x the data takes 2x as long to restore). Depending on your
> latency requirements, this may or may not be acceptable.
> 2. "And from my understanding of how Samza tasks work you can't consume
> partition 0 from one topic and an arbitrary partition from another topic."
> In 0.8.0, we allow pluggable partition assignment, using the
> SystemStreamPartitionGrouper interface. There is still a limit that
> prevents you from assigning the same partition to more than one task,
> though. This is discussed in detail in this ticket:
> https://issues.apache.org/jira/browse/SAMZA-353.
> 3. As Martin said, we ended up moving the YARN NMs off of the Kafka broker
> nodes at LinkedIn due to page cache contention between the Kafka broker
> and stateful Samza jobs. We actually published an engineering blog post
> today (http://engineering.linkedin.com/samza/operating-apache-samza-scale)
> that documents how we run Samza. Kafka brokers rely on page cache as their
> "cache", so taking page cache away to use for stateful stream processors
> can impact the latency and throughput of the brokers (they start hitting
> disk more).
>
> In the past, we've been able to side skirt these kinds of problems by
> either repartitioning the data, or remotely querying a system and caching
> the responses locally (to avoid a DDOS, and maintain high throughput).
>
> Cheers,
> Chris
>
> On 11/19/14 9:18 AM, "David Pick" <pickdavid@gmail.com> wrote:
>
> >Hey Milinda,
> >
> >Unfortunately, not all of our tables have the same join attribute to
> >partition on. As an example we have several tables that join to our
> >customers table through another table and since our producer is stateless
> >we have no way of knowing which customer that row was for.
> >
> >Customers => {:id int}
> >Customer_Transactions => {:customer_id int, :transaction_id int}
> >Transactions => {:id int}
> >
> >Given a schema like the one above, we could partition both the customers
> >table and the customer_transactions table on the customer_id, but we
> >couldn't partition the transactions table the same way. And from my
> >understanding of how Samza tasks work you can't consume partition 0 from
> >one topic and an arbitrary partition from another topic.
> >
> >David
> >
> >On Wed, Nov 19, 2014 at 11:04 AM, Milinda Pathirage
> ><mpathira@umail.iu.edu>
> >wrote:
> >
> >> Hi David,
> >>
> >> One way to have more control over number of partitions is to partition
> >> based on combination of shard identifier + join attribute. This will
> >>give
> >> you more control over number of partitions and you can change the
> >>number of
> >> partition based on availability of resources and amount of data process
> >>at
> >> a single node. Also this won't effect the ordering.
> >>
> >> Thanks
> >> Milinda
> >>
> >> On Wed, Nov 19, 2014 at 10:50 AM, David Pick <pickdavid@gmail.com>
> >>wrote:
> >>
> >> > Hey Martin and Milinda,
> >> >
> >> > Thanks for the quick replies!
> >> >
> >> > Our primary database is Postgres which we've sharded. To get data to
> >> Kafka
> >> > we use a tool called PGQ
> >>(https://wiki.postgresql.org/wiki/PGQ_Tutorial)
> >> > which is just a simple queueing system built inside of Postgres. So we
> >> have
> >> > a database trigger that pushes any state change into PGQ. Then a small
> >> > Clojure script picks up all the changes from the database and pushes
> >>them
> >> > into Kafka where there is a single topic called the datastream. The
> >> > datastream topic has a partition for each shard of our database so
> >>that
> >> we
> >> > can ensure messages from each individual Postgres instance come out of
> >> > Kafka in the order we're expecting.
> >> >
> >> > If my understanding is correct it seems that with our partitioning
> >>scheme
> >> > we would have a Samza task for each partition of our datastream task
> >>for
> >> a
> >> > job that was generating data for something like our search platform.
> >>But
> >> > given the amount of data we have, our LevelDB instance would get
> >> > significantly larger than a few gig. Is there a better way to
> >>partition
> >> the
> >> > data that would keep those ordering guarantees?
> >> >
> >> > For my example we'd like to publish a feed of all posts with their
> >> > associated user information merged onto the document.
> >> >
> >> > As for why we're storing the merged document, we certainly don't have
> >> to. I
> >> > was just going based on the example on the state management page (
> >> >
> >> >
> >>
> >>
> http://samza.incubator.apache.org/learn/documentation/0.7.0/container/sta
> >>te-management.html
> >> > )
> >> > which says:
> >> >
> >> > *Example: Join a table of user profiles to a table of user settings by
> >> > user_id and emit the joined stream*
> >> >
> >> > Implementation: The job subscribes to the change streams for the user
> >> > profiles database and the user settings database, both partitioned by
> >> > user_id. The job keeps a key-value store keyed by user_id, which
> >>contains
> >> > the latest profile record and the latest settings record for each
> >> user_id.
> >> > When a new event comes in from either stream, the job looks up the
> >> current
> >> > value in its store, updates the appropriate fields (depending on
> >>whether
> >> it
> >> > was a profile update or a settings update), and writes back the new
> >> joined
> >> > record to the store. The changelog of the store doubles as the output
> >> > stream of the task.
> >> >
> >> >
> >> > Thanks,
> >> >
> >> > David
> >> >
> >> > On Wed, Nov 19, 2014 at 9:10 AM, Milinda Pathirage <
> >> mpathira@umail.iu.edu>
> >> > wrote:
> >> >
> >> > > Hi David,
> >> > >
> >> > > I am also wondering why you are storing merged document in local
> >> LevelDB.
> >> > > If you need to check for duplicates, how about using a bloom filter
> >>to
> >> > > handle duplicates.
> >> > >
> >> > > Thanks
> >> > > Milinda
> >> > >
> >> > > On Wed, Nov 19, 2014 at 8:50 AM, Martin Kleppmann <
> >> martin@kleppmann.com>
> >> > > wrote:
> >> > >
> >> > > > Hi David,
> >> > > >
> >> > > > On 19 Nov 2014, at 04:52, David Pick <pickdavid@gmail.com>
wrote:
> >> > > > > First off, if this is the wrong place to ask these kinds
of
> >> questions
> >> > > > > please let me know. I tried in IRC but didn't get an answer
> >>within
> >> a
> >> > > few
> >> > > > > hours so I'm trying here.
> >> > > >
> >> > > > It's the right place, and they're good questions :)
> >> > > >
> >> > > > > I had a couple of questions around implementing a table
to table
> >> join
> >> > > > with
> >> > > > > data coming from a database changelog through Kafka.
> >> > > >
> >> > > > Cool! Out of curiosity, can I ask what database you're using
and
> >>how
> >> > > > you're getting the changelog?
> >> > > >
> >> > > > > Let's say I've got two tables users and posts in my primary
db
> >> where
> >> > > the
> >> > > > > posts table has a user_id column. I've written a Samza job
that
> >> joins
> >> > > > those
> >> > > > > two tables together by storing every user record and the
merged
> >> > > document
> >> > > > in
> >> > > > > Leveldb and then outputing the resulting document to the
> >>changelog
> >> > > Kafka
> >> > > > > topic.
> >> > > >
> >> > > > Not sure exactly what you mean with a merged document here. Is
it
> >>a
> >> > list
> >> > > > of all the posts by a particular user? Or post records augmented
> >>with
> >> > > user
> >> > > > information? Or something else?
> >> > > >
> >> > > > > Is this the right way to implement that kind of job?
> >> > > >
> >> > > > On a high level, this sounds reasonable. One detail question
is
> >> whether
> >> > > > you're using the changelog feature for the LevelDB store. If
the
> >> > contents
> >> > > > of the store can be rebuilt by reprocessing the input stream,
you
> >> could
> >> > > get
> >> > > > away with turning off the changelog on the store, and making
the
> >> input
> >> > > > stream a bootstrap stream instead. That will save some overhead
on
> >> > > writes,
> >> > > > but still give you fault tolerance.
> >> > > >
> >> > > > Another question is whether the values you're storing in LevelDB
> >>are
> >> > > > several merged records together, or whether you store each record
> >> > > > individually and use a range query to retrieve them if necessary.
> >>You
> >> > > could
> >> > > > benchmark which works best for you.
> >> > > >
> >> > > > > It seems that even
> >> > > > > with a decent partitioning scheme the leveldb instance in
each
> >>task
> >> > > will
> >> > > > > get quite large, especially if we're joining several tables
> >> together
> >> > > that
> >> > > > > have millions of rows (our real world use case would be
7 or 8
> >> tables
> >> > > > each
> >> > > > > with many millions of records).
> >> > > >
> >> > > > The goal of Samza's local key-value stores is to support a few
> >> > gigabytes
> >> > > > per partition. So millions of rows distributed across (say) tens
> >>of
> >> > > > partitions should be fine, assuming the rows aren't huge.  You
> >>might
> >> > want
> >> > > > to try the new RocksDB support, which may perform better than
> >> LevelDB.
> >> > > >
> >> > > > > Also, given a task that's processing that much data where
do you
> >> > > > > recommended running Samza? Should we spin up another set
of
> >>boxes
> >> or
> >> > is
> >> > > > it
> >> > > > > ok to run it on the Kafka brokers (I heard it mentioned
that
> >>this
> >> is
> >> > > how
> >> > > > > LinkedIn is running Samza).
> >> > > >
> >> > > > IIRC LinkedIn saw some issues with the page cache when Kafka
and
> >> > LevelDB
> >> > > > were on the same machine, which was resolved by putting them
on
> >> > different
> >> > > > machines. But I'll let one of the LinkedIn folks comment on that.
> >> > > >
> >> > > > Best,
> >> > > > Martin
> >> > > >
> >> > > >
> >> > >
> >> > >
> >> > > --
> >> > > Milinda Pathirage
> >> > >
> >> > > PhD Student | Research Assistant
> >> > > School of Informatics and Computing | Data to Insight Center
> >> > > Indiana University
> >> > >
> >> > > twitter: milindalakmal
> >> > > skype: milinda.pathirage
> >> > > blog: http://milinda.pathirage.org
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> Milinda Pathirage
> >>
> >> PhD Student | Research Assistant
> >> School of Informatics and Computing | Data to Insight Center
> >> Indiana University
> >>
> >> twitter: milindalakmal
> >> skype: milinda.pathirage
> >> blog: http://milinda.pathirage.org
> >>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message