kudu-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Clifford Resnick <cresn...@mediamath.com>
Subject Re: "broadcast" tablet replication for kudu?
Date Mon, 23 Jul 2018 14:09:59 GMT
Great! We’re on 2.11 now. I’ll do some before/after benchmarks this week.

From: Todd Lipcon <todd@cloudera.com<mailto:todd@cloudera.com>>
Reply-To: "user@kudu.apache.org<mailto:user@kudu.apache.org>" <user@kudu.apache.org<mailto:user@kudu.apache.org>>
Date: Monday, July 23, 2018 at 10:05 AM
To: "user@kudu.apache.org<mailto:user@kudu.apache.org>" <user@kudu.apache.org<mailto:user@kudu.apache.org>>
Subject: Re: "broadcast" tablet replication for kudu?

Impala 2.12. The external RPC protocol is still Thrift.


On Mon, Jul 23, 2018, 7:02 AM Clifford Resnick <cresnick@mediamath.com<mailto:cresnick@mediamath.com>>
Is this impala 3.0? I’m concerned about breaking changes and our RPC to Impala is thrift-based.

From: Todd Lipcon <todd@cloudera.com<mailto:todd@cloudera.com>>
Reply-To: "user@kudu.apache.org<mailto:user@kudu.apache.org>" <user@kudu.apache.org<mailto:user@kudu.apache.org>>
Date: Monday, July 23, 2018 at 9:46 AM
To: "user@kudu.apache.org<mailto:user@kudu.apache.org>" <user@kudu.apache.org<mailto:user@kudu.apache.org>>
Subject: Re: "broadcast" tablet replication for kudu?

Are you on the latest release of Impala? It switched from using Thrift for RPC to a new implementation
(actually borrowed from kudu) which might help broadcast performance a bit.


On Mon, Jul 23, 2018, 6:43 AM Boris Tyukin <boris@boristyukin.com<mailto:boris@boristyukin.com>>
sorry to revive the old thread but I am curious if there is a good way to speed up requests
to frequently used tables in Kudu.

On Thu, Apr 12, 2018 at 8:19 AM Boris Tyukin <boris@boristyukin.com<mailto:boris@boristyukin.com>>
bummer..After reading your guys conversation, I wish there was an easier way...we will have
the same issue as we have a few dozens of tables which are used very frequently in joins and
I was hoping there was an easy way to replicate them on most of the nodes to avoid broadcasts
every time

On Thu, Apr 12, 2018 at 7:26 AM, Clifford Resnick <cresnick@mediamath.com<mailto:cresnick@mediamath.com>>
The table in our case is 12x hashed and ranged by month, so the broadcasts were often to all
(12) nodes.

On Apr 12, 2018 12:58 AM, Mauricio Aristizabal <mauricio@impact.com<mailto:mauricio@impact.com>>
Sorry I left that out Cliff, FWIW it does seem to have been broadcast..


Not sure though how a shuffle would be much different from a broadcast if entire table is
1 file/block in 1 node.

On Wed, Apr 11, 2018 at 8:52 PM, Cliff Resnick <cresny@gmail.com<mailto:cresny@gmail.com>>
From the screenshot it does not look like there was a broadcast of the dimension table(s),
so it could be the case here that the multiple smaller sends helps. Our dim tables are generally
in the single-digit millions and Impala chooses to broadcast them. Since the fact result cardinality
is always much smaller, we've found that forcing a [shuffle] dimension join is actually faster
since it only sends dims once rather than all to all nodes. The degenerative performance of
broadcast is especially obvious when the query returns zero results. I don't have much experience
here, but it does seem that Kudu's efficient predicate scans can sometimes "break" Impala's
query plan.


On Wed, Apr 11, 2018 at 5:41 PM, Mauricio Aristizabal <mauricio@impact.com<mailto:mauricio@impact.com>>
@Todd not to belabor the point, but when I suggested breaking up small dim tables into multiple
parquet files (and in this thread's context perhaps partition kudu table, even if small, into
multiple tablets), it was to speed up joins/exchanges, not to parallelize the scan.

For example recently we ran into this slow query where the 14M record dimension fit into a
single file & block, so it got scanned on a single node though still pretty quickly (300ms),
however it caused the join to take 25+ seconds and bogged down the entire query.  See highlighted
fragment and its parent.

So we broke it into several small files the way I described in my previous post, and now join
and query are fast (6s).



On Fri, Mar 16, 2018 at 3:55 PM, Todd Lipcon <todd@cloudera.com<mailto:todd@cloudera.com>>
I suppose in the case that the dimension table scan makes a non-trivial portion of your workload
time, then yea, parallelizing the scan as you suggest would be beneficial. That said, in typical
analytic queries, scanning the dimension tables is very quick compared to scanning the much-larger
fact tables, so the extra parallelism on the dim table scan isn't worth too much.


