impala-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Philipp Krause <philippkrause.m...@googlemail.com>
Subject Re: Local join instead of data exchange - co-located blocks
Date Tue, 24 Apr 2018 12:31:16 GMT
To prevent the broadcast join I could simply use the shuffle operator in 
the query:

SELECT * FROM business_partition_1 INNER JOIN [SHUFFLE] 
business_partition_2 WHERE 
business_partition_1.businessid=business_partition_2.businessid

I think the broadcast is currently only used because of my very small 
test tables.

This gives me the plan attached as partitioned_shuffle.png. Since my 
modified version isn't working yet, I partitioned both tables on 
businessid in Impala. The "hack" should only help to get into the 
if-condition if I partition the data manually, right?. But in this case 
(if the partitioning is done by Impala itself) Impala should get into 
the if-condition anyway. Unfortunately I can't see a difference in the 
plan compared to my unpartitioned tables (unpartitioned.png) concerning 
the exchange nodes. My goal is actually to get rid of all exchange nodes 
since the corresponding data is already present on each node. Actually 
the plan should look the same except for the 03 and 04 exchange then.

Are there other changes neccessary to just take Table1-partition/block X 
and Table2-partition/block Y on each node and join them without any data 
exchange? Actually each node should take all its local blocks for both 
tables, join them and pass the results back to the coordinator where all 
results come together (please see explanation.png).

I'm looking forward hearing from you. I hope we can realise this.

Best regards
Philipp


