kudu-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jean-Daniel Cryans <jdcry...@apache.org>
Subject Re: Spark kudu - partitioning, filter, a whole lot of upserts
Date Thu, 08 Jun 2017 18:09:23 GMT
Hi Frank,

Q3 is really the main problem I would focus on here. Architecturally, Kudu
was built to handle workloads that consist mostly of inserts and scans,
with a sprinkle of updates and random reads. Not only that, but we also
haven't been spending much time (if it all) optimizing the update code path.

So, upsert when updating will indeed update all the columns which turns
your use case from something that should work into something that plays
directly into Kudu's weaknesses. Another thing I'm aware of is that the
maintenance manager is bad at scheduling flushes quickly, so if you do a
ton of updates you'll probably fill the DeltaMemStores but it won't flush
fast enough so you'll end up with a lot of memory backpressure (and
eventually timeouts). FWIW this is better in the upcoming 1.4.0, but still
likely not ideal.

Relatedly, see this comment from Dan on how we could make your use case

Hope this helps,


On Thu, Jun 8, 2017 at 6:03 AM, Frank Heimerzheim <fh.ordix@gmail.com>

> Hello,
> we are using kudu 1.2 on 6 worker/data-nodes and 3 high available kudu
> master nodes. Development is with python 2.7.12 against spark 2.1.1. To
> connect kudu with spark we use the kudu-spark2_2.11-1.2.0.jar.
> Our data is about 600 GB large with about 1,5 billions of records.
> Partitioning is with 100 hash partitions on a string typed-PK and
> additional 2 range partitions for 2016 and 2017. Additional PKs are day and
> month. Data is evenly distributed from 04/2016 to 06/2017 and will raise
> about 100 millions records a month.
> Essentially there are three use cases:
> 1. inserting about 5 millions records every working day
> a spark job is reading *.csv, mingle the data and write them to kudu with
> df.write.format('org.apache.kudu.spark.kudu').option('kudu.master',
> self.kudu_master).option('kudu.table', self.target_table).mode('
> append').save()
> 2. reading data for the last 400 calender days
> df=self.sqlContext.read.format('org.apache.kudu.spark.kudu').option('kudu.master',
> self.kudu_master).option('kudu.table', self.target_table).load()
> rdd = df.rdd.map(lambda x: whatever(x))
> rdd_400 = rdd.filter(lambda x: datetime.strptime(x.b_date, '%Y%m%d') >
> today_minus_400_days)
> 3. updating three columns for all 1,5 billion records
> About twice or three times a year it is necessary to recalculate values
> saved in three (non-PK) columns. For this we read the whole table in a
> spark job (like 2), recompute the values and writing them back to the same
> table (like 3).
> My questions:
> Q1: Is the choosen type of partitioning suitable for this kind of use case
> and data? I think the way to write the data is a suitable one.
> Q2: Is the  way to read the data a suitable one? Will the filter make sure
> that only data for the 400 days is physically read from kudu?
> Q3: Will only the altered three collumns be physically written? Or all
> complete records as the write() suggests. On another use case we upsert
> about 10% of 5 million records every day. Data here is coming from another
> source, not the same table there will be written into later on. Is kudu
> able to handle such massive upserts?
> Thanks very much for your opinions and suggestions.
> Greetings
> Frank

View raw message