samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Pick <pickda...@gmail.com>
Subject Re: Question on table to table joins
Date Wed, 19 Nov 2014 17:18:27 GMT
Hey Milinda,

Unfortunately, not all of our tables have the same join attribute to
partition on. As an example we have several tables that join to our
customers table through another table and since our producer is stateless
we have no way of knowing which customer that row was for.

Customers => {:id int}
Customer_Transactions => {:customer_id int, :transaction_id int}
Transactions => {:id int}

Given a schema like the one above, we could partition both the customers
table and the customer_transactions table on the customer_id, but we
couldn't partition the transactions table the same way. And from my
understanding of how Samza tasks work you can't consume partition 0 from
one topic and an arbitrary partition from another topic.

David

On Wed, Nov 19, 2014 at 11:04 AM, Milinda Pathirage <mpathira@umail.iu.edu>
wrote:

> Hi David,
>
> One way to have more control over number of partitions is to partition
> based on combination of shard identifier + join attribute. This will give
> you more control over number of partitions and you can change the number of
> partition based on availability of resources and amount of data process at
> a single node. Also this won't effect the ordering.
>
> Thanks
> Milinda
>
> On Wed, Nov 19, 2014 at 10:50 AM, David Pick <pickdavid@gmail.com> wrote:
>
> > Hey Martin and Milinda,
> >
> > Thanks for the quick replies!
> >
> > Our primary database is Postgres which we've sharded. To get data to
> Kafka
> > we use a tool called PGQ (https://wiki.postgresql.org/wiki/PGQ_Tutorial)
> > which is just a simple queueing system built inside of Postgres. So we
> have
> > a database trigger that pushes any state change into PGQ. Then a small
> > Clojure script picks up all the changes from the database and pushes them
> > into Kafka where there is a single topic called the datastream. The
> > datastream topic has a partition for each shard of our database so that
> we
> > can ensure messages from each individual Postgres instance come out of
> > Kafka in the order we're expecting.
> >
> > If my understanding is correct it seems that with our partitioning scheme
> > we would have a Samza task for each partition of our datastream task for
> a
> > job that was generating data for something like our search platform. But
> > given the amount of data we have, our LevelDB instance would get
> > significantly larger than a few gig. Is there a better way to partition
> the
> > data that would keep those ordering guarantees?
> >
> > For my example we'd like to publish a feed of all posts with their
> > associated user information merged onto the document.
> >
> > As for why we're storing the merged document, we certainly don't have
> to. I
> > was just going based on the example on the state management page (
> >
> >
> http://samza.incubator.apache.org/learn/documentation/0.7.0/container/state-management.html
> > )
> > which says:
> >
> > *Example: Join a table of user profiles to a table of user settings by
> > user_id and emit the joined stream*
> >
> > Implementation: The job subscribes to the change streams for the user
> > profiles database and the user settings database, both partitioned by
> > user_id. The job keeps a key-value store keyed by user_id, which contains
> > the latest profile record and the latest settings record for each
> user_id.
> > When a new event comes in from either stream, the job looks up the
> current
> > value in its store, updates the appropriate fields (depending on whether
> it
> > was a profile update or a settings update), and writes back the new
> joined
> > record to the store. The changelog of the store doubles as the output
> > stream of the task.
> >
> >
> > Thanks,
> >
> > David
> >
> > On Wed, Nov 19, 2014 at 9:10 AM, Milinda Pathirage <
> mpathira@umail.iu.edu>
> > wrote:
> >
> > > Hi David,
> > >
> > > I am also wondering why you are storing merged document in local
> LevelDB.
> > > If you need to check for duplicates, how about using a bloom filter to
> > > handle duplicates.
> > >
> > > Thanks
> > > Milinda
> > >
> > > On Wed, Nov 19, 2014 at 8:50 AM, Martin Kleppmann <
> martin@kleppmann.com>
> > > wrote:
> > >
> > > > Hi David,
> > > >
> > > > On 19 Nov 2014, at 04:52, David Pick <pickdavid@gmail.com> wrote:
> > > > > First off, if this is the wrong place to ask these kinds of
> questions
> > > > > please let me know. I tried in IRC but didn't get an answer within
> a
> > > few
> > > > > hours so I'm trying here.
> > > >
> > > > It's the right place, and they're good questions :)
> > > >
> > > > > I had a couple of questions around implementing a table to table
> join
> > > > with
> > > > > data coming from a database changelog through Kafka.
> > > >
> > > > Cool! Out of curiosity, can I ask what database you're using and how
> > > > you're getting the changelog?
> > > >
> > > > > Let's say I've got two tables users and posts in my primary db
> where
> > > the
> > > > > posts table has a user_id column. I've written a Samza job that
> joins
> > > > those
> > > > > two tables together by storing every user record and the merged
> > > document
> > > > in
> > > > > Leveldb and then outputing the resulting document to the changelog
> > > Kafka
> > > > > topic.
> > > >
> > > > Not sure exactly what you mean with a merged document here. Is it a
> > list
> > > > of all the posts by a particular user? Or post records augmented with
> > > user
> > > > information? Or something else?
> > > >
> > > > > Is this the right way to implement that kind of job?
> > > >
> > > > On a high level, this sounds reasonable. One detail question is
> whether
> > > > you're using the changelog feature for the LevelDB store. If the
> > contents
> > > > of the store can be rebuilt by reprocessing the input stream, you
> could
> > > get
> > > > away with turning off the changelog on the store, and making the
> input
> > > > stream a bootstrap stream instead. That will save some overhead on
> > > writes,
> > > > but still give you fault tolerance.
> > > >
> > > > Another question is whether the values you're storing in LevelDB are
> > > > several merged records together, or whether you store each record
> > > > individually and use a range query to retrieve them if necessary. You
> > > could
> > > > benchmark which works best for you.
> > > >
> > > > > It seems that even
> > > > > with a decent partitioning scheme the leveldb instance in each task
> > > will
> > > > > get quite large, especially if we're joining several tables
> together
> > > that
> > > > > have millions of rows (our real world use case would be 7 or 8
> tables
> > > > each
> > > > > with many millions of records).
> > > >
> > > > The goal of Samza's local key-value stores is to support a few
> > gigabytes
> > > > per partition. So millions of rows distributed across (say) tens of
> > > > partitions should be fine, assuming the rows aren't huge.  You might
> > want
> > > > to try the new RocksDB support, which may perform better than
> LevelDB.
> > > >
> > > > > Also, given a task that's processing that much data where do you
> > > > > recommended running Samza? Should we spin up another set of boxes
> or
> > is
> > > > it
> > > > > ok to run it on the Kafka brokers (I heard it mentioned that this
> is
> > > how
> > > > > LinkedIn is running Samza).
> > > >
> > > > IIRC LinkedIn saw some issues with the page cache when Kafka and
> > LevelDB
> > > > were on the same machine, which was resolved by putting them on
> > different
> > > > machines. But I'll let one of the LinkedIn folks comment on that.
> > > >
> > > > Best,
> > > > Martin
> > > >
> > > >
> > >
> > >
> > > --
> > > Milinda Pathirage
> > >
> > > PhD Student | Research Assistant
> > > School of Informatics and Computing | Data to Insight Center
> > > Indiana University
> > >
> > > twitter: milindalakmal
> > > skype: milinda.pathirage
> > > blog: http://milinda.pathirage.org
> > >
> >
>
>
>
> --
> Milinda Pathirage
>
> PhD Student | Research Assistant
> School of Informatics and Computing | Data to Insight Center
> Indiana University
>
> twitter: milindalakmal
> skype: milinda.pathirage
> blog: http://milinda.pathirage.org
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message