On Fri, Mar 16, 2018 at 2:56 PM, Mauricio Aristizabal <mauricio@impactradius.com<mailto:mauricio@impactradius.com>>
@Todd I know working with parquet in the past I've seen small dimensions that fit in 1 single
file/block limit parallelism of join/exchange/aggregation nodes, and I've forced those dims
to spread across 20 or so blocks by leveraging SET PARQUET_FILE_SIZE=8m; or similar when doing
INSERT OVERWRITE to load them, which then allows these operations to parallelize across that
many nodes.

Wouldn't it be useful here for Cliff's small dims to be partitioned into a couple tablets
to similarly improve parallelism?


On Fri, Mar 16, 2018 at 2:29 PM, Todd Lipcon <todd@cloudera.com<mailto:todd@cloudera.com>>
On Fri, Mar 16, 2018 at 2:19 PM, Cliff Resnick <cresny@gmail.com<mailto:cresny@gmail.com>>
Hey Todd,

Thanks for that explanation, as well as all the great work you're doing  -- it's much appreciated!
I just have one last follow-up question. Reading about BROADCAST operations (Kudu, Spark,
Flink, etc. ) it seems the smaller table is always copied in its entirety BEFORE the predicate
is evaluated.

That's not quite true. If you have a predicate on a joined column, or on one of the columns
in the joined table, it will be pushed down to the "scan" operator, which happens before the
"exchange". In addition, there is a feature called "runtime filters" that can push dynamically-generated
filters from one side of the exchange to the other.

But since the Kudu client provides a serialized scanner as part of the ScanToken API, why
wouldn't Impala use that instead if it knows that the table is Kudu and the query has any
type of predicate? Perhaps if I hash-partition the table I could maybe force this (because
that complicates a BROADCAST)? I guess this is really a question for Impala but perhaps there
is a more basic reason.

Impala could definitely be smarter, just a matter of programming Kudu-specific join strategies
into the optimizer. Today, the optimizer isn't aware of the unique properties of Kudu scans
vs other storage mechanisms.



On Fri, Mar 16, 2018 at 4:10 PM, Todd Lipcon <todd@cloudera.com<mailto:todd@cloudera.com>>
On Fri, Mar 16, 2018 at 12:30 PM, Clifford Resnick <cresnick@mediamath.com<mailto:cresnick@mediamath.com>>
I thought I had read that the Kudu client can configure a scan for CLOSEST_REPLICA and assumed
this was a way to take advantage of data collocation.

