impala-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Philipp Krause <philippkrause.m...@googlemail.com>
Subject Re: Local join instead of data exchange - co-located blocks
Date Sun, 29 Apr 2018 19:21:07 GMT
Hi Alex,
I got the modified version working on my cluster. The query plan looks 
exactly as wanted (s. attachment). This is awesome! Unfortunately the 
result set is empty. As you can see in query_state.png, the scan 
progress always shows 50% although the query has finished.

The only modification in the code is the if statement you pointed to me 
(I set it to true). Maybe I have to give Impala the information about 
the lhs / rhs join partition since there are no exchange nodes now (like 
in the following lines)? The corresponding  partitions / blocks of each 
table are on the same node.

I think we are very close to the final result and I hope you can help me 
once more. Thank you so much!

Best regards
Philipp


Am 24.04.2018 um 18:00 schrieb Alexander Behm:
> On Tue, Apr 24, 2018 at 5:31 AM, Philipp Krause 
> <philippkrause.mail@googlemail.com 
> <mailto:philippkrause.mail@googlemail.com>> wrote:
>
>     To prevent the broadcast join I could simply use the shuffle
>     operator in the query:
>
>     SELECT * FROM business_partition_1 INNER JOIN [SHUFFLE]
>     business_partition_2 WHERE
>     business_partition_1.businessid=business_partition_2.businessid
>
>
> Not sure what version of Impala you are using, and whether hints 
> override any changes you might make. I suggest you make the code work 
> as you wish without requiring hints.
>
>
>     I think the broadcast is currently only used because of my very
>     small test tables.
>
>     This gives me the plan attached as partitioned_shuffle.png. Since
>     my modified version isn't working yet, I partitioned both tables
>     on businessid in Impala. The "hack" should only help to get into
>     the if-condition if I partition the data manually, right?. But in
>     this case (if the partitioning is done by Impala itself) Impala
>     should get into the if-condition anyway. Unfortunately I can't see
>     a difference in the plan compared to my unpartitioned tables
>     (unpartitioned.png) concerning the exchange nodes. My goal is
>     actually to get rid of all exchange nodes since the corresponding
>     data is already present on each node. Actually the plan should
>     look the same except for the 03 and 04 exchange then.
>
>
> I understand the picture and goal. I suggest you read, understand, and 
> modify the code in DistributedPlanner.createHashJoinFragment() to 
> create the plan shape that you want.
> I don't know how you are producing these plans. Are you sure your code 
> changes are taking effect?
>
>
>     Are there other changes neccessary to just take
>     Table1-partition/block X and Table2-partition/block Y on each node
>     and join them without any data exchange? Actually each node should
>     take all its local blocks for both tables, join them and pass the
>     results back to the coordinator where all results come together
>     (please see explanation.png).
>
>
> No. You just need to create the plan without exchange nodes, as we've 
> already gone through early in this thread.
>
>
>     I'm looking forward hearing from you. I hope we can realise this.
>
>     Best regards
>     Philipp
>
>
>     Am 24.04.2018 um 07:03 schrieb Alexander Behm:
>>     On Mon, Apr 23, 2018 at 7:24 PM, Philipp Krause
>>     <philippkrause.mail@googlemail.com
>>     <mailto:philippkrause.mail@googlemail.com>> wrote:
>>
>>         Hi Alex,
>>         thanks for the information! I've compiled the
>>         cdh5.13.1-release and replaced
>>         impala-frontend-0.1-SNAPSHOT.jar (which seems to include the
>>         changes in the DistributedPlanner.java). There still seems to
>>         to be a method missing after replacing the jar but I'll try
>>         to figure that out.
>>
>>         I have two questions concerning the code fragment in the
>>         DistributedPlanner.java you pointed to me.
>>
>>         First:
>>         The attached graphic shows the query plan for two tables
>>         which are partitioned on the join attribute (standard impala
>>         version without any changes). If I change the if-condition to
>>         true for my modified version I expect to get the same result
>>         for my "hand made" partitions (outside of impala). But why is
>>         there an exchange broadcast (even in the standard version)? I
>>         mean, if I have a partition of Table 1 with ID=0 on Node X
>>         why is there a broadcast of the partition of Table 2 with
>>         ID=0 on Node Y? Actually this partition only has to be sent
>>         to Node X where the matching partition of Table 1 is located
>>         (instead of sending it to all nodes (broadcast)). Or is this
>>         exactly the case and it's only shown as a broadcast here in
>>         the graphics?
>>
>>
>>     You are reading the plan correctly. Impala simply does not
>>     implement that optimization.
>>
>>
>>         Second:
>>         All corresponding blocks of the partitioned tables are on the
>>         same node (e.g. Table 1 Partition with ID=0, Table 2
>>         Partition with ID=0 => Node 1 etc.). This is what I did
>>         manually. As already mentioned before I want to join these
>>         partitions (blocks) locally on each node. But if I'm correct,
>>         the modification in the DistributedPlanner will also only
>>         lead to the plan in the attached graphic so that no further
>>         exchanges are created if I use my "hand made" partitions. But
>>         there is still the broadcast exchange which distributes the
>>         partitions across the cluster which isn't neccessary because
>>         all needed blocks are already on the same node and are ready
>>         to get joined locally. Is there a way to realise that and get
>>         rid of the broadcast exchange?
>>
>>     You are right. That hack in DistributedPlanner only works for
>>     partitioned hash joins. You can probably stitch the plan together
>>     at an earlier place, e.g.:
>>     https://github.com/apache/impala/blob/master/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java#L506
>>     <https://github.com/apache/impala/blob/master/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java#L506>
>>
>>         Please correct me if I'm wrong with my assumptions.
>>
>>         Thank you very much!
>>
>>         Best regards
>>         Philipp
>>
>>
>>
>>
>>
>>
>>
>>         Am 18.04.2018 um 07:16 schrieb Alexander Behm:
>>>         Your CM-managed cluster must be running a "compatible"
>>>         Impala version already for this trick to work. It looks like
>>>         your catalog binary is trying to find a method which does
>>>         not exist in the .jar, presumably because your .jar is built
>>>         based on a different version of Impala where that method
>>>         does not exist anymore.
>>>
>>>         It looks like you have CDH 5.13.3 installed. CDH 5.13.3 is
>>>         based on Impala 2.10, see:
>>>         https://www.cloudera.com/documentation/enterprise/release-notes/topics/cdh_vd_cdh_package_tarball_513.html#cm_vd_cdh_package_tarball_513
>>>         <https://www.cloudera.com/documentation/enterprise/release-notes/topics/cdh_vd_cdh_package_tarball_513.html#cm_vd_cdh_package_tarball_513>
>>>
>>>         That means this binary copying trick will only work with a
>>>         modified version of Impala 2.10, and very likely will not
>>>         work with a different version.
>>>
>>>         It's probably easier to test with the mini cluster first.
>>>         Alternatively, it "might" work if you replace all the
>>>         binaries mentioned above, but it's quite possible that will
>>>         not work.
>>>
>>>         On Sun, Apr 15, 2018 at 7:13 PM, Philipp Krause
>>>         <philippkrause.mail@googlemail.com
>>>         <mailto:philippkrause.mail@googlemail.com>> wrote:
>>>
>>>             Hi Alex! Thank you for the list! The build of the
>>>             modified cdh5-trunk branch (debug mode) was sucessfull.
>>>             After replacing "impala-frontend-0.1-SNAPSHOT.jar" in
>>>             /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/jars/
>>>             I got the following error in my existing cluster:
>>>             F0416 01:16:45.402997 17897 catalog.cc:69]
>>>             NoSuchMethodError: getCatalogObjects
>>>             When I switch back to the original jar file the error is
>>>             gone. So it must be something wrong with this file I
>>>             guess. But I wonder about the error in catalog.cc
>>>             because I didn't touch any .cc files.
>>>
>>>             I also replaced
>>>             "impala-data-source-api-1.0-SNAPSHOT.jar". The other jar
>>>             files do not exist in my impala installation (CDH-5.13.1).
>>>
>>>             What am I doing wrong?
>>>
>>>             Best regards
>>>             Philipp
>>>
>>>
>>>             Am 13.04.2018 um 20:12 schrieb Alexander Behm:
>>>>             Here's the foll list. It might not be minimal, but
>>>>             copying/overwriting these should work.
>>>>
>>>>             debug/service/impalad
>>>>             debug/service/libfesupport.so
>>>>             debug/service/libService.a
>>>>             release/service/impalad
>>>>             release/service/libfesupport.so
>>>>             release/service/libService.a
>>>>             yarn-extras-0.1-SNAPSHOT.jar
>>>>             impala-data-source-api-1.0-SNAPSHOT-sources.jar
>>>>             impala-data-source-api-1.0-SNAPSHOT.jar
>>>>             impala-frontend-0.1-SNAPSHOT-tests.jar
>>>>             impala-frontend-0.1-SNAPSHOT.jar
>>>>             libkudu_client.so.0.1.0
>>>>             libstdc++.so.6.0.20
>>>>             impala-no-sse.bc
>>>>             impala-sse.bc
>>>>             libimpalalzo.so
>>>>
>>>>             If you are only modifying the Java portion (like
>>>>             DistributedPlanner), then only copying/replacing the
>>>>             *.jar files should be sufficient.
>>>>
>>>>             On Fri, Apr 13, 2018 at 11:00 AM, Philipp Krause
>>>>             <philippkrause.mail@googlemail.com
>>>>             <mailto:philippkrause.mail@googlemail.com>> wrote:
>>>>
>>>>                 Yes, I have a running (virtual) cluster. I would
>>>>                 try to follow your way with the custom impala build
>>>>                 (DistributedPlanner.java is the only modified file
>>>>                 at the moment). Thank you in advance for the file
>>>>                 list!
>>>>
>>>>                 Best regards
>>>>                 Philipp
>>>>
>>>>                 Alexander Behm <alex.behm@cloudera.com
>>>>                 <mailto:alex.behm@cloudera.com>> schrieb am Fr.,
>>>>                 13. Apr. 2018, 18:45:
>>>>
>>>>                     I'm not really following your
>>>>                     installation/setup and am not an expert on
>>>>                     Cloudera Manager installation/config. If you
>>>>                     are going to build Impala anyway, it's probably
>>>>                     easiest to test on Impala's minicluster first.
>>>>
>>>>                     In general, if you have a running Cloudera
>>>>                     Managed cluster, you can deploy a custom Impala
>>>>                     build by simply overwriting the Impala existing
>>>>                     binaries and jars with the new build. If you
>>>>                     want to go this route, I can give you a full
>>>>                     list of files you need to replace.
>>>>
>>>>                     On Tue, Apr 10, 2018 at 11:44 AM, Philipp
>>>>                     Krause <philippkrause.mail@googlemail.com
>>>>                     <mailto:philippkrause.mail@googlemail.com>> wrote:
>>>>
>>>>                         Thank you for the explanation! Yes, I'm
>>>>                         using HDFS. The single replica setup is
>>>>                         only for test purposes at the moment. I
>>>>                         think this makes it easier to gain some
>>>>                         first results since less modifications
>>>>                         (scheduler etc.) are neccessary.
>>>>                         I would like to test the DistributedPlanner
>>>>                         modification in my virtual cluster. I used
>>>>                         a customized Vagrant script to install
>>>>                         Impala on multiple hosts (s.attachment). It
>>>>                         simply installs cloudera-manager-server-db,
>>>>                         cloudera-manager-server and
>>>>                         cloudera-manager-daemons via apt-get. What
>>>>                         would be the simplest solution to setup my
>>>>                         modified version? Could I simply call
>>>>                         ./buildall.sh and change the script to sth.
>>>>                         like this?
>>>>
>>>>                         echo "Install java..."
>>>>                         apt-get -q -y --force-yes install
>>>>                         oracle-j2sdk1.7
>>>>                         echo "Download impala..."
>>>>                         wget https://... where I uploaded my
>>>>                         modified version
>>>>                         echo "Extract impala..."
>>>>                         tar -xvzf Impala-cdh5-trunk.tar.gz
>>>>                         cd Impala-cdh5-trunk
>>>>                         echo "Build impala..."
>>>>                         ./buildall.sh
>>>>                         echo "Start impala instances..."
>>>>                         service cloudera-scm-server-db initdb
>>>>                         service cloudera-scm-server-db start
>>>>                         service cloudera-scm-server start
>>>>
>>>>                         Or is there another, maybe even easier
>>>>                         method, to test the code? Maybe via
>>>>                         bootstrap_development.sh / minicluster?
>>>>
>>>>                         Best regards
>>>>                         Philipp
>>>>
>>>>                         2018-04-05 18:39 GMT+02:00 Alexander Behm
>>>>                         <alex.behm@cloudera.com
>>>>                         <mailto:alex.behm@cloudera.com>>:
>>>>
>>>>                             Apologies for the late response. Btw,
>>>>                             your previous post was clear enough to
>>>>                             me, so no worries :)
>>>>
>>>>
>>>>                             On Wed, Apr 4, 2018 at 7:46 AM, Philipp
>>>>                             Krause
>>>>                             <philippkrause.mail@googlemail.com
>>>>                             <mailto:philippkrause.mail@googlemail.com>>
>>>>                             wrote:
>>>>
>>>>                                 Hello Alex,
>>>>
>>>>                                 I think my previous post has been
>>>>                                 too long and confusing. I apologize
>>>>                                 for that!
>>>>
>>>>                                 If replicas are completely
>>>>                                 deactivated, all scan ranges of a
>>>>                                 block are mapped to the one host,
>>>>                                 where the block is located on. This
>>>>                                 host is the "executor"/reader for
>>>>                                 all the scan ranges of this block.
>>>>                                 Is that correct?
>>>>
>>>>
>>>>                             Yes, assuming you are using HDFS.
>>>>
>>>>
>>>>                                 I tried to visualize my
>>>>                                 understanding of the scan_range to
>>>>                                 host mapping for my use case (s.
>>>>                                 attachment). Could you please have
>>>>                                 a quick look at it and tell me if
>>>>                                 this is correct?
>>>>
>>>>                                 "The existing scan range assignment
>>>>                                 is scan-node centric. For each scan
>>>>                                 node, we independently decide which
>>>>                                 of its scan ranges should be
>>>>                                 processed by which host."
>>>>                                 Without replicas, all scan ranges
>>>>                                 of a block would be assigend to the
>>>>                                 same host where this block is
>>>>                                 located on. Isn't everything local
>>>>                                 here, so that Table_A - Block_0 and
>>>>                                 Table_B - Block_0 can be joined
>>>>                                 local or are further steps
>>>>                                 neccessary? The condition in the
>>>>                                 DistributedPlanner you pointed to
>>>>                                 me is set to false (no exchange nodes).
>>>>
>>>>                                 "You want it to be host-centric.
>>>>                                 For each host, collect the local
>>>>                                 scan ranges of *all* scan nodes,
>>>>                                 and assign them to that host."
>>>>                                 Wouldn't the standard setup from
>>>>                                 above work? Wouldn't I assign all
>>>>                                 (the same) scan ranges to each host
>>>>                                 in this case here?
>>>>
>>>>
>>>>                             The standard setup works only in if
>>>>                             every block only has exactly one
>>>>                             replica. For our purposes, that is
>>>>                             basically never the case (who would
>>>>                             store production data without
>>>>                             replication?), so the single-replica
>>>>                             assumption was not clear to me.
>>>>
>>>>                             Does your current setup (only changing
>>>>                             the planner and not the scheduler)
>>>>                             produce the expected results?
>>>>
>>>>
>>>>
>>>>                                 Thank you very much!
>>>>
>>>>                                 Best regards
>>>>                                 Philipp
>>>>
>>>>                                 2018-03-28 21:04 GMT+02:00 Philipp
>>>>                                 Krause
>>>>                                 <philippkrause.mail@googlemail.com
>>>>                                 <mailto:philippkrause.mail@googlemail.com>>:
>>>>
>>>>                                     Thank you for your answer and
>>>>                                     sorry for my delay!
>>>>
>>>>                                     If my understanding is correct,
>>>>                                     the list of scan nodes consists
>>>>                                     of all nodes which contain a
>>>>                                     *local* block from a table that
>>>>                                     is needed for the query
>>>>                                     (Assumption: I have no replicas
>>>>                                     in my first tests). If
>>>>                                     TableA-Block0 is on Node_0,
>>>>                                     isn't Node_0 automatically a
>>>>                                     scan node? And wouldn't this
>>>>                                     scan node always be the host
>>>>                                     for the complete scan range(s)
>>>>                                     then?
>>>>
>>>>                                     "For each scan node, we
>>>>                                     independently decide which of
>>>>                                     its scan ranges should be
>>>>                                     processed by which host."
>>>>
>>>>                                     https://github.com/cloudera/Impala/blob/cdh5-trunk/be/src/scheduling/scheduler.cc#L532
>>>>                                     <https://github.com/cloudera/Impala/blob/cdh5-trunk/be/src/scheduling/scheduler.cc#L532>
>>>>
>>>>                                     // Loop over all scan ranges,
>>>>                                     select an executor for those
>>>>                                     with local impalads and
>>>>                                     // collect all others for later
>>>>                                     processing.
>>>>
>>>>                                     So in this whole block, scan
>>>>                                     ranges are assigned to the
>>>>                                     closest executor (=host?). But
>>>>                                     isn't the closest executor
>>>>                                     always the node the block is
>>>>                                     located on (assumed impalad is
>>>>                                     installed and I have no
>>>>                                     replicas)? And isn't this node
>>>>                                     always a scan node at the same
>>>>                                     time? Otherwise a thread on a
>>>>                                     remote host had to read the
>>>>                                     corresponding scan range, which
>>>>                                     would be more expensive. The
>>>>                                     only exception I can think of
>>>>                                     is when all threads on the
>>>>                                     local node are busy. Or, if I
>>>>                                     use replicas and all other
>>>>                                     threads of my node with the
>>>>                                     "original" block are busy, a
>>>>                                     thread on another node which
>>>>                                     contains a replica could read a
>>>>                                     special scan range of its local
>>>>                                     block. Is my understanding
>>>>                                     correct here?
>>>>
>>>>                                     Aren't all scan ranges read
>>>>                                     locally by its scan nodes if I
>>>>                                     have impalad installed on all
>>>>                                     nodes? And am I right, that the
>>>>                                     scan range is only based on its
>>>>                                     length which refers to
>>>>                                     maxScanRangeLength in
>>>>                                     computeScanRangeLocations?
>>>>                                     https://github.com/cloudera/Impala/blob/cdh5-trunk/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java#L721
>>>>                                     <https://github.com/cloudera/Impala/blob/cdh5-trunk/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java#L721>
>>>>
>>>>                                     I hope you can help me with the
>>>>                                     scan node <-> scan range->host
>>>>                                     relationship. If I have
>>>>                                     Table_A-Block_0 and
>>>>                                     Table_B_Block_0 on the same
>>>>                                     node (which I want to join
>>>>                                     locally), I don't get the point
>>>>                                     of why scan ranges could be
>>>>                                     assigned to another host in my
>>>>                                     scenario.
>>>>
>>>>                                     Best regads and thank you very
>>>>                                     much!
>>>>                                     Philipp Krause
>>>>
>>>>
>>>>                                     Am 21.03.2018 um 05:21 schrieb
>>>>                                     Alexander Behm:
>>>>>                                     Thanks for following up. I
>>>>>                                     think I understand your setup.
>>>>>
>>>>>                                     If you want to not think about
>>>>>                                     scan ranges, then you can
>>>>>                                     modify
>>>>>                                     HdfsScanNode.computeScanRangeLocations().
>>>>>                                     For example, you could change
>>>>>                                     it to produce one scan range
>>>>>                                     per file or per HDFS block.
>>>>>                                     That way you'd know exactly
>>>>>                                     what a scan range corresponds to.
>>>>>
>>>>>                                     I think the easiest/fastest
>>>>>                                     way for you to make progress
>>>>>                                     is to re-implement the
>>>>>                                     existing scan range assignment
>>>>>                                     logic in that place in the
>>>>>                                     code I had pointed you to.
>>>>>                                     There is no quick fix to
>>>>>                                     change the existing behavior.
>>>>>                                     The existing scan range
>>>>>                                     assignment is scan-node
>>>>>                                     centric. For each scan node,
>>>>>                                     we independently decide which
>>>>>                                     of its scan ranges should be
>>>>>                                     processed by which host.
>>>>>
>>>>>                                     I believe an algorithm to
>>>>>                                     achieve your goal would look
>>>>>                                     completely different. You want
>>>>>                                     it to be host-centric. For
>>>>>                                     each host, collect the local
>>>>>                                     scan ranges of *all* scan
>>>>>                                     nodes, and assign them to that
>>>>>                                     host.
>>>>>
>>>>>                                     Does that make sense?
>>>>>
>>>>>                                     Alex
>>>>>
>>>>>
>>>>>                                     On Mon, Mar 19, 2018 at 1:02
>>>>>                                     PM, Philipp Krause
>>>>>                                     <philippkrause.mail@googlemail.com
>>>>>                                     <mailto:philippkrause.mail@googlemail.com>>
>>>>>                                     wrote:
>>>>>
>>>>>                                         I'd like to provide a
>>>>>                                         small example for our
>>>>>                                         purpose. The last post may
>>>>>                                         be a bit confusing, so
>>>>>                                         here's a very simple
>>>>>                                         example in the attached
>>>>>                                         pdf file. I hope, it's
>>>>>                                         understandable. Otherwise,
>>>>>                                         please give me a short
>>>>>                                         feedback.
>>>>>
>>>>>                                         Basically, I only want
>>>>>                                         each data node to join all
>>>>>                                         it's local blocks. Is
>>>>>                                         there a range mapping
>>>>>                                         needed or is it possible
>>>>>                                         to easily join all local
>>>>>                                         blocks (regardless of its
>>>>>                                         content) since everything
>>>>>                                         is already "prepared"?
>>>>>                                         Maybe you can clarify this
>>>>>                                         for me.
>>>>>
>>>>>                                         As you can see in the
>>>>>                                         example, the tables are
>>>>>                                         not partitioned by ID. The
>>>>>                                         files are manually
>>>>>                                         prepared by the help of
>>>>>                                         the modulo function. So I
>>>>>                                         don't have a range like
>>>>>                                         [0,10], but something like
>>>>>                                         0,5,10,15 etc.
>>>>>
>>>>>                                         I hope, I didn't make it
>>>>>                                         too complicated and
>>>>>                                         confusing. I think, the
>>>>>                                         actual idea behind this is
>>>>>                                         really simple and I hope
>>>>>                                         you can help me to get
>>>>>                                         this working.
>>>>>
>>>>>                                         Best regards and thank you
>>>>>                                         very much for your time!
>>>>>                                         Philipp
>>>>>
>>>>>
>>>>>                                         Am 18.03.2018 um 17:32
>>>>>                                         schrieb Philipp Krause:
>>>>>>
>>>>>>                                         Hi! At the moment the
>>>>>>                                         data to parquet (block)
>>>>>>                                         mapping is based on a
>>>>>>                                         simple modulo function:
>>>>>>                                         Id % #data_nodes. So with
>>>>>>                                         5 data nodes all rows
>>>>>>                                         with Id's 0,5,10,... are
>>>>>>                                         written to Parquet_0,
>>>>>>                                         Id's 1,4,9 are written to
>>>>>>                                         Parquet_1 etc. That's
>>>>>>                                         what I did manually.
>>>>>>                                         Since the parquet file
>>>>>>                                         size and the block size
>>>>>>                                         are both set to 64MB,
>>>>>>                                         each parquet file will
>>>>>>                                         result in one block when
>>>>>>                                         I transfer the parquet
>>>>>>                                         files to HDFS. By
>>>>>>                                         default, HDFS distributes
>>>>>>                                         the blocks randomly. For
>>>>>>                                         test purposes I
>>>>>>                                         transferred corresponding
>>>>>>                                         blocks from Table_A and
>>>>>>                                         Table_B to the same data
>>>>>>                                         node (Table_A - Block_X
>>>>>>                                         with Id's 0,5,10 and
>>>>>>                                         Table_B - Block_Y with
>>>>>>                                         Id's 0,5,10). In this
>>>>>>                                         case, they are
>>>>>>                                         transferred to
>>>>>>                                         data_node_0 because the
>>>>>>                                         modulo function (which I
>>>>>>                                         want to implement in the
>>>>>>                                         scheduler) returns 0 for
>>>>>>                                         these Id's. This is also
>>>>>>                                         done manually at the moment.
>>>>>>
>>>>>>                                         1.) DistributedPlanner:
>>>>>>                                         For first, upcoming tests
>>>>>>                                         I simply changed the
>>>>>>                                         first condition in the
>>>>>>                                         DistributedPlanner to
>>>>>>                                         true to avoid exchange nodes.
>>>>>>
>>>>>>                                         2.) The scheduler: That's
>>>>>>                                         the part I'm currently
>>>>>>                                         struggling with. For
>>>>>>                                         first tests, block
>>>>>>                                         replication is
>>>>>>                                         deactivated. I'm not sure
>>>>>>                                         how / where to implement
>>>>>>                                         the modulo function for
>>>>>>                                         scan range to host
>>>>>>                                         mapping. Without the
>>>>>>                                         modulo function, I had to
>>>>>>                                         implement a hard coded
>>>>>>                                         mapping (something like
>>>>>>                                         "range" 0-0, 5-5, 10-10
>>>>>>                                         -> Data_node_0 etc.). Is
>>>>>>                                         that correct? Instead I
>>>>>>                                         would like to use a
>>>>>>                                         slightly more flexible
>>>>>>                                         solution by the help of
>>>>>>                                         this modulo function for
>>>>>>                                         the host mapping.
>>>>>>
>>>>>>                                         I would be really
>>>>>>                                         grateful if you could
>>>>>>                                         give me a hint for the
>>>>>>                                         scheduling
>>>>>>                                         implementation. I try to
>>>>>>                                         go deeper through the
>>>>>>                                         code meanwhile.
>>>>>>
>>>>>>                                         Best regards and thank
>>>>>>                                         you in advance
>>>>>>                                         Philipp
>>>>>>
>>>>>>
>>>>>>                                         Am 14.03.2018 um 08:06
>>>>>>                                         schrieb Philipp Krause:
>>>>>>>                                         Thank you very much for
>>>>>>>                                         these information! I'll
>>>>>>>                                         try to implement these
>>>>>>>                                         two steps and post some
>>>>>>>                                         updates within the next
>>>>>>>                                         days!
>>>>>>>
>>>>>>>                                         Best regards
>>>>>>>                                         Philipp
>>>>>>>
>>>>>>>                                         2018-03-13 5:38
>>>>>>>                                         GMT+01:00 Alexander Behm
>>>>>>>                                         <alex.behm@cloudera.com
>>>>>>>                                         <mailto:alex.behm@cloudera.com>>:
>>>>>>>
>>>>>>>                                             Cool that you
>>>>>>>                                             working on a
>>>>>>>                                             research project
>>>>>>>                                             with Impala!
>>>>>>>
>>>>>>>                                             Properly adding such
>>>>>>>                                             a feature to Impala
>>>>>>>                                             is a substantial
>>>>>>>                                             effort, but hacking
>>>>>>>                                             the code for an
>>>>>>>                                             experiment or two
>>>>>>>                                             seems doable.
>>>>>>>
>>>>>>>                                             I think you will
>>>>>>>                                             need to modify two
>>>>>>>                                             things: (1) the
>>>>>>>                                             planner to not add
>>>>>>>                                             exchange nodes, and
>>>>>>>                                             (2) the scheduler to
>>>>>>>                                             assign the
>>>>>>>                                             co-located scan
>>>>>>>                                             ranges to the same host.
>>>>>>>
>>>>>>>                                             Here are a few
>>>>>>>                                             starting points in
>>>>>>>                                             the code:
>>>>>>>
>>>>>>>                                             1) DistributedPlanner
>>>>>>>                                             https://github.com/apache/impala/blob/master/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java#L318
>>>>>>>                                             <https://github.com/apache/impala/blob/master/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java#L318>
>>>>>>>
>>>>>>>                                             The first condition
>>>>>>>                                             handles the case
>>>>>>>                                             where no exchange
>>>>>>>                                             nodes need to be
>>>>>>>                                             added because the
>>>>>>>                                             join inputs are
>>>>>>>                                             already suitably
>>>>>>>                                             partitioned.
>>>>>>>                                             You could hack the
>>>>>>>                                             code to always go
>>>>>>>                                             into that codepath,
>>>>>>>                                             so no exchanges are
>>>>>>>                                             added.
>>>>>>>
>>>>>>>                                             2) The scheduler
>>>>>>>                                             https://github.com/apache/impala/blob/master/be/src/scheduling/scheduler.cc#L226
>>>>>>>                                             <https://github.com/apache/impala/blob/master/be/src/scheduling/scheduler.cc#L226>
>>>>>>>
>>>>>>>                                             You'll need to dig
>>>>>>>                                             through and
>>>>>>>                                             understand that code
>>>>>>>                                             so that you can make
>>>>>>>                                             the necessary
>>>>>>>                                             changes. Change the
>>>>>>>                                             scan range to host
>>>>>>>                                             mapping to your
>>>>>>>                                             liking. The rest of
>>>>>>>                                             the code should just
>>>>>>>                                             work.
>>>>>>>
>>>>>>>                                             Cheers,
>>>>>>>
>>>>>>>                                             Alex
>>>>>>>
>>>>>>>
>>>>>>>                                             On Mon, Mar 12, 2018
>>>>>>>                                             at 6:55 PM, Philipp
>>>>>>>                                             Krause
>>>>>>>                                             <philippkrause.mail@googlemail.com
>>>>>>>                                             <mailto:philippkrause.mail@googlemail.com>>
>>>>>>>                                             wrote:
>>>>>>>
>>>>>>>                                                 Thank you very
>>>>>>>                                                 much for your
>>>>>>>                                                 quick answers!
>>>>>>>                                                 The intention
>>>>>>>                                                 behind this is
>>>>>>>                                                 to improve the
>>>>>>>                                                 execution time
>>>>>>>                                                 and (primarily)
>>>>>>>                                                 to examine the
>>>>>>>                                                 impact of
>>>>>>>                                                 block-co-location
>>>>>>>                                                 (research
>>>>>>>                                                 project) for
>>>>>>>                                                 this particular
>>>>>>>                                                 query (simplified):
>>>>>>>
>>>>>>>                                                 select A.x, B.y,
>>>>>>>                                                 A.z from tableA
>>>>>>>                                                 as A inner join
>>>>>>>                                                 tableB as B on
>>>>>>>                                                 A.id=B.id
>>>>>>>
>>>>>>>                                                 The "real" query
>>>>>>>                                                 includes three
>>>>>>>                                                 joins and the
>>>>>>>                                                 data size is in
>>>>>>>                                                 pb-range.
>>>>>>>                                                 Therefore
>>>>>>>                                                 several nodes (5
>>>>>>>                                                 in the test
>>>>>>>                                                 environment with
>>>>>>>                                                 less data) are
>>>>>>>                                                 used (without
>>>>>>>                                                 any load balancer).
>>>>>>>
>>>>>>>                                                 Could you give
>>>>>>>                                                 me some hints
>>>>>>>                                                 what code
>>>>>>>                                                 changes are
>>>>>>>                                                 required and
>>>>>>>                                                 which files are
>>>>>>>                                                 affected? I
>>>>>>>                                                 don't know how
>>>>>>>                                                 to give Impala
>>>>>>>                                                 the information
>>>>>>>                                                 that it should
>>>>>>>                                                 only join the
>>>>>>>                                                 local data
>>>>>>>                                                 blocks on each
>>>>>>>                                                 node and then
>>>>>>>                                                 pass it to the
>>>>>>>                                                 "final" node
>>>>>>>                                                 which receives
>>>>>>>                                                 all intermediate
>>>>>>>                                                 results. I hope
>>>>>>>                                                 you can help me
>>>>>>>                                                 to get this
>>>>>>>                                                 working. That
>>>>>>>                                                 would be awesome!
>>>>>>>
>>>>>>>                                                 Best regards
>>>>>>>                                                 Philipp
>>>>>>>
>>>>>>>                                                 Am 12.03.2018 um
>>>>>>>                                                 18:38 schrieb
>>>>>>>                                                 Alexander Behm:
>>>>>>>>                                                 I suppose one
>>>>>>>>                                                 exception is if
>>>>>>>>                                                 your data lives
>>>>>>>>                                                 only on a
>>>>>>>>                                                 single node.
>>>>>>>>                                                 Then you can
>>>>>>>>                                                 set num_nodes=1
>>>>>>>>                                                 and make sure
>>>>>>>>                                                 to send the
>>>>>>>>                                                 query request
>>>>>>>>                                                 to the impalad
>>>>>>>>                                                 running on the
>>>>>>>>                                                 same data node
>>>>>>>>                                                 as the target
>>>>>>>>                                                 data. Then you
>>>>>>>>                                                 should get a
>>>>>>>>                                                 local join.
>>>>>>>>
>>>>>>>>                                                 On Mon, Mar 12,
>>>>>>>>                                                 2018 at 9:30
>>>>>>>>                                                 AM, Alexander
>>>>>>>>                                                 Behm
>>>>>>>>                                                 <alex.behm@cloudera.com
>>>>>>>>                                                 <mailto:alex.behm@cloudera.com>>
>>>>>>>>                                                 wrote:
>>>>>>>>
>>>>>>>>                                                     Such a
>>>>>>>>                                                     specific
>>>>>>>>                                                     block
>>>>>>>>                                                     arrangement
>>>>>>>>                                                     is very
>>>>>>>>                                                     uncommon
>>>>>>>>                                                     for typical
>>>>>>>>                                                     Impala
>>>>>>>>                                                     setups, so
>>>>>>>>                                                     we don't
>>>>>>>>                                                     attempt to
>>>>>>>>                                                     recognize
>>>>>>>>                                                     and
>>>>>>>>                                                     optimize
>>>>>>>>                                                     this narrow
>>>>>>>>                                                     case. In
>>>>>>>>                                                     particular,
>>>>>>>>                                                     such an
>>>>>>>>                                                     arrangement
>>>>>>>>                                                     tends to be
>>>>>>>>                                                     short lived
>>>>>>>>                                                     if you have
>>>>>>>>                                                     the HDFS
>>>>>>>>                                                     balancer
>>>>>>>>                                                     turned on.
>>>>>>>>
>>>>>>>>                                                     Without
>>>>>>>>                                                     making code
>>>>>>>>                                                     changes,
>>>>>>>>                                                     there is no
>>>>>>>>                                                     way today
>>>>>>>>                                                     to remove
>>>>>>>>                                                     the data
>>>>>>>>                                                     exchanges
>>>>>>>>                                                     and make
>>>>>>>>                                                     sure that
>>>>>>>>                                                     the
>>>>>>>>                                                     scheduler
>>>>>>>>                                                     assigns
>>>>>>>>                                                     scan splits
>>>>>>>>                                                     to nodes in
>>>>>>>>                                                     the desired
>>>>>>>>                                                     way
>>>>>>>>                                                     (co-located,
>>>>>>>>                                                     but with
>>>>>>>>                                                     possible
>>>>>>>>                                                     load
>>>>>>>>                                                     imbalance).
>>>>>>>>
>>>>>>>>                                                     In what way
>>>>>>>>                                                     is the
>>>>>>>>                                                     current
>>>>>>>>                                                     setup
>>>>>>>>                                                     unacceptable
>>>>>>>>                                                     to you? Is
>>>>>>>>                                                     this
>>>>>>>>                                                     pre-mature
>>>>>>>>                                                     optimization?
>>>>>>>>                                                     If you have
>>>>>>>>                                                     certain
>>>>>>>>                                                     performance
>>>>>>>>                                                     expectations/requirements
>>>>>>>>                                                     for
>>>>>>>>                                                     specific
>>>>>>>>                                                     queries we
>>>>>>>>                                                     might be
>>>>>>>>                                                     able to
>>>>>>>>                                                     help you
>>>>>>>>                                                     improve
>>>>>>>>                                                     those. If
>>>>>>>>                                                     you want to
>>>>>>>>                                                     pursue this
>>>>>>>>                                                     route,
>>>>>>>>                                                     please help
>>>>>>>>                                                     us by
>>>>>>>>                                                     posting
>>>>>>>>                                                     complete
>>>>>>>>                                                     query profiles.
>>>>>>>>
>>>>>>>>                                                     Alex
>>>>>>>>
>>>>>>>>                                                     On Mon, Mar
>>>>>>>>                                                     12, 2018 at
>>>>>>>>                                                     6:29 AM,
>>>>>>>>                                                     Philipp
>>>>>>>>                                                     Krause
>>>>>>>>                                                     <philippkrause.mail@googlemail.com
>>>>>>>>                                                     <mailto:philippkrause.mail@googlemail.com>>
>>>>>>>>                                                     wrote:
>>>>>>>>
>>>>>>>>                                                         Hello
>>>>>>>>                                                         everyone!
>>>>>>>>
>>>>>>>>                                                         In
>>>>>>>>                                                         order
>>>>>>>>                                                         to
>>>>>>>>                                                         prevent
>>>>>>>>                                                         network
>>>>>>>>                                                         traffic,
>>>>>>>>                                                         I'd
>>>>>>>>                                                         like to
>>>>>>>>                                                         perform
>>>>>>>>                                                         local
>>>>>>>>                                                         joins
>>>>>>>>                                                         on each
>>>>>>>>                                                         node
>>>>>>>>                                                         instead
>>>>>>>>                                                         of
>>>>>>>>                                                         exchanging
>>>>>>>>                                                         the
>>>>>>>>                                                         data
>>>>>>>>                                                         and
>>>>>>>>                                                         perform
>>>>>>>>                                                         a join
>>>>>>>>                                                         over
>>>>>>>>                                                         the
>>>>>>>>                                                         complete
>>>>>>>>                                                         data
>>>>>>>>                                                         afterwards.
>>>>>>>>                                                         My
>>>>>>>>                                                         query
>>>>>>>>                                                         is
>>>>>>>>                                                         basically
>>>>>>>>                                                         a join
>>>>>>>>                                                         over
>>>>>>>>                                                         three
>>>>>>>>                                                         three
>>>>>>>>                                                         tables
>>>>>>>>                                                         on an
>>>>>>>>                                                         ID
>>>>>>>>                                                         attribute.
>>>>>>>>                                                         The
>>>>>>>>                                                         blocks
>>>>>>>>                                                         are
>>>>>>>>                                                         perfectly
>>>>>>>>                                                         distributed,
>>>>>>>>                                                         so that
>>>>>>>>                                                         e.g.
>>>>>>>>                                                         Table A
>>>>>>>>                                                         - Block
>>>>>>>>                                                         0  and
>>>>>>>>                                                         Table B
>>>>>>>>                                                         - Block
>>>>>>>>                                                         0  are
>>>>>>>>                                                         on the
>>>>>>>>                                                         same
>>>>>>>>                                                         node.
>>>>>>>>                                                         These
>>>>>>>>                                                         blocks
>>>>>>>>                                                         contain
>>>>>>>>                                                         all
>>>>>>>>                                                         data
>>>>>>>>                                                         rows
>>>>>>>>                                                         with an
>>>>>>>>                                                         ID
>>>>>>>>                                                         range
>>>>>>>>                                                         [0,1].
>>>>>>>>                                                         Table A
>>>>>>>>                                                         - Block
>>>>>>>>                                                         1  and
>>>>>>>>                                                         Table B
>>>>>>>>                                                         - Block
>>>>>>>>                                                         1 with
>>>>>>>>                                                         an ID
>>>>>>>>                                                         range
>>>>>>>>                                                         [2,3]
>>>>>>>>                                                         are on
>>>>>>>>                                                         another
>>>>>>>>                                                         node
>>>>>>>>                                                         etc. So
>>>>>>>>                                                         I want
>>>>>>>>                                                         to
>>>>>>>>                                                         perform
>>>>>>>>                                                         a local
>>>>>>>>                                                         join
>>>>>>>>                                                         per
>>>>>>>>                                                         node
>>>>>>>>                                                         because
>>>>>>>>                                                         any
>>>>>>>>                                                         data
>>>>>>>>                                                         exchange
>>>>>>>>                                                         would
>>>>>>>>                                                         be
>>>>>>>>                                                         unneccessary
>>>>>>>>                                                         (except
>>>>>>>>                                                         for the
>>>>>>>>                                                         last
>>>>>>>>                                                         step
>>>>>>>>                                                         when
>>>>>>>>                                                         the
>>>>>>>>                                                         final
>>>>>>>>                                                         node
>>>>>>>>                                                         recevieves
>>>>>>>>                                                         all
>>>>>>>>                                                         results
>>>>>>>>                                                         of the
>>>>>>>>                                                         other
>>>>>>>>                                                         nodes).
>>>>>>>>                                                         Is this
>>>>>>>>                                                         possible?
>>>>>>>>                                                         At the
>>>>>>>>                                                         moment
>>>>>>>>                                                         the
>>>>>>>>                                                         query
>>>>>>>>                                                         plan
>>>>>>>>                                                         includes
>>>>>>>>                                                         multiple
>>>>>>>>                                                         data
>>>>>>>>                                                         exchanges,
>>>>>>>>                                                         although
>>>>>>>>                                                         the
>>>>>>>>                                                         blocks
>>>>>>>>                                                         are
>>>>>>>>                                                         already
>>>>>>>>                                                         perfectly
>>>>>>>>                                                         distributed
>>>>>>>>                                                         (manually).
>>>>>>>>                                                         I would
>>>>>>>>                                                         be
>>>>>>>>                                                         grateful
>>>>>>>>                                                         for any
>>>>>>>>                                                         help!
>>>>>>>>
>>>>>>>>                                                         Best
>>>>>>>>                                                         regards
>>>>>>>>                                                         Philipp
>>>>>>>>                                                         Krause
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>
>>
>
>


Mime
View raw message