Am 24.04.2018 um 07:03 schrieb Alexander Behm:
> On Mon, Apr 23, 2018 at 7:24 PM, Philipp Krause 
> <philippkrause.mail@googlemail.com 
> <mailto:philippkrause.mail@googlemail.com>> wrote:
>
>     Hi Alex,
>     thanks for the information! I've compiled the cdh5.13.1-release
>     and replaced impala-frontend-0.1-SNAPSHOT.jar (which seems to
>     include the changes in the DistributedPlanner.java). There still
>     seems to to be a method missing after replacing the jar but I'll
>     try to figure that out.
>
>     I have two questions concerning the code fragment in the
>     DistributedPlanner.java you pointed to me.
>
>     First:
>     The attached graphic shows the query plan for two tables which are
>     partitioned on the join attribute (standard impala version without
>     any changes). If I change the if-condition to true for my modified
>     version I expect to get the same result for my "hand made"
>     partitions (outside of impala). But why is there an exchange
>     broadcast (even in the standard version)? I mean, if I have a
>     partition of Table 1 with ID=0 on Node X why is there a broadcast
>     of the partition of Table 2 with ID=0 on Node Y? Actually this
>     partition only has to be sent to Node X where the matching
>     partition of Table 1 is located (instead of sending it to all
>     nodes (broadcast)). Or is this exactly the case and it's only
>     shown as a broadcast here in the graphics?
>
>
> You are reading the plan correctly. Impala simply does not implement 
> that optimization.
>
>
>     Second:
>     All corresponding blocks of the partitioned tables are on the same
>     node (e.g. Table 1 Partition with ID=0, Table 2 Partition with
>     ID=0 => Node 1 etc.). This is what I did manually. As already
>     mentioned before I want to join these partitions (blocks) locally
>     on each node. But if I'm correct, the modification in the
>     DistributedPlanner will also only lead to the plan in the attached
>     graphic so that no further exchanges are created if I use my "hand
>     made" partitions. But there is still the broadcast exchange which
>     distributes the partitions across the cluster which isn't
>     neccessary because all needed blocks are already on the same node
>     and are ready to get joined locally. Is there a way to realise
>     that and get rid of the broadcast exchange?
>
> You are right. That hack in DistributedPlanner only works for 
> partitioned hash joins. You can probably stitch the plan together at 
> an earlier place, e.g.:
> https://github.com/apache/impala/blob/master/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java#L506
>
>     Please correct me if I'm wrong with my assumptions.
>
>     Thank you very much!
>
>     Best regards
>     Philipp
>
>
>
>
>
>
>
>     Am 18.04.2018 um 07:16 schrieb Alexander Behm:
>>     Your CM-managed cluster must be running a "compatible" Impala
>>     version already for this trick to work. It looks like your
>>     catalog binary is trying to find a method which does not exist in
>>     the .jar, presumably because your .jar is built based on a
>>     different version of Impala where that method does not exist
>>     anymore.
>>
>>     It looks like you have CDH 5.13.3 installed. CDH 5.13.3 is based
>>     on Impala 2.10, see:
>>     https://www.cloudera.com/documentation/enterprise/release-notes/topics/cdh_vd_cdh_package_tarball_513.html#cm_vd_cdh_package_tarball_513
>>     <https://www.cloudera.com/documentation/enterprise/release-notes/topics/cdh_vd_cdh_package_tarball_513.html#cm_vd_cdh_package_tarball_513>
>>
>>     That means this binary copying trick will only work with a
>>     modified version of Impala 2.10, and very likely will not work
>>     with a different version.
>>
>>     It's probably easier to test with the mini cluster first.
>>     Alternatively, it "might" work if you replace all the binaries
>>     mentioned above, but it's quite possible that will not work.
>>
>>     On Sun, Apr 15, 2018 at 7:13 PM, Philipp Krause
>>     <philippkrause.mail@googlemail.com
>>     <mailto:philippkrause.mail@googlemail.com>> wrote:
>>
>>         Hi Alex! Thank you for the list! The build of the modified
>>         cdh5-trunk branch (debug mode) was sucessfull. After
>>         replacing "impala-frontend-0.1-SNAPSHOT.jar" in
>>         /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/jars/ I got
>>         the following error in my existing cluster:
>>         F0416 01:16:45.402997 17897 catalog.cc:69] NoSuchMethodError:
>>         getCatalogObjects
>>         When I switch back to the original jar file the error is
>>         gone. So it must be something wrong with this file I guess.
>>         But I wonder about the error in catalog.cc because I didn't
>>         touch any .cc files.
>>
>>         I also replaced "impala-data-source-api-1.0-SNAPSHOT.jar".
>>         The other jar files do not exist in my impala installation
>>         (CDH-5.13.1).
>>
>>         What am I doing wrong?
>>
>>         Best regards
>>         Philipp
>>
>>
>>         Am 13.04.2018 um 20:12 schrieb Alexander Behm:
>>>         Here's the foll list. It might not be minimal, but
>>>         copying/overwriting these should work.
>>>
>>>         debug/service/impalad
>>>         debug/service/libfesupport.so
>>>         debug/service/libService.a
>>>         release/service/impalad
>>>         release/service/libfesupport.so
>>>         release/service/libService.a
>>>         yarn-extras-0.1-SNAPSHOT.jar
>>>         impala-data-source-api-1.0-SNAPSHOT-sources.jar
>>>         impala-data-source-api-1.0-SNAPSHOT.jar
>>>         impala-frontend-0.1-SNAPSHOT-tests.jar
>>>         impala-frontend-0.1-SNAPSHOT.jar
>>>         libkudu_client.so.0.1.0
>>>         libstdc++.so.6.0.20
>>>         impala-no-sse.bc
>>>         impala-sse.bc
>>>         libimpalalzo.so
>>>
>>>         If you are only modifying the Java portion (like
>>>         DistributedPlanner), then only copying/replacing the *.jar
>>>         files should be sufficient.
>>>
>>>         On Fri, Apr 13, 2018 at 11:00 AM, Philipp Krause
>>>         <philippkrause.mail@googlemail.com
>>>         <mailto:philippkrause.mail@googlemail.com>> wrote:
>>>
>>>             Yes, I have a running (virtual) cluster. I would try to
>>>             follow your way with the custom impala build
>>>             (DistributedPlanner.java is the only modified file at
>>>             the moment). Thank you in advance for the file list!
>>>
>>>             Best regards
>>>             Philipp
>>>
>>>             Alexander Behm <alex.behm@cloudera.com
>>>             <mailto:alex.behm@cloudera.com>> schrieb am Fr., 13.
>>>             Apr. 2018, 18:45:
>>>
>>>                 I'm not really following your installation/setup and
>>>                 am not an expert on Cloudera Manager
>>>                 installation/config. If you are going to build
>>>                 Impala anyway, it's probably easiest to test on
>>>                 Impala's minicluster first.
>>>
>>>                 In general, if you have a running Cloudera Managed
>>>                 cluster, you can deploy a custom Impala build by
>>>                 simply overwriting the Impala existing binaries and
>>>                 jars with the new build. If you want to go this
>>>                 route, I can give you a full list of files you need
>>>                 to replace.
>>>
>>>                 On Tue, Apr 10, 2018 at 11:44 AM, Philipp Krause
>>>                 <philippkrause.mail@googlemail.com
>>>                 <mailto:philippkrause.mail@googlemail.com>> wrote:
>>>
>>>                     Thank you for the explanation! Yes, I'm using
>>>                     HDFS. The single replica setup is only for test
>>>                     purposes at the moment. I think this makes it
>>>                     easier to gain some first results since less
>>>                     modifications (scheduler etc.) are neccessary.
>>>                     I would like to test the DistributedPlanner
>>>                     modification in my virtual cluster. I used a
>>>                     customized Vagrant script to install Impala on
>>>                     multiple hosts (s.attachment). It simply
>>>                     installs cloudera-manager-server-db,
>>>                     cloudera-manager-server and
>>>                     cloudera-manager-daemons via apt-get. What would
>>>                     be the simplest solution to setup my modified
>>>                     version? Could I simply call ./buildall.sh and
>>>                     change the script to sth. like this?
>>>
>>>                     echo "Install java..."
>>>                     apt-get -q -y --force-yes install oracle-j2sdk1.7
>>>                     echo "Download impala..."
>>>                     wget https://... where I uploaded my modified
>>>                     version
>>>                     echo "Extract impala..."
>>>                     tar -xvzf Impala-cdh5-trunk.tar.gz
>>>                     cd Impala-cdh5-trunk
>>>                     echo "Build impala..."
>>>                     ./buildall.sh
>>>                     echo "Start impala instances..."
>>>                     service cloudera-scm-server-db initdb
>>>                     service cloudera-scm-server-db start
>>>                     service cloudera-scm-server start
>>>
>>>                     Or is there another, maybe even easier method,
>>>                     to test the code? Maybe via
>>>                     bootstrap_development.sh / minicluster?
>>>
>>>                     Best regards
>>>                     Philipp
>>>
>>>                     2018-04-05 18:39 GMT+02:00 Alexander Behm
>>>                     <alex.behm@cloudera.com
>>>                     <mailto:alex.behm@cloudera.com>>:
>>>
>>>                         Apologies for the late response. Btw, your
>>>                         previous post was clear enough to me, so no
>>>                         worries :)
>>>
>>>
>>>                         On Wed, Apr 4, 2018 at 7:46 AM, Philipp
>>>                         Krause <philippkrause.mail@googlemail.com
>>>                         <mailto:philippkrause.mail@googlemail.com>>
>>>                         wrote:
>>>
>>>                             Hello Alex,
>>>
>>>                             I think my previous post has been too
>>>                             long and confusing. I apologize for that!
>>>
>>>                             If replicas are completely deactivated,
>>>                             all scan ranges of a block are mapped to
>>>                             the one host, where the block is located
>>>                             on. This host is the "executor"/reader
>>>                             for all the scan ranges of this block.
>>>                             Is that correct?
>>>
>>>
>>>                         Yes, assuming you are using HDFS.
>>>
>>>
>>>                             I tried to visualize my understanding of
>>>                             the scan_range to host mapping for my
>>>                             use case (s. attachment). Could you
>>>                             please have a quick look at it and tell
>>>                             me if this is correct?
>>>
>>>                             "The existing scan range assignment is
>>>                             scan-node centric. For each scan node,
>>>                             we independently decide which of its
>>>                             scan ranges should be processed by which
>>>                             host."
>>>                             Without replicas, all scan ranges of a
>>>                             block would be assigend to the same host
>>>                             where this block is located on. Isn't
>>>                             everything local here, so that Table_A -
>>>                             Block_0 and Table_B - Block_0 can be
>>>                             joined local or are further steps
>>>                             neccessary? The condition in the
>>>                             DistributedPlanner you pointed to me is
>>>                             set to false (no exchange nodes).
>>>
>>>                             "You want it to be host-centric. For
>>>                             each host, collect the local scan ranges
>>>                             of *all* scan nodes, and assign them to
>>>                             that host."
>>>                             Wouldn't the standard setup from above
>>>                             work? Wouldn't I assign all (the same)
>>>                             scan ranges to each host in this case here?
>>>
>>>
>>>                         The standard setup works only in if every
>>>                         block only has exactly one replica. For our
>>>                         purposes, that is basically never the case
>>>                         (who would store production data without
>>>                         replication?), so the single-replica
>>>                         assumption was not clear to me.
>>>
>>>                         Does your current setup (only changing the
>>>                         planner and not the scheduler) produce the
>>>                         expected results?
>>>
>>>
>>>
>>>                             Thank you very much!
>>>
>>>                             Best regards
>>>                             Philipp
>>>
>>>                             2018-03-28 21:04 GMT+02:00 Philipp
>>>                             Krause
>>>                             <philippkrause.mail@googlemail.com
>>>                             <mailto:philippkrause.mail@googlemail.com>>:
>>>
>>>                                 Thank you for your answer and sorry
>>>                                 for my delay!
>>>
>>>                                 If my understanding is correct, the
>>>                                 list of scan nodes consists of all
>>>                                 nodes which contain a *local* block
>>>                                 from a table that is needed for the
>>>                                 query (Assumption: I have no
>>>                                 replicas in my first tests). If
>>>                                 TableA-Block0 is on Node_0, isn't
>>>                                 Node_0 automatically a scan node?
>>>                                 And wouldn't this scan node always
>>>                                 be the host for the complete scan
>>>                                 range(s) then?
>>>
>>>                                 "For each scan node, we
>>>                                 independently decide which of its
>>>                                 scan ranges should be processed by
>>>                                 which host."
>>>
>>>                                 https://github.com/cloudera/Impala/blob/cdh5-trunk/be/src/scheduling/scheduler.cc#L532
>>>                                 <https://github.com/cloudera/Impala/blob/cdh5-trunk/be/src/scheduling/scheduler.cc#L532>
>>>
>>>                                 // Loop over all scan ranges, select
>>>                                 an executor for those with local
>>>                                 impalads and
>>>                                 // collect all others for later
>>>                                 processing.
>>>
>>>                                 So in this whole block, scan ranges
>>>                                 are assigned to the closest executor
>>>                                 (=host?). But isn't the closest
>>>                                 executor always the node the block
>>>                                 is located on (assumed impalad is
>>>                                 installed and I have no replicas)?
>>>                                 And isn't this node always a scan
>>>                                 node at the same time? Otherwise a
>>>                                 thread on a remote host had to read
>>>                                 the corresponding scan range, which
>>>                                 would be more expensive. The only
>>>                                 exception I can think of is when all
>>>                                 threads on the local node are busy.
>>>                                 Or, if I use replicas and all other
>>>                                 threads of my node with the
>>>                                 "original" block are busy, a thread
>>>                                 on another node which contains a
>>>                                 replica could read a special scan
>>>                                 range of its local block. Is my
>>>                                 understanding correct here?
>>>
>>>                                 Aren't all scan ranges read locally
>>>                                 by its scan nodes if I have impalad
>>>                                 installed on all nodes? And am I
>>>                                 right, that the scan range is only
>>>                                 based on its length which refers to
>>>                                 maxScanRangeLength in
>>>                                 computeScanRangeLocations?
>>>                                 https://github.com/cloudera/Impala/blob/cdh5-trunk/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java#L721
>>>                                 <https://github.com/cloudera/Impala/blob/cdh5-trunk/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java#L721>
>>>
>>>                                 I hope you can help me with the scan
>>>                                 node <-> scan range->host
>>>                                 relationship. If I have
>>>                                 Table_A-Block_0 and Table_B_Block_0
>>>                                 on the same node (which I want to
>>>                                 join locally), I don't get the point
>>>                                 of why scan ranges could be assigned
>>>                                 to another host in my scenario.
>>>
>>>                                 Best regads and thank you very much!
>>>                                 Philipp Krause
>>>
>>>
>>>                                 Am 21.03.2018 um 05:21 schrieb
>>>                                 Alexander Behm:
>>>>                                 Thanks for following up. I think I
>>>>                                 understand your setup.
>>>>
>>>>                                 If you want to not think about scan
>>>>                                 ranges, then you can modify
>>>>                                 HdfsScanNode.computeScanRangeLocations().
>>>>                                 For example, you could change it to
>>>>                                 produce one scan range per file or
>>>>                                 per HDFS block. That way you'd know
>>>>                                 exactly what a scan range
>>>>                                 corresponds to.
>>>>
>>>>                                 I think the easiest/fastest way for
>>>>                                 you to make progress is to
>>>>                                 re-implement the existing scan
>>>>                                 range assignment logic in that
>>>>                                 place in the code I had pointed you
>>>>                                 to. There is no quick fix to change
>>>>                                 the existing behavior.
>>>>                                 The existing scan range assignment
>>>>                                 is scan-node centric. For each scan
>>>>                                 node, we independently decide which
>>>>                                 of its scan ranges should be
>>>>                                 processed by which host.
>>>>
>>>>                                 I believe an algorithm to achieve
>>>>                                 your goal would look completely
>>>>                                 different. You want it to be
>>>>                                 host-centric. For each host,
>>>>                                 collect the local scan ranges of
>>>>                                 *all* scan nodes, and assign them
>>>>                                 to that host.
>>>>
>>>>                                 Does that make sense?
>>>>
>>>>                                 Alex
>>>>
>>>>
>>>>                                 On Mon, Mar 19, 2018 at 1:02 PM,
>>>>                                 Philipp Krause
>>>>                                 <philippkrause.mail@googlemail.com
>>>>                                 <mailto:philippkrause.mail@googlemail.com>>
>>>>                                 wrote:
>>>>
>>>>                                     I'd like to provide a small
>>>>                                     example for our purpose. The
>>>>                                     last post may be a bit
>>>>                                     confusing, so here's a very
>>>>                                     simple example in the attached
>>>>                                     pdf file. I hope, it's
>>>>                                     understandable. Otherwise,
>>>>                                     please give me a short feedback.
>>>>
>>>>                                     Basically, I only want each
>>>>                                     data node to join all it's
>>>>                                     local blocks. Is there a range
>>>>                                     mapping needed or is it
>>>>                                     possible to easily join all
>>>>                                     local blocks (regardless of its
>>>>                                     content) since everything is
>>>>                                     already "prepared"? Maybe you
>>>>                                     can clarify this for me.
>>>>
>>>>                                     As you can see in the example,
>>>>                                     the tables are not partitioned
>>>>                                     by ID. The files are manually
>>>>                                     prepared by the help of the
>>>>                                     modulo function. So I don't
>>>>                                     have a range like [0,10], but
>>>>                                     something like 0,5,10,15 etc.
>>>>
>>>>                                     I hope, I didn't make it too
>>>>                                     complicated and confusing. I
>>>>                                     think, the actual idea behind
>>>>                                     this is really simple and I
>>>>                                     hope you can help me to get
>>>>                                     this working.
>>>>
>>>>                                     Best regards and thank you very
>>>>                                     much for your time!
>>>>                                     Philipp
>>>>
>>>>
>>>>                                     Am 18.03.2018 um 17:32 schrieb
>>>>                                     Philipp Krause:
>>>>>
>>>>>                                     Hi! At the moment the data to
>>>>>                                     parquet (block) mapping is
>>>>>                                     based on a simple modulo
>>>>>                                     function: Id % #data_nodes. So
>>>>>                                     with 5 data nodes all rows
>>>>>                                     with Id's 0,5,10,... are
>>>>>                                     written to Parquet_0, Id's
>>>>>                                     1,4,9 are written to Parquet_1
>>>>>                                     etc. That's what I did
>>>>>                                     manually. Since the parquet
>>>>>                                     file size and the block size
>>>>>                                     are both set to 64MB, each
>>>>>                                     parquet file will result in
>>>>>                                     one block when I transfer the
>>>>>                                     parquet files to HDFS. By
>>>>>                                     default, HDFS distributes the
>>>>>                                     blocks randomly. For test
>>>>>                                     purposes I transferred
>>>>>                                     corresponding blocks from
>>>>>                                     Table_A and Table_B to the
>>>>>                                     same data node (Table_A -
>>>>>                                     Block_X with Id's 0,5,10 and
>>>>>                                     Table_B - Block_Y with Id's
>>>>>                                     0,5,10). In this case, they
>>>>>                                     are transferred to data_node_0
>>>>>                                     because the modulo function
>>>>>                                     (which I want to implement in
>>>>>                                     the scheduler) returns 0 for
>>>>>                                     these Id's. This is also done
>>>>>                                     manually at the moment.
>>>>>
>>>>>                                     1.) DistributedPlanner: For
>>>>>                                     first, upcoming tests I simply
>>>>>                                     changed the first condition in
>>>>>                                     the DistributedPlanner to true
>>>>>                                     to avoid exchange nodes.
>>>>>
>>>>>                                     2.) The scheduler: That's the
>>>>>                                     part I'm currently struggling
>>>>>                                     with. For first tests, block
>>>>>                                     replication is deactivated.
>>>>>                                     I'm not sure how / where to
>>>>>                                     implement the modulo function
>>>>>                                     for scan range to host
>>>>>                                     mapping. Without the modulo
>>>>>                                     function, I had to implement a
>>>>>                                     hard coded mapping (something
>>>>>                                     like "range" 0-0, 5-5, 10-10
>>>>>                                     -> Data_node_0 etc.). Is that
>>>>>                                     correct? Instead I would like
>>>>>                                     to use a slightly more
>>>>>                                     flexible solution by the help
>>>>>                                     of this modulo function for
>>>>>                                     the host mapping.
>>>>>
>>>>>                                     I would be really grateful if
>>>>>                                     you could give me a hint for
>>>>>                                     the scheduling implementation.
>>>>>                                     I try to go deeper through the
>>>>>                                     code meanwhile.
>>>>>
>>>>>                                     Best regards and thank you in
>>>>>                                     advance
>>>>>                                     Philipp
>>>>>
>>>>>
>>>>>                                     Am 14.03.2018 um 08:06 schrieb
>>>>>                                     Philipp Krause:
>>>>>>                                     Thank you very much for these
>>>>>>                                     information! I'll try to
>>>>>>                                     implement these two steps and
>>>>>>                                     post some updates within the
>>>>>>                                     next days!
>>>>>>
>>>>>>                                     Best regards
>>>>>>                                     Philipp
>>>>>>
>>>>>>                                     2018-03-13 5:38 GMT+01:00
>>>>>>                                     Alexander Behm
>>>>>>                                     <alex.behm@cloudera.com
>>>>>>                                     <mailto:alex.behm@cloudera.com>>:
>>>>>>
>>>>>>                                         Cool that you working on
>>>>>>                                         a research project with
>>>>>>                                         Impala!
>>>>>>
>>>>>>                                         Properly adding such a
>>>>>>                                         feature to Impala is a
>>>>>>                                         substantial effort, but
>>>>>>                                         hacking the code for an
>>>>>>                                         experiment or two seems
>>>>>>                                         doable.
>>>>>>
>>>>>>                                         I think you will need to
>>>>>>                                         modify two things: (1)
>>>>>>                                         the planner to not add
>>>>>>                                         exchange nodes, and (2)
>>>>>>                                         the scheduler to assign
>>>>>>                                         the co-located scan
>>>>>>                                         ranges to the same host.
>>>>>>
>>>>>>                                         Here are a few starting
>>>>>>                                         points in the code:
>>>>>>
>>>>>>                                         1) DistributedPlanner
>>>>>>                                         https://github.com/apache/impala/blob/master/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java#L318
>>>>>>                                         <https://github.com/apache/impala/blob/master/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java#L318>
>>>>>>
>>>>>>                                         The first condition
>>>>>>                                         handles the case where no
>>>>>>                                         exchange nodes need to be
>>>>>>                                         added because the join
>>>>>>                                         inputs are already
>>>>>>                                         suitably partitioned.
>>>>>>                                         You could hack the code
>>>>>>                                         to always go into that
>>>>>>                                         codepath, so no exchanges
>>>>>>                                         are added.
>>>>>>
>>>>>>                                         2) The scheduler
>>>>>>                                         https://github.com/apache/impala/blob/master/be/src/scheduling/scheduler.cc#L226
>>>>>>                                         <https://github.com/apache/impala/blob/master/be/src/scheduling/scheduler.cc#L226>
>>>>>>
>>>>>>                                         You'll need to dig
>>>>>>                                         through and understand
>>>>>>                                         that code so that you can
>>>>>>                                         make the necessary
>>>>>>                                         changes. Change the scan
>>>>>>                                         range to host mapping to
>>>>>>                                         your liking. The rest of
>>>>>>                                         the code should just work.
>>>>>>
>>>>>>                                         Cheers,
>>>>>>
>>>>>>                                         Alex
>>>>>>
>>>>>>
>>>>>>                                         On Mon, Mar 12, 2018 at
>>>>>>                                         6:55 PM, Philipp Krause
>>>>>>                                         <philippkrause.mail@googlemail.com
>>>>>>                                         <mailto:philippkrause.mail@googlemail.com>>
>>>>>>                                         wrote:
>>>>>>
>>>>>>                                             Thank you very much
>>>>>>                                             for your quick answers!
>>>>>>                                             The intention behind
>>>>>>                                             this is to improve
>>>>>>                                             the execution time
>>>>>>                                             and (primarily) to
>>>>>>                                             examine the impact of
>>>>>>                                             block-co-location
>>>>>>                                             (research project)
>>>>>>                                             for this particular
>>>>>>                                             query (simplified):
>>>>>>
>>>>>>                                             select A.x, B.y, A.z
>>>>>>                                             from tableA as A
>>>>>>                                             inner join tableB as
>>>>>>                                             B on A.id=B.id
>>>>>>
>>>>>>                                             The "real" query
>>>>>>                                             includes three joins
>>>>>>                                             and the data size is
>>>>>>                                             in pb-range.
>>>>>>                                             Therefore several
>>>>>>                                             nodes (5 in the test
>>>>>>                                             environment with less
>>>>>>                                             data) are used
>>>>>>                                             (without any load
>>>>>>                                             balancer).
>>>>>>
>>>>>>                                             Could you give me
>>>>>>                                             some hints what code
>>>>>>                                             changes are required
>>>>>>                                             and which files are
>>>>>>                                             affected? I don't
>>>>>>                                             know how to give
>>>>>>                                             Impala the
>>>>>>                                             information that it
>>>>>>                                             should only join the
>>>>>>                                             local data blocks on
>>>>>>                                             each node and then
>>>>>>                                             pass it to the
>>>>>>                                             "final" node which
>>>>>>                                             receives all
>>>>>>                                             intermediate results.
>>>>>>                                             I hope you can help
>>>>>>                                             me to get this
>>>>>>                                             working. That would
>>>>>>                                             be awesome!
>>>>>>
>>>>>>                                             Best regards
>>>>>>                                             Philipp
>>>>>>
>>>>>>                                             Am 12.03.2018 um
>>>>>>                                             18:38 schrieb
>>>>>>                                             Alexander Behm:
>>>>>>>                                             I suppose one
>>>>>>>                                             exception is if your
>>>>>>>                                             data lives only on a
>>>>>>>                                             single node. Then
>>>>>>>                                             you can set
>>>>>>>                                             num_nodes=1 and make
>>>>>>>                                             sure to send the
>>>>>>>                                             query request to the
>>>>>>>                                             impalad running on
>>>>>>>                                             the same data node
>>>>>>>                                             as the target data.
>>>>>>>                                             Then you should get
>>>>>>>                                             a local join.
>>>>>>>
>>>>>>>                                             On Mon, Mar 12, 2018
>>>>>>>                                             at 9:30 AM,
>>>>>>>                                             Alexander Behm
>>>>>>>                                             <alex.behm@cloudera.com
>>>>>>>                                             <mailto:alex.behm@cloudera.com>>
>>>>>>>                                             wrote:
>>>>>>>
>>>>>>>                                                 Such a specific
>>>>>>>                                                 block
>>>>>>>                                                 arrangement is
>>>>>>>                                                 very uncommon
>>>>>>>                                                 for typical
>>>>>>>                                                 Impala setups,
>>>>>>>                                                 so we don't
>>>>>>>                                                 attempt to
>>>>>>>                                                 recognize and
>>>>>>>                                                 optimize this
>>>>>>>                                                 narrow case. In
>>>>>>>                                                 particular, such
>>>>>>>                                                 an arrangement
>>>>>>>                                                 tends to be
>>>>>>>                                                 short lived if
>>>>>>>                                                 you have the
>>>>>>>                                                 HDFS balancer
>>>>>>>                                                 turned on.
>>>>>>>
>>>>>>>                                                 Without making
>>>>>>>                                                 code changes,
>>>>>>>                                                 there is no way
>>>>>>>                                                 today to remove
>>>>>>>                                                 the data
>>>>>>>                                                 exchanges and
>>>>>>>                                                 make sure that
>>>>>>>                                                 the scheduler
>>>>>>>                                                 assigns scan
>>>>>>>                                                 splits to nodes
>>>>>>>                                                 in the desired
>>>>>>>                                                 way (co-located,
>>>>>>>                                                 but with
>>>>>>>                                                 possible load
>>>>>>>                                                 imbalance).
>>>>>>>
>>>>>>>                                                 In what way is
>>>>>>>                                                 the current
>>>>>>>                                                 setup
>>>>>>>                                                 unacceptable to
>>>>>>>                                                 you? Is this
>>>>>>>                                                 pre-mature
>>>>>>>                                                 optimization? If
>>>>>>>                                                 you have certain
>>>>>>>                                                 performance
>>>>>>>                                                 expectations/requirements
>>>>>>>                                                 for specific
>>>>>>>                                                 queries we might
>>>>>>>                                                 be able to help
>>>>>>>                                                 you improve
>>>>>>>                                                 those. If you
>>>>>>>                                                 want to pursue
>>>>>>>                                                 this route,
>>>>>>>                                                 please help us
>>>>>>>                                                 by posting
>>>>>>>                                                 complete query
>>>>>>>                                                 profiles.
>>>>>>>
>>>>>>>                                                 Alex
>>>>>>>
>>>>>>>                                                 On Mon, Mar 12,
>>>>>>>                                                 2018 at 6:29 AM,
>>>>>>>                                                 Philipp Krause
>>>>>>>                                                 <philippkrause.mail@googlemail.com
>>>>>>>                                                 <mailto:philippkrause.mail@googlemail.com>>
>>>>>>>                                                 wrote:
>>>>>>>
>>>>>>>                                                     Hello everyone!
>>>>>>>
>>>>>>>                                                     In order to
>>>>>>>                                                     prevent
>>>>>>>                                                     network
>>>>>>>                                                     traffic, I'd
>>>>>>>                                                     like to
>>>>>>>                                                     perform
>>>>>>>                                                     local joins
>>>>>>>                                                     on each node
>>>>>>>                                                     instead of
>>>>>>>                                                     exchanging
>>>>>>>                                                     the data and
>>>>>>>                                                     perform a
>>>>>>>                                                     join over
>>>>>>>                                                     the complete
>>>>>>>                                                     data
>>>>>>>                                                     afterwards.
>>>>>>>                                                     My query is
>>>>>>>                                                     basically a
>>>>>>>                                                     join over
>>>>>>>                                                     three three
>>>>>>>                                                     tables on an
>>>>>>>                                                     ID
>>>>>>>                                                     attribute.
>>>>>>>                                                     The blocks
>>>>>>>                                                     are
>>>>>>>                                                     perfectly
>>>>>>>                                                     distributed,
>>>>>>>                                                     so that e.g.
>>>>>>>                                                     Table A -
>>>>>>>                                                     Block 0  and
>>>>>>>                                                     Table B -
>>>>>>>                                                     Block 0  are
>>>>>>>                                                     on the same
>>>>>>>                                                     node. These
>>>>>>>                                                     blocks
>>>>>>>                                                     contain all
>>>>>>>                                                     data rows
>>>>>>>                                                     with an ID
>>>>>>>                                                     range [0,1].
>>>>>>>                                                     Table A -
>>>>>>>                                                     Block 1  and
>>>>>>>                                                     Table B -
>>>>>>>                                                     Block 1 with
>>>>>>>                                                     an ID range
>>>>>>>                                                     [2,3] are on
>>>>>>>                                                     another node
>>>>>>>                                                     etc. So I
>>>>>>>                                                     want to
>>>>>>>                                                     perform a
>>>>>>>                                                     local join
>>>>>>>                                                     per node
>>>>>>>                                                     because any
>>>>>>>                                                     data
>>>>>>>                                                     exchange
>>>>>>>                                                     would be
>>>>>>>                                                     unneccessary
>>>>>>>                                                     (except for
>>>>>>>                                                     the last
>>>>>>>                                                     step when
>>>>>>>                                                     the final
>>>>>>>                                                     node
>>>>>>>                                                     recevieves
>>>>>>>                                                     all results
>>>>>>>                                                     of the other
>>>>>>>                                                     nodes). Is
>>>>>>>                                                     this possible?
>>>>>>>                                                     At the
>>>>>>>                                                     moment the
>>>>>>>                                                     query plan
>>>>>>>                                                     includes
>>>>>>>                                                     multiple
>>>>>>>                                                     data
>>>>>>>                                                     exchanges,
>>>>>>>                                                     although the
>>>>>>>                                                     blocks are
>>>>>>>                                                     already
>>>>>>>                                                     perfectly
>>>>>>>                                                     distributed
>>>>>>>                                                     (manually).
>>>>>>>                                                     I would be
>>>>>>>                                                     grateful for
>>>>>>>                                                     any help!
>>>>>>>
>>>>>>>                                                     Best regards
>>>>>>>                                                     Philipp Krause
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>>
>
>


Mime
View raw message