Yea, when a client uses CLOSEST_REPLICA it will read a local one if available. However, that
doesn't influence the higher level operation of the Impala (or Spark) planner. The planner
isn't aware of the replication policy, so it will use one of the existing supported JOIN strategies.
Given statistics, it will choose to broadcast the small table, which means that it will create
a plan that looks like:

                                   |                         |
                        +---------->build      JOIN          |
                        |          |                         |
                        |          |              probe      |
                 +--------------+  +-------------------------+
                 |              |                  |
                 | Exchange     |                  |
            +----+ (broadcast   |                  |
            |    |              |                  |
            |    +--------------+                  |
            |                                      |
      +---------+                                  |
      |         |                        +-----------------------+
      |  SCAN   |                        |                       |
      |  KUDU   |                        |   SCAN (other side)   |
      |         |                        |                       |
      +---------+                        +-----------------------+

(hopefully the ASCII art comes through)

In other words, the "scan kudu" operator scans the table once, and then replicates the results
of that scan into the JOIN operator. The "scan kudu" operator of course will read its local
copy, but it will still go through the exchange process.

For the use case you're talking about, where the join is just looking up a single row by PK
in a dimension table, ideally we'd be using an altogether different join strategy such as
nested-loop join, with the inner "loop" actually being a Kudu PK lookup, but that strategy
isn't implemented by Impala.


 If this exists then how far out of context is my understanding of it? Reading about HDFS
cache replication, I do know that Impala will choose a random replica there to more evenly
distribute load. But especially compared to Kudu upsert, managing mutable data using Parquet
is painful. So, perhaps to sum thing up, if nearly 100% of my metadata scan are single Primary
Key lookups followed by a tiny broadcast then am I really just splitting hairs performance-wise
between Kudu and HDFS-cached parquet?

From:  Todd Lipcon <todd@cloudera.com<mailto:todd@cloudera.com>>
Reply-To: "user@kudu.apache.org<mailto:user@kudu.apache.org>" <user@kudu.apache.org<mailto:user@kudu.apache.org>>
Date: Friday, March 16, 2018 at 2:51 PM

To: "user@kudu.apache.org<mailto:user@kudu.apache.org>" <user@kudu.apache.org<mailto:user@kudu.apache.org>>
Subject: Re: "broadcast" tablet replication for kudu?

It's worth noting that, even if your table is replicated, Impala's planner is unaware of this
fact and it will give the same plan regardless. That is to say, rather than every node scanning
its local copy, instead a single node will perform the whole scan (assuming it's a small table)
and broadcast it from there within the scope of a single query. So, I don't think you'll see
any performance improvements on Impala queries by attempting something like an extremely high
replication count.

I could see bumping the replication count to 5 for these tables since the extra storage cost
is low and it will ensure higher availability of the important central tables, but I'd be
surprised if there is any measurable perf impact.


On Fri, Mar 16, 2018 at 11:35 AM, Clifford Resnick <cresnick@mediamath.com<mailto:cresnick@mediamath.com>>
Thanks for that, glad I was wrong there! Aside from replication considerations, is it also
recommended the number of tablet servers be odd?

I will check forums as you suggested, but from what I read after searching is that Impala
relies on user configured caching strategies using HDFS cache.  The workload for these tables
is very light write, maybe a dozen or so records per hour across 6 or 7 tables. The size of
the tables ranges from thousands to low millions of rows so so sub-partitioning would not
be required. So perhaps this is not a typical use-case but I think it could work quite well
with kudu.

From: Dan Burkert <danburkert@apache.org<mailto:danburkert@apache.org>>
Reply-To: "user@kudu.apache.org<mailto:user@kudu.apache.org>" <user@kudu.apache.org<mailto:user@kudu.apache.org>>
Date: Friday, March 16, 2018 at 2:09 PM
To: "user@kudu.apache.org<mailto:user@kudu.apache.org>" <user@kudu.apache.org<mailto:user@kudu.apache.org>>
Subject: Re: "broadcast" tablet replication for kudu?

The replication count is the number of tablet servers which Kudu will host copies on.  So
if you set the replication level to 5, Kudu will put the data on 5 separate tablet servers.
 There's no built-in broadcast table feature; upping the replication factor is the closest
thing.  A couple of things to keep in mind:

- Always use an odd replication count.  This is important due to how the Raft algorithm works.
 Recent versions of Kudu won't even let you specify an even number without flipping some flags.
- We don't test much much beyond 5 replicas.  It should work, but you may run in to issues
since it's a relatively rare configuration.  With a heavy write workload and many replicas
you are even more likely to encounter issues.

It's also worth checking in an Impala forum whether it has features that make joins against
small broadcast tables better?  Perhaps Impala can cache small tables locally when doing joins.

- Dan

On Fri, Mar 16, 2018 at 10:55 AM, Clifford Resnick <cresnick@mediamath.com<mailto:cresnick@mediamath.com>>
The problem is, AFIK, that replication count is not necessarily the distribution count, so
you can't guarantee all tablet servers will have a copy.

On Mar 16, 2018 1:41 PM, Boris Tyukin <boris@boristyukin.com<mailto:boris@boristyukin.com>>
I'm new to Kudu but we are also going to use Impala mostly with Kudu. We have a few tables
that are small but used a lot. My plan is replicate them more than 3 times. When you create
a kudu table, you can specify number of replicated copies (3 by default) and I guess you can
put there a number, corresponding to your node count in cluster. The downside, you cannot
change that number unless you recreate a table.

On Fri, Mar 16, 2018 at 10:42 AM, Cliff Resnick <cresny@gmail.com<mailto:cresny@gmail.com>>
We will soon be moving our analytics from AWS Redshift to Impala/Kudu. One Redshift feature
that we will miss is its ALL Distribution, where a copy of a table is maintained on each server.
We define a number of metadata tables this way since they are used in nearly every query.
We are considering using parquet in HDFS cache for these, and Kudu would be a much better
fit for the update semantics but we are worried about the additional contention.  I'm wondering
if having a Broadcast, or ALL, tablet replication might be an easy feature to add to Kudu?


Todd Lipcon
Software Engineer, Cloudera

Todd Lipcon
Software Engineer, Cloudera

Todd Lipcon
Software Engineer, Cloudera

Architect - Business Intelligence + Data Science
mauricio@impactradius.com<mailto:mauricio@impactradius.com>(m)+1 323 309 4260<tel:(323)%20309-4260>
223 E. De La Guerra St. | Santa Barbara, CA 93101<https://maps.google.com/?q=223+E.+De+La+Guerra+St.+%7C+Santa+Barbara,+CA+93101&entry=gmail&source=g>
Overview<http://www.impactradius.com/?src=slsap> | Twitter<https://twitter.com/impactradius>
| Facebook<https://www.facebook.com/pages/Impact-Radius/153376411365183> | LinkedIn<https://www.linkedin.com/company/impact-radius-inc->

Todd Lipcon
Software Engineer, Cloudera

Mauricio Aristizabal
Architect - Data Pipeline

M  323 309 4260
E  mauricio@impact.com<http://mauricio@impact.com>  |  W  https://impact.com
 [http://storage.googleapis.com/signaturesatori/icons/facebook.png] <https://www.facebook.com/ImpactMarTech/>
  [http://storage.googleapis.com/signaturesatori/icons/twitter.png] <https://twitter.com/impactmartech>

Mauricio Aristizabal
Architect - Data Pipeline

M  323 309 4260
E  mauricio@impact.com<http://mauricio@impact.com>  |  W  https://impact.com
 [http://storage.googleapis.com/signaturesatori/icons/facebook.png] <https://www.facebook.com/ImpactMarTech/>
  [http://storage.googleapis.com/signaturesatori/icons/twitter.png] <https://twitter.com/impactmartech>

View raw message